| #!/usr/bin/env python3 |
| # |
| # Copyright (C) 2018 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """ |
| Test script for concurrent Gatt connections. |
| Testbed assumes 6 Android devices. One will be the central and the rest |
| peripherals. |
| """ |
| |
| from queue import Empty |
| import concurrent.futures |
| import threading |
| import time |
| from acts_contrib.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest |
| from acts_contrib.test_utils.bt.bt_constants import ble_scan_settings_modes |
| from acts_contrib.test_utils.bt.bt_constants import ble_advertise_settings_modes |
| from acts_contrib.test_utils.bt.bt_constants import bt_profile_constants |
| from acts_contrib.test_utils.bt.bt_constants import gatt_characteristic |
| from acts_contrib.test_utils.bt.bt_constants import gatt_characteristic_value_format |
| from acts_contrib.test_utils.bt.bt_constants import gatt_char_desc_uuids |
| from acts_contrib.test_utils.bt.bt_constants import gatt_descriptor |
| from acts_contrib.test_utils.bt.bt_constants import gatt_service_types |
| from acts_contrib.test_utils.bt.bt_constants import scan_result |
| from acts_contrib.test_utils.bt.bt_gatt_utils import run_continuous_write_descriptor |
| from acts_contrib.test_utils.bt.bt_gatt_utils import setup_gatt_connection |
| from acts_contrib.test_utils.bt.gatts_lib import GattServerLib |
| from acts.test_decorators import test_tracker_info |
| |
| service_uuid = '0000a00a-0000-1000-8000-00805f9b34fb' |
| characteristic_uuid = 'aa7edd5a-4d1d-4f0e-883a-d145616a1630' |
| descriptor_uuid = "00000003-0000-1000-8000-00805f9b34fb" |
| |
| gatt_server_read_descriptor_sample = { |
| 'services': [{ |
| 'uuid': |
| service_uuid, |
| 'type': |
| gatt_service_types['primary'], |
| 'characteristics': [{ |
| 'uuid': |
| characteristic_uuid, |
| 'properties': |
| gatt_characteristic['property_write'], |
| 'permissions': |
| gatt_characteristic['permission_write'], |
| 'instance_id': |
| 0x002a, |
| 'value_type': |
| gatt_characteristic_value_format['string'], |
| 'value': |
| 'Test Database', |
| 'descriptors': [{ |
| 'uuid': descriptor_uuid, |
| 'permissions': gatt_descriptor['permission_write'], |
| }] |
| }] |
| }] |
| } |
| |
| |
| class ConcurrentGattConnectTest(BluetoothBaseTest): |
| bt_default_timeout = 10 |
| max_connections = 5 |
| # List of tuples (android_device, advertise_callback) |
| advertise_callbacks = [] |
| # List of tuples (android_device, advertisement_name) |
| advertisement_names = [] |
| list_of_arguments_list = [] |
| |
| def setup_class(self): |
| super(BluetoothBaseTest, self).setup_class() |
| self.pri_dut = self.android_devices[0] |
| |
| |
| # Create 5 advertisements from different android devices |
| for i in range(1, self.max_connections + 1): |
| # Set device name |
| ad = self.android_devices[i] |
| name = "test_adv_{}".format(i) |
| self.advertisement_names.append((ad, name)) |
| ad.droid.bluetoothSetLocalName(name) |
| |
| # Setup and start advertisements |
| ad.droid.bleSetAdvertiseDataIncludeDeviceName(True) |
| ad.droid.bleSetAdvertiseSettingsAdvertiseMode( |
| ble_advertise_settings_modes['low_latency']) |
| advertise_data = ad.droid.bleBuildAdvertiseData() |
| advertise_settings = ad.droid.bleBuildAdvertiseSettings() |
| advertise_callback = ad.droid.bleGenBleAdvertiseCallback() |
| ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data, |
| advertise_settings) |
| self.advertise_callbacks.append((ad, advertise_callback)) |
| |
| def obtain_address_list_from_scan(self): |
| """Returns the address list of all devices that match the scan filter. |
| |
| Returns: |
| A list if all devices are found; None is any devices are not found. |
| """ |
| # From central device, scan for all appropriate addresses by name. |
| filter_list = self.pri_dut.droid.bleGenFilterList() |
| self.pri_dut.droid.bleSetScanSettingsScanMode( |
| ble_scan_settings_modes['low_latency']) |
| scan_settings = self.pri_dut.droid.bleBuildScanSetting() |
| scan_callback = self.pri_dut.droid.bleGenScanCallback() |
| for android_device, name in self.advertisement_names: |
| self.pri_dut.droid.bleSetScanFilterDeviceName(name) |
| self.pri_dut.droid.bleBuildScanFilter(filter_list) |
| self.pri_dut.droid.bleStartBleScan(filter_list, scan_settings, |
| scan_callback) |
| address_list = [] |
| devices_found = [] |
| # Set the scan time out to 20 sec to provide enough time to discover the |
| # devices in busy environment |
| scan_timeout = 20 |
| end_time = time.time() + scan_timeout |
| while time.time() < end_time and len(address_list) < len( |
| self.advertisement_names): |
| try: |
| event = self.pri_dut.ed.pop_event( |
| "BleScan{}onScanResults".format(scan_callback), |
| self.bt_default_timeout) |
| |
| adv_name = event['data']['Result']['deviceInfo']['name'] |
| mac_address = event['data']['Result']['deviceInfo']['address'] |
| # Look up the android device handle based on event name |
| device = [ |
| item for item in self.advertisement_names |
| if adv_name in item |
| ] |
| devices_found.append(device[0][0].serial) |
| if len(device) != 0: |
| address_list_tuple = (device[0][0], mac_address) |
| else: |
| continue |
| result = [item for item in address_list if mac_address in item] |
| # if length of result is 0, it indicates that we have discovered |
| # new mac address. |
| if len(result) == 0: |
| self.log.info("Found new mac address: {}".format( |
| address_list_tuple[1])) |
| address_list.append(address_list_tuple) |
| except Empty as err: |
| self.log.error("Failed to find any scan results.") |
| return None |
| if len(address_list) < self.max_connections: |
| self.log.info("Only found these devices: {}".format(devices_found)) |
| self.log.error("Could not find all necessary advertisements.") |
| return None |
| return address_list |
| |
| @BluetoothBaseTest.bt_test_wrap |
| @test_tracker_info(uuid='6638282c-69b5-4237-9f0d-18e131424a9f') |
| def test_concurrent_gatt_connections(self): |
| """Test max concurrent GATT connections |
| |
| Connect to all peripherals. |
| |
| Steps: |
| 1. Scan |
| 2. Save addresses |
| 3. Connect all addresses of the peripherals |
| |
| Expected Result: |
| All connections successful. |
| |
| Returns: |
| Pass if True |
| Fail if False |
| |
| TAGS: Bluetooth, GATT |
| Priority: 2 |
| """ |
| |
| address_list = self.obtain_address_list_from_scan() |
| if address_list is None: |
| return False |
| |
| # Connect to all addresses |
| for address_tuple in address_list: |
| address = address_tuple[1] |
| try: |
| autoconnect = False |
| bluetooth_gatt, gatt_callback = setup_gatt_connection( |
| self.pri_dut, address, autoconnect) |
| self.log.info("Successfully connected to {}".format(address)) |
| except Exception as err: |
| self.log.error( |
| "Failed to establish connection to {}".format(address)) |
| return False |
| if (len( |
| self.pri_dut.droid.bluetoothGetConnectedLeDevices( |
| bt_profile_constants['gatt_server'])) != |
| self.max_connections): |
| self.log.error("Did not reach max connection count.") |
| return False |
| |
| return True |
| |
| @BluetoothBaseTest.bt_test_wrap |
| @test_tracker_info(uuid='660bf05e-a8e5-45f3-b42b-b66b4ac0d85f') |
| def test_data_transfer_to_concurrent_gatt_connections(self): |
| """Test writing GATT descriptors concurrently to many peripherals. |
| |
| Connect to all peripherals and write gatt descriptors concurrently. |
| |
| |
| Steps: |
| 1. Scan the addresses by names |
| 2. Save mac addresses of the peripherals |
| 3. Connect all addresses of the peripherals and write gatt descriptors |
| |
| |
| Expected Result: |
| All connections and data transfers are successful. |
| |
| Returns: |
| Pass if True |
| Fail if False |
| |
| TAGS: Bluetooth, GATT |
| Priority: 2 |
| """ |
| |
| address_list = self.obtain_address_list_from_scan() |
| if address_list is None: |
| return False |
| |
| # Connect to all addresses |
| executor = concurrent.futures.ThreadPoolExecutor(max_workers=10) |
| |
| for address_tuple in address_list: |
| ad, address = address_tuple |
| |
| gatts = GattServerLib(log=self.log, dut=ad) |
| gatt_server, gatt_server_callback = gatts.setup_gatts_db( |
| database=gatt_server_read_descriptor_sample) |
| |
| try: |
| bluetooth_gatt, gatt_callback = setup_gatt_connection( |
| self.pri_dut, address, autoconnect=False) |
| self.log.info("Successfully connected to {}".format(address)) |
| |
| except Exception as err: |
| self.log.error( |
| "Failed to establish connection to {}".format(address)) |
| return False |
| |
| if self.pri_dut.droid.gattClientDiscoverServices(bluetooth_gatt): |
| event = self.pri_dut.ed.pop_event( |
| "GattConnect{}onServicesDiscovered".format(bluetooth_gatt), |
| self.bt_default_timeout) |
| discovered_services_index = event['data']['ServicesIndex'] |
| else: |
| self.log.info("Failed to discover services.") |
| return False |
| services_count = self.pri_dut.droid.gattClientGetDiscoveredServicesCount( |
| discovered_services_index) |
| |
| arguments_list = [ |
| self.pri_dut.droid, self.pri_dut.ed, ad.droid, ad.ed, |
| gatt_server, gatt_server_callback, bluetooth_gatt, |
| services_count, discovered_services_index, 100 |
| ] |
| self.list_of_arguments_list.append(arguments_list) |
| |
| for arguments_list in self.list_of_arguments_list: |
| executor.submit(run_continuous_write_descriptor, *arguments_list) |
| |
| executor.shutdown(wait=True) |
| |
| if (len( |
| self.pri_dut.droid.bluetoothGetConnectedLeDevices( |
| bt_profile_constants['gatt_server'])) != |
| self.max_connections): |
| self.log.error("Failed to write concurrently.") |
| return False |
| |
| return True |