| #/usr/bin/env python3.4 |
| # |
| # Copyright (C) 2016 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """ |
| This is base class for tests that exercises different GATT procedures between two connected devices. |
| Setup/Teardown methods take care of establishing connection, and doing GATT DB initialization/discovery. |
| """ |
| |
| from queue import Empty |
| |
| from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest |
| from acts.test_utils.bt.GattEnum import GattCharacteristic |
| from acts.test_utils.bt.GattEnum import GattDescriptor |
| from acts.test_utils.bt.GattEnum import GattService |
| from acts.test_utils.bt.GattEnum import GattEvent |
| from acts.test_utils.bt.GattEnum import GattCbErr |
| from acts.test_utils.bt.GattEnum import GattCbStrings |
| from acts.test_utils.bt.GattEnum import MtuSize |
| from acts.test_utils.bt.bt_gatt_utils import disconnect_gatt_connection |
| from acts.test_utils.bt.bt_gatt_utils import orchestrate_gatt_connection |
| from acts.test_utils.bt.bt_gatt_utils import setup_gatt_characteristics |
| from acts.test_utils.bt.bt_gatt_utils import setup_gatt_descriptors |
| |
| |
| class GattConnectedBaseTest(BluetoothBaseTest): |
| DEFAULT_TIMEOUT = 10 |
| |
| TEST_SERVICE_UUID = "3846D7A0-69C8-11E4-BA00-0002A5D5C51B" |
| READABLE_CHAR_UUID = "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8" |
| READABLE_DESC_UUID = "aa7edd5a-4d1d-4f0e-883a-d145616a1630" |
| WRITABLE_CHAR_UUID = "aa7edd5a-4d1d-4f0e-883a-d145616a1630" |
| WRITABLE_DESC_UUID = "76d5ed92-ca81-4edb-bb6b-9f019665fb32" |
| NOTIFIABLE_CHAR_UUID = "b2c83efa-34ca-11e6-ac61-9e71128cae77" |
| # CCC == Client Characteristic Configuration |
| CCC_DESC_UUID = "00002902-0000-1000-8000-00805f9b34fb" |
| |
| def __init__(self, controllers): |
| BluetoothBaseTest.__init__(self, controllers) |
| self.cen_ad = self.android_devices[0] |
| self.per_ad = self.android_devices[1] |
| |
| def setup_test(self): |
| super(GattConnectedBaseTest, self).setup_test() |
| |
| self.gatt_server_callback, self.gatt_server = self._setup_multiple_services( |
| ) |
| if not self.gatt_server_callback or not self.gatt_server: |
| raise AssertionError('Service setup failed') |
| |
| self.bluetooth_gatt, self.gatt_callback, self.adv_callback = ( |
| orchestrate_gatt_connection(self.cen_ad, self.per_ad)) |
| self.per_ad.droid.bleStopBleAdvertising(self.adv_callback) |
| |
| self.mtu = MtuSize.MIN |
| |
| if self.cen_ad.droid.gattClientDiscoverServices(self.bluetooth_gatt): |
| event = self._client_wait(GattEvent.GATT_SERV_DISC) |
| self.discovered_services_index = event['data']['ServicesIndex'] |
| services_count = self.cen_ad.droid.gattClientGetDiscoveredServicesCount( |
| self.discovered_services_index) |
| self.test_service_index = None |
| for i in range(services_count): |
| disc_service_uuid = ( |
| self.cen_ad.droid.gattClientGetDiscoveredServiceUuid( |
| self.discovered_services_index, i).upper()) |
| if disc_service_uuid == self.TEST_SERVICE_UUID: |
| self.test_service_index = i |
| break |
| |
| if not self.test_service_index: |
| print("Service not found") |
| return False |
| |
| connected_device_list = self.per_ad.droid.gattServerGetConnectedDevices( |
| self.gatt_server) |
| if len(connected_device_list) == 0: |
| self.log.info("No devices connected from peripheral.") |
| return False |
| |
| return True |
| |
| def teardown_test(self): |
| self.per_ad.droid.gattServerClearServices(self.gatt_server) |
| self.per_ad.droid.gattServerClose(self.gatt_server) |
| |
| del self.gatt_server_callback |
| del self.gatt_server |
| |
| self._orchestrate_gatt_disconnection(self.bluetooth_gatt, |
| self.gatt_callback) |
| |
| return super(GattConnectedBaseTest, self).teardown_test() |
| |
| def _server_wait(self, gatt_event): |
| return self._timed_pop(gatt_event, self.per_ad, |
| self.gatt_server_callback) |
| |
| def _client_wait(self, gatt_event): |
| return self._timed_pop(gatt_event, self.cen_ad, self.gatt_callback) |
| |
| def _timed_pop(self, gatt_event, droid, gatt_callback): |
| expected_event = gatt_event.value["evt"].format(gatt_callback) |
| try: |
| return droid.ed.pop_event(expected_event, self.DEFAULT_TIMEOUT) |
| except Empty as emp: |
| raise AssertionError(gatt_event.value["err"].format( |
| expected_event)) |
| |
| def _setup_characteristics_and_descriptors(self, droid): |
| characteristic_input = [ |
| { |
| 'uuid': self.WRITABLE_CHAR_UUID, |
| 'property': GattCharacteristic.PROPERTY_WRITE.value |
| | GattCharacteristic.PROPERTY_WRITE_NO_RESPONSE.value, |
| 'permission': GattCharacteristic.PERMISSION_WRITE.value |
| }, |
| { |
| 'uuid': self.READABLE_CHAR_UUID, |
| 'property': GattCharacteristic.PROPERTY_READ.value, |
| 'permission': GattCharacteristic.PERMISSION_READ.value |
| }, |
| { |
| 'uuid': self.NOTIFIABLE_CHAR_UUID, |
| 'property': GattCharacteristic.PROPERTY_NOTIFY.value |
| | GattCharacteristic.PROPERTY_INDICATE.value, |
| 'permission': GattCharacteristic.PERMISSION_READ.value |
| }, |
| ] |
| descriptor_input = [ |
| { |
| 'uuid': self.WRITABLE_DESC_UUID, |
| 'property': GattDescriptor.PERMISSION_READ.value |
| | GattCharacteristic.PERMISSION_WRITE.value, |
| }, { |
| 'uuid': self.READABLE_DESC_UUID, |
| 'property': GattDescriptor.PERMISSION_READ.value |
| | GattDescriptor.PERMISSION_WRITE.value, |
| }, { |
| 'uuid': self.CCC_DESC_UUID, |
| 'property': GattDescriptor.PERMISSION_READ.value |
| | GattDescriptor.PERMISSION_WRITE.value, |
| } |
| ] |
| characteristic_list = setup_gatt_characteristics(droid, |
| characteristic_input) |
| self.notifiable_char_index = characteristic_list[2] |
| descriptor_list = setup_gatt_descriptors(droid, descriptor_input) |
| return characteristic_list, descriptor_list |
| |
| def _orchestrate_gatt_disconnection(self, bluetooth_gatt, gatt_callback): |
| self.log.info("Disconnecting from peripheral device.") |
| test_result = disconnect_gatt_connection(self.cen_ad, bluetooth_gatt, |
| gatt_callback) |
| self.cen_ad.droid.gattClientClose(bluetooth_gatt) |
| if not test_result: |
| self.log.info("Failed to disconnect from peripheral device.") |
| return False |
| return True |
| |
| def _find_service_added_event(self, gatt_server_callback, uuid): |
| expected_event = GattCbStrings.SERV_ADDED.value.format( |
| gatt_server_callback) |
| try: |
| event = self.per_ad.ed.pop_event(expected_event, |
| self.DEFAULT_TIMEOUT) |
| except Empty: |
| self.log.error(GattCbErr.SERV_ADDED_ERR.value.format( |
| expected_event)) |
| return False |
| if event['data']['serviceUuid'].lower() != uuid.lower(): |
| self.log.error("Uuid mismatch. Found: {}, Expected {}.".format( |
| event['data']['serviceUuid'], uuid)) |
| return False |
| return True |
| |
| def _setup_multiple_services(self): |
| gatt_server_callback = ( |
| self.per_ad.droid.gattServerCreateGattServerCallback()) |
| gatt_server = self.per_ad.droid.gattServerOpenGattServer( |
| gatt_server_callback) |
| characteristic_list, descriptor_list = ( |
| self._setup_characteristics_and_descriptors(self.per_ad.droid)) |
| self.per_ad.droid.gattServerCharacteristicAddDescriptor( |
| characteristic_list[0], descriptor_list[0]) |
| self.per_ad.droid.gattServerCharacteristicAddDescriptor( |
| characteristic_list[1], descriptor_list[1]) |
| self.per_ad.droid.gattServerCharacteristicAddDescriptor( |
| characteristic_list[2], descriptor_list[2]) |
| gatt_service3 = self.per_ad.droid.gattServerCreateService( |
| self.TEST_SERVICE_UUID, GattService.SERVICE_TYPE_PRIMARY) |
| for characteristic in characteristic_list: |
| self.per_ad.droid.gattServerAddCharacteristicToService( |
| gatt_service3, characteristic) |
| self.per_ad.droid.gattServerAddService(gatt_server, gatt_service3) |
| result = self._find_service_added_event(gatt_server_callback, |
| self.TEST_SERVICE_UUID) |
| if not result: |
| return False, False |
| return gatt_server_callback, gatt_server |
| |
| def assertEqual(self, first, second, msg=None): |
| if not first == second: |
| if not msg: |
| raise AssertionError('%r != %r' % (first, second)) |
| else: |
| raise AssertionError(msg + ' %r != %r' % (first, second)) |