| # Copyright (C) 2020 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http:#www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """Tests for higher-level trace file parser""" |
| |
| # pylint: disable=missing-docstring |
| |
| import logging |
| from contextlib import contextmanager |
| from os.path import join as pjoin |
| from cytoolz import valmap, first |
| import numpy as np |
| import pytest |
| from .util import dctv_dir, INT64, all_same |
| from .model import TraceAnalysisSession, FileTraceContext |
| from .query import QuerySchema, STRING_SCHEMA |
| from .sql import quote |
| from .test_util import pict_parameterize |
| |
| log = logging.getLogger(__name__) |
| |
| test_data_dir = pjoin(dctv_dir(), "test-data") |
| |
| @contextmanager |
| def _open_trace_session(trace_file_name): |
| with TraceAnalysisSession() as trace_session: |
| test_file_name = pjoin(test_data_dir, trace_file_name) |
| trace_session.mount_trace( |
| ["mytrace"], |
| FileTraceContext(test_file_name), |
| temp_hack_lenient_metadata=True) |
| yield trace_session |
| |
| def _do_test_file_parser(trace_file_name, expected_sched_switch_count): |
| with _open_trace_session(trace_file_name) as trace_session: |
| assert trace_session.sql_query_scalar( |
| "SELECT COUNT(*) AS res FROM mytrace.raw_events.sched_switch") \ |
| == expected_sched_switch_count |
| |
| def test_parse_style_1(): |
| _do_test_file_parser("ftrace1.trace", 8) |
| |
| def test_parse_style_2(): |
| _do_test_file_parser("dragonball.mini.trace", 32370) |
| |
| def test_last_ts_query(): |
| with _open_trace_session("dragonball.mini.trace") as trace_session: |
| assert trace_session.sql_query_scalar( |
| "SELECT EVENT * FROM mytrace.last_ts") == 647183000 |
| |
| def test_event_types_query(): |
| with _open_trace_session("dragonball.mini.trace") as trace_session: |
| event_types = trace_session.sql_query1( |
| "SELECT * FROM mytrace.found_event_types ORDER BY type").tolist() |
| assert event_types == [ |
| "E_atrace", |
| "clock_set_rate", |
| "cpu_frequency", |
| "cpu_frequency_limits", |
| "cpu_idle", |
| "sched_blocked_reason", |
| "sched_switch", |
| "sched_wakeup", |
| "sched_waking", |
| "trace_marker_write", |
| "workqueue_activate_work", |
| "workqueue_execute_end", |
| "workqueue_execute_start", |
| "workqueue_queue_work", |
| ] |
| |
| @pict_parameterize(dict(block_size=[1, 2, 128], |
| want_schema=["want_schema", "no_want_schema"])) |
| def test_sql_query_columns(block_size, want_schema): |
| want_schema = (want_schema == "want_schema") |
| with TraceAnalysisSession() as session: |
| session.reconfigure_cache(block_size=block_size) |
| result = dict(session.sql_query( |
| "(VALUES (1, 'foo'), (3, 'bar'), (5, 'qux'))", |
| want_schema=want_schema, |
| )) |
| v0 = result["col0"] |
| if want_schema: |
| v0, schema0 = v0 |
| assert schema0 == QuerySchema(INT64) |
| assert isinstance(v0, np.ndarray) |
| assert v0.tolist() == [1, 3, 5] |
| v1 = result["col1"] |
| if want_schema: |
| v1, schema1 = v1 |
| assert schema1 == STRING_SCHEMA |
| assert v1.tolist() == ["foo", "bar", "qux"] |
| |
| def _mkvalues(*seqs): |
| def _fmt(val): |
| if isinstance(val, str): |
| return quote(val) |
| if isinstance(val, int): |
| return str(val) |
| if val is None: |
| return "NULL" |
| raise ValueError("can't format {!r}".format(val)) |
| return "(VALUES {})".format( |
| ", ".join( |
| "({})".format(", ".join(_fmt(val) for val in chunk)) |
| for chunk in zip(*seqs))) |
| |
| @pict_parameterize( |
| dict(block_size=[1, 2, 128], |
| str_type=[str, bytes, None], |
| str_none=["str_none", "str_not_none"], |
| int_none=["int_none", "int_not_none"], |
| empty=["empty", "not_empty"], |
| want_schema=["want-schema", "no_want_schema"])) |
| def test_sql_query_rows( |
| block_size, |
| str_type, |
| str_none, |
| int_none, |
| empty, |
| want_schema): |
| str_none = (str_none == "str_none") |
| int_none = (int_none == "int_none") |
| empty = (empty == "empty") |
| want_schema = (want_schema == "want_schema") |
| with TraceAnalysisSession() as session: |
| session.reconfigure_cache(block_size=block_size) |
| vals = { |
| "col0": [1, 3, 5], |
| "col1": ["foo", "bar", "qux"], |
| } |
| expected_schemas = { |
| "col0": QuerySchema(INT64), |
| "col1": STRING_SCHEMA, |
| } |
| |
| if int_none: |
| vals["col0"][-1] = None |
| if str_none: |
| vals["col1"][1] = None |
| sql = _mkvalues(vals["col0"], vals["col1"]) |
| if empty: |
| sql = "SELECT * FROM {} LIMIT 0".format(sql) |
| for vlist in vals.values(): |
| vlist[:] = [] |
| if str_type is bytes: |
| vals["col1"] = [None if x is None else x.encode("UTF-8") |
| for x in vals["col1"]] |
| elif str_type is None: |
| vals["col1"] = [(None if x is None else session.st.intern( |
| x.encode("UTF-8"))) for x in vals["col1"]] |
| else: |
| assert str_type is str |
| chunks = list(session.sql_query(sql, |
| want_schema=want_schema, |
| strings=str_type, |
| output_format="rows")) |
| data_blocks, eof = zip(*chunks) |
| assert sum(eof) == 1 |
| assert eof[-1] |
| pos = 0 |
| for data_block in data_blocks: |
| assert isinstance(data_block, dict) |
| assert sorted(data_block) == ["col0", "col1"] |
| if want_schema: |
| arrays = valmap(lambda x: x[0], data_block) |
| schemas = valmap(lambda x: x[1], data_block) |
| else: |
| arrays = data_block |
| schemas = expected_schemas |
| assert all_same(map(len, arrays)) |
| clen = len(first(arrays.values())) |
| for key, array in arrays.items(): |
| assert isinstance(array, np.ndarray) |
| assert array.tolist() == vals[key][pos : pos + clen] |
| for schema, expected_schema in zip(schemas.values(), |
| expected_schemas.values()): |
| assert schema.is_a(expected_schema) |
| pos += clen |
| |
| @pytest.mark.parametrize("output_format", ["rows", "columns"]) |
| def test_sql_query_no_columns(output_format): |
| with TraceAnalysisSession() as session: |
| list(session.sql_query( |
| "SELECT * FROM dctv.filled(10)", |
| output_format=output_format |
| )) |