blob: 0facbdc8e79a8d3da9e98fea9805e8becd3426ae [file] [log] [blame]
# Lint as: python3
"""Tests for blueberry.tests.bluetooth.bluetooth_latency."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import logging
import math
import random
import string
import time
from mobly import asserts
from mobly import test_runner
from mobly.signals import TestAbortClass
# Internal import
from blueberry.utils import blueberry_base_test
from blueberry.utils import bt_test_utils
from blueberry.utils import metrics_utils
# Internal import
class BluetoothLatencyTest(blueberry_base_test.BlueberryBaseTest):
@retry.logged_retry_on_exception(
retry_intervals=retry.FuzzedExponentialIntervals(
initial_delay_sec=2, factor=5, num_retries=5, max_delay_sec=300))
def _measure_latency(self):
"""Measures the latency of data transfer over RFCOMM.
Sends data from the client device that is read by the server device.
Calculates the latency of the transfer.
Returns:
The latency of the transfer milliseconds.
"""
# Generates a random message to transfer
message = (''.join(
random.choice(string.ascii_letters + string.digits) for _ in range(6)))
start_time = time.time()
write_read_successful = bt_test_utils.write_read_verify_data_sl4a(
self.phone, self.derived_bt_device, message, False)
end_time = time.time()
asserts.assert_true(write_read_successful, 'Failed to send/receive message')
return (end_time - start_time) * 1000
def setup_class(self):
"""Standard Mobly setup class."""
super(BluetoothLatencyTest, self).setup_class()
if len(self.android_devices) < 2:
raise TestAbortClass(
'Not enough android phones detected (need at least two)')
self.phone = self.android_devices[0]
self.phone.init_setup()
self.phone.sl4a_setup()
# We treat the secondary phone as a derived_bt_device in order for the
# generic script to work with this android phone properly. Data will be sent
# from first phone to the second phone.
self.derived_bt_device = self.android_devices[1]
self.derived_bt_device.init_setup()
self.derived_bt_device.sl4a_setup()
self.set_btsnooplogmode_full(self.phone)
self.set_btsnooplogmode_full(self.derived_bt_device)
self.metrics = (
metrics_utils.BluetoothMetricLogger(
metrics_pb2.BluetoothDataTestResult()))
self.metrics.add_primary_device_metrics(self.phone)
self.metrics.add_connected_device_metrics(self.derived_bt_device)
self.data_transfer_type = metrics_pb2.BluetoothDataTestResult.RFCOMM
self.iterations = int(self.user_params.get('iterations', 300))
logging.info('Running Bluetooth latency test %s times.', self.iterations)
logging.info('Successfully found required devices.')
def setup_test(self):
"""Setup for bluetooth latency test."""
logging.info('Setup Test for test_bluetooth_latency')
super(BluetoothLatencyTest, self).setup_test()
asserts.assert_true(self.phone.connect_with_rfcomm(self.derived_bt_device),
'Failed to establish RFCOMM connection')
def test_bluetooth_latency(self):
"""Tests the latency for a data transfer over RFCOMM."""
metrics = {}
latency_list = []
for _ in range(self.iterations):
latency_list.append(self._measure_latency())
metrics['data_transfer_protocol'] = self.data_transfer_type
metrics['data_latency_min_millis'] = int(min(latency_list))
metrics['data_latency_max_millis'] = int(max(latency_list))
metrics['data_latency_avg_millis'] = int(
math.fsum(latency_list) / float(len(latency_list)))
logging.info('Latency: %s', metrics)
asserts.assert_true(metrics['data_latency_min_millis'] > 0,
'Minimum latency must be greater than 0!')
self.metrics.add_test_metrics(metrics)
for metric in metrics:
self.record_data({
'Test Name': 'test_bluetooth_latency',
'sponge_properties': {
metric: metrics[metric],
}
})
def teardown_class(self):
logging.info('Factory resetting Bluetooth on devices.')
self.phone.sl4a.bluetoothSocketConnStop()
self.derived_bt_device.sl4a.bluetoothSocketConnStop()
self.phone.factory_reset_bluetooth()
self.derived_bt_device.factory_reset_bluetooth()
super(BluetoothLatencyTest, self).teardown_class()
self.record_data({
'Test Name': 'test_bluetooth_latency',
'sponge_properties': {
'proto_ascii':
self.metrics.proto_message_to_ascii(),
'primary_device_build':
self.phone.get_device_info()['android_release_id']
}
})
if __name__ == '__main__':
test_runner.main()