blob: 342f2f0359680f685e93805898cb4557052ae1d9 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2017 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import logging
import pandas as pd
import time
import acts_contrib.test_utils.bt.bt_test_utils as btutils
from acts_contrib.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
from acts_contrib.test_utils.bt.ble_performance_test_utils import plot_graph
from acts_contrib.test_utils.power.PowerBTBaseTest import ramp_attenuation
from acts_contrib.test_utils.bt.bt_test_utils import setup_multiple_devices_for_bt_test
from acts_contrib.test_utils.bt.bt_test_utils import orchestrate_rfcomm_connection
from acts.signals import TestPass
from acts import utils
from acts_contrib.test_utils.bt.bt_test_utils import write_read_verify_data
INIT_ATTEN = 0
WRITE_ITERATIONS = 500
class BtRfcommThroughputRangeTest(BluetoothBaseTest):
def __init__(self, configs):
super().__init__(configs)
req_params = ['attenuation_vector', 'system_path_loss']
#'attenuation_vector' is a dict containing: start, stop and step of
#attenuation changes
self.unpack_userparams(req_params)
def setup_class(self):
super().setup_class()
self.dut = self.android_devices[0]
self.remote_device = self.android_devices[1]
btutils.enable_bqr(self.android_devices)
if hasattr(self, 'attenuators'):
self.attenuator = self.attenuators[0]
self.attenuator.set_atten(INIT_ATTEN)
self.attenuation_range = range(self.attenuation_vector['start'],
self.attenuation_vector['stop'] + 1,
self.attenuation_vector['step'])
self.log_path = os.path.join(logging.log_path, 'results')
os.makedirs(self.log_path, exist_ok=True)
return setup_multiple_devices_for_bt_test(self.android_devices)
def teardown_test(self):
self.dut.droid.bluetoothSocketConnStop()
self.remote_device.droid.bluetoothSocketConnStop()
if hasattr(self, 'attenuator'):
self.attenuator.set_atten(INIT_ATTEN)
def test_rfcomm_throughput_range(self):
data_points = []
message = "x" * 990
self.file_output = os.path.join(
self.log_path, '{}.csv'.format(self.current_test_name))
if not orchestrate_rfcomm_connection(self.dut, self.remote_device):
return False
self.log.info("RFCOMM Connection established")
for atten in self.attenuation_range:
ramp_attenuation(self.attenuator, atten)
self.log.info('Set attenuation to %d dB', atten)
process_data_dict = btutils.get_bt_metric(self.dut)
rssi_primary = process_data_dict.get('rssi')
pwlv_primary = process_data_dict.get('pwlv')
rssi_primary = rssi_primary.get(self.dut.serial)
pwlv_primary = pwlv_primary.get(self.dut.serial)
self.log.info("DUT RSSI:{} and PwLv:{} with attenuation:{}".format(
rssi_primary, pwlv_primary, atten))
if type(rssi_primary) != str:
data_rate = self.write_read_verify_rfcommdata(
self.dut, self.remote_device, message)
data_point = {
'attenuation_db': atten,
'Dut_RSSI': rssi_primary,
'DUT_PwLv': pwlv_primary,
'Pathloss': atten + self.system_path_loss,
'RfcommThroughput': data_rate
}
data_points.append(data_point)
df = pd.DataFrame(data_points)
# bokeh data for generating BokehFigure
bokeh_data = {
'x_label': 'Pathloss (dBm)',
'primary_y_label': 'RSSI (dBm)',
'log_path': self.log_path,
'current_test_name': self.current_test_name
}
# plot_data for adding line to existing BokehFigure
plot_data = {
'line_one': {
'x_column': 'Pathloss',
'y_column': 'Dut_RSSI',
'legend': 'DUT RSSI (dBm)',
'marker': 'circle_x',
'y_axis': 'default'
},
'line_two': {
'x_column': 'Pathloss',
'y_column': 'RfcommThroughput',
'legend': 'RFCOMM Throughput (bits/sec)',
'marker': 'hex',
'y_axis': 'secondary'
}
}
else:
df.to_csv(self.file_output, index=False)
plot_graph(df,
plot_data,
bokeh_data,
secondary_y_label='RFCOMM Throughput (bits/sec)')
raise TestPass("Reached RFCOMM Max Range,RFCOMM disconnected.")
# Save Data points to csv
df.to_csv(self.file_output, index=False)
# Plot graph
plot_graph(df,
plot_data,
bokeh_data,
secondary_y_label='RFCOMM Throughput (bits/sec)')
self.dut.droid.bluetoothRfcommStop()
self.remote_device.droid.bluetoothRfcommStop()
return True
def write_read_verify_rfcommdata(self, dut, remote_device, msg):
"""Verify that the client wrote data to the remote Android device correctly.
Args:
dut: the Android device to perform the write.
remote_device: the Android device to read the data written.
msg: the message to write.
Returns:
True if the data written matches the data read, false if not.
"""
start_write_time = time.perf_counter()
for n in range(WRITE_ITERATIONS):
try:
dut.droid.bluetoothSocketConnWrite(msg)
except Exception as err:
dut.log.error("Failed to write data: {}".format(err))
return False
try:
read_msg = remote_device.droid.bluetoothSocketConnRead()
except Exception as err:
remote_device.log.error("Failed to read data: {}".format(err))
return False
if msg != read_msg:
self.log.error("Mismatch! Read: {}, Expected: {}".format(
read_msg, msg))
return False
end_read_time = time.perf_counter()
total_num_bytes = 990 * WRITE_ITERATIONS
test_time = (end_read_time - start_write_time)
if (test_time == 0):
dut.log.error("Buffer transmits cannot take zero time")
return 0
data_rate = (1.000 * total_num_bytes) / test_time
self.log.info(
"Calculated using total write and read times: total_num_bytes={}, "
"test_time={}, data rate={:08.0f} bytes/sec, {:08.0f} bits/sec".
format(total_num_bytes, test_time, data_rate, (data_rate * 8)))
data_rate = data_rate * 8
return data_rate