blob: 537cdc99790f12f6f93bb757e5bc6c7438e84396 [file] [log] [blame]
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Provides fakes for several of Telemetry's internal objects.
These allow code like story_runner and Benchmark to be run and tested
without compiling or starting a browser. Class names prepended with an
underscore are intended to be implementation details, and should not
be subclassed; however, some, like _FakeBrowser, have public APIs that
may need to be called in tests.
"""
from telemetry.internal.backends.chrome_inspector import websocket
from telemetry.internal.browser import browser_options
from telemetry.internal.platform import system_info
from telemetry.page import shared_page_state
from telemetry.util import image_util
from telemetry.testing.internal import fake_gpu_info
# Classes and functions which are intended to be part of the public
# fakes API.
class FakePlatform(object):
def __init__(self):
self._network_controller = None
self._tracing_controller = None
@property
def is_host_platform(self):
raise NotImplementedError
@property
def network_controller(self):
if self._network_controller is None:
self._network_controller = _FakeNetworkController()
return self._network_controller
@property
def tracing_controller(self):
if self._tracing_controller is None:
self._tracing_controller = _FakeTracingController()
return self._tracing_controller
def CanMonitorThermalThrottling(self):
return False
def IsThermallyThrottled(self):
return False
def HasBeenThermallyThrottled(self):
return False
def GetDeviceTypeName(self):
raise NotImplementedError
def GetArchName(self):
raise NotImplementedError
def GetOSName(self):
raise NotImplementedError
def GetOSVersionName(self):
raise NotImplementedError
def StopAllLocalServers(self):
pass
class FakeLinuxPlatform(FakePlatform):
def __init__(self):
super(FakeLinuxPlatform, self).__init__()
self.screenshot_png_data = None
self.http_server_directories = []
self.http_server = FakeHTTPServer()
@property
def is_host_platform(self):
return True
def GetDeviceTypeName(self):
return 'Desktop'
def GetArchName(self):
return 'x86_64'
def GetOSName(self):
return 'linux'
def GetOSVersionName(self):
return 'trusty'
def CanTakeScreenshot(self):
return bool(self.screenshot_png_data)
def TakeScreenshot(self, file_path):
if not self.CanTakeScreenshot():
raise NotImplementedError
img = image_util.FromBase64Png(self.screenshot_png_data)
image_util.WritePngFile(img, file_path)
return True
def SetHTTPServerDirectories(self, paths):
self.http_server_directories.append(paths)
class FakeHTTPServer(object):
def UrlOf(self, url):
del url # unused
return 'file:///foo'
class FakePossibleBrowser(object):
def __init__(self, execute_on_startup=None):
self._returned_browser = _FakeBrowser(FakeLinuxPlatform())
self.browser_type = 'linux'
self.supports_tab_control = False
self.is_remote = False
self.execute_on_startup = execute_on_startup
@property
def returned_browser(self):
"""The browser object that will be returned through later API calls."""
return self._returned_browser
def Create(self, finder_options):
if self.execute_on_startup is not None:
self.execute_on_startup()
del finder_options # unused
return self.returned_browser
@property
def platform(self):
"""The platform object from the returned browser.
To change this or set it up, change the returned browser's
platform.
"""
return self.returned_browser.platform
def IsRemote(self):
return self.is_remote
def SetCredentialsPath(self, _):
pass
class FakeSharedPageState(shared_page_state.SharedPageState):
def __init__(self, test, finder_options, story_set):
super(FakeSharedPageState, self).__init__(test, finder_options, story_set)
def _GetPossibleBrowser(self, test, finder_options):
p = FakePossibleBrowser()
self.ConfigurePossibleBrowser(p)
return p
def ConfigurePossibleBrowser(self, possible_browser):
"""Override this to configure the PossibleBrowser.
Can make changes to the browser's configuration here via e.g.:
possible_browser.returned_browser.returned_system_info = ...
"""
pass
def DidRunStory(self, results):
# TODO(kbr): add a test which throws an exception from DidRunStory
# to verify the fix from https://crrev.com/86984d5fc56ce00e7b37ebe .
super(FakeSharedPageState, self).DidRunStory(results)
class FakeSystemInfo(system_info.SystemInfo):
def __init__(self, model_name='', gpu_dict=None):
if gpu_dict == None:
gpu_dict = fake_gpu_info.FAKE_GPU_INFO
super(FakeSystemInfo, self).__init__(model_name, gpu_dict)
class _FakeBrowserFinderOptions(browser_options.BrowserFinderOptions):
def __init__(self, execute_on_startup=None, *args, **kwargs):
browser_options.BrowserFinderOptions.__init__(self, *args, **kwargs)
self.fake_possible_browser = \
FakePossibleBrowser(execute_on_startup=execute_on_startup)
def CreateBrowserFinderOptions(browser_type=None, execute_on_startup=None):
"""Creates fake browser finder options for discovering a browser."""
return _FakeBrowserFinderOptions(browser_type=browser_type, \
execute_on_startup=execute_on_startup)
# Internal classes. Note that end users may still need to both call
# and mock out methods of these classes, but they should not be
# subclassed.
class _FakeBrowser(object):
def __init__(self, platform):
self._tabs = _FakeTabList(self)
# Fake the creation of the first tab.
self._tabs.New()
self._returned_system_info = FakeSystemInfo()
self._platform = platform
self._browser_type = 'release'
self._is_crashed = False
@property
def platform(self):
return self._platform
@platform.setter
def platform(self, incoming):
"""Allows overriding of the fake browser's platform object."""
assert isinstance(incoming, FakePlatform)
self._platform = incoming
@property
def returned_system_info(self):
"""The object which will be returned from calls to GetSystemInfo."""
return self._returned_system_info
@returned_system_info.setter
def returned_system_info(self, incoming):
"""Allows overriding of the returned SystemInfo object.
Incoming argument must be an instance of FakeSystemInfo."""
assert isinstance(incoming, FakeSystemInfo)
self._returned_system_info = incoming
@property
def browser_type(self):
"""The browser_type this browser claims to be ('debug', 'release', etc.)"""
return self._browser_type
@browser_type.setter
def browser_type(self, incoming):
"""Allows setting of the browser_type."""
self._browser_type = incoming
@property
def credentials(self):
return _FakeCredentials()
def Close(self):
self._is_crashed = False
@property
def supports_system_info(self):
return True
def GetSystemInfo(self):
return self.returned_system_info
@property
def supports_tab_control(self):
return True
@property
def tabs(self):
return self._tabs
def DumpStateUponFailure(self):
pass
class _FakeCredentials(object):
def WarnIfMissingCredentials(self, _):
pass
class _FakeTracingController(object):
def __init__(self):
self._is_tracing = False
def StartTracing(self, tracing_config, timeout=10):
self._is_tracing = True
del tracing_config
del timeout
def StopTracing(self):
self._is_tracing = False
@property
def is_tracing_running(self):
return self._is_tracing
def ClearStateIfNeeded(self):
pass
class _FakeNetworkController(object):
def __init__(self):
self.wpr_mode = None
self.extra_wpr_args = None
self.is_replay_active = False
self.is_open = False
def InitializeIfNeeded(self):
pass
def Open(self, wpr_mode, extra_wpr_args):
self.wpr_mode = wpr_mode
self.extra_wpr_args = extra_wpr_args
self.is_open = True
def Close(self):
self.wpr_mode = None
self.extra_wpr_args = None
self.is_replay_active = False
self.is_open = False
def StartReplay(self, archive_path, make_javascript_deterministic=False):
del make_javascript_deterministic # Unused.
assert self.is_open
self.is_replay_active = archive_path is not None
def StopReplay(self):
self.is_replay_active = False
class _FakeTab(object):
def __init__(self, browser, tab_id):
self._browser = browser
self._tab_id = str(tab_id)
self._collect_garbage_count = 0
self.test_png = None
@property
def collect_garbage_count(self):
return self._collect_garbage_count
@property
def id(self):
return self._tab_id
@property
def browser(self):
return self._browser
def WaitForDocumentReadyStateToBeComplete(self, timeout=0):
pass
def Navigate(self, url, script_to_evaluate_on_commit=None,
timeout=0):
del script_to_evaluate_on_commit, timeout # unused
if url == 'chrome://crash':
self.browser._is_crashed = True
raise Exception
def WaitForDocumentReadyStateToBeInteractiveOrBetter(self, timeout=0):
pass
def IsAlive(self):
return True
def CloseConnections(self):
pass
def CollectGarbage(self):
self._collect_garbage_count += 1
def Close(self):
pass
@property
def screenshot_supported(self):
return self.test_png is not None
def Screenshot(self):
assert self.screenshot_supported, 'Screenshot is not supported'
return image_util.FromBase64Png(self.test_png)
class _FakeTabList(object):
_current_tab_id = 0
def __init__(self, browser):
self._tabs = []
self._browser = browser
def New(self, timeout=300):
del timeout # unused
type(self)._current_tab_id += 1
t = _FakeTab(self._browser, type(self)._current_tab_id)
self._tabs.append(t)
return t
def __iter__(self):
return self._tabs.__iter__()
def __len__(self):
return len(self._tabs)
def __getitem__(self, index):
if self._tabs[index].browser._is_crashed:
raise Exception
else:
return self._tabs[index]
def GetTabById(self, identifier):
"""The identifier of a tab can be accessed with tab.id."""
for tab in self._tabs:
if tab.id == identifier:
return tab
return None
class FakeInspectorWebsocket(object):
_NOTIFICATION_EVENT = 1
_NOTIFICATION_CALLBACK = 2
"""A fake InspectorWebsocket.
A fake that allows tests to send pregenerated data. Normal
InspectorWebsockets allow for any number of domain handlers. This fake only
allows up to 1 domain handler, and assumes that the domain of the response
always matches that of the handler.
"""
def __init__(self, mock_timer):
self._mock_timer = mock_timer
self._notifications = []
self._response_handlers = {}
self._pending_callbacks = {}
self._handler = None
def RegisterDomain(self, _, handler):
self._handler = handler
def AddEvent(self, method, params, time):
if self._notifications:
assert self._notifications[-1][1] < time, (
'Current response is scheduled earlier than previous response.')
response = {'method': method, 'params': params}
self._notifications.append((response, time, self._NOTIFICATION_EVENT))
def AddAsyncResponse(self, method, result, time):
if self._notifications:
assert self._notifications[-1][1] < time, (
'Current response is scheduled earlier than previous response.')
response = {'method': method, 'result': result}
self._notifications.append((response, time, self._NOTIFICATION_CALLBACK))
def AddResponseHandler(self, method, handler):
self._response_handlers[method] = handler
def SyncRequest(self, request, *args, **kwargs):
del args, kwargs # unused
handler = self._response_handlers[request['method']]
return handler(request) if handler else None
def AsyncRequest(self, request, callback):
self._pending_callbacks.setdefault(request['method'], []).append(callback)
def SendAndIgnoreResponse(self, request):
pass
def Connect(self, _):
pass
def DispatchNotifications(self, timeout):
current_time = self._mock_timer.time()
if not self._notifications:
self._mock_timer.SetTime(current_time + timeout + 1)
raise websocket.WebSocketTimeoutException()
response, time, kind = self._notifications[0]
if time - current_time > timeout:
self._mock_timer.SetTime(current_time + timeout + 1)
raise websocket.WebSocketTimeoutException()
self._notifications.pop(0)
self._mock_timer.SetTime(time + 1)
if kind == self._NOTIFICATION_EVENT:
self._handler(response)
elif kind == self._NOTIFICATION_CALLBACK:
callback = self._pending_callbacks.get(response['method']).pop(0)
callback(response)
else:
raise Exception('Unexpected response type')