blob: 6ccc949b109197dc94a91bfee52e390ecf684c0e [file] [log] [blame]
#/usr/bin/env python3
#
# Copyright (C) 2018 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import random
import statistics
import string
import time
from acts import asserts
from acts.base_test import BaseTestClass
from acts.signals import TestPass
from acts.test_decorators import test_tracker_info
from acts_contrib.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
from acts_contrib.test_utils.bt.bt_test_utils import orchestrate_rfcomm_connection
from acts_contrib.test_utils.bt.bt_test_utils import setup_multiple_devices_for_bt_test
from acts_contrib.test_utils.bt.bt_test_utils import verify_server_and_client_connected
from acts_contrib.test_utils.bt.bt_test_utils import write_read_verify_data
from acts_contrib.test_utils.bt.loggers.bluetooth_metric_logger import BluetoothMetricLogger
from acts_contrib.test_utils.bt.loggers.protos import bluetooth_metric_pb2 as proto_module
from acts.utils import set_location_service
class BluetoothLatencyTest(BaseTestClass):
"""Connects two Android phones and tests the RFCOMM latency.
Attributes:
client_device: An Android device object that will be sending data
server_device: An Android device object that will be receiving data
bt_logger: The proxy logger instance for each test case
data_transfer_type: Data transfer protocol used for the test
"""
def setup_class(self):
super().setup_class()
# Sanity check of the devices under test
# TODO(b/119051823): Investigate using a config validator to replace this.
if len(self.android_devices) < 2:
raise ValueError(
'Not enough android phones detected (need at least two)')
# Data will be sent from the client_device to the server_device
self.client_device = self.android_devices[0]
self.server_device = self.android_devices[1]
self.bt_logger = BluetoothMetricLogger.for_test_case()
self.data_transfer_type = proto_module.BluetoothDataTestResult.RFCOMM
self.log.info('Successfully found required devices.')
def setup_test(self):
setup_multiple_devices_for_bt_test(self.android_devices)
self._connect_rfcomm()
def teardown_test(self):
if verify_server_and_client_connected(
self.client_device, self.server_device, log=False):
self.client_device.droid.bluetoothSocketConnStop()
self.server_device.droid.bluetoothSocketConnStop()
def _connect_rfcomm(self):
"""Establishes an RFCOMM connection between two phones.
Connects the client device to the server device given the hardware
address of the server device.
"""
set_location_service(self.client_device, True)
set_location_service(self.server_device, True)
server_address = self.server_device.droid.bluetoothGetLocalAddress()
self.log.info('Pairing and connecting devices')
asserts.assert_true(self.client_device.droid
.bluetoothDiscoverAndBond(server_address),
'Failed to pair and connect devices')
# Create RFCOMM connection
asserts.assert_true(orchestrate_rfcomm_connection
(self.client_device, self.server_device),
'Failed to establish RFCOMM connection')
def _measure_latency(self):
"""Measures the latency of data transfer over RFCOMM.
Sends data from the client device that is read by the server device.
Calculates the latency of the transfer.
Returns:
The latency of the transfer milliseconds.
"""
# Generates a random message to transfer
message = (''.join(random.choice(string.ascii_letters + string.digits)
for _ in range(6)))
start_time = time.perf_counter()
write_read_successful = write_read_verify_data(self.client_device,
self.server_device,
message,
False)
end_time = time.perf_counter()
asserts.assert_true(write_read_successful,
'Failed to send/receive message')
return (end_time - start_time) * 1000
@BluetoothBaseTest.bt_test_wrap
@test_tracker_info(uuid='7748295d-204e-4ad0-adf5-7591380b940a')
def test_bluetooth_latency(self):
"""Tests the latency for a data transfer over RFCOMM"""
metrics = {}
latency_list = []
for _ in range(300):
latency_list.append(self._measure_latency())
metrics['data_transfer_protocol'] = self.data_transfer_type
metrics['data_latency_min_millis'] = int(min(latency_list))
metrics['data_latency_max_millis'] = int(max(latency_list))
metrics['data_latency_avg_millis'] = int(statistics.mean(latency_list))
self.log.info('Latency: {}'.format(metrics))
proto = self.bt_logger.get_results(metrics,
self.__class__.__name__,
self.server_device,
self.client_device)
asserts.assert_true(metrics['data_latency_min_millis'] > 0,
'Minimum latency must be greater than 0!',
extras=proto)
raise TestPass('Latency test completed successfully', extras=proto)