blob: 4f82e0c781b549b3a52f3172e67327fc0b66359f [file] [log] [blame]
# Copyright (C) 2020 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test thread analysis core routines"""
# pylint: disable=missing-docstring
import logging
from textwrap import dedent
from contextlib import contextmanager
from collections import defaultdict
import pytest
from modernmp.util import the
from .thread_analysis import (
CPU_NUMBER_SCHEMA,
MEMORY_SIZE_SCHEMA,
OOM_SCORE_ADJ_SCHEMA,
QueryBundle,
RAW_TGID_SCHEMA,
RAW_THREAD_STATE_SCHEMA,
RAW_TID_SCHEMA,
ThreadAnalysis,
UTGID_SCHEMA,
UTID_SCHEMA,
)
from .query import (
DURATION_SCHEMA,
EVENT_UNPARTITIONED_TIME_MAJOR,
QueryWarning,
STRING_SCHEMA,
TS_SCHEMA,
TableKind,
TableSchema,
TableSorting,
)
from .util import (
ExplicitInheritance,
final,
override,
)
from ._native import (
PID_MAX_LIMIT,
SYSTEM_UTID_BASE,
Snapshot,
)
from .test_query import (
TestQueryTable,
_execute_qt,
)
from .test_sql import ps2qt, _qerror
from .sql import TableValuedFunction
from .test_util import (
assertrepr_compare_hooks,
)
log = logging.getLogger(__name__)
CLONE_THREAD = 0x00010000
RUNNING = 0
SLEEPING = 1
# Infrastructure
def _compare_snapshots_hook(config, op, ss1, ss2):
if (op == "==" and
isinstance(ss1, Snapshot) and
isinstance(ss2, Snapshot) and
ss1 != ss2):
from _pytest.assertion.util import assertrepr_compare
return assertrepr_compare(config, op, ss1.format(), ss2.format())
return None
assertrepr_compare_hooks.append(_compare_snapshots_hook)
class SchedSwitchQueryBundle(QueryBundle):
"""QueryBundle for sched_switch events"""
# Just the fields stdlib needs
_ts = TS_SCHEMA.qn_iattr()
cpu = CPU_NUMBER_SCHEMA.qn_iattr()
prev_tid = RAW_TID_SCHEMA.qn_iattr()
prev_state = RAW_THREAD_STATE_SCHEMA.qn_iattr()
next_tid = RAW_TID_SCHEMA.qn_iattr()
_required_table_schema = EVENT_UNPARTITIONED_TIME_MAJOR
__inherit__ = dict(_required_table_schema=override)
class SchedWakingQueryBundle(QueryBundle):
"""QueryBundle for sched_waking"""
_ts = TS_SCHEMA.qn_iattr()
cpu = CPU_NUMBER_SCHEMA.qn_iattr()
woken_tid = RAW_TID_SCHEMA.qn_iattr()
_required_table_schema = EVENT_UNPARTITIONED_TIME_MAJOR
__inherit__ = dict(_required_table_schema=override)
@final
class TraceMaker(ExplicitInheritance):
"""Makes a series of fake trace events"""
@override
def __init__(self, *, nr_cpus=None):
self.__ts = 0
self.__query_bundles = dict(
task_newtask=ThreadAnalysis.TaskNewTask,
sched_process_exit=ThreadAnalysis.SchedProcessExit,
snapshot_start=ThreadAnalysis.SnapshotStart,
snapshot_end=ThreadAnalysis.SnapshotEnd,
sched_switch=SchedSwitchQueryBundle,
sched_waking=SchedWakingQueryBundle,
)
self.__rows_by_type = defaultdict(list)
self.__snapshot = None
self.__snapshots = []
self.__trace_end_ts = None
if nr_cpus:
self.__cpu_state = [0] * nr_cpus
else:
self.__cpu_state = None
self.markers = {
"trace_start": self.__ts,
}
def make_runnable(self, tid):
self.event("sched_waking", woken_tid=tid)
def switch_in(self, *, cpu, next_tid, prev_tid, prev_state):
assert prev_tid == self.__cpu_state[cpu]
assert next_tid not in self.__cpu_state
self.__cpu_state[cpu] = next_tid
return self.event("sched_switch",
cpu=cpu,
next_tid=next_tid,
prev_tid=prev_tid,
prev_state=prev_state)
@property
def nr_cpus(self):
"""Number of CPUs (cores) we're simulating"""
return len(self.__cpu_state)
def get_known_tables(self):
"""Return events we're simulating"""
return list(self.__query_bundles)
def __make_event_row(self, name, args, kwargs):
qbcls = self.__query_bundles[name]
assert issubclass(qbcls, QueryBundle)
row = []
args = list(reversed(args))
kwargs = dict(kwargs)
for i, (field_name, _finfo) in enumerate(qbcls.fields.items()):
# pylint: disable=compare-to-zero
if i == 0 and "_ts" not in kwargs:
assert field_name == "_ts"
field_value = self.__ts
else:
if args: # pylint: disable=else-if-used
field_value = args.pop()
else:
field_value = kwargs.pop(field_name)
row.append(field_value)
assert not args
assert not kwargs
return row
def event(self, name, *args, **kwargs):
self.__rows_by_type[name].append(
self.__make_event_row(name, args, kwargs))
return self
def gap(self, amount_of_time, mark=None):
assert the(int, amount_of_time) >= 0
self.__ts += amount_of_time
if mark:
self.mark(mark)
return self
def mark(self, name):
assert name not in self.markers
self.markers[the(str, name)] = self.__ts
return self
def __make_qb_table(self, qb_name):
# pylint: disable=protected-access
qbcls = self.__query_bundles[qb_name]
rows = self.__rows_by_type[qb_name]
def _qs_from_iattr(ia):
return ia.required_schema
tbl = TestQueryTable(
names=tuple(qbcls.fields),
schemas=[_qs_from_iattr(ia) for
ia in qbcls.fields.values()],
rows=rows,
table_schema=qbcls._required_table_schema)
return tbl
def snapshot_start(self):
assert self.__snapshot is None
self.event("snapshot_start",
_ts=self.__ts,
number=len(self.__snapshots))
self.__snapshot = Snapshot()
return self
def snapshot_end(self):
assert self.__snapshot is not None
self.event("snapshot_end", _ts=self.__ts)
self.__snapshots.append(self.__snapshot)
self.__snapshot = None
return self
def get_snapshot_data(self, nr):
return self.__snapshots[nr].copy()
def _forward_to_snapshot(fn): # pylint: disable=no-self-argument
# pylint: disable=protected-access,not-callable
def _fn(self, *args, **kwargs):
fn(self.__snapshot, *args, **kwargs)
return self
return _fn
note_scanned_oom_score_adj = \
_forward_to_snapshot(Snapshot.note_scanned_oom_score_adj)
note_scanned_parent = \
_forward_to_snapshot(Snapshot.note_scanned_parent)
note_scanned_tgid = \
_forward_to_snapshot(Snapshot.note_scanned_tgid)
note_scanned_state = \
_forward_to_snapshot(Snapshot.note_scanned_state)
note_scanned_comm = \
_forward_to_snapshot(Snapshot.note_scanned_comm)
note_scanned_rss_anon = \
_forward_to_snapshot(Snapshot.note_scanned_rss_anon)
note_scanned_rss_file = \
_forward_to_snapshot(Snapshot.note_scanned_rss_file)
del _forward_to_snapshot
def __note_scanned_process_or_thread(self, *,
tid,
tgid,
state="S",
comm=None):
self.note_scanned_tgid(tid, tgid)
self.note_scanned_state(tid, state)
if comm is None:
comm = "#{}".format(tid)
self.note_scanned_comm(tid, comm)
return self
def note_scanned_process(self, *,
tgid,
parent,
oom_score_adj=0,
**kwargs):
self.note_scanned_oom_score_adj(tgid, oom_score_adj)
if parent is not None:
self.note_scanned_parent(tgid, parent)
return self.__note_scanned_process_or_thread(
tid=tgid, tgid=tgid, **kwargs)
def note_scanned_thread(self, *,
tgid,
tid,
**kwargs):
assert tgid != tid
return self.__note_scanned_process_or_thread(
tid=tid, tgid=tgid, **kwargs)
def start_process(self, *, tgid, parent=1):
return self.event("task_newtask",
tid=parent,
new_tid=tgid,
clone_flags=0)
def start_thread(self, *, tid, creating_tid):
return self.event("task_newtask",
tid=creating_tid,
new_tid=tid,
clone_flags=CLONE_THREAD)
def exit_thread(self, *, tid):
return self.event("sched_process_exit", tid=tid)
def end_trace(self):
assert self.__trace_end_ts is None
self.__trace_end_ts = self.__ts
return self.mark("trace_end")
def make_tables(self):
d = {qb_name: self.__make_qb_table(qb_name)
for qb_name in self.__query_bundles}
if self.__trace_end_ts is not None:
d["last_ts"] = TestQueryTable(
names=["_ts"],
schemas=[TS_SCHEMA],
rows=[
[self.__trace_end_ts],
],
table_schema=EVENT_UNPARTITIONED_TIME_MAJOR,
)
d["get_snapshot_data"] = self.get_snapshot_data
return d
def analyze(self,
wanted_subtable,
*,
kernel_threads_as_threads=True,
k="SPAN"):
tables = self.make_tables()
nr_cpus = len(self.__cpu_state or ()) or "NULL"
q = dedent("""\
WITH ta AS thread_analysis(
sched_process_exit=>sched_process_exit,
task_newtask=>task_newtask,
snapshot_start=>snapshot_start,
snapshot_end=>snapshot_end,
get_snapshot_data=>get_snapshot_data,
first_ts=>0ns,
last_ts=>{last_ts},
kernel_threads_as_threads=>{ktat},
nr_cpus=>{nr_cpus},
),
SELECT {k} * FROM ta.{wanted_subtable}
""").format(
k=k,
last_ts="last_ts" if "last_ts" in tables else "NULL",
wanted_subtable=wanted_subtable,
ktat="TRUE" if kernel_threads_as_threads else "FALSE",
nr_cpus=nr_cpus)
ta_tvf = TableValuedFunction(ThreadAnalysis)
return _execute_qt(
ps2qt(q, thread_analysis=ta_tvf, **tables)
.to_schema(kind=TableKind.SPAN if k.lower() == "span"
else TableKind.EVENT,
sorting=TableSorting.TIME_MAJOR))
def _test_qt_threads(*rows):
return TestQueryTable(
names=["_ts", "_end", "raw_tid", "live_duration", "utid", "utgid"],
schemas=[TS_SCHEMA, DURATION_SCHEMA, RAW_TID_SCHEMA, DURATION_SCHEMA, UTID_SCHEMA, UTGID_SCHEMA],
rows=rows,
table_schema=TableSchema(TableKind.SPAN, "raw_tid",
TableSorting.TIME_MAJOR),
)
def _test_qt_processes(*rows):
return TestQueryTable(
names=["_ts", "_end", "raw_tgid", "live_duration", "utgid", "parent_utgid"],
schemas=[TS_SCHEMA, DURATION_SCHEMA, RAW_TGID_SCHEMA, DURATION_SCHEMA, UTGID_SCHEMA, UTGID_SCHEMA],
rows=rows,
table_schema=TableSchema(TableKind.SPAN, "raw_tgid",
TableSorting.TIME_MAJOR))
def _thread(m, tid, *,
_ts=None,
utid=None,
utgid=None,
_end=None,
_live_end=None):
return {
"_ts": m[_ts or "trace_start"],
"_end": m[_end or "trace_end"],
"raw_tid": tid,
"_live_end": m[_live_end or "trace_end"],
"utid": utid or -tid,
"utgid": utgid or -tid,
}
def _init_thread(m):
return _thread(m, 1)
def _process(m, tgid, *,
_ts=None,
utgid=None,
parent=-1,
_end=None,
_live_end=None):
return {
"_ts": m[_ts or "trace_start"],
"_end": m[_end or "trace_end"],
"raw_tgid": tgid,
"_live_end": m[_live_end or "trace_end"],
"utgid": utgid or -tgid,
"parent_utgid": parent,
}
def _init_process(m):
return _process(m, 1, parent=0)
# Positive tests
def test_basic():
ft = (TraceMaker()
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.note_scanned_process(tgid=125, parent=1)
.gap(10, "initial_snapshot_end")
.snapshot_end()
.gap(10, "tgid_101_start")
.start_process(tgid=101)
.gap(10)
.end_trace())
m = ft.markers
assert ft.analyze("threads") == _test_qt_threads(
_init_thread(m),
_thread(m, 125),
_thread(m, 101, _ts="tgid_101_start"),
)
assert ft.analyze("processes") == _test_qt_processes(
_init_process(m),
_process(m, 125),
_process(m, 101, _ts="tgid_101_start"),
)
def test_tid_reuse():
ft = (TraceMaker()
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.gap(10, "snapshot_end")
.snapshot_end()
.gap(10, "tgid_101v0_start")
.start_process(tgid=101)
.gap(10, "tgid_101v0_exit")
.exit_thread(tid=101)
.gap(10, "tgid_101v1_start")
.start_process(tgid=101)
.gap(10)
.end_trace())
m = ft.markers
assert ft.analyze("threads") == _test_qt_threads(
_init_thread(m),
_thread(m, 101,
_ts="tgid_101v0_start",
_end="tgid_101v1_start",
_live_end="tgid_101v0_exit"),
_thread(m, 101,
_ts="tgid_101v1_start",
utid=-10000101,
utgid=-10000101),
)
assert ft.analyze("processes") == _test_qt_processes(
_init_process(m),
_process(m, 101,
_ts="tgid_101v0_start",
_end="tgid_101v1_start",
_live_end="tgid_101v0_exit"),
_process(m, 101,
_ts="tgid_101v1_start",
utgid=-10000101),
)
def test_threads_non_snapshot():
ft = (TraceMaker()
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.gap(10)
.snapshot_end()
.gap(10, "tid_101_start")
.start_process(tgid=101)
.gap(10, "tid_201_start")
.start_thread(tid=201, creating_tid=101)
.gap(10, "tid_102_start")
.start_thread(tid=102, creating_tid=201)
.gap(10, "tid_102_exit")
.exit_thread(tid=102)
.gap(10, "tid_101_exit")
.exit_thread(tid=101)
.gap(10, "tid_201_exit")
.exit_thread(tid=201)
.gap(10)
.end_trace())
m = ft.markers
assert ft.analyze("threads") == _test_qt_threads(
_init_thread(m),
_thread(m, 101,
_ts="tid_101_start",
_live_end="tid_101_exit"),
_thread(m, 201,
_ts="tid_201_start",
_live_end="tid_201_exit",
utgid=-101),
_thread(m, 102,
_ts="tid_102_start",
_live_end="tid_102_exit",
utgid=-101),
)
assert ft.analyze("processes") == _test_qt_processes(
_init_process(m),
_process(m, 101, _ts="tid_101_start", _live_end="tid_201_exit"),
)
def test_threads_snapshot():
ft = (TraceMaker()
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.note_scanned_process(tgid=5, parent=1)
.note_scanned_thread(tgid=5, tid=51)
.snapshot_end()
.gap(10)
.end_trace())
m = ft.markers
assert ft.analyze("threads") == _test_qt_threads(
_init_thread(m),
_thread(m, 5),
_thread(m, 51, utgid=-5),
)
assert ft.analyze("processes") == _test_qt_processes(
_init_process(m),
_process(m, 5),
)
def test_zombie_in_snapshot():
ft = (TraceMaker()
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.note_scanned_process(tgid=5, parent=1, state="Z")
.snapshot_end()
.gap(10)
.end_trace())
m = ft.markers
assert ft.analyze("processes") == _test_qt_processes(
_init_process(m),
_process(m, 5, _live_end="trace_start"),
)
def test_process_start_on_snapshot_boundary():
ft = (TraceMaker()
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.snapshot_end()
.start_process(tgid=5, parent=1)
.gap(10)
.end_trace())
m = ft.markers
assert ft.analyze("threads") == _test_qt_threads(
_init_thread(m),
_thread(m, 5),
)
assert ft.analyze("processes") == _test_qt_processes(
_init_process(m),
_process(m, 5),
)
def test_simultaneous_start_exit():
ft = (TraceMaker()
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.snapshot_end()
.gap(10, "tgid5_start_stop")
.start_process(tgid=5, parent=1)
.exit_thread(tid=5)
.gap(10)
# Start another process to make sure we haven't screwed up
# the event queue with our trawling through it.
.mark("tgid10_start")
.start_process(tgid=10, parent=1)
.gap(10, "tgid10_exit")
.exit_thread(tid=10)
.gap(10)
.end_trace())
m = ft.markers
assert ft.analyze("threads") == _test_qt_threads(
_init_thread(m),
_thread(m, 5,
_ts="tgid5_start_stop",
_live_end="tgid5_start_stop"),
_thread(m, 10,
_ts="tgid10_start",
_live_end="tgid10_exit"),
)
assert ft.analyze("processes") == _test_qt_processes(
_init_process(m),
_process(m, 5,
_ts="tgid5_start_stop",
_live_end="tgid5_start_stop"),
_process(m, 10,
_ts="tgid10_start",
_live_end="tgid10_exit"),
)
def test_elide_due_to_proc_race():
ft = (TraceMaker()
.mark("snapshot_start")
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.gap(10, "snapshot_middle")
.exit_thread(tid=100)
.start_process(tgid=200)
.gap(10, "snapshot_end")
.snapshot_end()
.gap(10)
.end_trace())
m = ft.markers
with pytest.warns(QueryWarning,
match=r"pretending that RawTid\(100\) "
"never happened"):
assert ft.analyze("threads") == _test_qt_threads(
_init_thread(m),
_thread(m, 200, _ts="snapshot_middle"),
)
def test_process_start_and_died_in_snapshot():
ft = (TraceMaker()
.mark("snapshot_start")
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.gap(10, "tgid_200_start")
.start_process(tgid=200)
.gap(10, "tgid_200_end")
.exit_thread(tid=200)
.gap(10, "snapshot_end")
.snapshot_end()
.gap(10)
.end_trace())
m = ft.markers
assert ft.analyze("threads") == _test_qt_threads(
_init_thread(m),
_thread(m, 200,
_ts="tgid_200_start",
_live_end="tgid_200_end"),
)
def test_process_start_in_snapshot_parent_died():
ft = (TraceMaker()
.mark("snapshot_start")
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.gap(10, "tgid_200_start")
.start_process(tgid=200, parent=100)
.gap(10, "snapshot_end")
.snapshot_end()
.gap(10)
.end_trace())
m = ft.markers
@contextmanager
def _expected_warnings():
with pytest.warns(QueryWarning,
match=r"pretending that RawTid\(200\) was created "
"by init"):
yield
with _expected_warnings():
assert ft.analyze("threads") == _test_qt_threads(
_init_thread(m),
_thread(m, 200, _ts="tgid_200_start"),
)
with _expected_warnings():
assert ft.analyze("processes") == _test_qt_processes(
_init_process(m),
_process(m, 200, _ts="tgid_200_start"),
)
def test_process_scanned_in_snapshot_parent_died():
ft = (TraceMaker()
.mark("snapshot_start")
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.note_scanned_process(tgid=200, parent=100)
.snapshot_end()
.gap(10)
.end_trace())
m = ft.markers
@contextmanager
def _expected_warnings():
with pytest.warns(QueryWarning,
match=
r"[pP]retending that the parent of RawTid\(200\) "
r"is init"):
yield
with _expected_warnings():
assert ft.analyze("threads") == _test_qt_threads(
_init_thread(m),
_thread(m, 200),
)
with _expected_warnings():
assert ft.analyze("processes") == _test_qt_processes(
_init_process(m),
_process(m, 200),
)
def test_process_created_in_snapshot():
ft = (TraceMaker()
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.gap(10, "tgid_100_start")
.start_process(tgid=100, parent=1)
.gap(10)
.snapshot_end()
.gap(10, "tgid_101_start")
.start_process(tgid=101, parent=1)
.gap(10)
.end_trace())
m = ft.markers
assert ft.analyze("threads") == _test_qt_threads(
_init_thread(m),
_thread(m, 100, _ts="tgid_100_start"),
_thread(m, 101, _ts="tgid_101_start"),
)
def test_trace_ends_on_process_creation():
ft = (TraceMaker()
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.start_process(tgid=100, parent=1)
.snapshot_end()
.gap(10)
.start_process(tgid=101, parent=100)
.end_trace())
m = ft.markers
assert ft.analyze("threads") == _test_qt_threads(
_init_thread(m),
_thread(m, 100),
)
def test_snapshot_floats():
ft = (TraceMaker()
# Note how the events before the ostensible snapshot begin
# come out-of-order and have dangling references: we "fix"
# these problems by stuffing these events in the implicit
# floating start-of-trace snapshot.
.gap(10, "start_tgid_10")
.start_process(tgid=10, parent=5)
.gap(10, "start_tid_6")
.start_thread(tid=6, creating_tid=5)
.gap(10, "snapshot_start")
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.note_scanned_process(tgid=5, parent=1)
.gap(10)
.snapshot_end()
.gap(10)
.end_trace())
m = ft.markers
assert ft.analyze("threads") == _test_qt_threads(
_init_thread(m),
_thread(m, 5),
_thread(m, 10, _ts="start_tgid_10"),
_thread(m, 6, _ts="start_tid_6", utgid=-5),
)
assert ft.analyze("processes") == _test_qt_processes(
_init_process(m),
_process(m, 5),
_process(m, 10, _ts="start_tgid_10", parent=-5),
)
def test_multiple_snapshots():
ft = (TraceMaker()
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.note_scanned_process(tgid=125, parent=1)
.gap(10, "initial_snapshot_end")
.snapshot_end()
.gap(10, "tgid_101_start")
.start_process(tgid=101)
.gap(10)
.gap(10)
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.note_scanned_process(tgid=101, parent=1)
.note_scanned_process(tgid=125, parent=1)
.gap(10)
.snapshot_end()
.gap(10)
.end_trace())
m = ft.markers
assert ft.analyze("threads") == _test_qt_threads(
_init_thread(m),
_thread(m, 125),
_thread(m, 101, _ts="tgid_101_start"),
)
assert ft.analyze("processes") == _test_qt_processes(
_init_process(m),
_process(m, 125),
_process(m, 101, _ts="tgid_101_start"),
)
def test_kthreadd_root():
ft = (TraceMaker()
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.note_scanned_process(tgid=2, comm="kthreadd", parent=None)
.snapshot_end()
.gap(10)
.end_trace())
m = ft.markers
assert ft.analyze("threads") == _test_qt_threads(
_init_thread(m),
_thread(m, 2),
)
assert ft.analyze("processes") == _test_qt_processes(
_init_process(m),
_process(m, 2, parent=0),
)
def test_kthreadd_tgid_rewrite():
ft = (TraceMaker()
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.note_scanned_process(tgid=2, comm="kthreadd", parent=None)
.note_scanned_process(tgid=3, comm="some-kthread", parent=2)
.snapshot_end()
.gap(10)
.end_trace())
m = ft.markers
assert ft.analyze("threads") == _test_qt_threads(
_init_thread(m),
_thread(m, 2),
_thread(m, 3, utgid=-2),
)
assert ft.analyze("processes") == _test_qt_processes(
_init_process(m),
_process(m, 2, parent=0),
)
def test_kthreadd_tgid_rewrite_disable():
ft = (TraceMaker()
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.note_scanned_process(tgid=2, comm="kthreadd", parent=None)
.note_scanned_process(tgid=3, comm="some-kthread", parent=2)
.snapshot_end()
.gap(10)
.end_trace())
m = ft.markers
assert ft.analyze("threads", kernel_threads_as_threads=False) \
== _test_qt_threads(
_init_thread(m),
_thread(m, 2),
_thread(m, 3),
)
assert ft.analyze("processes", kernel_threads_as_threads=False) \
== _test_qt_processes(
_init_process(m),
_process(m, 2, parent=0),
_process(m, 3, parent=-2),
)
def test_samples():
# pylint: disable=bad-whitespace
ft = (TraceMaker()
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.note_scanned_process(tgid=100, parent=1, state="R",
oom_score_adj=200)
.note_scanned_thread(tid=101, tgid=100, comm="bleg")
.note_scanned_rss_file(tid=100, bytes=12345)
.note_scanned_rss_anon(tid=101, bytes=54321)
.gap(10, "snapshot_end")
.snapshot_end()
.gap(10)
.end_trace())
m = ft.markers
assert ft.analyze("threads") == _test_qt_threads(
_init_thread(m),
_thread(m, 100),
_thread(m, 101, utgid=-100)
)
assert ft.analyze("state", k="event") == TestQueryTable(
names=["_ts", "utid", "state"],
schemas=[TS_SCHEMA, UTID_SCHEMA, RAW_THREAD_STATE_SCHEMA],
rows=[
[m["snapshot_end"], -1, SLEEPING],
[m["snapshot_end"], -100, RUNNING],
[m["snapshot_end"], -101, SLEEPING],
], table_schema=EVENT_UNPARTITIONED_TIME_MAJOR)
assert ft.analyze("comm", k="event") == TestQueryTable(
names=["_ts", "utid", "comm"],
schemas=[TS_SCHEMA, UTID_SCHEMA, STRING_SCHEMA],
rows=[
[m["snapshot_end"], -1, "init"],
[m["snapshot_end"], -100, "#100"],
[m["snapshot_end"], -101, "bleg"],
], table_schema=EVENT_UNPARTITIONED_TIME_MAJOR)
# Note that the RSS stats attach to the utgid!
assert ft.analyze("rss_anon", k="event") == TestQueryTable(
names=["_ts", "utgid", "rss_anon"],
schemas=[TS_SCHEMA, UTGID_SCHEMA, MEMORY_SIZE_SCHEMA],
rows=[
[m["snapshot_end"], -100, 54321],
], table_schema=EVENT_UNPARTITIONED_TIME_MAJOR)
assert ft.analyze("rss_file", k="event") == TestQueryTable(
names=["_ts", "utgid", "rss_file"],
schemas=[TS_SCHEMA, UTGID_SCHEMA, MEMORY_SIZE_SCHEMA],
rows=[
[m["snapshot_end"], -100, 12345],
], table_schema=EVENT_UNPARTITIONED_TIME_MAJOR)
assert ft.analyze("oom_score_adj", k="event") == TestQueryTable(
names=["_ts", "utgid", "oom_score_adj"],
schemas=[TS_SCHEMA, UTGID_SCHEMA, OOM_SCORE_ADJ_SCHEMA],
rows=[
[m["snapshot_end"], -1, 0],
[m["snapshot_end"], -100, 200],
], table_schema=EVENT_UNPARTITIONED_TIME_MAJOR)
def test_samples_death():
# pylint: disable=bad-whitespace
ft = (TraceMaker()
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.note_scanned_process(tgid=100, parent=1, state="R",
oom_score_adj=200)
.gap(10, "tgid_100_exit")
.exit_thread(tid=100)
.gap(10, "snapshot_end")
.snapshot_end()
.gap(10)
.end_trace())
m = ft.markers
assert ft.analyze("threads") == _test_qt_threads(
_init_thread(m),
_thread(m, 100, _live_end="tgid_100_exit"),
)
# N.B. When a process dies in a snapshot, we emit snapshot samples
# at death time so that backfilling can figure out the right state
# pre-death. The thread isn't actually running at death time.
assert ft.analyze("state", k="event") == TestQueryTable(
names=["_ts", "utid", "state"],
schemas=[TS_SCHEMA, UTID_SCHEMA, RAW_THREAD_STATE_SCHEMA],
rows=[
[m["snapshot_end"], -1, SLEEPING],
[m["tgid_100_exit"], -100, RUNNING],
], table_schema=EVENT_UNPARTITIONED_TIME_MAJOR)
def test_core_threads():
ft = (TraceMaker(nr_cpus=2)
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.gap(10)
.snapshot_end()
.end_trace())
m = ft.markers
assert ft.analyze("threads") == _test_qt_threads(
_thread(m, SYSTEM_UTID_BASE - 1,
utid=SYSTEM_UTID_BASE - 1,
utgid=SYSTEM_UTID_BASE),
_thread(m, SYSTEM_UTID_BASE - 0,
utid=SYSTEM_UTID_BASE - 0,
utgid=SYSTEM_UTID_BASE),
_init_thread(m),
)
assert ft.analyze("processes") == _test_qt_processes(
_process(m, SYSTEM_UTID_BASE,
parent=0, utgid=SYSTEM_UTID_BASE),
_init_process(m),
)
# Negative tests
def test_bad_snapshot_no_init():
ft = (TraceMaker()
.snapshot_start()
.note_scanned_process(tgid=5, parent=1)
.snapshot_end()
.gap(10)
.end_trace())
with _qerror("incomplete snapshot"):
ft.analyze("processes")
def test_bad_snapshot_invalid_tid():
with _qerror("invalid value for Raw"):
ft = (TraceMaker()
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.note_scanned_process(tgid=0, parent=1)
.snapshot_end()
.gap(10)
.end_trace())
ft.analyze("processes")
def test_bad_snapshot_tid_reuse():
with _qerror("tid reuse"):
ft = (TraceMaker()
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.exit_thread(tid=5)
.gap(10)
.start_process(tgid=5, parent=1)
.snapshot_end()
.gap(10)
.end_trace())
ft.analyze("processes")
def test_bad_snapshot_circular_reference():
with _qerror("circular snapshot reference"):
ft = (TraceMaker()
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.start_process(tgid=5, parent=4)
.start_process(tgid=4, parent=5)
.snapshot_end()
.gap(10)
.end_trace())
ft.analyze("processes")
def test_bad_snapshot_inconsistent_parent():
with pytest.warns(QueryWarning, match="mismatch on field parent"):
ft = (TraceMaker()
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.note_scanned_process(tgid=5, parent=1)
.note_scanned_process(tgid=7, parent=5)
.note_scanned_parent(tid=5, parent=7)
.snapshot_end()
.gap(10)
.end_trace())
ft.analyze("processes")
def test_bad_trace_thread_died_before_birth():
with _qerror("RawTid(5) is dying"):
ft = (TraceMaker()
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.snapshot_end()
.gap(10)
.exit_thread(tid=5)
.gap(10)
.start_process(tgid=5, parent=1)
.gap(10)
.end_trace())
ft.analyze("processes")
def test_bad_trace_died_twice():
with _qerror("observed state is dead"):
ft = (TraceMaker()
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.snapshot_end()
.gap(10)
.start_process(tgid=5, parent=1)
.gap(10)
.exit_thread(tid=5)
.gap(10)
.exit_thread(tid=5)
.end_trace())
ft.analyze("processes")
def test_bad_trace_joining_dead_tgid():
with _qerror("thread group is not alive"):
ft = (TraceMaker()
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.snapshot_end()
.gap(10)
.start_process(tgid=5, parent=1)
.gap(10)
.exit_thread(tid=5)
.gap(10)
.start_thread(tid=7, creating_tid=5)
.gap(10)
.end_trace())
ft.analyze("processes")
def test_bad_trace_tid_reuse_without_death():
with _qerror("finalized while still alive"):
ft = (TraceMaker()
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.snapshot_end()
.gap(10)
.start_process(tgid=5, parent=1)
.gap(10)
.start_thread(tid=6, creating_tid=5)
.gap(10)
.start_thread(tid=6, creating_tid=5)
.end_trace())
ft.analyze("processes")
def test_bad_trace_reused_tg_leader():
with _qerror("tgid state should be dead"):
ft = (TraceMaker()
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.snapshot_end()
.gap(10)
.start_process(tgid=5, parent=1)
.gap(10)
.start_thread(tid=6, creating_tid=5)
.gap(10)
.exit_thread(tid=5)
.gap(10)
.start_process(tgid=5, parent=1)
.gap(10)
.end_trace())
ft.analyze("processes")
def test_bad_snapshot_dangling_reference():
with _qerror("outlives the snapshot"):
ft = (TraceMaker()
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.note_scanned_thread(tid=3, tgid=2)
.snapshot_end()
.end_trace())
ft.analyze("processes")
def test_bad_trace_huge_tid():
with _qerror("unexpectedly huge"):
ft = (TraceMaker()
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.note_scanned_process(tgid=PID_MAX_LIMIT+10, parent=1)
.snapshot_end()
.end_trace())
ft.analyze("processes")
def test_bad_trace_zero_duration_process():
with _qerror("zero-duration lifetime"):
ft = (TraceMaker()
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.snapshot_end()
.gap(10)
.start_process(tgid=10)
.exit_thread(tid=10)
.start_process(tgid=10)
.gap(10)
.end_trace())
ft.analyze("processes")
def test_bad_trace_snapshot_only():
with _qerror("zero-duration lifetime"):
ft = (TraceMaker()
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.start_process(tgid=100, parent=1)
.snapshot_end()
.end_trace())
ft.analyze("threads")
def test_bad_trace_unknown_thread_state():
with _qerror("unknown thread state"):
ft = (TraceMaker()
.snapshot_start()
.note_scanned_process(tgid=1, comm="init",
parent=None, state="?")
.snapshot_end()
.end_trace())
ft.analyze("threads")
def test_bad_snapshot_inconsistent_tgid():
with _qerror("but in taskdb it has"):
ft = (TraceMaker()
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.note_scanned_process(tgid=125, parent=1)
.gap(10, "initial_snapshot_end")
.snapshot_end()
.gap(10, "tgid_101_start")
.start_process(tgid=101)
.gap(10)
.gap(10)
.snapshot_start()
.note_scanned_process(tgid=1, comm="init", parent=None)
.note_scanned_process(tgid=101, parent=1)
.note_scanned_thread(tid=125, tgid=101)
.gap(10)
.snapshot_end()
.gap(10)
.end_trace())
ft.analyze("threads")
# Parse testing
def test_snapshot_equality():
# pylint: disable=superfluous-parens,unneeded-not
ss1 = Snapshot()
ss2 = Snapshot()
assert ss1 == ss2
assert not (ss1 != ss2)
ss1.note_scanned_tgid(1, 2)
assert ss1 != ss2
assert not (ss1 == ss2)
ss2.note_scanned_tgid(1, 2)
assert ss1 == ss2
def test_snapshot_copy():
ss1 = Snapshot()
ss2 = ss1.copy()
ss1.note_scanned_tgid(1, 2)
assert ss1 != ss2
BASH_TID = 183870
BASH_PARENT = 250447
BASH_STAT = "183870 (bash)) S 250447 183870 183870 34820 233667 4194304 9842 30115218 0 79 20 19 347971 80637 20 0 1 0 113345218 18149376 1448 18446744073709551615 94172523884544 94172524945648 140721025606512 0 0 0 65536 3670020 1266777851 0 0 0 17 3 0 0 0 0 0 94172527043696 94172527090756 94172546084864 140721025607500 140721025607508 140721025607508 140721025609710 0"
def _add_test_stat(ss, comm="bash)"):
ss.note_scanned_comm(BASH_TID, comm)
ss.note_scanned_parent(BASH_TID, BASH_PARENT)
ss.note_scanned_state(BASH_TID, "S")
def test_parse_tid_stat():
ss1 = Snapshot()
ss1.parse_tid_stat(BASH_TID, BASH_STAT)
ss2 = Snapshot()
_add_test_stat(ss2)
assert ss1 == ss2
@pytest.mark.parametrize("comm", ["bash ", " bash", "ba sh", "bash"])
def test_parse_tid_stat_comm_whitespace(comm):
ss1 = Snapshot()
ss1.parse_tid_stat(BASH_TID,
BASH_STAT.replace("(bash))", "(" + comm + ")"))
ss2 = Snapshot()
_add_test_stat(ss2, comm=comm)
assert ss1 == ss2
# Split into lines at the source level instead of using a multi-line
# string so that it's easy to comment out single lines for testing.
BASH_STATUS = "\n".join([
"Name: bash\\\\",
"Umask: 0077",
"State: S (sleeping)",
"Tgid: 183870",
"Ngid: 0",
"Pid: 183870",
"PPid: 250447",
"TracerPid: 0",
"Uid: 419826 419826 419826 419826",
"Gid: 89939 89939 89939 89939",
"FDSize: 256",
"Groups: 4 20 24 25 44 46 125 999 5000 6095 66688 68512 70967 70970 74990 75209 76076 77056 77281 79910 79982 80650 80665 81448 81910 82072 82189 82193 82712 83042 83243 84796 85841 86035 86931 87558 87815 87986 88277 88414 88461 88462 88463 88464 88466 89028 89046 89266 89939 89971 90312 90338 90384 90387 90415 90518 90535 90537 90558 90578 90673 90899 90968 91041 91674 91675 91750 91767 91820 91952 92188 92278 92901",
"NStgid: 183870",
"NSpid: 183870",
"NSpgid: 183870",
"NSsid: 183870",
"VmPeak: 66032 kB",
"VmSize: 17724 kB",
"VmLck: 0 kB",
"VmPin: 0 kB",
"VmHWM: 6240 kB",
"VmRSS: 5796 kB",
"RssAnon: 2584 kB",
"RssFile: 3212 kB",
"RssShmem: 0 kB",
"VmData: 2580 kB",
"VmStk: 132 kB",
"VmExe: 1040 kB",
"VmLib: 1992 kB",
"VmPTE: 84 kB",
"VmSwap: 0 kB",
"HugetlbPages: 0 kB",
"CoreDumping: 0",
"Threads: 1",
"SigQ: 0/62907",
"SigPnd: 0000000000000000",
"ShdPnd: 0000000000000000",
"SigBlk: 0000000000010000",
"SigIgn: 0000000000380004",
"SigCgt: 000000004b817efb",
"CapInh: 0000000000000000",
"CapPrm: 0000000000000000",
"CapEff: 0000000000000000",
"CapBnd: 0000003fffffffff",
"CapAmb: 0000000000000000",
"NoNewPrivs: 0",
"Seccomp: 0",
"Speculation_Store_Bypass: thread vulnerable",
"Cpus_allowed: f",
"Cpus_allowed_list: 0-3",
"Mems_allowed: 00000000,00000001",
"Mems_allowed_list: 0",
"voluntary_ctxt_switches: 1154",
"nonvoluntary_ctxt_switches: 163",
])
def _add_test_status(ss, comm="bash\\"):
ss.note_scanned_comm(BASH_TID, comm)
ss.note_scanned_tgid(BASH_TID, BASH_TID)
ss.note_scanned_parent(BASH_TID, BASH_PARENT)
ss.note_scanned_state(BASH_TID, 'S')
ss.note_scanned_rss_anon(BASH_TID, 2584*1024)
ss.note_scanned_rss_file(BASH_TID, 3212*1024)
def test_parse_tid_status():
ss1 = Snapshot()
ss1.parse_tid_status(BASH_TID, BASH_STATUS)
ss2 = Snapshot()
_add_test_status(ss2)
assert ss1 == ss2
@pytest.mark.parametrize("comm", ["bash ", " bash", "ba sh", "bash"])
def test_parse_tid_status_comm_whitespace(comm):
ss1 = Snapshot()
ss1.parse_tid_status(BASH_TID, BASH_STATUS.replace("bash\\\\", comm))
ss2 = Snapshot()
_add_test_status(ss2, comm=comm)
assert ss1 == ss2
def _dctv_escape(s):
return '"' + s.translate(str.maketrans({
'"': '\\"',
"\\": "\\\\",
"\n": "\\n",
})) + '"'
def test_parse_dctv_snapshot():
def _f(s):
return s.format(
bash_tid=BASH_TID,
escaped_bash_stat=_dctv_escape(BASH_STAT),
escaped_bash_status=_dctv_escape(BASH_STATUS))
dctv_snapshot = "\n".join(map(_f, [
"tgid_oom_score_adj {bash_tid} 100",
"tid_stat {bash_tid} {escaped_bash_stat}",
"tid_status {bash_tid} {escaped_bash_status}"
]))
ss1 = Snapshot()
ss1.parse_text_dctv_snapshot(dctv_snapshot)
ss2 = Snapshot()
ss2.note_scanned_oom_score_adj(BASH_TID, 100)
_add_test_stat(ss2)
_add_test_status(ss2)
assert ss1 == ss2