Merge "ACTS: Support SL4A tests for Connection-oriented Channels"
diff --git a/acts/framework/acts/test_utils/bt/bt_constants.py b/acts/framework/acts/test_utils/bt/bt_constants.py
index 8e165ae..793d052 100644
--- a/acts/framework/acts/test_utils/bt/bt_constants.py
+++ b/acts/framework/acts/test_utils/bt/bt_constants.py
@@ -18,6 +18,7 @@
bt_default_timeout = 15
default_rfcomm_timeout_ms = 10000
+default_bluetooth_socket_timeout_ms = 10000
pan_connect_timeout = 5
bt_discovery_timeout = 3
small_timeout = 0.0001
@@ -72,6 +73,9 @@
rfcomm_secure_uuid = "fa87c0d0-afac-11de-8a39-0800200c9a66"
rfcomm_insecure_uuid = "8ce255c0-200a-11e0-ac64-0800200c9a66"
+# bluetooth socket connection test uuid
+bluetooth_socket_conn_test_uuid = "12345678-1234-5678-9abc-123456789abc"
+
# Bluetooth Adapter Scan Mode Types
bt_scan_mode_types = {
"state_off": -1,
diff --git a/acts/framework/acts/test_utils/bt/bt_test_utils.py b/acts/framework/acts/test_utils/bt/bt_test_utils.py
index faf9b3d..dfe3850 100644
--- a/acts/framework/acts/test_utils/bt/bt_test_utils.py
+++ b/acts/framework/acts/test_utils/bt/bt_test_utils.py
@@ -57,6 +57,7 @@
from acts.test_utils.bt.bt_constants import btsnoop_last_log_path_on_device
from acts.test_utils.bt.bt_constants import btsnoop_log_path_on_device
from acts.test_utils.bt.bt_constants import default_rfcomm_timeout_ms
+from acts.test_utils.bt.bt_constants import default_bluetooth_socket_timeout_ms
from acts.test_utils.bt.bt_constants import mtu_changed
from acts.test_utils.bt.bt_constants import pairing_variant_passkey_confirmation
from acts.test_utils.bt.bt_constants import pan_connect_timeout
@@ -1027,32 +1028,50 @@
Returns:
True if connection was successful, false if unsuccessful.
"""
+ result = orchestrate_bluetooth_socket_connection(
+ client_ad, server_ad, accept_timeout_ms,
+ (bt_rfcomm_uuids['default_uuid'] if uuid is None else uuid))
+
+ return result
+
+
+def orchestrate_bluetooth_socket_connection(
+ client_ad,
+ server_ad,
+ accept_timeout_ms=default_bluetooth_socket_timeout_ms,
+ uuid=None):
+ """Sets up the Bluetooth Socket connection between two Android devices.
+
+ Args:
+ client_ad: the Android device performing the connection.
+ server_ad: the Android device accepting the connection.
+ Returns:
+ True if connection was successful, false if unsuccessful.
+ """
server_ad.droid.bluetoothStartPairingHelper()
client_ad.droid.bluetoothStartPairingHelper()
- if not uuid:
- server_ad.droid.bluetoothRfcommBeginAcceptThread(
- bt_rfcomm_uuids['default_uuid'], accept_timeout_ms)
- client_ad.droid.bluetoothRfcommBeginConnectThread(
- server_ad.droid.bluetoothGetLocalAddress(),
- bt_rfcomm_uuids['default_uuid'])
- else:
- server_ad.droid.bluetoothRfcommBeginAcceptThread(
- uuid, accept_timeout_ms)
- client_ad.droid.bluetoothRfcommBeginConnectThread(
- server_ad.droid.bluetoothGetLocalAddress(), uuid)
+
+ server_ad.droid.bluetoothSocketConnBeginAcceptThreadUuid(
+ (bluetooth_socket_conn_test_uuid
+ if uuid is None else uuid), accept_timeout_ms)
+ client_ad.droid.bluetoothSocketConnBeginConnectThreadUuid(
+ server_ad.droid.bluetoothGetLocalAddress(),
+ (bluetooth_socket_conn_test_uuid if uuid is None else uuid))
+
end_time = time.time() + bt_default_timeout
result = False
test_result = True
while time.time() < end_time:
- if len(client_ad.droid.bluetoothRfcommActiveConnections()) > 0:
+ if len(client_ad.droid.bluetoothSocketConnActiveConnections()) > 0:
test_result = True
- client_ad.log.info("RFCOMM Client Connection Active")
+ client_ad.log.info("Bluetooth socket Client Connection Active")
break
else:
test_result = False
time.sleep(1)
if not test_result:
- client_ad.log.error("Failed to establish an RFCOMM connection")
+ client_ad.log.error(
+ "Failed to establish a Bluetooth socket connection")
return False
return True
@@ -1072,19 +1091,19 @@
client_ad.log.info("Write message.")
try:
if binary:
- client_ad.droid.bluetoothRfcommWriteBinary(msg)
+ client_ad.droid.bluetoothSocketConnWriteBinary(msg)
else:
- client_ad.droid.bluetoothRfcommWrite(msg)
+ client_ad.droid.bluetoothSocketConnWrite(msg)
except Exception as err:
client_ad.log.error("Failed to write data: {}".format(err))
return False
server_ad.log.info("Read message.")
try:
if binary:
- read_msg = server_ad.droid.bluetoothRfcommReadBinary().rstrip(
+ read_msg = server_ad.droid.bluetoothSocketConnReadBinary().rstrip(
"\r\n")
else:
- read_msg = server_ad.droid.bluetoothRfcommRead()
+ read_msg = server_ad.droid.bluetoothSocketConnRead()
except Exception as err:
server_ad.log.error("Failed to read data: {}".format(err))
return False
@@ -1128,13 +1147,13 @@
false if unsuccessful.
"""
test_result = True
- if len(server_ad.droid.bluetoothRfcommActiveConnections()) == 0:
+ if len(server_ad.droid.bluetoothSocketConnActiveConnections()) == 0:
if log:
- server_ad.log.error("No rfcomm connections found on server.")
+ server_ad.log.error("No socket connections found on server.")
test_result = False
- if len(client_ad.droid.bluetoothRfcommActiveConnections()) == 0:
+ if len(client_ad.droid.bluetoothSocketConnActiveConnections()) == 0:
if log:
- client_ad.log.error("No rfcomm connections found on client.")
+ client_ad.log.error("No socket connections found on client.")
test_result = False
return test_result
@@ -1286,3 +1305,165 @@
time.sleep(interval)
device_ad.droid.bluetoothHidDeviceSendReport(host_id, hid_id_keyboard,
hid_keyboard_report("00"))
+
+
+def do_multi_connection_throughput(client_ad, list_server_ad,
+ list_client_conn_id, num_iterations,
+ number_buffers, buffer_size):
+ """Throughput measurements from one client to one-or-many servers.
+
+ Args:
+ client_ad: the Android device to perform the write.
+ list_server_ad: the list of Android server devices connected to this client.
+ num_iterations: the number of test repetitions.
+ number_buffers: the total number of data buffers to transmit per test.
+ buffer_size: the number of bytes per L2CAP data buffer.
+ list_client_conn_id: list of client connection IDs
+
+ Returns:
+ Throughput in terms of bytes per second, 0 if test failed.
+ """
+
+ total_num_bytes = 0
+ start_write_time = time.perf_counter()
+ client_ad.log.info(
+ "do_multi_connection_throughput: Before write. Start Time={:f}, "
+ "num_iterations={}, number_buffers={}, buffer_size={}, "
+ "number_buffers*buffer_size={}, num_servers={}".format(
+ start_write_time, num_iterations, number_buffers, buffer_size,
+ number_buffers * buffer_size, len(list_server_ad)))
+
+ if (len(list_server_ad) != len(list_client_conn_id)):
+ client_ad.log.error("do_multi_connection_throughput: invalid "
+ "parameters. Num of list_server_ad({}) != "
+ "list_client_conn({})".format(
+ len(list_server_ad), len(list_client_conn_id)))
+ return 0
+
+ try:
+ for index, client_conn_id in enumerate(list_client_conn_id):
+ client_ad.log.info("do_multi_connection_throughput: "
+ "client_conn_id={}".format(client_conn_id))
+ # Plumb the tx data queue with the first set of data buffers.
+ client_ad.droid.bluetoothConnectionThroughputSend(
+ number_buffers, buffer_size, client_conn_id)
+ except Exception as err:
+ client_ad.log.error("Failed to write data: {}".format(err))
+ return 0
+
+ # Each Loop iteration will write and read one set of buffers.
+ for i in range(0, (num_iterations - 1)):
+ try:
+ for index, client_conn_id in enumerate(list_client_conn_id):
+ client_ad.droid.bluetoothConnectionThroughputSend(
+ number_buffers, buffer_size, client_conn_id)
+ except Exception as err:
+ client_ad.log.error("Failed to write data: {}".format(err))
+ return 0
+
+ for index, server_ad in enumerate(list_server_ad):
+ try:
+ server_ad.droid.bluetoothConnectionThroughputRead(
+ number_buffers, buffer_size)
+ total_num_bytes += number_buffers * buffer_size
+ except Exception as err:
+ server_ad.log.error("Failed to read data: {}".format(err))
+ return 0
+
+ for index, server_ad in enumerate(list_server_ad):
+ try:
+ server_ad.droid.bluetoothConnectionThroughputRead(
+ number_buffers, buffer_size)
+ total_num_bytes += number_buffers * buffer_size
+ except Exception as err:
+ server_ad.log.error("Failed to read data: {}".format(err))
+ return 0
+
+ end_read_time = time.perf_counter()
+
+ test_time = (end_read_time - start_write_time)
+ data_rate = (1.000 * total_num_bytes) / test_time
+ log.info(
+ "Calculated using total write and read times: total_num_bytes={}, "
+ "test_time={}, data rate={:08.0f} bytes/sec, {:08.0f} bits/sec".format(
+ total_num_bytes, test_time, data_rate, (data_rate * 8)))
+ return data_rate
+
+
+def orchestrate_coc_connection(
+ client_ad,
+ server_ad,
+ is_ble,
+ psm_value,
+ accept_timeout_ms=default_bluetooth_socket_timeout_ms):
+ """Sets up the CoC connection between two Android devices.
+
+ Args:
+ client_ad: the Android device performing the connection.
+ server_ad: the Android device accepting the connection.
+ is_ble: using LE transport.
+ psm_value: assigned PSM value of this CoC.
+ accept_timeout_ms: timeout while waiting for incoming connection.
+ Returns:
+ True if connection was successful or false if unsuccessful,
+ client connection ID,
+ and server connection ID
+ """
+ server_ad.droid.bluetoothStartPairingHelper()
+ client_ad.droid.bluetoothStartPairingHelper()
+
+ adv_callback = None
+ mac_address = None
+ if is_ble == 1:
+ try:
+ # This will start advertising and scanning. Will fail if it could
+ # not find the advertisements from server_ad
+ client_ad.log.info(
+ "orchestrate_coc_connection: Start BLE Advertisement and"
+ "Scanning")
+ mac_address, adv_callback = (
+ get_mac_address_of_generic_advertisement(client_ad, server_ad))
+ except BtTestUtilsError as err:
+ raise GattTestUtilsError(
+ "orchestrate_coc_connection: Error in getting mac address: {}".
+ format(err))
+ else:
+ mac_address = server_ad.droid.bluetoothGetLocalAddress()
+ adv_callback = None
+
+ server_ad.droid.bluetoothSocketConnBeginAcceptThreadPsm(
+ psm_value, accept_timeout_ms, is_ble)
+
+ client_ad.droid.bluetoothSocketConnBeginConnectThreadPsm(
+ mac_address, is_ble, psm_value)
+
+ end_time = time.time() + bt_default_timeout
+ test_result = False
+ while time.time() < end_time:
+ if len(server_ad.droid.bluetoothSocketConnActiveConnections()) > 0:
+ server_ad.log.info("CoC Server Connection Active")
+ if len(client_ad.droid.bluetoothSocketConnActiveConnections()) > 0:
+ client_ad.log.info("CoC Client Connection Active")
+ test_result = True
+ break
+ time.sleep(1)
+ if not test_result:
+ client_ad.log.error("Failed to establish an CoC connection")
+ return False, None
+
+ if len(client_ad.droid.bluetoothSocketConnActiveConnections()) > 0:
+ server_ad.log.info(
+ "CoC client_ad Connection Active, num=%d",
+ len(client_ad.droid.bluetoothSocketConnActiveConnections()))
+ else:
+ server_ad.log.info("Error CoC client_ad Connection Inactive")
+ client_ad.log.info("Error CoC client_ad Connection Inactive")
+
+ # Get the conn_id
+ client_conn_id = client_ad.droid.bluetoothGetLastConnId()
+ server_conn_id = server_ad.droid.bluetoothGetLastConnId()
+ client_ad.log.info(
+ "orchestrate_coc_connection: client conn id={}, server conn id={}".
+ format(client_conn_id, server_conn_id))
+
+ return True, client_conn_id, server_conn_id
diff --git a/acts/tests/google/ble/conn_oriented_chan/BleCocTest.py b/acts/tests/google/ble/conn_oriented_chan/BleCocTest.py
new file mode 100644
index 0000000..62743ad
--- /dev/null
+++ b/acts/tests/google/ble/conn_oriented_chan/BleCocTest.py
@@ -0,0 +1,275 @@
+#/usr/bin/env python3.4
+#
+# Copyright 2017 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+Test script to execute Bluetooth Connection-orient Channel (CoC) functionality
+test cases. This test was designed to be run in a shield box.
+"""
+
+import threading
+import time
+
+from queue import Empty
+from acts.test_decorators import test_tracker_info
+from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
+from acts.test_utils.bt.bt_test_utils import clear_bonded_devices
+from acts.test_utils.bt.bt_test_utils import kill_bluetooth_process
+from acts.test_utils.bt.bt_test_utils import orchestrate_coc_connection
+from acts.test_utils.bt.bt_test_utils import do_multi_connection_throughput
+from acts.test_utils.bt.bt_test_utils import reset_bluetooth
+from acts.test_utils.bt.bt_test_utils import setup_multiple_devices_for_bt_test
+from acts.test_utils.bt.bt_test_utils import take_btsnoop_logs
+from acts.test_utils.bt.bt_test_utils import write_read_verify_data
+from acts.test_utils.bt.bt_test_utils import verify_server_and_client_connected
+
+
+class BleCocTest(BluetoothBaseTest):
+ default_timeout = 10
+ coc_conn_psm1 = 0x00A1
+ coc_conn_psm2 = 0x00B1
+
+ message = (
+ "Space: the final frontier. These are the voyages of "
+ "the starship Enterprise. Its continuing mission: to explore "
+ "strange new worlds, to seek out new life and new civilizations,"
+ " to boldly go where no man has gone before.")
+
+ def __init__(self, controllers):
+ BluetoothBaseTest.__init__(self, controllers)
+ self.client_ad = self.android_devices[0]
+ self.server_ad = self.android_devices[1]
+ # Note that some tests required a third device.
+ if len(self.android_devices) > 2:
+ self.server2_ad = self.android_devices[2]
+
+ def setup_class(self):
+ return setup_multiple_devices_for_bt_test(self.android_devices)
+
+ def teardown_test(self):
+ if verify_server_and_client_connected(
+ self.client_ad, self.server_ad, log=False):
+ self.client_ad.droid.bluetoothSocketConnStop()
+ self.server_ad.droid.bluetoothSocketConnStop()
+
+ @BluetoothBaseTest.bt_test_wrap
+ @test_tracker_info(uuid='6587792c-78fb-469f-9084-772c249f97de')
+ def test_coc_connection(self):
+ """Test Bluetooth LE CoC connection
+
+ Test LE CoC though establishing a basic connection.
+
+ Steps:
+ 1. Get the mac address of the server device.
+ 2. Establish an LE CoC connection from the client to the server AD.
+ 3. Verify that the LE CoC connection is active from both the client and
+ server.
+ Expected Result:
+ LE CoC connection is established then disconnected succcessfully.
+
+ Returns:
+ Pass if True
+ Fail if False
+
+ TAGS: BLE, CoC
+ Priority: 1
+ """
+ self.log.info(
+ "test_new_coc_connection: calling orchestrate_coc_connection but "
+ "isBle=1")
+ if not orchestrate_coc_connection(self.client_ad, self.server_ad, 1,
+ self.coc_conn_psm1):
+ return False
+
+ self.client_ad.droid.bluetoothSocketConnStop()
+ self.server_ad.droid.bluetoothSocketConnStop()
+ return True
+
+ @BluetoothBaseTest.bt_test_wrap
+ @test_tracker_info(uuid='12537d27-79c9-40a0-8bdb-d023b0e36b58')
+ def test_coc_connection_write_ascii(self):
+ """Test LE CoC writing and reading ascii data
+
+ Test LE CoC though establishing a connection.
+
+ Steps:
+ 1. Get the mac address of the server device.
+ 2. Establish an LE CoC connection from the client to the server AD.
+ 3. Verify that the LE CoC connection is active from both the client and
+ server.
+ 4. Write data from the client and read received data from the server.
+ 5. Verify data matches from client and server
+ 6. Disconnect the LE CoC connection.
+
+ Expected Result:
+ LE CoC connection is established then disconnected succcessfully.
+
+ Returns:
+ Pass if True
+ Fail if False
+
+ TAGS: BLE, CoC
+ Priority: 1
+ """
+ self.log.info("test_cocs_connection_write_ascii: calling "
+ "orchestrate_coc_connection")
+ if not orchestrate_coc_connection(self.client_ad, self.server_ad, 1,
+ self.coc_conn_psm1):
+ return False
+ if not write_read_verify_data(self.client_ad, self.server_ad,
+ self.message, False):
+ return False
+ if not verify_server_and_client_connected(self.client_ad,
+ self.server_ad):
+ return False
+
+ self.client_ad.droid.bluetoothSocketConnStop()
+ self.server_ad.droid.bluetoothSocketConnStop()
+ return True
+
+ @BluetoothBaseTest.bt_test_wrap
+ @test_tracker_info(uuid='214037f4-f0d1-47db-86a7-5230c71bdcac')
+ def test_coc_connection_throughput(self):
+ """Test LE CoC writing and measured data throughput
+
+ Test CoC thoughput by establishing a connection and sending data.
+
+ Steps:
+ 1. Get the mac address of the server device.
+ 2. Establish a L2CAP CoC connection from the client to the server AD.
+ 3. Verify that the L2CAP CoC connection is active from both the client
+ and server.
+ 4. Write data from the client to server.
+ 5. Verify data matches from client and server
+ 6. Disconnect the L2CAP CoC connections.
+
+ Expected Result:
+ CoC connection is established then disconnected succcessfully.
+
+ Returns:
+ Pass if True
+ Fail if False
+
+ TAGS: BLE, CoC
+ Priority: 1
+ """
+
+ status, client_conn_id, server_conn_id = orchestrate_coc_connection(
+ self.client_ad, self.server_ad, 1, self.coc_conn_psm1)
+ if not status:
+ return False
+
+ # The num_iterations is that number of repetitions of each
+ # set of buffers r/w.
+ # number_buffers is the total number of data buffers to transmit per
+ # set of buffers r/w.
+ # buffer_size is the number of bytes per L2CAP data buffer.
+ number_buffers = 100
+ buffer_size = 23
+ num_iterations = 3
+ list_server_ad = [self.server_ad]
+ list_client_conn_id = [client_conn_id]
+ data_rate = do_multi_connection_throughput(
+ self.client_ad, list_server_ad, list_client_conn_id,
+ num_iterations, number_buffers, buffer_size)
+ if data_rate <= 0:
+ return False
+ self.log.info(
+ "test_coc_connection_throughput: throughput=%d bytes per sec",
+ data_rate)
+
+ self.client_ad.droid.bluetoothSocketConnStop()
+ self.server_ad.droid.bluetoothSocketConnStop()
+ return True
+
+ @BluetoothBaseTest.bt_test_wrap
+ @test_tracker_info(uuid='7fed507e-1ab5-43ec-abd2-3abd88b95f5b')
+ def test_coc_connection_throughput_2_conn(self):
+ """Test LE CoC data throughput on two connections
+
+ Test Data Throughput of 2 L2CAP CoC connections. 3 phones are required.
+
+ Steps:
+ 1. Get the mac address of the server device.
+ 2. Establish a L2CAP CoC connection from the client to the server#1 AD.
+ 3. Verify that the L2CAP CoC connection is active from both the client
+ and server.
+ 4. Establish a L2CAP CoC connection from the client to the server#2 AD.
+ 5. Verify that the L2CAP CoC connection is active from both the client
+ and server.
+ 6. Write data from the client to both server#1 and server#2.
+ 7. Verify data matches from client and server
+ 8. Disconnect the 2 L2CAP CoC connections.
+
+ Expected Result:
+ L2CAP CoC connections are established, data written to both servers,
+ then disconnected succcessfully.
+
+ Returns:
+ Pass if True
+ Fail if False
+
+ TAGS: BLE, CoC
+ Priority: 1
+ """
+
+ # Make sure at least 3 phones are setup
+ if len(self.android_devices) <= 2:
+ self.log.info("test_coc_connection_throughput_2_conn: "
+ "Error: 3rd phone not configured in file")
+ return False
+
+ self.log.info(
+ "test_coc_connection_throughput_2_conn: calling "
+ "orchestrate_coc_connection server1 for hardcoded psmValue=0x%x",
+ self.coc_conn_psm1)
+ status, client_conn_id1, server_conn_id1 = orchestrate_coc_connection(
+ self.client_ad, self.server_ad, 1, self.coc_conn_psm1)
+ if not status:
+ return False
+
+ self.log.info(
+ "test_coc_connection_throughput_2_conn: calling "
+ "orchestrate_coc_connection server2 for hardcoded psmValue=0x%x",
+ self.coc_conn_psm2)
+ status, client_conn_id2, server_conn_id2 = orchestrate_coc_connection(
+ self.client_ad, self.server2_ad, 1, self.coc_conn_psm2)
+ if not status:
+ return False
+
+ # The num_iterations is that number of repetitions of each
+ # set of buffers r/w.
+ # number_buffers is the total number of data buffers to transmit per
+ # set of buffers r/w.
+ # buffer_size is the number of bytes per L2CAP data buffer.
+ num_iterations = 3
+ number_buffers = 100
+ buffer_size = 23
+ list_server_ad = [self.server_ad, self.server2_ad]
+ list_client_conn_id = [client_conn_id1, client_conn_id2]
+ data_rate = do_multi_connection_throughput(
+ self.client_ad, list_server_ad, list_client_conn_id,
+ num_iterations, number_buffers, buffer_size)
+ if data_rate <= 0:
+ return False
+
+ self.log.info(
+ "test_coc_connection_throughput_2_conn: throughput=%d bytes per "
+ "sec", data_rate)
+
+ self.client_ad.droid.bluetoothSocketConnStop(client_conn_id1)
+ self.client_ad.droid.bluetoothSocketConnStop(client_conn_id2)
+ self.server_ad.droid.bluetoothSocketConnStop(server_conn_id1)
+ self.server2_ad.droid.bluetoothSocketConnStop(server_conn_id2)
+ return True