blob: 3473cefc6a46fa942284c3c587bb4f84e80ca469 [file] [log] [blame] [edit]
#!/usr/bin/env python3
#
# Copyright 2017 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test script to execute BLE connection,run data traffic and calculating RSSI value of the remote BLE device.
"""
import os
import logging
import pandas as pd
import numpy as np
import time
import acts_contrib.test_utils.bt.bt_test_utils as btutils
import acts_contrib.test_utils.wifi.wifi_performance_test_utils.bokeh_figure as bokeh_figure
from acts_contrib.test_utils.bt.ble_performance_test_utils import ble_coc_connection
from acts_contrib.test_utils.bt.ble_performance_test_utils import ble_gatt_disconnection
from acts_contrib.test_utils.bt.ble_performance_test_utils import start_advertising_and_scanning
from acts_contrib.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
from acts_contrib.test_utils.bt.bt_test_utils import cleanup_scanners_and_advertisers
from acts_contrib.test_utils.bt.ble_performance_test_utils import establish_ble_connection
from acts_contrib.test_utils.bt.bt_constants import l2cap_max_inactivity_delay_after_disconnect
from acts_contrib.test_utils.bt.ble_performance_test_utils import run_ble_throughput
from acts_contrib.test_utils.bt.ble_performance_test_utils import read_ble_rssi
from acts_contrib.test_utils.bt.ble_performance_test_utils import read_ble_scan_rssi
from acts_contrib.test_utils.bt.bt_test_utils import reset_bluetooth
from acts_contrib.test_utils.power.PowerBTBaseTest import ramp_attenuation
from acts_contrib.test_utils.bt.bt_test_utils import setup_multiple_devices_for_bt_test
from acts.signals import TestPass
from acts import utils
INIT_ATTEN = 0
MAX_RSSI = 92
class BleRangeTest(BluetoothBaseTest):
active_adv_callback_list = []
active_scan_callback_list = []
def __init__(self, configs):
super().__init__(configs)
req_params = ['attenuation_vector', 'system_path_loss']
#'attenuation_vector' is a dict containing: start, stop and step of
#attenuation changes
self.unpack_userparams(req_params)
def setup_class(self):
super().setup_class()
self.client_ad = self.android_devices[0]
# The client which is scanning will need location to be enabled in order to
# start scan and get scan results.
utils.set_location_service(self.client_ad, True)
self.server_ad = self.android_devices[1]
# Note that some tests required a third device.
if hasattr(self, 'attenuators'):
self.attenuator = self.attenuators[0]
self.attenuator.set_atten(INIT_ATTEN)
self.attenuation_range = range(self.attenuation_vector['start'],
self.attenuation_vector['stop'] + 1,
self.attenuation_vector['step'])
self.log_path = os.path.join(logging.log_path, 'results')
os.makedirs(self.log_path, exist_ok=True)
# BokehFigure object
self.plot = bokeh_figure.BokehFigure(
title='{}'.format(self.current_test_name),
x_label='Pathloss (dB)',
primary_y_label='BLE RSSI (dBm)',
secondary_y_label='DUT Tx Power (dBm)',
axis_label_size='16pt')
if len(self.android_devices) > 2:
self.server2_ad = self.android_devices[2]
btutils.enable_bqr(self.android_devices)
return setup_multiple_devices_for_bt_test(self.android_devices)
def teardown_test(self):
self.client_ad.droid.bluetoothSocketConnStop()
self.server_ad.droid.bluetoothSocketConnStop()
if hasattr(self, 'attenuator'):
self.attenuator.set_atten(INIT_ATTEN)
# Give sufficient time for the physical LE link to be disconnected.
time.sleep(l2cap_max_inactivity_delay_after_disconnect)
cleanup_scanners_and_advertisers(self.client_ad,
self.active_scan_callback_list,
self.server_ad,
self.active_adv_callback_list)
def test_ble_gatt_connection_range(self):
"""Test GATT connection over LE and read RSSI.
Test will establish a gatt connection between a GATT server and GATT
client then read the RSSI for each attenuation until the BLE link get disconnect
Expected Result:
Verify that a connection was established and then disconnected
successfully. Verify that the RSSI was read correctly.
"""
attenuation = []
ble_rssi = []
dut_pwlv = []
path_loss = []
bluetooth_gatt, gatt_callback, adv_callback, gatt_server = establish_ble_connection(
self.client_ad, self.server_ad)
for atten in self.attenuation_range:
ramp_attenuation(self.attenuator, atten)
self.log.info('Set attenuation to %d dB', atten)
rssi_primary, pwlv_primary = self.get_ble_rssi_and_pwlv()
self.log.info(
"Dut BLE RSSI:{} and Pwlv:{} with attenuation:{}".format(
rssi_primary, pwlv_primary, atten))
rssi = self.client_ad.droid.gattClientReadRSSI(gatt_server)
if type(rssi_primary) != str:
attenuation.append(atten)
ble_rssi.append(rssi_primary)
dut_pwlv.append(pwlv_primary)
path_loss.append(atten + self.system_path_loss)
df = pd.DataFrame({
'Attenuation': attenuation,
'BLE_RSSI': ble_rssi,
'Dut_PwLv': dut_pwlv,
'Pathloss': path_loss
})
filepath = os.path.join(
self.log_path, '{}.csv'.format(self.current_test_name))
else:
self.plot_ble_graph(df)
df.to_csv(filepath, encoding='utf-8')
raise TestPass('Reached BLE Max Range, BLE Gatt disconnected')
ble_gatt_disconnection(self.client_ad, bluetooth_gatt, gatt_callback)
self.plot_ble_graph(df)
df.to_csv(filepath, encoding='utf-8')
self.server_ad.droid.bleStopBleAdvertising(adv_callback)
return True
def test_ble_coc_throughput_range(self):
"""Test LE CoC data transfer and read RSSI with each attenuation
Test will establish a L2CAP CoC connection between client and server
then start BLE date transfer and read the RSSI for each attenuation
until the BLE link get disconnect
Expected Result:
BLE data transfer successful and Read RSSi Value of the server
"""
attenuation = []
ble_rssi = []
throughput = []
dut_pwlv = []
path_loss = []
self.plot_throughput = bokeh_figure.BokehFigure(
title='{}'.format(self.current_test_name),
x_label='Pathloss (dB)',
primary_y_label='BLE Throughput (bits per sec)',
axis_label_size='16pt')
status, gatt_callback, gatt_server, bluetooth_gatt, client_conn_id = ble_coc_connection(
self.server_ad, self.client_ad)
for atten in self.attenuation_range:
ramp_attenuation(self.attenuator, atten)
self.log.info('Set attenuation to %d dB', atten)
datarate = run_ble_throughput(self.client_ad, client_conn_id,
self.server_ad)
rssi_primary, pwlv_primary = self.get_ble_rssi_and_pwlv()
self.log.info(
"BLE RSSI is:{} dBm and Tx Power:{} with attenuation {} dB with throughput:{}bits per sec"
.format(rssi_primary, pwlv_primary, atten, datarate))
if type(rssi_primary) != str:
attenuation.append(atten)
ble_rssi.append(rssi_primary)
dut_pwlv.append(pwlv_primary)
throughput.append(datarate)
path_loss.append(atten + self.system_path_loss)
df = pd.DataFrame({
'Attenuation': attenuation,
'BLE_RSSI': ble_rssi,
'Dut_PwLv': dut_pwlv,
'Throughput': throughput,
'Pathloss': path_loss
})
filepath = os.path.join(
self.log_path, '{}.csv'.format(self.current_test_name))
results_file_path = os.path.join(
self.log_path,
'{}_throughput.html'.format(self.current_test_name))
self.plot_throughput.add_line(df['Pathloss'],
df['Throughput'],
legend='BLE Throughput',
marker='square_x')
else:
self.plot_ble_graph(df)
self.plot_throughput.generate_figure()
bokeh_figure.BokehFigure.save_figures([self.plot_throughput],
results_file_path)
df.to_csv(filepath, encoding='utf-8')
raise TestPass('Reached BLE Max Range, BLE Gatt disconnected')
self.plot_ble_graph(df)
self.plot_throughput.generate_figure()
bokeh_figure.BokehFigure.save_figures([self.plot_throughput],
results_file_path)
df.to_csv(filepath, encoding='utf-8')
ble_gatt_disconnection(self.server_ad, bluetooth_gatt, gatt_callback)
return True
def test_ble_scan_remote_rssi(self):
data_points = []
for atten in self.attenuation_range:
csv_path = os.path.join(
self.log_path,
'{}_attenuation_{}.csv'.format(self.current_test_name, atten))
ramp_attenuation(self.attenuator, atten)
self.log.info('Set attenuation to %d dB', atten)
adv_callback, scan_callback = start_advertising_and_scanning(
self.client_ad, self.server_ad, Legacymode=False)
self.active_adv_callback_list.append(adv_callback)
self.active_scan_callback_list.append(scan_callback)
average_rssi, raw_rssi, timestamp = read_ble_scan_rssi(
self.client_ad, scan_callback)
self.log.info(
"Scanned rssi list of the remote device is :{}".format(
raw_rssi))
self.log.info(
"BLE RSSI of the remote device is:{} dBm".format(average_rssi))
min_rssi = min(raw_rssi)
max_rssi = max(raw_rssi)
path_loss = atten + self.system_path_loss
std_deviation = np.std(raw_rssi)
data_point = {
'Attenuation': atten,
'BLE_RSSI': average_rssi,
'Pathloss': path_loss,
'Min_RSSI': min_rssi,
'Max_RSSI': max_rssi,
'Standard_deviation': std_deviation
}
data_points.append(data_point)
df = pd.DataFrame({'timestamp': timestamp, 'raw rssi': raw_rssi})
df.to_csv(csv_path, encoding='utf-8', index=False)
try:
self.server_ad.droid.bleAdvSetStopAdvertisingSet(adv_callback)
except Exception as err:
self.log.warning(
"Failed to stop advertisement: {}".format(err))
reset_bluetooth([self.server_ad])
self.client_ad.droid.bleStopBleScan(scan_callback)
filepath = os.path.join(
self.log_path, '{}_summary.csv'.format(self.current_test_name))
ble_df = pd.DataFrame(data_points)
ble_df.to_csv(filepath, encoding='utf-8')
return True
def plot_ble_graph(self, df):
""" Plotting BLE RSSI and Throughput with Attenuation.
Args:
df: Summary of results contains attenuation, BLE_RSSI and Throughput
"""
self.plot.add_line(df['Pathloss'],
df['BLE_RSSI'],
legend='DUT BLE RSSI (dBm)',
marker='circle_x')
self.plot.add_line(df['Pathloss'],
df['Dut_PwLv'],
legend='DUT TX Power (dBm)',
marker='hex',
y_axis='secondary')
results_file_path = os.path.join(
self.log_path, '{}.html'.format(self.current_test_name))
self.plot.generate_figure()
bokeh_figure.BokehFigure.save_figures([self.plot], results_file_path)
def get_ble_rssi_and_pwlv(self):
process_data_dict = btutils.get_bt_metric(self.client_ad)
rssi_primary = process_data_dict.get('rssi')
pwlv_primary = process_data_dict.get('pwlv')
rssi_primary = rssi_primary.get(self.client_ad.serial)
pwlv_primary = pwlv_primary.get(self.client_ad.serial)
return rssi_primary, pwlv_primary