| #/usr/bin/env python3 |
| # |
| # Copyright (C) 2018 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| |
| import random |
| import statistics |
| import string |
| import time |
| from acts import asserts |
| from acts.base_test import BaseTestClass |
| from acts.signals import TestPass |
| from acts.test_decorators import test_tracker_info |
| from acts_contrib.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest |
| from acts_contrib.test_utils.bt.bt_test_utils import orchestrate_rfcomm_connection |
| from acts_contrib.test_utils.bt.bt_test_utils import setup_multiple_devices_for_bt_test |
| from acts_contrib.test_utils.bt.bt_test_utils import verify_server_and_client_connected |
| from acts_contrib.test_utils.bt.bt_test_utils import write_read_verify_data |
| from acts_contrib.test_utils.bt.loggers.bluetooth_metric_logger import BluetoothMetricLogger |
| from acts_contrib.test_utils.bt.loggers.protos import bluetooth_metric_pb2 as proto_module |
| from acts.utils import set_location_service |
| |
| |
| class BluetoothLatencyTest(BaseTestClass): |
| """Connects two Android phones and tests the RFCOMM latency. |
| |
| Attributes: |
| client_device: An Android device object that will be sending data |
| server_device: An Android device object that will be receiving data |
| bt_logger: The proxy logger instance for each test case |
| data_transfer_type: Data transfer protocol used for the test |
| """ |
| |
| def setup_class(self): |
| super().setup_class() |
| |
| # Sanity check of the devices under test |
| # TODO(b/119051823): Investigate using a config validator to replace this. |
| if len(self.android_devices) < 2: |
| raise ValueError( |
| 'Not enough android phones detected (need at least two)') |
| |
| # Data will be sent from the client_device to the server_device |
| self.client_device = self.android_devices[0] |
| self.server_device = self.android_devices[1] |
| self.bt_logger = BluetoothMetricLogger.for_test_case() |
| self.data_transfer_type = proto_module.BluetoothDataTestResult.RFCOMM |
| self.log.info('Successfully found required devices.') |
| |
| def setup_test(self): |
| setup_multiple_devices_for_bt_test(self.android_devices) |
| self._connect_rfcomm() |
| |
| def teardown_test(self): |
| if verify_server_and_client_connected( |
| self.client_device, self.server_device, log=False): |
| self.client_device.droid.bluetoothSocketConnStop() |
| self.server_device.droid.bluetoothSocketConnStop() |
| |
| def _connect_rfcomm(self): |
| """Establishes an RFCOMM connection between two phones. |
| |
| Connects the client device to the server device given the hardware |
| address of the server device. |
| """ |
| |
| set_location_service(self.client_device, True) |
| set_location_service(self.server_device, True) |
| server_address = self.server_device.droid.bluetoothGetLocalAddress() |
| self.log.info('Pairing and connecting devices') |
| asserts.assert_true(self.client_device.droid |
| .bluetoothDiscoverAndBond(server_address), |
| 'Failed to pair and connect devices') |
| |
| # Create RFCOMM connection |
| asserts.assert_true(orchestrate_rfcomm_connection |
| (self.client_device, self.server_device), |
| 'Failed to establish RFCOMM connection') |
| |
| def _measure_latency(self): |
| """Measures the latency of data transfer over RFCOMM. |
| |
| Sends data from the client device that is read by the server device. |
| Calculates the latency of the transfer. |
| |
| Returns: |
| The latency of the transfer milliseconds. |
| """ |
| |
| # Generates a random message to transfer |
| message = (''.join(random.choice(string.ascii_letters + string.digits) |
| for _ in range(6))) |
| |
| start_time = time.perf_counter() |
| write_read_successful = write_read_verify_data(self.client_device, |
| self.server_device, |
| message, |
| False) |
| end_time = time.perf_counter() |
| asserts.assert_true(write_read_successful, |
| 'Failed to send/receive message') |
| return (end_time - start_time) * 1000 |
| |
| @BluetoothBaseTest.bt_test_wrap |
| @test_tracker_info(uuid='7748295d-204e-4ad0-adf5-7591380b940a') |
| def test_bluetooth_latency(self): |
| """Tests the latency for a data transfer over RFCOMM""" |
| |
| metrics = {} |
| latency_list = [] |
| |
| for _ in range(300): |
| latency_list.append(self._measure_latency()) |
| |
| metrics['data_transfer_protocol'] = self.data_transfer_type |
| metrics['data_latency_min_millis'] = int(min(latency_list)) |
| metrics['data_latency_max_millis'] = int(max(latency_list)) |
| metrics['data_latency_avg_millis'] = int(statistics.mean(latency_list)) |
| self.log.info('Latency: {}'.format(metrics)) |
| |
| proto = self.bt_logger.get_results(metrics, |
| self.__class__.__name__, |
| self.server_device, |
| self.client_device) |
| |
| asserts.assert_true(metrics['data_latency_min_millis'] > 0, |
| 'Minimum latency must be greater than 0!', |
| extras=proto) |
| |
| raise TestPass('Latency test completed successfully', extras=proto) |