blob: 394e962abc3a3df0d01d1da99c61ea422bdae7b2 [file] [log] [blame]
#!/usr/bin/env python3
#
# Copyright 2018 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
import statistics
from queue import Empty
from concurrent.futures import ThreadPoolExecutor
from acts_contrib.test_utils.bt.bt_gatt_utils import close_gatt_client
from acts_contrib.test_utils.bt.bt_coc_test_utils import do_multi_connection_throughput
from acts_contrib.test_utils.bt.bt_gatt_utils import disconnect_gatt_connection
from acts_contrib.test_utils.bt.bt_constants import gatt_cb_err
from acts_contrib.test_utils.bt.bt_constants import gatt_cb_strings
from acts_contrib.test_utils.bt.bt_constants import l2cap_coc_header_size
from acts_contrib.test_utils.bt.bt_gatt_utils import GattTestUtilsError
from acts_contrib.test_utils.bt.bt_coc_test_utils import orchestrate_coc_connection
from acts_contrib.test_utils.bt.bt_gatt_utils import orchestrate_gatt_connection
default_event_timeout = 10
rssi_read_duration = 25
def establish_ble_connection(client_ad, server_ad):
"""Function to establish BLE connection between two BLE devices.
Args:
client_ad: the Android device performing the connection.
server_ad: the Android device accepting the connection.
Returns:
bluetooth_gatt: GATT object
gatt_callback: Gatt callback object
adv_callback: advertisement callback object
gatt_server: the gatt server
"""
gatt_server_cb = server_ad.droid.gattServerCreateGattServerCallback()
gatt_server = server_ad.droid.gattServerOpenGattServer(gatt_server_cb)
try:
bluetooth_gatt, gatt_callback, adv_callback = (
orchestrate_gatt_connection(client_ad, server_ad))
except GattTestUtilsError as err:
logging.error(err)
return False
return bluetooth_gatt, gatt_callback, adv_callback, gatt_server
def read_ble_rssi(client_ad, gatt_server, gatt_callback):
"""Function to Read BLE RSSI of the remote BLE device.
Args:
client_ad: the Android device performing the connection.
gatt_server: the gatt server
gatt_callback:the gatt connection call back object
Returns:
ble_rssi: RSSI value of the remote BLE device
"""
AVG_RSSI = []
end_time = time.time() + rssi_read_duration
logging.info("Reading BLE RSSI for {} sec".format(rssi_read_duration))
while time.time() < end_time:
expected_event = gatt_cb_strings['rd_remote_rssi'].format(
gatt_callback)
read_rssi = client_ad.droid.gattClientReadRSSI(gatt_server)
if read_rssi:
try:
event = client_ad.ed.pop_event(expected_event,
default_event_timeout)
except Empty:
logging.error(
gatt_cb_err['rd_remote_rssi_err'].format(expected_event))
return False
rssi_value = event['data']['Rssi']
AVG_RSSI.append(rssi_value)
logging.debug("First & Last reading of RSSI :{:03d} & {:03d}".format(
AVG_RSSI[0], AVG_RSSI[-1]))
ble_rssi = statistics.mean(AVG_RSSI)
ble_rssi = round(ble_rssi, 2)
return ble_rssi
def ble_coc_connection(client_ad, server_ad):
"""Sets up the CoC connection between two Android devices.
Args:
client_ad: the Android device performing the connection.
server_ad: the Android device accepting the connection.
Returns:
True if connection was successful or false if unsuccessful,
gatt_callback: GATT callback object
client connection ID: Client connection ID
and server connection ID : server connection ID
"""
#secured_conn: True if using secured connection
#le_connection_interval: LE Connection interval. 0 means use default.
#buffer_size : is the number of bytes per L2CAP data buffer
#le_tx_data_length: LE Data Length used by BT Controller to transmit.
is_secured = False
le_connection_interval = 30
buffer_size = 240
le_tx_data_length = buffer_size + l2cap_coc_header_size
gatt_server_cb = server_ad.droid.gattServerCreateGattServerCallback()
gatt_server = server_ad.droid.gattServerOpenGattServer(gatt_server_cb)
logging.info(
"orchestrate_ble_coc_connection. is_secured={}, Connection Interval={}msec, "
"buffer_size={}bytes".format(is_secured, le_connection_interval,
buffer_size))
try:
status, client_conn_id, server_conn_id, bluetooth_gatt, gatt_callback = orchestrate_coc_connection(
client_ad,
server_ad,
True,
is_secured,
le_connection_interval,
le_tx_data_length,
gatt_disconnection=False)
except Exception as err:
logging.info("Failed to esatablish COC connection".format(err))
return 0
return True, gatt_callback, gatt_server, bluetooth_gatt, client_conn_id
def run_ble_throughput(server_ad, client_conn_id, client_ad,
num_iterations=30):
"""Function to measure Throughput from one client to one-or-many servers
Args:
server_ad: the Android device accepting the connection.
client_conn_id: the client connection ID.
client_ad: the Android device performing the connection.
num_iterations: The num_iterations is that number of repetitions of each
set of buffers r/w.
Returns:
data_rate: Throughput in terms of bytes per second, 0 if test failed.
"""
# number_buffers is the total number of data buffers to transmit per
# set of buffers r/w.
# buffer_size is the number of bytes per L2CAP data buffer.
number_buffers = 100
buffer_size = 240
list_server_ad = [server_ad]
list_client_conn_id = [client_conn_id]
data_rate = do_multi_connection_throughput(client_ad, list_server_ad,
list_client_conn_id,
num_iterations, number_buffers,
buffer_size)
if data_rate <= 0:
return False
data_rate = data_rate * 8
logging.info(
"run_ble_coc_connection_throughput: throughput=%d bites per sec",
data_rate)
return data_rate
def run_ble_throughput_and_read_rssi(client_ad, server_ad, client_conn_id,
gatt_server, gatt_callback):
"""Function to measure ble rssi while sendinng data from client to server
Args:
client_ad: the Android device performing the connection.
server_ad: the Android device accepting the connection.
client_conn_id: the client connection ID.
gatt_server: the gatt server
gatt_callback: Gatt callback object
Returns:
ble_rssi: RSSI value of the remote BLE device.
"""
executor = ThreadPoolExecutor(2)
ble_throughput = executor.submit(run_ble_throughput, client_ad,
client_conn_id, server_ad)
ble_rssi = executor.submit(read_ble_rssi, server_ad, gatt_server,
gatt_callback)
logging.info("BLE RSSI is:{} dBm with data rate={} bites per sec ".format(
ble_rssi.result(), ble_throughput.result()))
return ble_rssi.result()
def ble_gatt_disconnection(client_ad, bluetooth_gatt, gatt_callback):
"""Function to disconnect GATT connection between client and server.
Args:
client_ad: the Android device performing the connection.
bluetooth_gatt: GATT object
gatt_callback:the gatt connection call back object
Returns:
ble_rssi: RSSI value of the remote BLE device
"""
logging.info("Disconnecting from peripheral device.")
try:
disconnect_gatt_connection(client_ad, bluetooth_gatt, gatt_callback)
close_gatt_client(client_ad, bluetooth_gatt)
except GattTestUtilsError as err:
logging.error(err)
return False
return True