blob: 242ae59d11621f68819be32de27cfa4a6756ff81 [file] [log] [blame]
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import unittest
from collections import namedtuple
from telemetry.testing import test_page_test_results
from telemetry.timeline import model as model_module
from telemetry.timeline import slice as slice_module
from telemetry.web_perf import timeline_interaction_record
from telemetry.web_perf.metrics.trace_event_stats import TraceEventStats
from telemetry.web_perf.metrics.trace_event_stats import TraceEventStatsInput
FakeEvent = namedtuple('Event', 'name, start, end, thread_duration, args')
Interaction = timeline_interaction_record.TimelineInteractionRecord
TEST_INTERACTION_LABEL = 'Action_TestInteraction'
RENDERER_PROCESS = 'Renderer'
OTHER_PROCESS = 'Other'
EVENT_CATEGORY1 = 'Category1'
EVENT_CATEGORY2 = 'Category2'
EVENT_NAME1 = 'Name1'
EVENT_NAME2 = 'Name2'
def TestInteraction(start, end):
return Interaction(TEST_INTERACTION_LABEL, start, end)
class TraceEventStatsUnittest(unittest.TestCase):
def setUp(self):
self.model = model_module.TimelineModel()
self.renderer_process = self.model.GetOrCreateProcess(1)
self.renderer_process.name = RENDERER_PROCESS
self.main_thread = self.renderer_process.GetOrCreateThread(tid=11)
self.other_process = self.model.GetOrCreateProcess(2)
self.other_process.name = OTHER_PROCESS
self.other_thread = self.other_process.GetOrCreateThread(tid=12)
def GetThreadForProcessName(self, process_name):
if process_name is RENDERER_PROCESS:
return self.main_thread
elif process_name is OTHER_PROCESS:
return self.other_thread
else:
raise
def AddEvent(self, process_name, event_category, event_name,
start, duration, thread_start, thread_duration):
thread = self.GetThreadForProcessName(process_name)
record = slice_module.Slice(thread,
event_category,
event_name,
start, duration, thread_start, thread_duration)
thread.PushSlice(record)
def RunAggregator(self, aggregator, interactions):
results = test_page_test_results.TestPageTestResults(self)
aggregator.AddResults(self.model, self.renderer_process,
interactions, results)
return results
def testBasicUsage(self):
self.AddEvent(RENDERER_PROCESS, EVENT_CATEGORY1, EVENT_NAME1, 10, 8, 10, 5)
self.AddEvent(RENDERER_PROCESS, EVENT_CATEGORY1, EVENT_NAME1, 14, 2, 14, 2)
interactions = [TestInteraction(9, 14)]
aggregator = TraceEventStats()
aggregator.AddInput(TraceEventStatsInput(
EVENT_CATEGORY1,
EVENT_NAME1,
'metric-name',
'metric-description',
'units',
'Renderer'))
results = self.RunAggregator(aggregator, interactions)
results.AssertHasPageSpecificScalarValue('metric-name-count', 'count', 2)
results.AssertHasPageSpecificListOfScalarValues(
'metric-name', 'units', [5, 2])
def testFiltering(self):
# These should be recorded.
self.AddEvent(RENDERER_PROCESS, EVENT_CATEGORY1, EVENT_NAME1, 10, 8, 10, 5)
self.AddEvent(RENDERER_PROCESS, EVENT_CATEGORY1, EVENT_NAME1, 14, 2, 14, 2)
self.AddEvent(RENDERER_PROCESS, EVENT_CATEGORY1, EVENT_NAME1, 20, 6, 20, 1)
# These should be filtered.
self.AddEvent(RENDERER_PROCESS, EVENT_CATEGORY1, EVENT_NAME1, 15, 1, 15, 1)
self.AddEvent(RENDERER_PROCESS, EVENT_CATEGORY2, EVENT_NAME1, 11, 4, 11, 4)
self.AddEvent(RENDERER_PROCESS, EVENT_CATEGORY1, EVENT_NAME2, 11, 3, 11, 3)
self.AddEvent(OTHER_PROCESS, EVENT_CATEGORY1, EVENT_NAME1, 11, 2, 11, 2)
interactions = [TestInteraction(9, 14), TestInteraction(20, 21)]
aggregator = TraceEventStats()
# Test that we default to 'Renderer'
aggregator.AddInput(TraceEventStatsInput(
EVENT_CATEGORY1,
EVENT_NAME1,
'metric-name',
'metric-description',
'units'))
results = self.RunAggregator(aggregator, interactions)
results.AssertHasPageSpecificScalarValue('metric-name-count', 'count', 3)
results.AssertHasPageSpecificListOfScalarValues(
'metric-name', 'units', [5, 2, 1])
def testNoInputs(self):
# These should be recorded.
self.AddEvent(RENDERER_PROCESS, EVENT_CATEGORY1, EVENT_NAME1, 10, 8, 10, 5)
self.AddEvent(RENDERER_PROCESS, EVENT_CATEGORY1, EVENT_NAME1, 14, 2, 14, 2)
self.AddEvent(RENDERER_PROCESS, EVENT_CATEGORY1, EVENT_NAME1, 20, 6, 20, 1)
# These should be filtered.
self.AddEvent(RENDERER_PROCESS, EVENT_CATEGORY1, EVENT_NAME1, 15, 1, 15, 1)
self.AddEvent(RENDERER_PROCESS, EVENT_CATEGORY2, EVENT_NAME1, 11, 4, 11, 4)
self.AddEvent(RENDERER_PROCESS, EVENT_CATEGORY1, EVENT_NAME2, 11, 3, 11, 3)
self.AddEvent(OTHER_PROCESS, EVENT_CATEGORY1, EVENT_NAME1, 11, 2, 11, 2)
interactions = [TestInteraction(9, 14), TestInteraction(20, 21)]
aggregator = TraceEventStats()
results = self.RunAggregator(aggregator, interactions)
self.assertEquals([], results.all_page_specific_values)
def testNoEvents(self):
interactions = [TestInteraction(9, 14)]
aggregator = TraceEventStats()
aggregator.AddInput(TraceEventStatsInput(
EVENT_CATEGORY1,
EVENT_NAME1,
'metric-name',
'metric-description',
'units'))
results = self.RunAggregator(aggregator, interactions)
self.assertEquals([], results.all_page_specific_values)