Change RFCOMM tests to use new SL4A Apis
Remove threading in python since SL4A now
handles the RFCOMM connect and accept
threads. Change API calls accordingly.
Also cleaned up the code.
Change-Id: I4024c3685a35f598c712afa596fe51114222c824
(cherry picked from commit 24506940d005cc81aa371b060e03eb6ff5806fdb)
diff --git a/acts/framework/acts/test_utils/bt/bt_test_utils.py b/acts/framework/acts/test_utils/bt/bt_test_utils.py
index 8400500..5470094 100644
--- a/acts/framework/acts/test_utils/bt/bt_test_utils.py
+++ b/acts/framework/acts/test_utils/bt/bt_test_utils.py
@@ -612,33 +612,37 @@
call(["adb -s " + ad.serial + " shell kill " + pid], shell=True)
-def rfcomm_connect(ad, device_address):
- rf_client_ad = ad
- log.debug("Performing RFCOMM connection to {}".format(device_address))
- try:
- ad.droid.bluetoothRfcommConnect(device_address)
- except Exception as err:
- log.error("Failed to connect: {}".format(err))
- ad.droid.bluetoothRfcommCloseSocket()
- return
- finally:
- return
- return
+def orchestrate_rfcomm_connection(client_ad,
+ server_ad,
+ timeout=default_timeout):
+ """Sets up the RFCOMM connection between two Android devices.
-
-def rfcomm_accept(ad):
- rf_server_ad = ad
- log.debug("Performing RFCOMM accept")
- try:
- ad.droid.bluetoothRfcommAccept(RfcommUuid.DEFAULT_UUID.value,
- default_timeout)
- except Exception as err:
- log.error("Failed to accept: {}".format(err))
- ad.droid.bluetoothRfcommCloseSocket()
- return
- finally:
- return
- return
+ Args:
+ client_ad: the Android device performing the connection
+ server_ad: the Android device accepting the connection
+ Returns:
+ True if connection was successful, false if unsuccessful.
+ """
+ server_ad.droid.bluetoothRfcommBeginAcceptThread(
+ RfcommUuid.DEFAULT_UUID.value, timeout)
+ client_ad.droid.bluetoothRfcommBeginConnectThread(
+ server_ad.droid.bluetoothGetLocalAddress())
+ end_time = time.time() + timeout
+ result = False
+ test_result = True
+ while time.time() < end_time:
+ if len(client_ad.droid.bluetoothRfcommActiveConnections()) > 0:
+ log.info("RFCOMM Client Connection Active")
+ break
+ end_time = time.time() + timeout
+ while time.time() < end_time:
+ if len(server_ad.droid.bluetoothRfcommActiveConnections()) > 0:
+ log.info("RFCOMM Server Connection Active")
+ break
+ if not test_result:
+ log.error("Failed to establish an RFCOMM connection")
+ return False
+ return True
def write_read_verify_data(client_ad, server_ad, msg, binary=False):
@@ -666,3 +670,22 @@
log.error("Mismatch! Read: {}, Expected: {}".format(read_msg, msg))
return False
return True
+
+
+def clear_bonded_devices(ad):
+ """Clear bonded devices from the input Android device
+
+ Args:
+ ad: the Android device performing the connection
+ Returns:
+ True if clearing bonded devices was successful, false if unsuccessful.
+ """
+ bonded_device_list = ad.droid.bluetoothGetBondedDevices()
+ for device in bonded_device_list:
+ device_address = device['address']
+ if not ad.droid.bluetoothUnbond(device_address):
+ log.error("Failed to unbond {}".format(device_address))
+ return False
+ log.info("Successfully unbonded {}".format(device_address))
+ return True
+
diff --git a/acts/tests/google/bt/RfcommTest.py b/acts/tests/google/bt/RfcommTest.py
index a445d42..5577f03 100644
--- a/acts/tests/google/bt/RfcommTest.py
+++ b/acts/tests/google/bt/RfcommTest.py
@@ -24,21 +24,22 @@
from queue import Empty
from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
+from acts.test_utils.bt.bt_test_utils import clear_bonded_devices
from acts.test_utils.bt.bt_test_utils import log_energy_info
from acts.test_utils.bt.bt_test_utils import kill_bluetooth_process
+from acts.test_utils.bt.bt_test_utils import orchestrate_rfcomm_connection
from acts.test_utils.bt.bt_test_utils import reset_bluetooth
-from acts.test_utils.bt.bt_test_utils import rfcomm_accept
-from acts.test_utils.bt.bt_test_utils import rfcomm_connect
from acts.test_utils.bt.bt_test_utils import setup_multiple_devices_for_bt_test
from acts.test_utils.bt.bt_test_utils import take_btsnoop_logs
from acts.test_utils.bt.bt_test_utils import write_read_verify_data
+from acts.test_utils.bt.BtEnum import RfcommUuid
+
class RfcommTest(BluetoothBaseTest):
default_timeout = 10
rf_client_th = 0
scan_discovery_time = 5
- thread_list = []
message = (
"Space: the final frontier. These are the voyages of "
"the starship Enterprise. Its continuing mission: to explore "
@@ -50,23 +51,21 @@
self.client_ad = self.android_devices[0]
self.server_ad = self.android_devices[1]
- def _clear_bonded_devices(self):
- for a in self.android_devices:
- bonded_device_list = a.droid.bluetoothGetBondedDevices()
- for device in bonded_device_list:
- a.droid.bluetoothUnbond(device['address'])
-
def setup_class(self):
return setup_multiple_devices_for_bt_test(self.android_devices)
def setup_test(self):
- self._clear_bonded_devices()
+ for a in self.android_devices:
+ if not clear_bonded_devices(a):
+ return False
self.log.debug(log_energy_info(self.android_devices, "Start"))
for a in self.android_devices:
a.ed.clear_all_events()
return True
def teardown_test(self):
+ self.client_ad.droid.bluetoothRfcommCloseClientSocket()
+ self.server_ad.droid.bluetoothRfcommCloseServerSocket()
self.log.debug(log_energy_info(self.android_devices, "End"))
return True
@@ -80,31 +79,6 @@
self.server_ad.droid.bluetoothRfcommStop()
self.client_ad.droid.bluetoothRfcommCloseSocket()
self.server_ad.droid.bluetoothRfcommCloseSocket()
- for thread in self.thread_list:
- thread.join()
- self.thread_list.clear()
-
- def orchestrate_rfcomm_connect(self, server_mac):
- accept_thread = threading.Thread(target=rfcomm_accept,
- args=(self.server_ad, ))
- self.thread_list.append(accept_thread)
- accept_thread.start()
- connect_thread = threading.Thread(target=rfcomm_connect,
- args=(self.client_ad, server_mac))
- self.rf_client_th = connect_thread
- self.thread_list.append(connect_thread)
- connect_thread.start()
- for thread in self.thread_list:
- thread.join()
- end_time = time.time() + self.default_timeout
- result = False
- while time.time() < end_time:
- if len(self.client_ad.droid.bluetoothRfcommActiveConnections(
- )) > 0:
- self.log.info("RFCOMM Connection Active")
- return True
- self.log.error("Failed to establish an RFCOMM connection")
- return False
@BluetoothBaseTest.bt_test_wrap
def test_rfcomm_connection(self):
@@ -128,15 +102,13 @@
TAGS: Classic, RFCOMM
Priority: 1
"""
- server_mac = self.server_ad.droid.bluetoothGetLocalAddress()
- if not self.orchestrate_rfcomm_connect(server_mac):
+ if not orchestrate_rfcomm_connection(self.client_ad, self.server_ad):
return False
self.client_ad.droid.bluetoothRfcommStop()
self.server_ad.droid.bluetoothRfcommStop()
return True
-
@BluetoothBaseTest.bt_test_wrap
def test_rfcomm_connection_write_ascii(self):
"""Test bluetooth RFCOMM writing and reading ascii data
@@ -162,8 +134,7 @@
TAGS: Classic, RFCOMM
Priority: 1
"""
- server_mac = self.server_ad.droid.bluetoothGetLocalAddress()
- if not self.orchestrate_rfcomm_connect(server_mac):
+ if not orchestrate_rfcomm_connection(self.client_ad, self.server_ad):
return False
if not write_read_verify_data(self.client_ad, self.server_ad,
self.message, False):
@@ -204,8 +175,7 @@
TAGS: Classic, RFCOMM
Priority: 1
"""
- server_mac = self.server_ad.droid.bluetoothGetLocalAddress()
- if not self.orchestrate_rfcomm_connect(server_mac):
+ if not orchestrate_rfcomm_connection(self.client_ad, self.server_ad):
return False
binary_message = "11010101"
if not write_read_verify_data(self.client_ad, self.server_ad,
diff --git a/acts/tests/google/bt/system_tests/BtStressTest.py b/acts/tests/google/bt/system_tests/BtStressTest.py
index d1d452b..ff578cc 100644
--- a/acts/tests/google/bt/system_tests/BtStressTest.py
+++ b/acts/tests/google/bt/system_tests/BtStressTest.py
@@ -19,6 +19,7 @@
import time
from acts.base_test import BaseTestClass
+from acts.test_utils.bt.bt_test_utils import clear_bonded_devices
from acts.test_utils.bt.bt_test_utils import log_energy_info
from acts.test_utils.bt.bt_test_utils import pair_pri_to_sec
from acts.test_utils.bt.bt_test_utils import reset_bluetooth
@@ -107,9 +108,8 @@
self.log.error("Failed to bond devices.")
return False
for ad in self.android_devices:
- bonded_devices = ad.droid.bluetoothGetBondedDevices()
- for b in bonded_devices:
- ad.droid.bluetoothUnbond(b['address'])
+ if not clear_bonded_devices(ad):
+ return False
#Necessary sleep time for entries to update unbonded state
time.sleep(1)
bonded_devices = ad.droid.bluetoothGetBondedDevices()
diff --git a/acts/tests/google/bt/system_tests/RfcommStressTest.py b/acts/tests/google/bt/system_tests/RfcommStressTest.py
index 2ce6a59..a7b6ea3 100644
--- a/acts/tests/google/bt/system_tests/RfcommStressTest.py
+++ b/acts/tests/google/bt/system_tests/RfcommStressTest.py
@@ -26,15 +26,14 @@
from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
from acts.test_utils.bt.bt_test_utils import reset_bluetooth
from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
-from acts.test_utils.bt.bt_test_utils import rfcomm_accept
-from acts.test_utils.bt.bt_test_utils import rfcomm_connect
+from acts.test_utils.bt.bt_test_utils import orchestrate_rfcomm_connection
from acts.test_utils.bt.bt_test_utils import take_btsnoop_logs
+from acts.test_utils.bt.bt_test_utils import write_read_verify_data
class RfcommStressTest(BluetoothBaseTest):
default_timeout = 10
scan_discovery_time = 5
- thread_list = []
message = (
"Space: the final frontier. These are the voyages of "
"the starship Enterprise. Its continuing mission: to explore "
@@ -54,22 +53,6 @@
take_btsnoop_logs(self.android_devices, self, test_name)
reset_bluetooth(self.android_devices)
- def teardown_test(self):
- with suppress(Exception):
- for thread in self.thread_list:
- thread.join()
-
- def orchestrate_rfcomm_connect(self, server_mac):
- accept_thread = threading.Thread(target=rfcomm_accept,
- args=(self.server_ad, ))
- self.thread_list.append(accept_thread)
- accept_thread.start()
- connect_thread = threading.Thread(
- target=rfcomm_connect,
- args=(self.client_ad, server_mac))
- self.thread_list.append(connect_thread)
- connect_thread.start()
-
def test_rfcomm_connection_stress(self):
"""Stress test an RFCOMM connection
@@ -94,21 +77,50 @@
TAGS: Classic, Stress, RFCOMM
Priority: 1
"""
- server_mac = self.server_ad.droid.bluetoothGetLocalAddress()
- for n in range(1000):
- self.orchestrate_rfcomm_connect(server_mac)
- self.log.info("Write message.")
- self.client_ad.droid.bluetoothRfcommWrite(self.message)
- self.log.info("Read message.")
- read_msg = self.server_ad.droid.bluetoothRfcommRead()
- self.log.info("Verify message.")
- assert self.message == read_msg, "Mismatch! Read {}".format(
- read_msg)
+ iterations = 10000
+ for n in range(iterations):
+ if not orchestrate_rfcomm_connection(self.client_ad,
+ self.server_ad):
+ return False
self.client_ad.droid.bluetoothRfcommStop()
self.server_ad.droid.bluetoothRfcommStop()
- for t in self.thread_list:
- t.join()
- self.thread_list.clear()
+ self.log.info("Iteration {} completed".format(n))
+ return True
+
+ def test_rfcomm_connection_write_msg_stress(self):
+ """Stress test an RFCOMM connection
+
+ Test the integrity of RFCOMM. Verify that file descriptors are cleared
+ out properly.
+
+ Steps:
+ 1. Establish a bonding between two Android devices.
+ 2. Write data to RFCOMM from the client droid.
+ 3. Read data from RFCOMM from the server droid.
+ 4. Stop the RFCOMM connection.
+ 5. Repeat steps 2-4 1000 times.
+
+ Expected Result:
+ Each iteration should read and write to the RFCOMM connection
+ successfully.
+
+ Returns:
+ Pass if True
+ Fail if False0
+
+ TAGS: Classic, Stress, RFCOMM
+ Priority: 1
+ """
+ iterations = 1000
+ for n in range(iterations):
+ if not orchestrate_rfcomm_connection(self.client_ad,
+ self.server_ad):
+ return False
+ if not write_read_verify_data(self.client_ad, self.server_ad,
+ self.message, False):
+ return False
+ self.client_ad.droid.bluetoothRfcommStop()
+ self.server_ad.droid.bluetoothRfcommStop()
self.log.info("Iteration {} completed".format(n))
return True
@@ -136,21 +148,15 @@
TAGS: Classic, Stress, RFCOMM
Priority: 1
"""
- server_mac = self.server_ad.droid.bluetoothGetLocalAddress()
- reset_bluetooth([self.server_ad])
- self.orchestrate_rfcomm_connect(server_mac)
- for n in range(10000):
+ iterations = 1000
+ if not orchestrate_rfcomm_connection(self.client_ad, self.server_ad):
+ return False
+ for n in range(iterations):
self.log.info("Write message.")
- self.client_ad.droid.bluetoothRfcommWrite(self.message)
- self.log.info("Read message.")
- read_msg = self.server_ad.droid.bluetoothRfcommRead()
- self.log.info("Verify message.")
- assert self.message == read_msg, "Mismatch! Read {}".format(
- read_msg)
+ if not write_read_verify_data(self.client_ad, self.server_ad,
+ self.message, False):
+ return False
self.log.info("Iteration {} completed".format(n))
self.client_ad.droid.bluetoothRfcommStop()
self.server_ad.droid.bluetoothRfcommStop()
- for t in self.thread_list:
- t.join()
- self.thread_list.clear()
return True