| # Copyright 2014 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import errno |
| import socket |
| import unittest |
| |
| from telemetry import decorators |
| from telemetry.internal.backends.chrome_inspector import inspector_websocket |
| from telemetry.internal.backends.chrome_inspector import websocket |
| from telemetry.testing import simple_mock |
| |
| |
| class FakeSocket(object): |
| """A fake websocket that allows test to send random data.""" |
| def __init__(self, mock_timer): |
| self._mock_timer = mock_timer |
| self._responses = [] |
| self._timeout = None |
| |
| def AddResponse(self, response, time): |
| if self._responses: |
| assert self._responses[-1][1] < time, ( |
| 'Current response is scheduled earlier than previous response.') |
| self._responses.append((response, time)) |
| |
| def send(self, data): |
| pass |
| |
| def recv(self): |
| if not self._responses: |
| raise Exception('No more recorded responses.') |
| |
| response, time = self._responses.pop(0) |
| current_time = self._mock_timer.time() |
| if self._timeout is not None and time - current_time > self._timeout: |
| self._mock_timer.SetTime(current_time + self._timeout + 1) |
| raise websocket.WebSocketTimeoutException() |
| |
| self._mock_timer.SetTime(time) |
| if isinstance(response, Exception): |
| raise response |
| return response |
| |
| def settimeout(self, timeout): |
| self._timeout = timeout |
| |
| |
| def _DoNothingHandler(elapsed_time): |
| del elapsed_time # unused |
| |
| |
| class InspectorWebsocketUnittest(unittest.TestCase): |
| |
| def setUp(self): |
| self._mock_timer = simple_mock.MockTimer() |
| |
| def tearDown(self): |
| self._mock_timer.Restore() |
| |
| @decorators.Disabled('chromeos', 'mac') # crbug.com/483212, crbug.com/498950 |
| def testDispatchNotification(self): |
| inspector = inspector_websocket.InspectorWebsocket() |
| fake_socket = FakeSocket(self._mock_timer) |
| # pylint: disable=protected-access |
| inspector._socket = fake_socket |
| |
| results = [] |
| def OnTestEvent(result): |
| results.append(result) |
| |
| inspector.RegisterDomain('Test', OnTestEvent) |
| fake_socket.AddResponse('{"method": "Test.foo"}', 5) |
| inspector.DispatchNotifications() |
| self.assertEqual(1, len(results)) |
| self.assertEqual('Test.foo', results[0]['method']) |
| |
| @decorators.Disabled('chromeos') # crbug.com/483212 |
| def testDispatchNotificationTimedOut(self): |
| inspector = inspector_websocket.InspectorWebsocket() |
| fake_socket = FakeSocket(self._mock_timer) |
| # pylint: disable=protected-access |
| inspector._socket = fake_socket |
| |
| results = [] |
| def OnTestEvent(result): |
| results.append(result) |
| |
| inspector.RegisterDomain('Test', OnTestEvent) |
| fake_socket.AddResponse('{"method": "Test.foo"}', 11) |
| with self.assertRaises( |
| websocket.WebSocketTimeoutException): |
| inspector.DispatchNotifications(timeout=10) |
| self.assertEqual(0, len(results)) |
| |
| @decorators.Disabled('chromeos') # crbug.com/483212 |
| def testUnregisterDomain(self): |
| inspector = inspector_websocket.InspectorWebsocket() |
| fake_socket = FakeSocket(self._mock_timer) |
| # pylint: disable=protected-access |
| inspector._socket = fake_socket |
| |
| results = [] |
| def OnTestEvent(result): |
| results.append(result) |
| |
| inspector.RegisterDomain('Test', OnTestEvent) |
| inspector.RegisterDomain('Test2', OnTestEvent) |
| inspector.UnregisterDomain('Test') |
| |
| fake_socket.AddResponse('{"method": "Test.foo"}', 5) |
| fake_socket.AddResponse('{"method": "Test2.foo"}', 10) |
| |
| inspector.DispatchNotifications() |
| self.assertEqual(0, len(results)) |
| |
| inspector.DispatchNotifications() |
| self.assertEqual(1, len(results)) |
| self.assertEqual('Test2.foo', results[0]['method']) |
| |
| @decorators.Disabled('chromeos') # crbug.com/483212 |
| def testUnregisterDomainWithUnregisteredDomain(self): |
| inspector = inspector_websocket.InspectorWebsocket() |
| with self.assertRaises(AssertionError): |
| inspector.UnregisterDomain('Test') |
| |
| def testAsyncRequest(self): |
| inspector = inspector_websocket.InspectorWebsocket() |
| fake_socket = FakeSocket(self._mock_timer) |
| # pylint: disable=protected-access |
| inspector._socket = fake_socket |
| response_count = [0] |
| |
| def callback0(response): |
| response_count[0] += 1 |
| self.assertEqual(2, response_count[0]) |
| self.assertEqual('response1', response['result']['data']) |
| |
| def callback1(response): |
| response_count[0] += 1 |
| self.assertEqual(1, response_count[0]) |
| self.assertEqual('response2', response['result']['data']) |
| |
| request1 = {'method': 'Test.foo'} |
| inspector.AsyncRequest(request1, callback0) |
| request2 = {'method': 'Test.foo'} |
| inspector.AsyncRequest(request2, callback1) |
| fake_socket.AddResponse('{"id": 5555555, "result": {}}', 1) |
| inspector.DispatchNotifications() |
| self.assertEqual(0, response_count[0]) |
| fake_socket.AddResponse( |
| '{"id": %d, "result": {"data": "response2"}}' % request2['id'], 1) |
| fake_socket.AddResponse( |
| '{"id": %d, "result": {"data": "response1"}}' % request1['id'], 2) |
| inspector.DispatchNotifications() |
| inspector.DispatchNotifications() |
| self.assertEqual(2, response_count[0]) |
| fake_socket.AddResponse('{"id": 6666666, "result": {}}', 1) |
| inspector.DispatchNotifications() |
| self.assertEqual(2, response_count[0]) |
| |
| def testEAGAIN(self): |
| inspector = inspector_websocket.InspectorWebsocket() |
| fake_socket = FakeSocket(self._mock_timer) |
| # pylint: disable=protected-access |
| inspector._socket = fake_socket |
| |
| error = socket.error(errno.EAGAIN, "error string") |
| fake_socket.AddResponse(error, 4) |
| fake_socket.AddResponse('{"asdf": "qwer"}', 5) |
| |
| result = inspector._Receive() |
| self.assertEqual(result, {"asdf" : "qwer"}) |
| |
| def testSocketErrorOtherThanEAGAIN(self): |
| inspector = inspector_websocket.InspectorWebsocket() |
| fake_socket = FakeSocket(self._mock_timer) |
| # pylint: disable=protected-access |
| inspector._socket = fake_socket |
| |
| error = socket.error(errno.EPIPE, "error string") |
| fake_socket.AddResponse(error, 4) |
| |
| self.assertRaises(socket.error, inspector._Receive) |