blob: 248f9c886e7da5ca925b56f93455e92ff58f8920 [file] [log] [blame]
# Copyright (C) 2020 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for higher-level trace file parser"""
# pylint: disable=missing-docstring
import logging
from contextlib import contextmanager
from os.path import join as pjoin
from cytoolz import valmap, first
import numpy as np
import pytest
from .util import dctv_dir, INT64, all_same
from .model import TraceAnalysisSession, FileTraceContext
from .query import QuerySchema, STRING_SCHEMA
from .sql import quote
from .test_util import pict_parameterize
log = logging.getLogger(__name__)
test_data_dir = pjoin(dctv_dir(), "test-data")
@contextmanager
def _open_trace_session(trace_file_name):
with TraceAnalysisSession() as trace_session:
test_file_name = pjoin(test_data_dir, trace_file_name)
trace_session.mount_trace(
["mytrace"],
FileTraceContext(test_file_name),
temp_hack_lenient_metadata=True)
yield trace_session
def _do_test_file_parser(trace_file_name, expected_sched_switch_count):
with _open_trace_session(trace_file_name) as trace_session:
assert trace_session.sql_query_scalar(
"SELECT COUNT(*) AS res FROM mytrace.raw_events.sched_switch") \
== expected_sched_switch_count
def test_parse_style_1():
_do_test_file_parser("ftrace1.trace", 8)
def test_parse_style_2():
_do_test_file_parser("dragonball.mini.trace", 32370)
def test_last_ts_query():
with _open_trace_session("dragonball.mini.trace") as trace_session:
assert trace_session.sql_query_scalar(
"SELECT EVENT * FROM mytrace.last_ts") == 647183000
def test_event_types_query():
with _open_trace_session("dragonball.mini.trace") as trace_session:
event_types = trace_session.sql_query1(
"SELECT * FROM mytrace.found_event_types ORDER BY type").tolist()
assert event_types == [
"E_atrace",
"clock_set_rate",
"cpu_frequency",
"cpu_frequency_limits",
"cpu_idle",
"sched_blocked_reason",
"sched_switch",
"sched_wakeup",
"sched_waking",
"trace_marker_write",
"workqueue_activate_work",
"workqueue_execute_end",
"workqueue_execute_start",
"workqueue_queue_work",
]
@pict_parameterize(dict(block_size=[1, 2, 128],
want_schema=["want_schema", "no_want_schema"]))
def test_sql_query_columns(block_size, want_schema):
want_schema = (want_schema == "want_schema")
with TraceAnalysisSession() as session:
session.reconfigure_cache(block_size=block_size)
result = dict(session.sql_query(
"(VALUES (1, 'foo'), (3, 'bar'), (5, 'qux'))",
want_schema=want_schema,
))
v0 = result["col0"]
if want_schema:
v0, schema0 = v0
assert schema0 == QuerySchema(INT64)
assert isinstance(v0, np.ndarray)
assert v0.tolist() == [1, 3, 5]
v1 = result["col1"]
if want_schema:
v1, schema1 = v1
assert schema1 == STRING_SCHEMA
assert v1.tolist() == ["foo", "bar", "qux"]
def _mkvalues(*seqs):
def _fmt(val):
if isinstance(val, str):
return quote(val)
if isinstance(val, int):
return str(val)
if val is None:
return "NULL"
raise ValueError("can't format {!r}".format(val))
return "(VALUES {})".format(
", ".join(
"({})".format(", ".join(_fmt(val) for val in chunk))
for chunk in zip(*seqs)))
@pict_parameterize(
dict(block_size=[1, 2, 128],
str_type=[str, bytes, None],
str_none=["str_none", "str_not_none"],
int_none=["int_none", "int_not_none"],
empty=["empty", "not_empty"],
want_schema=["want-schema", "no_want_schema"]))
def test_sql_query_rows(
block_size,
str_type,
str_none,
int_none,
empty,
want_schema):
str_none = (str_none == "str_none")
int_none = (int_none == "int_none")
empty = (empty == "empty")
want_schema = (want_schema == "want_schema")
with TraceAnalysisSession() as session:
session.reconfigure_cache(block_size=block_size)
vals = {
"col0": [1, 3, 5],
"col1": ["foo", "bar", "qux"],
}
expected_schemas = {
"col0": QuerySchema(INT64),
"col1": STRING_SCHEMA,
}
if int_none:
vals["col0"][-1] = None
if str_none:
vals["col1"][1] = None
sql = _mkvalues(vals["col0"], vals["col1"])
if empty:
sql = "SELECT * FROM {} LIMIT 0".format(sql)
for vlist in vals.values():
vlist[:] = []
if str_type is bytes:
vals["col1"] = [None if x is None else x.encode("UTF-8")
for x in vals["col1"]]
elif str_type is None:
vals["col1"] = [(None if x is None else session.st.intern(
x.encode("UTF-8"))) for x in vals["col1"]]
else:
assert str_type is str
chunks = list(session.sql_query(sql,
want_schema=want_schema,
strings=str_type,
output_format="rows"))
data_blocks, eof = zip(*chunks)
assert sum(eof) == 1
assert eof[-1]
pos = 0
for data_block in data_blocks:
assert isinstance(data_block, dict)
assert sorted(data_block) == ["col0", "col1"]
if want_schema:
arrays = valmap(lambda x: x[0], data_block)
schemas = valmap(lambda x: x[1], data_block)
else:
arrays = data_block
schemas = expected_schemas
assert all_same(map(len, arrays))
clen = len(first(arrays.values()))
for key, array in arrays.items():
assert isinstance(array, np.ndarray)
assert array.tolist() == vals[key][pos : pos + clen]
for schema, expected_schema in zip(schemas.values(),
expected_schemas.values()):
assert schema.is_a(expected_schema)
pos += clen
@pytest.mark.parametrize("output_format", ["rows", "columns"])
def test_sql_query_no_columns(output_format):
with TraceAnalysisSession() as session:
list(session.sql_query(
"SELECT * FROM dctv.filled(10)",
output_format=output_format
))