| #/usr/bin/env python3.4 |
| # |
| # Copyright (C) 2016 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """ |
| This test script exercises different GATT connection tests. |
| """ |
| |
| import pprint |
| from queue import Empty |
| import time |
| from contextlib import suppress |
| |
| from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest |
| from acts.test_utils.bt.GattEnum import GattCharacteristic |
| from acts.test_utils.bt.GattEnum import GattDescriptor |
| from acts.test_utils.bt.GattEnum import GattService |
| from acts.test_utils.bt.GattEnum import MtuSize |
| from acts.test_utils.bt.GattEnum import GattCbErr |
| from acts.test_utils.bt.GattEnum import GattCbStrings |
| from acts.test_utils.bt.bt_gatt_utils import disconnect_gatt_connection |
| from acts.test_utils.bt.bt_gatt_utils import orchestrate_gatt_connection |
| from acts.test_utils.bt.bt_gatt_utils import setup_gatt_characteristics |
| from acts.test_utils.bt.bt_gatt_utils import setup_gatt_connection |
| from acts.test_utils.bt.bt_gatt_utils import setup_gatt_descriptors |
| from acts.test_utils.bt.bt_test_utils import get_advanced_droid_list |
| from acts.test_utils.bt.bt_test_utils import get_mac_address_of_generic_advertisement |
| from acts.test_utils.bt.bt_test_utils import log_energy_info |
| |
| |
| class GattConnectTest(BluetoothBaseTest): |
| adv_instances = [] |
| default_timeout = 10 |
| default_discovery_timeout = 3 |
| droid_list = () |
| |
| def __init__(self, controllers): |
| BluetoothBaseTest.__init__(self, controllers) |
| self.droid_list = get_advanced_droid_list(self.android_devices) |
| self.cen_ad = self.android_devices[0] |
| self.per_ad = self.android_devices[1] |
| if self.droid_list[1]['max_advertisements'] == 0: |
| self.tests = () |
| return |
| |
| def teardown_test(self): |
| for adv in self.adv_instances: |
| self.per_ad.droid.bleStopBleAdvertising(adv) |
| self.log.debug(log_energy_info(self.android_devices, "End")) |
| return True |
| |
| def _setup_characteristics_and_descriptors(self, droid): |
| characteristic_input = [ |
| { |
| 'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630", |
| 'property': GattCharacteristic.PROPERTY_WRITE.value |
| | GattCharacteristic.PROPERTY_WRITE_NO_RESPONSE.value, |
| 'permission': GattCharacteristic.PERMISSION_WRITE.value |
| }, |
| { |
| 'uuid': "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8", |
| 'property': GattCharacteristic.PROPERTY_NOTIFY.value |
| | GattCharacteristic.PROPERTY_READ.value, |
| 'permission': GattCharacteristic.PERMISSION_READ.value |
| }, |
| { |
| 'uuid': "6774191f-6ec3-4aa2-b8a8-cf830e41fda6", |
| 'property': GattCharacteristic.PROPERTY_NOTIFY.value |
| | GattCharacteristic.PROPERTY_READ.value, |
| 'permission': GattCharacteristic.PERMISSION_READ.value |
| }, |
| ] |
| descriptor_input = [ |
| { |
| 'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630", |
| 'property': GattDescriptor.PERMISSION_READ.value |
| | GattDescriptor.PERMISSION_WRITE.value, |
| }, { |
| 'uuid': "76d5ed92-ca81-4edb-bb6b-9f019665fb32", |
| 'property': GattDescriptor.PERMISSION_READ.value |
| | GattCharacteristic.PERMISSION_WRITE.value, |
| } |
| ] |
| characteristic_list = setup_gatt_characteristics(droid, |
| characteristic_input) |
| descriptor_list = setup_gatt_descriptors(droid, descriptor_input) |
| return characteristic_list, descriptor_list |
| |
| def _orchestrate_gatt_disconnection(self, bluetooth_gatt, gatt_callback): |
| self.log.info("Disconnecting from peripheral device.") |
| test_result = disconnect_gatt_connection(self.cen_ad, bluetooth_gatt, |
| gatt_callback) |
| self.cen_ad.droid.gattClientClose(bluetooth_gatt) |
| if not test_result: |
| self.log.info("Failed to disconnect from peripheral device.") |
| return False |
| return True |
| |
| def _iterate_attributes(self, discovered_services_index): |
| services_count = self.cen_ad.droid.gattClientGetDiscoveredServicesCount( |
| discovered_services_index) |
| for i in range(services_count): |
| service = self.cen_ad.droid.gattClientGetDiscoveredServiceUuid( |
| discovered_services_index, i) |
| self.log.info("Discovered service uuid {}".format(service)) |
| characteristic_uuids = ( |
| self.cen_ad.droid.gattClientGetDiscoveredCharacteristicUuids( |
| discovered_services_index, i)) |
| for characteristic in characteristic_uuids: |
| self.log.info("Discovered characteristic uuid {}".format( |
| characteristic)) |
| descriptor_uuids = ( |
| self.cen_ad.droid.gattClientGetDiscoveredDescriptorUuids( |
| discovered_services_index, i, characteristic)) |
| for descriptor in descriptor_uuids: |
| self.log.info("Discovered descriptor uuid {}".format( |
| descriptor)) |
| |
| def _find_service_added_event(self, gatt_server_callback, uuid): |
| expected_event = GattCbStrings.SERV_ADDED.value.format( |
| gatt_server_callback) |
| try: |
| event = self.per_ad.ed.pop_event(expected_event, |
| self.default_timeout) |
| except Empty: |
| self.log.error(GattCbErr.SERV_ADDED_ERR.value.format( |
| expected_event)) |
| return False |
| if event['data']['serviceUuid'].lower() != uuid.lower(): |
| self.log.error("Uuid mismatch. Found: {}, Expected {}.".format( |
| event['data']['serviceUuid'], uuid)) |
| return False |
| return True |
| |
| def _setup_multiple_services(self): |
| gatt_server_callback = ( |
| self.per_ad.droid.gattServerCreateGattServerCallback()) |
| gatt_server = self.per_ad.droid.gattServerOpenGattServer( |
| gatt_server_callback) |
| characteristic_list, descriptor_list = ( |
| self._setup_characteristics_and_descriptors(self.per_ad.droid)) |
| self.per_ad.droid.gattServerCharacteristicAddDescriptor( |
| characteristic_list[1], descriptor_list[0]) |
| self.per_ad.droid.gattServerCharacteristicAddDescriptor( |
| characteristic_list[2], descriptor_list[1]) |
| gatt_service = self.per_ad.droid.gattServerCreateService( |
| "00000000-0000-1000-8000-00805f9b34fb", |
| GattService.SERVICE_TYPE_PRIMARY.value) |
| gatt_service2 = self.per_ad.droid.gattServerCreateService( |
| "FFFFFFFF-0000-1000-8000-00805f9b34fb", |
| GattService.SERVICE_TYPE_PRIMARY.value) |
| gatt_service3 = self.per_ad.droid.gattServerCreateService( |
| "3846D7A0-69C8-11E4-BA00-0002A5D5C51B", |
| GattService.SERVICE_TYPE_PRIMARY.value) |
| for characteristic in characteristic_list: |
| self.per_ad.droid.gattServerAddCharacteristicToService( |
| gatt_service, characteristic) |
| self.per_ad.droid.gattServerAddService(gatt_server, gatt_service) |
| result = self._find_service_added_event( |
| gatt_server_callback, "00000000-0000-1000-8000-00805f9b34fb") |
| if not result: |
| return False |
| for characteristic in characteristic_list: |
| self.per_ad.droid.gattServerAddCharacteristicToService( |
| gatt_service2, characteristic) |
| self.per_ad.droid.gattServerAddService(gatt_server, gatt_service2) |
| result = self._find_service_added_event( |
| gatt_server_callback, "FFFFFFFF-0000-1000-8000-00805f9b34fb") |
| if not result: |
| return False |
| for characteristic in characteristic_list: |
| self.per_ad.droid.gattServerAddCharacteristicToService( |
| gatt_service3, characteristic) |
| self.per_ad.droid.gattServerAddService(gatt_server, gatt_service3) |
| result = self._find_service_added_event( |
| gatt_server_callback, "3846D7A0-69C8-11E4-BA00-0002A5D5C51B") |
| if not result: |
| return False, False |
| return gatt_server_callback, gatt_server |
| |
| @BluetoothBaseTest.bt_test_wrap |
| def test_gatt_connect(self): |
| """Test GATT connection over LE. |
| |
| Test establishing a gatt connection between a GATT server and GATT |
| client. |
| |
| Steps: |
| 1. Start a generic advertisement. |
| 2. Start a generic scanner. |
| 3. Find the advertisement and extract the mac address. |
| 4. Stop the first scanner. |
| 5. Create a GATT connection between the scanner and advertiser. |
| 6. Disconnect the GATT connection. |
| |
| Expected Result: |
| Verify that a connection was established and then disconnected |
| successfully. |
| |
| Returns: |
| Pass if True |
| Fail if False |
| |
| TAGS: LE, Advertising, Filtering, Scanning, GATT |
| Priority: 0 |
| """ |
| bluetooth_gatt, gatt_callback, adv_callback = ( |
| orchestrate_gatt_connection(self.cen_ad, self.per_ad)) |
| self.adv_instances.append(adv_callback) |
| return self._orchestrate_gatt_disconnection(bluetooth_gatt, |
| gatt_callback) |
| |
| @BluetoothBaseTest.bt_test_wrap |
| def test_gatt_connect_autoconnect(self): |
| """Test GATT connection over LE. |
| |
| Test re-establishing a gat connection using autoconnect |
| set to True in order to test connection whitelist. |
| |
| Steps: |
| 1. Start a generic advertisement. |
| 2. Start a generic scanner. |
| 3. Find the advertisement and extract the mac address. |
| 4. Stop the first scanner. |
| 5. Create a GATT connection between the scanner and advertiser. |
| 6. Disconnect the GATT connection. |
| 7. Create a GATT connection with autoconnect set to True |
| 8. Disconnect the GATT connection. |
| |
| Expected Result: |
| Verify that a connection was re-established and then disconnected |
| successfully. |
| |
| Returns: |
| Pass if True |
| Fail if False |
| |
| TAGS: LE, Advertising, Filtering, Scanning, GATT |
| Priority: 0 |
| """ |
| autoconnect = False |
| mac_address, adv_callback = ( |
| get_mac_address_of_generic_advertisement(self.cen_ad, self.per_ad)) |
| test_result, bluetooth_gatt, gatt_callback = setup_gatt_connection( |
| self.cen_ad, mac_address, autoconnect) |
| if not disconnect_gatt_connection(self.cen_ad, bluetooth_gatt, |
| gatt_callback): |
| return False |
| autoconnect = True |
| bluetooth_gatt = self.cen_ad.droid.gattClientConnectGatt( |
| gatt_callback, mac_address, autoconnect) |
| expected_event = GattCbStrings.GATT_CONN_CHANGE.value.format( |
| gatt_callback) |
| try: |
| event = self.cen_ad.ed.pop_event(expected_event, |
| self.default_timeout) |
| except Empty: |
| log.error(GattCbErr.GATT_CONN_CHANGE_ERR.value.format( |
| expected_event)) |
| test_result = False |
| return True |
| |
| @BluetoothBaseTest.bt_test_wrap |
| def test_gatt_request_min_mtu(self): |
| """Test GATT connection over LE and exercise MTU sizes. |
| |
| Test establishing a gatt connection between a GATT server and GATT |
| client. Request an MTU size that matches the correct minimum size. |
| |
| Steps: |
| 1. Start a generic advertisement. |
| 2. Start a generic scanner. |
| 3. Find the advertisement and extract the mac address. |
| 4. Stop the first scanner. |
| 5. Create a GATT connection between the scanner and advertiser. |
| 6. From the scanner (client) request MTU size change to the |
| minimum value. |
| 7. Find the MTU changed event on the client. |
| 8. Disconnect the GATT connection. |
| |
| Expected Result: |
| Verify that a connection was established and the MTU value found |
| matches the expected MTU value. |
| |
| Returns: |
| Pass if True |
| Fail if False |
| |
| TAGS: LE, Advertising, Filtering, Scanning, GATT, MTU |
| Priority: 0 |
| """ |
| bluetooth_gatt, gatt_callback, adv_callback = ( |
| orchestrate_gatt_connection(self.cen_ad, self.per_ad)) |
| self.adv_instances.append(adv_callback) |
| expected_mtu = MtuSize.MIN.value |
| self.cen_ad.droid.gattClientRequestMtu(bluetooth_gatt, expected_mtu) |
| expected_event = GattCbStrings.MTU_CHANGED.value.format(gatt_callback) |
| try: |
| mtu_event = self.cen_ad.ed.pop_event(expected_event, |
| self.default_timeout) |
| mtu_size_found = mtu_event['data']['MTU'] |
| if mtu_size_found != expected_mtu: |
| self.log.error("MTU size found: {}, expected: {}".format( |
| mtu_size_found, expected_mtu)) |
| return False |
| except Empty: |
| self.log.error(GattCbErr.MTU_CHANGED_ERR.value.format( |
| expected_event)) |
| return False |
| return self._orchestrate_gatt_disconnection(bluetooth_gatt, |
| gatt_callback) |
| |
| @BluetoothBaseTest.bt_test_wrap |
| def test_gatt_request_max_mtu(self): |
| """Test GATT connection over LE and exercise MTU sizes. |
| |
| Test establishing a gatt connection between a GATT server and GATT |
| client. Request an MTU size that matches the correct maximum size. |
| |
| Steps: |
| 1. Start a generic advertisement. |
| 2. Start a generic scanner. |
| 3. Find the advertisement and extract the mac address. |
| 4. Stop the first scanner. |
| 5. Create a GATT connection between the scanner and advertiser. |
| 6. From the scanner (client) request MTU size change to the |
| maximum value. |
| 7. Find the MTU changed event on the client. |
| 8. Disconnect the GATT connection. |
| |
| Expected Result: |
| Verify that a connection was established and the MTU value found |
| matches the expected MTU value. |
| |
| Returns: |
| Pass if True |
| Fail if False |
| |
| TAGS: LE, Advertising, Filtering, Scanning, GATT, MTU |
| Priority: 0 |
| """ |
| bluetooth_gatt, gatt_callback, adv_callback = ( |
| orchestrate_gatt_connection(self.cen_ad, self.per_ad)) |
| self.adv_instances.append(adv_callback) |
| expected_mtu = MtuSize.MAX.value |
| self.cen_ad.droid.gattClientRequestMtu(bluetooth_gatt, expected_mtu) |
| expected_event = GattCbStrings.MTU_CHANGED.value.format(gatt_callback) |
| try: |
| mtu_event = self.cen_ad.ed.pop_event(expected_event, |
| self.default_timeout) |
| mtu_size_found = mtu_event['data']['MTU'] |
| if mtu_size_found != expected_mtu: |
| self.log.error("MTU size found: {}, expected: {}".format( |
| mtu_size_found, expected_mtu)) |
| return False |
| except Empty: |
| self.log.error(GattCbErr.MTU_CHANGED_ERR.value.format( |
| expected_event)) |
| return False |
| return self._orchestrate_gatt_disconnection(bluetooth_gatt, |
| gatt_callback) |
| |
| @BluetoothBaseTest.bt_test_wrap |
| def test_gatt_request_out_of_bounds_mtu(self): |
| """Test GATT connection over LE and exercise an out of bound MTU size. |
| |
| Test establishing a gatt connection between a GATT server and GATT |
| client. Request an MTU size that is the MIN value minus 1. |
| |
| Steps: |
| 1. Start a generic advertisement. |
| 2. Start a generic scanner. |
| 3. Find the advertisement and extract the mac address. |
| 4. Stop the first scanner. |
| 5. Create a GATT connection between the scanner and advertiser. |
| 6. From the scanner (client) request MTU size change to the |
| minimum value minus one. |
| 7. Find the MTU changed event on the client. |
| 8. Disconnect the GATT connection. |
| |
| Expected Result: |
| Verify that an MTU changed event was not discovered and that |
| it didn't cause an exception when requesting an out of bounds |
| MTU. |
| |
| Returns: |
| Pass if True |
| Fail if False |
| |
| TAGS: LE, Advertising, Filtering, Scanning, GATT, MTU |
| Priority: 0 |
| """ |
| bluetooth_gatt, gatt_callback, adv_callback = ( |
| orchestrate_gatt_connection(self.cen_ad, self.per_ad)) |
| self.adv_instances.append(adv_callback) |
| self.cen_ad.droid.gattClientRequestMtu(bluetooth_gatt, |
| MtuSize.MIN.value - 1) |
| expected_event = GattCbStrings.MTU_CHANGED.value.format(gatt_callback) |
| try: |
| self.cen_ad.ed.pop_event(expected_event, self.default_timeout) |
| self.log.error("Found {} event when it wasn't expected".format( |
| expected_event)) |
| return False |
| except Empty: |
| self.log.debug("Successfully didn't find {} event".format( |
| expected_event)) |
| return self._orchestrate_gatt_disconnection(bluetooth_gatt, |
| gatt_callback) |
| |
| @BluetoothBaseTest.bt_test_wrap |
| def test_gatt_connect_trigger_on_read_rssi(self): |
| """Test GATT connection over LE read RSSI. |
| |
| Test establishing a gatt connection between a GATT server and GATT |
| client then read the RSSI. |
| |
| Steps: |
| 1. Start a generic advertisement. |
| 2. Start a generic scanner. |
| 3. Find the advertisement and extract the mac address. |
| 4. Stop the first scanner. |
| 5. Create a GATT connection between the scanner and advertiser. |
| 6. From the scanner, request to read the RSSI of the advertiser. |
| 7. Disconnect the GATT connection. |
| |
| Expected Result: |
| Verify that a connection was established and then disconnected |
| successfully. Verify that the RSSI was ready correctly. |
| |
| Returns: |
| Pass if True |
| Fail if False |
| |
| TAGS: LE, Advertising, Filtering, Scanning, GATT, RSSI |
| Priority: 1 |
| """ |
| bluetooth_gatt, gatt_callback, adv_callback = ( |
| orchestrate_gatt_connection(self.cen_ad, self.per_ad)) |
| self.adv_instances.append(adv_callback) |
| expected_event = GattCbStrings.RD_REMOTE_RSSI.value.format( |
| gatt_callback) |
| if self.cen_ad.droid.gattClientReadRSSI(bluetooth_gatt): |
| try: |
| self.cen_ad.ed.pop_event(expected_event, self.default_timeout) |
| except Empty: |
| self.log.error(GattCbErr.RD_REMOTE_RSSI_ERR.value.format( |
| expected_event)) |
| return self._orchestrate_gatt_disconnection(bluetooth_gatt, |
| gatt_callback) |
| |
| @BluetoothBaseTest.bt_test_wrap |
| def test_gatt_connect_trigger_on_services_discovered(self): |
| """Test GATT connection and discover services of peripheral. |
| |
| Test establishing a gatt connection between a GATT server and GATT |
| client the discover all services from the connected device. |
| |
| Steps: |
| 1. Start a generic advertisement. |
| 2. Start a generic scanner. |
| 3. Find the advertisement and extract the mac address. |
| 4. Stop the first scanner. |
| 5. Create a GATT connection between the scanner and advertiser. |
| 6. From the scanner (central device), discover services. |
| 7. Disconnect the GATT connection. |
| |
| Expected Result: |
| Verify that a connection was established and then disconnected |
| successfully. Verify that the service were discovered. |
| |
| Returns: |
| Pass if True |
| Fail if False |
| |
| TAGS: LE, Advertising, Filtering, Scanning, GATT, Services |
| Priority: 1 |
| """ |
| bluetooth_gatt, gatt_callback, adv_callback = ( |
| orchestrate_gatt_connection(self.cen_ad, self.per_ad)) |
| self.adv_instances.append(adv_callback) |
| if self.cen_ad.droid.gattClientDiscoverServices(bluetooth_gatt): |
| expected_event = GattCbStrings.GATT_SERV_DISC.value.format( |
| gatt_callback) |
| try: |
| event = self.cen_ad.ed.pop_event(expected_event, |
| self.default_timeout) |
| except Empty: |
| self.log.error(GattCbErr.GATT_SERV_DISC_ERR.value.format( |
| expected_event)) |
| return False |
| return self._orchestrate_gatt_disconnection(bluetooth_gatt, |
| gatt_callback) |
| |
| @BluetoothBaseTest.bt_test_wrap |
| def test_gatt_connect_trigger_on_services_discovered_iterate_attributes( |
| self): |
| """Test GATT connection and iterate peripherals attributes. |
| |
| Test establishing a gatt connection between a GATT server and GATT |
| client and iterate over all the characteristics and descriptors of the |
| discovered services. |
| |
| Steps: |
| 1. Start a generic advertisement. |
| 2. Start a generic scanner. |
| 3. Find the advertisement and extract the mac address. |
| 4. Stop the first scanner. |
| 5. Create a GATT connection between the scanner and advertiser. |
| 6. From the scanner (central device), discover services. |
| 7. Iterate over all the characteristics and descriptors of the |
| discovered features. |
| 8. Disconnect the GATT connection. |
| |
| Expected Result: |
| Verify that a connection was established and then disconnected |
| successfully. Verify that the services, characteristics, and descriptors |
| were discovered. |
| |
| Returns: |
| Pass if True |
| Fail if False |
| |
| TAGS: LE, Advertising, Filtering, Scanning, GATT, Services |
| Characteristics, Descriptors |
| Priority: 1 |
| """ |
| bluetooth_gatt, gatt_callback, adv_callback = ( |
| orchestrate_gatt_connection(self.cen_ad, self.per_ad)) |
| self.adv_instances.append(adv_callback) |
| if self.cen_ad.droid.gattClientDiscoverServices(bluetooth_gatt): |
| expected_event = GattCbStrings.GATT_SERV_DISC.value.format( |
| gatt_callback) |
| try: |
| event = self.cen_ad.ed.pop_event(expected_event, |
| self.default_timeout) |
| discovered_services_index = event['data']['ServicesIndex'] |
| except Empty: |
| self.log.error(GattCbErr.GATT_SERV_DISC_ERR.value.format( |
| expected_event)) |
| return False |
| self._iterate_attributes(discovered_services_index) |
| return self._orchestrate_gatt_disconnection(bluetooth_gatt, |
| gatt_callback) |
| |
| @BluetoothBaseTest.bt_test_wrap |
| def test_gatt_connect_with_service_uuid_variations(self): |
| """Test GATT connection with multiple service uuids. |
| |
| Test establishing a gatt connection between a GATT server and GATT |
| client with multiple service uuid variations. |
| |
| Steps: |
| 1. Start a generic advertisement. |
| 2. Start a generic scanner. |
| 3. Find the advertisement and extract the mac address. |
| 4. Stop the first scanner. |
| 5. Create a GATT connection between the scanner and advertiser. |
| 6. From the scanner (central device), discover services. |
| 7. Verify that all the service uuid variations are found. |
| 8. Disconnect the GATT connection. |
| |
| Expected Result: |
| Verify that a connection was established and then disconnected |
| successfully. Verify that the service uuid variations are found. |
| |
| Returns: |
| Pass if True |
| Fail if False |
| |
| TAGS: LE, Advertising, Filtering, Scanning, GATT, Services |
| Priority: 2 |
| """ |
| gatt_server_callback, gatt_server = self._setup_multiple_services() |
| if not gatt_server_callback or not gatt_server: |
| return False |
| bluetooth_gatt, gatt_callback, adv_callback = ( |
| orchestrate_gatt_connection(self.cen_ad, self.per_ad)) |
| self.adv_instances.append(adv_callback) |
| if self.cen_ad.droid.gattClientDiscoverServices(bluetooth_gatt): |
| expected_event = GattCbStrings.GATT_SERV_DISC.value.format( |
| gatt_callback) |
| try: |
| event = self.cen_ad.ed.pop_event(expected_event, |
| self.default_timeout) |
| except Empty: |
| self.log.error(GattCbErr.GATT_SERV_DISC_ERR.value.format( |
| expected_event)) |
| return False |
| discovered_services_index = event['data']['ServicesIndex'] |
| self._iterate_attributes(discovered_services_index) |
| return self._orchestrate_gatt_disconnection(bluetooth_gatt, |
| gatt_callback) |
| |
| @BluetoothBaseTest.bt_test_wrap |
| def test_gatt_connect_in_quick_succession(self): |
| """Test GATT connections multiple times. |
| |
| Test establishing a gatt connection between a GATT server and GATT |
| client with multiple iterations. |
| |
| Steps: |
| 1. Start a generic advertisement. |
| 2. Start a generic scanner. |
| 3. Find the advertisement and extract the mac address. |
| 4. Stop the first scanner. |
| 5. Create a GATT connection between the scanner and advertiser. |
| 6. Disconnect the GATT connection. |
| 7. Repeat steps 5 and 6 twenty times. |
| |
| Expected Result: |
| Verify that a connection was established and then disconnected |
| successfully twenty times. |
| |
| Returns: |
| Pass if True |
| Fail if False |
| |
| TAGS: LE, Advertising, Filtering, Scanning, GATT, Stress |
| Priority: 1 |
| """ |
| mac_address, adv_callback = get_mac_address_of_generic_advertisement( |
| self.cen_ad, self.per_ad) |
| autoconnect = False |
| for i in range(1000): |
| self.log.info("Starting connection iteration {}".format(i + 1)) |
| test_result, bluetooth_gatt, gatt_callback = setup_gatt_connection( |
| self.cen_ad, mac_address, autoconnect) |
| if not test_result: |
| self.log.info("Could not connect to peripheral.") |
| return False |
| test_result = self._orchestrate_gatt_disconnection(bluetooth_gatt, |
| gatt_callback) |
| if not test_result: |
| self.log.info("Failed to disconnect from peripheral device.") |
| return False |
| return True |
| |
| @BluetoothBaseTest.bt_test_wrap |
| def test_write_descriptor_stress(self): |
| """Test GATT connection writing and reading descriptors. |
| |
| Test establishing a gatt connection between a GATT server and GATT |
| client with multiple service uuid variations. |
| |
| Steps: |
| 1. Start a generic advertisement. |
| 2. Start a generic scanner. |
| 3. Find the advertisement and extract the mac address. |
| 4. Stop the first scanner. |
| 5. Create a GATT connection between the scanner and advertiser. |
| 6. Discover services. |
| 7. Write data to the descriptors of each characteristic 100 times. |
| 8. Read the data sent to the descriptors. |
| 9. Disconnect the GATT connection. |
| |
| Expected Result: |
| Each descriptor in each characteristic is written and read 100 times. |
| |
| Returns: |
| Pass if True |
| Fail if False |
| |
| TAGS: LE, Advertising, Filtering, Scanning, GATT, Stress, |
| Characteristics, Descriptors |
| Priority: 1 |
| """ |
| gatt_server_callback, gatt_server = self._setup_multiple_services() |
| if not gatt_server_callback or not gatt_server: |
| return False |
| bluetooth_gatt, gatt_callback, adv_callback = ( |
| orchestrate_gatt_connection(self.cen_ad, self.per_ad)) |
| self.adv_instances.append(adv_callback) |
| if self.cen_ad.droid.gattClientDiscoverServices(bluetooth_gatt): |
| expected_event = GattCbStrings.GATT_SERV_DISC.value.format( |
| gatt_callback) |
| try: |
| event = self.cen_ad.ed.pop_event(expected_event, |
| self.default_timeout) |
| except Empty: |
| self.log.error(GattCbErr.GATT_SERV_DISC_ERR.value.format( |
| expected_event)) |
| return False |
| discovered_services_index = event['data']['ServicesIndex'] |
| else: |
| self.log.info("Failed to discover services.") |
| return False |
| services_count = self.cen_ad.droid.gattClientGetDiscoveredServicesCount( |
| discovered_services_index) |
| |
| connected_device_list = self.per_ad.droid.gattServerGetConnectedDevices( |
| gatt_server) |
| if len(connected_device_list) == 0: |
| self.log.info("No devices connected from peripheral.") |
| return False |
| bt_device_id = 0 |
| status = 1 |
| offset = 1 |
| test_value = "1,2,3,4,5,6,7" |
| test_value_return = "1,2,3" |
| for i in range(services_count): |
| characteristic_uuids = ( |
| self.cen_ad.droid.gattClientGetDiscoveredCharacteristicUuids( |
| discovered_services_index, i)) |
| for characteristic in characteristic_uuids: |
| descriptor_uuids = ( |
| self.cen_ad.droid.gattClientGetDiscoveredDescriptorUuids( |
| discovered_services_index, i, characteristic)) |
| for x in range(100): |
| for descriptor in descriptor_uuids: |
| self.log.info( |
| "Starting write iteration {} on (Characteristic::Descriptor) {}::{}".format( |
| x + 1, characteristic, descriptor)) |
| self.cen_ad.droid.gattClientDescriptorSetValue( |
| bluetooth_gatt, discovered_services_index, i, |
| characteristic, descriptor, test_value) |
| self.cen_ad.droid.gattClientWriteDescriptor( |
| bluetooth_gatt, discovered_services_index, i, |
| characteristic, descriptor) |
| expected_event = GattCbStrings.DESC_WRITE_REQ.value.format( |
| gatt_server_callback) |
| try: |
| event = self.per_ad.ed.pop_event( |
| expected_event, self.default_timeout) |
| except Empty: |
| self.log.error( |
| GattCbErr.DESC_WRITE_REQ_ERR.value.format( |
| expected_event)) |
| return False |
| request_id = event['data']['requestId'] |
| found_value = event['data']['value'] |
| if found_value != test_value: |
| self.log.error("Values didn't match. Found: {}, " |
| "Expected: {}".format(found_value, |
| test_value)) |
| return False |
| self.per_ad.droid.gattServerSendResponse( |
| gatt_server, bt_device_id, request_id, status, |
| offset, test_value_return) |
| expected_event = GattCbStrings.DESC_WRITE.value.format( |
| gatt_callback) |
| try: |
| self.cen_ad.ed.pop_event(expected_event, |
| self.default_timeout) |
| except Empty: |
| self.log.error( |
| GattCbErr.DESC_WRITE_ERR.value.format( |
| expected_event)) |
| return False |
| return True |
| |
| @BluetoothBaseTest.bt_test_wrap |
| def test_write_characteristic(self): |
| """Test GATT connection writing characteristics. |
| |
| Test establishing a gatt connection between a GATT server and GATT |
| client and exercise writing a characteristic. |
| |
| Steps: |
| 1. Start a generic advertisement. |
| 2. Start a generic scanner. |
| 3. Find the advertisement and extract the mac address. |
| 4. Stop the first scanner. |
| 5. Create a GATT connection between the scanner and advertiser. |
| 6. Discover services. |
| 7. Set discovered characteristic notification to True |
| 8. Write data to the characteristic. |
| 9. Send a response from the peripheral to the central. |
| 10. Disconnect the GATT connection. |
| |
| Expected Result: |
| The characteristic data should be written successfully |
| |
| Returns: |
| Pass if True |
| Fail if False |
| |
| TAGS: LE, Advertising, Filtering, Scanning, GATT, Stress, |
| Characteristics, Descriptors |
| Priority: 1 |
| """ |
| gatt_server_callback, gatt_server = self._setup_multiple_services() |
| if not gatt_server_callback or not gatt_server: |
| return False |
| bluetooth_gatt, gatt_callback, adv_callback = ( |
| orchestrate_gatt_connection(self.cen_ad, self.per_ad)) |
| |
| service_uuid = "3846D7A0-69C8-11E4-BA00-0002A5D5C51B" |
| characteristic_uuid = "aa7edd5a-4d1d-4f0e-883a-d145616a1630" |
| |
| if self.cen_ad.droid.gattClientDiscoverServices(bluetooth_gatt): |
| expected_event = GattCbStrings.GATT_SERV_DISC.value.format( |
| gatt_callback) |
| try: |
| event = self.cen_ad.ed.pop_event(expected_event, |
| self.default_timeout) |
| except Empty: |
| self.log.error(GattCbErr.GATT_SERV_DISC_ERR.value.format( |
| expected_event)) |
| discovered_services_index = event['data']['ServicesIndex'] |
| services_count = self.cen_ad.droid.gattClientGetDiscoveredServicesCount( |
| discovered_services_index) |
| disc_service_index = 0 |
| for i in range(services_count): |
| disc_service_uuid = ( |
| self.cen_ad.droid.gattClientGetDiscoveredServiceUuid( |
| discovered_services_index, i).upper()) |
| if disc_service_uuid == service_uuid: |
| disc_service_index = i |
| break |
| |
| test_value = "1,2,3,4,5,6,7" |
| self.cen_ad.droid.gattClientCharacteristicSetValue( |
| bluetooth_gatt, discovered_services_index, disc_service_index, |
| characteristic_uuid, test_value) |
| |
| self.cen_ad.droid.gattClientWriteCharacteristic( |
| bluetooth_gatt, discovered_services_index, disc_service_index, |
| characteristic_uuid) |
| |
| expected_event = GattCbStrings.CHAR_WRITE_REQ.value.format( |
| gatt_server_callback) |
| try: |
| event = self.per_ad.ed.pop_event(expected_event, |
| self.default_timeout) |
| except Empty: |
| self.log.error(GattCbErr.CHAR_WRITE_REQ_ERR.value.format( |
| expected_event)) |
| return False |
| |
| request_id = event['data']['requestId'] |
| bt_device_id = 0 |
| status = 0 |
| offset = 0 |
| test_value_return = "1,2,3" |
| self.per_ad.droid.gattServerGetConnectedDevices(gatt_server) |
| self.per_ad.droid.gattServerSendResponse(gatt_server, bt_device_id, |
| request_id, status, offset, |
| test_value_return) |
| |
| expected_event = GattCbStrings.CHAR_WRITE.value.format(gatt_callback) |
| try: |
| event = self.cen_ad.ed.pop_event(expected_event, self.default_timeout) |
| if event["data"]["Status"] != status: |
| self.log.error("Write status should be 0") |
| return False; |
| |
| except Empty: |
| self.log.error(GattCbErr.CHAR_WRITE_ERR.value.format( |
| expected_event)) |
| return False |
| return True |
| |
| @BluetoothBaseTest.bt_test_wrap |
| def test_write_characteristic_stress(self): |
| """Test GATT connection writing characteristics in quick succession. |
| |
| Test establishing a gatt connection between a GATT server and GATT |
| client and exercise writing a characteristic. Do this quickly 100 times. |
| |
| Steps: |
| 1. Start a generic advertisement. |
| 2. Start a generic scanner. |
| 3. Find the advertisement and extract the mac address. |
| 4. Stop the first scanner. |
| 5. Create a GATT connection between the scanner and advertiser. |
| 6. Discover services. |
| 7. Set discovered characteristic notification to True. |
| 8. Write data to the characteristic 100 times as fast as possible. |
| 9. Send a response from the peripheral to the central. |
| 10. Disconnect the GATT connection. |
| |
| Expected Result: |
| The characteristic data should be written successfully each iteration |
| |
| Returns: |
| Pass if True |
| Fail if False |
| |
| TAGS: LE, Advertising, Filtering, Scanning, GATT, Stress, |
| Characteristics, Descriptors |
| Priority: 1 |
| """ |
| gatt_server_callback, gatt_server = self._setup_multiple_services() |
| if not gatt_server_callback or not gatt_server: |
| return False |
| bluetooth_gatt, gatt_callback, adv_callback = ( |
| orchestrate_gatt_connection(self.cen_ad, self.per_ad)) |
| self.adv_instances.append(adv_callback) |
| if self.cen_ad.droid.gattClientDiscoverServices(bluetooth_gatt): |
| expected_event = GattCbStrings.GATT_SERV_DISC.value.format( |
| gatt_callback) |
| try: |
| event = self.cen_ad.ed.pop_event(expected_event, |
| self.default_timeout) |
| except Empty: |
| self.log.error(GattCbErr.GATT_SERV_DISC_ERR.value.format( |
| expected_event)) |
| return False |
| discovered_services_index = event['data']['ServicesIndex'] |
| else: |
| self.log.info("Failed to discover services.") |
| return False |
| services_count = self.cen_ad.droid.gattClientGetDiscoveredServicesCount( |
| discovered_services_index) |
| |
| connected_device_list = self.per_ad.droid.gattServerGetConnectedDevices( |
| gatt_server) |
| if len(connected_device_list) == 0: |
| self.log.info("No devices connected from peripheral.") |
| return False |
| bt_device_id = 0 |
| status = 1 |
| offset = 1 |
| test_value = "1,2,3,4,5,6,7" |
| test_value_return = "1,2,3" |
| for i in range(services_count): |
| characteristic_uuids = ( |
| self.cen_ad.droid.gattClientGetDiscoveredCharacteristicUuids( |
| discovered_services_index, i)) |
| for i in range(100): |
| for characteristic in characteristic_uuids: |
| self.cen_ad.droid.gattClientCharacteristicSetValue( |
| bluetooth_gatt, discovered_services_index, i, |
| characteristic, test_value) |
| self.cen_ad.droid.gattClientWriteCharacteristic( |
| bluetooth_gatt, discovered_services_index, i, |
| characteristic) |
| self.cen_ad.droid.gattClientWriteCharacteristic( |
| bluetooth_gatt, discovered_services_index, i, |
| characteristic) |
| self.cen_ad.droid.gattClientWriteCharacteristic( |
| bluetooth_gatt, discovered_services_index, i, |
| characteristic) |
| self.cen_ad.droid.gattClientWriteCharacteristic( |
| bluetooth_gatt, discovered_services_index, i, |
| characteristic) |
| expected_event = GattCbStrings.CHAR_WRITE_REQ.value.format( |
| gatt_server_callback) |
| try: |
| event = self.per_ad.ed.pop_event(expected_event, |
| self.default_timeout) |
| except Empty: |
| self.log.error( |
| GattCbErr.CHAR_WRITE_REQ_ERR.value.format( |
| expected_event)) |
| return False |
| self.log.info("{} event found: {}".format( |
| GattCbStrings.CHAR_WRITE_REQ.value.format( |
| gatt_server_callback), event)) |
| request_id = event['data']['requestId'] |
| found_value = event['data']['value'] |
| if found_value != test_value: |
| self.log.info("Values didn't match. Found: {}, " |
| "Expected: {}".format(found_value, |
| test_value)) |
| return False |
| self.per_ad.droid.gattServerSendResponse( |
| gatt_server, bt_device_id, request_id, status, offset, |
| test_value_return) |
| expected_event = GattCbStrings.CHAR_WRITE_REQ.value.format( |
| gatt_server_callback) |
| try: |
| self.cen_ad.ed.pop_event(expected_event, |
| self.default_timeout) |
| except Empty: |
| self.log.error( |
| GattCbErr.CHAR_WRITE_REQ_ERR.value.format( |
| expected_event)) |
| return False |
| return True |
| |
| @BluetoothBaseTest.bt_test_wrap |
| def test_gatt_connect_mitm_attack(self): |
| """Test GATT connection with permission write encrypted mitm. |
| |
| Test establishing a gatt connection between a GATT server and GATT |
| client while the GATT server's characteristic includes the property |
| write value and the permission write encrypted mitm value. This will |
| prompt LE pairing and then the devices will create a bond. |
| |
| Steps: |
| 1. Create a GATT server and server callback on the peripheral device. |
| 2. Create a unique service and characteristic uuid on the peripheral. |
| 3. Create a characteristic on the peripheral with these properties: |
| GattCharacteristic.PROPERTY_WRITE.value, |
| GattCharacteristic.PERMISSION_WRITE_ENCRYPTED_MITM.value |
| 4. Create a GATT service on the peripheral. |
| 5. Add the characteristic to the GATT service. |
| 6. Create a GATT connection between your central and peripheral device. |
| 7. From the central device, discover the peripheral's services. |
| 8. Iterate the services found until you find the unique characteristic |
| created in step 3. |
| 9. Once found, write a random but valid value to the characteristic. |
| 10. Start pairing helpers on both devices immediately after attempting |
| to write to the characteristic. |
| 11. Within 10 seconds of writing the characteristic, there should be |
| a prompt to bond the device from the peripheral. The helpers will |
| handle the UI interaction automatically. (see |
| BluetoothConnectionFacade.java bluetoothStartPairingHelper). |
| 12. Verify that the two devices are bonded. |
| |
| Expected Result: |
| Verify that a connection was established and the devices are bonded. |
| |
| Returns: |
| Pass if True |
| Fail if False |
| |
| TAGS: LE, Advertising, Filtering, Scanning, GATT, Characteristic, MITM |
| Priority: 1 |
| """ |
| gatt_server_callback = ( |
| self.per_ad.droid.gattServerCreateGattServerCallback()) |
| gatt_server = self.per_ad.droid.gattServerOpenGattServer( |
| gatt_server_callback) |
| service_uuid = "3846D7A0-69C8-11E4-BA00-0002A5D5C51B" |
| test_uuid = "aa7edd5a-4d1d-4f0e-883a-d145616a1630" |
| bonded = False |
| characteristic = self.per_ad.droid.gattServerCreateBluetoo |
| thGattCharacteristic( |
| test_uuid, GattCharacteristic.PROPERTY_WRITE.value, |
| GattCharacteristic.PERMISSION_WRITE_ENCRYPTED_MITM.value) |
| gatt_service = self.per_ad.droid.gattServerCreateService( |
| service_uuid, GattService.SERVICE_TYPE_PRIMARY.value) |
| self.per_ad.droid.gattServerAddCharacteristicToService(gatt_service, |
| characteristic) |
| self.per_ad.droid.gattServerAddService(gatt_server, gatt_service) |
| result = self._find_service_added_event(gatt_server_callback, |
| service_uuid) |
| if not result: |
| return False |
| bluetooth_gatt, gatt_callback, adv_callback = ( |
| orchestrate_gatt_connection(self.cen_ad, self.per_ad)) |
| self.adv_instances.append(adv_callback) |
| if self.cen_ad.droid.gattClientDiscoverServices(bluetooth_gatt): |
| expected_event = GattCbStrings.GATT_SERV_DISC.value.format( |
| gatt_callback) |
| try: |
| event = self.cen_ad.ed.pop_event(expected_event, |
| self.default_timeout) |
| except Empty: |
| self.log.error(GattCbErr.GATT_SERV_DISC_ERR.value.format( |
| expected_event)) |
| return False |
| discovered_services_index = event['data']['ServicesIndex'] |
| else: |
| self.log.info("Failed to discover services.") |
| return False |
| test_value = "1,2,3,4,5,6,7" |
| services_count = self.cen_ad.droid.gattClientGetDiscoveredServicesCount( |
| discovered_services_index) |
| for i in range(services_count): |
| characteristic_uuids = ( |
| self.cen_ad.droid.gattClientGetDiscoveredCharacteristicUuids( |
| discovered_services_index, i)) |
| for characteristic_uuid in characteristic_uuids: |
| if characteristic_uuid == test_uuid: |
| self.cen_ad.droid.bluetoothStartPairingHelper() |
| self.per_ad.droid.bluetoothStartPairingHelper() |
| self.cen_ad.droid.gattClientCharacteristicSetValue( |
| bluetooth_gatt, discovered_services_index, i, |
| characteristic_uuid, test_value) |
| self.cen_ad.droid.gattClientWriteCharacteristic( |
| bluetooth_gatt, discovered_services_index, i, |
| characteristic_uuid) |
| start_time = time.time() + self.default_timeout |
| target_name = self.per_ad.droid.bluetoothGetLocalName() |
| while time.time() < start_time and bonded == False: |
| bonded_devices = self.cen_ad.droid.bluetoothGetBondedDevices( |
| ) |
| for device in bonded_devices: |
| if 'name' in device.keys() and device[ |
| 'name'] == target_name: |
| bonded = True |
| break |
| return True |
| |
| @BluetoothBaseTest.bt_test_wrap |
| def test_gatt_connect_mitm_attack(self): |
| """Test GATT connection with permission write encrypted mitm. |
| |
| Test establishing a gatt connection between a GATT server and GATT |
| client while the GATT server's characteristic includes the property |
| write value and the permission write encrypted mitm value. This will |
| prompt LE pairing and then the devices will create a bond. |
| |
| Steps: |
| 1. Create a GATT server and server callback on the peripheral device. |
| 2. Create a unique service and characteristic uuid on the peripheral. |
| 3. Create a characteristic on the peripheral with these properties: |
| GattCharacteristic.PROPERTY_WRITE.value, |
| GattCharacteristic.PERMISSION_WRITE_ENCRYPTED_MITM.value |
| 4. Create a GATT service on the peripheral. |
| 5. Add the characteristic to the GATT service. |
| 6. Create a GATT connection between your central and peripheral device. |
| 7. From the central device, discover the peripheral's services. |
| 8. Iterate the services found until you find the unique characteristic |
| created in step 3. |
| 9. Once found, write a random but valid value to the characteristic. |
| 10. Start pairing helpers on both devices immediately after attempting |
| to write to the characteristic. |
| 11. Within 10 seconds of writing the characteristic, there should be |
| a prompt to bond the device from the peripheral. The helpers will |
| handle the UI interaction automatically. (see |
| BluetoothConnectionFacade.java bluetoothStartPairingHelper). |
| 12. Verify that the two devices are bonded. |
| |
| Expected Result: |
| Verify that a connection was established and the devices are bonded. |
| |
| Returns: |
| Pass if True |
| Fail if False |
| |
| TAGS: LE, Advertising, Filtering, Scanning, GATT, Characteristic, MITM |
| Priority: 1 |
| """ |
| gatt_server_callback = ( |
| self.per_ad.droid.gattServerCreateGattServerCallback()) |
| gatt_server = self.per_ad.droid.gattServerOpenGattServer( |
| gatt_server_callback) |
| service_uuid = "3846D7A0-69C8-11E4-BA00-0002A5D5C51B" |
| test_uuid = "aa7edd5a-4d1d-4f0e-883a-d145616a1630" |
| bonded = False |
| characteristic = self.per_ad.droid.gattServerCreateBluetoothGattCharacteristic( |
| test_uuid, GattCharacteristic.PROPERTY_WRITE.value, |
| GattCharacteristic.PERMISSION_WRITE_ENCRYPTED_MITM.value) |
| gatt_service = self.per_ad.droid.gattServerCreateService( |
| service_uuid, GattService.SERVICE_TYPE_PRIMARY.value) |
| self.per_ad.droid.gattServerAddCharacteristicToService(gatt_service, |
| characteristic) |
| self.per_ad.droid.gattServerAddService(gatt_server, gatt_service) |
| result = self._find_service_added_event(gatt_server_callback, |
| service_uuid) |
| if not result: |
| return False |
| bluetooth_gatt, gatt_callback, adv_callback = ( |
| orchestrate_gatt_connection(self.cen_ad, self.per_ad)) |
| self.adv_instances.append(adv_callback) |
| if self.cen_ad.droid.gattClientDiscoverServices(bluetooth_gatt): |
| expected_event = GattCbStrings.GATT_SERV_DISC.value.format( |
| gatt_callback) |
| try: |
| event = self.cen_ad.ed.pop_event(expected_event, |
| self.default_timeout) |
| except Empty: |
| self.log.error(GattCbErr.GATT_SERV_DISC_ERR.value.format( |
| expected_event)) |
| return False |
| discovered_services_index = event['data']['ServicesIndex'] |
| else: |
| self.log.info("Failed to discover services.") |
| return False |
| test_value = "1,2,3,4,5,6,7" |
| services_count = self.cen_ad.droid.gattClientGetDiscoveredServicesCount( |
| discovered_services_index) |
| for i in range(services_count): |
| characteristic_uuids = ( |
| self.cen_ad.droid.gattClientGetDiscoveredCharacteristicUuids( |
| discovered_services_index, i)) |
| for characteristic_uuid in characteristic_uuids: |
| if characteristic_uuid == test_uuid: |
| self.cen_ad.droid.bluetoothStartPairingHelper() |
| self.per_ad.droid.bluetoothStartPairingHelper() |
| self.cen_ad.droid.gattClientCharacteristicSetValue( |
| bluetooth_gatt, discovered_services_index, i, |
| characteristic_uuid, test_value) |
| self.cen_ad.droid.gattClientWriteCharacteristic( |
| bluetooth_gatt, discovered_services_index, i, |
| characteristic_uuid) |
| start_time = time.time() + self.default_timeout |
| target_name = self.per_ad.droid.bluetoothGetLocalName() |
| while time.time() < start_time and bonded == False: |
| bonded_devices = self.cen_ad.droid.bluetoothGetBondedDevices( |
| ) |
| for device in bonded_devices: |
| if 'name' in device.keys() and device[ |
| 'name'] == target_name: |
| bonded = True |
| break |
| return True |