blob: 050754375ad8fa9c79737d037cbde0129dcec1d5 [file] [log] [blame]
#/usr/bin/env python3.4
#
# Copyright (C) 2016 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""
This is base class for tests that exercises different GATT procedures between two connected devices.
Setup/Teardown methods take care of establishing connection, and doing GATT DB initialization/discovery.
"""
from queue import Empty
from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
from acts.test_utils.bt.GattEnum import GattCharacteristic
from acts.test_utils.bt.GattEnum import GattDescriptor
from acts.test_utils.bt.GattEnum import GattService
from acts.test_utils.bt.GattEnum import GattEvent
from acts.test_utils.bt.GattEnum import GattCbErr
from acts.test_utils.bt.GattEnum import GattCbStrings
from acts.test_utils.bt.GattEnum import MtuSize
from acts.test_utils.bt.bt_gatt_utils import disconnect_gatt_connection
from acts.test_utils.bt.bt_gatt_utils import orchestrate_gatt_connection
from acts.test_utils.bt.bt_gatt_utils import setup_gatt_characteristics
from acts.test_utils.bt.bt_gatt_utils import setup_gatt_descriptors
class GattConnectedBaseTest(BluetoothBaseTest):
DEFAULT_TIMEOUT = 10
TEST_SERVICE_UUID = "3846D7A0-69C8-11E4-BA00-0002A5D5C51B"
READABLE_CHAR_UUID = "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8"
READABLE_DESC_UUID = "aa7edd5a-4d1d-4f0e-883a-d145616a1630"
WRITABLE_CHAR_UUID = "aa7edd5a-4d1d-4f0e-883a-d145616a1630"
WRITABLE_DESC_UUID = "76d5ed92-ca81-4edb-bb6b-9f019665fb32"
NOTIFIABLE_CHAR_UUID = "b2c83efa-34ca-11e6-ac61-9e71128cae77"
# CCC == Client Characteristic Configuration
CCC_DESC_UUID = "00002902-0000-1000-8000-00805f9b34fb"
def __init__(self, controllers):
BluetoothBaseTest.__init__(self, controllers)
self.cen_ad = self.android_devices[0]
self.per_ad = self.android_devices[1]
def setup_test(self):
super(GattConnectedBaseTest, self).setup_test()
self.gatt_server_callback, self.gatt_server = self._setup_multiple_services(
)
if not self.gatt_server_callback or not self.gatt_server:
raise AssertionError('Service setup failed')
self.bluetooth_gatt, self.gatt_callback, self.adv_callback = (
orchestrate_gatt_connection(self.cen_ad, self.per_ad))
self.per_ad.droid.bleStopBleAdvertising(self.adv_callback)
self.mtu = MtuSize.MIN
if self.cen_ad.droid.gattClientDiscoverServices(self.bluetooth_gatt):
event = self._client_wait(GattEvent.GATT_SERV_DISC)
self.discovered_services_index = event['data']['ServicesIndex']
services_count = self.cen_ad.droid.gattClientGetDiscoveredServicesCount(
self.discovered_services_index)
self.test_service_index = None
for i in range(services_count):
disc_service_uuid = (
self.cen_ad.droid.gattClientGetDiscoveredServiceUuid(
self.discovered_services_index, i).upper())
if disc_service_uuid == self.TEST_SERVICE_UUID:
self.test_service_index = i
break
if not self.test_service_index:
print("Service not found")
return False
connected_device_list = self.per_ad.droid.gattServerGetConnectedDevices(
self.gatt_server)
if len(connected_device_list) == 0:
self.log.info("No devices connected from peripheral.")
return False
return True
def teardown_test(self):
self.per_ad.droid.gattServerClearServices(self.gatt_server)
self.per_ad.droid.gattServerClose(self.gatt_server)
del self.gatt_server_callback
del self.gatt_server
self._orchestrate_gatt_disconnection(self.bluetooth_gatt,
self.gatt_callback)
return super(GattConnectedBaseTest, self).teardown_test()
def _server_wait(self, gatt_event):
return self._timed_pop(gatt_event, self.per_ad,
self.gatt_server_callback)
def _client_wait(self, gatt_event):
return self._timed_pop(gatt_event, self.cen_ad, self.gatt_callback)
def _timed_pop(self, gatt_event, droid, gatt_callback):
expected_event = gatt_event.value["evt"].format(gatt_callback)
try:
return droid.ed.pop_event(expected_event, self.DEFAULT_TIMEOUT)
except Empty as emp:
raise AssertionError(gatt_event.value["err"].format(
expected_event))
def _setup_characteristics_and_descriptors(self, droid):
characteristic_input = [
{
'uuid': self.WRITABLE_CHAR_UUID,
'property': GattCharacteristic.PROPERTY_WRITE.value
| GattCharacteristic.PROPERTY_WRITE_NO_RESPONSE.value,
'permission': GattCharacteristic.PERMISSION_WRITE.value
},
{
'uuid': self.READABLE_CHAR_UUID,
'property': GattCharacteristic.PROPERTY_READ.value,
'permission': GattCharacteristic.PERMISSION_READ.value
},
{
'uuid': self.NOTIFIABLE_CHAR_UUID,
'property': GattCharacteristic.PROPERTY_NOTIFY.value
| GattCharacteristic.PROPERTY_INDICATE.value,
'permission': GattCharacteristic.PERMISSION_READ.value
},
]
descriptor_input = [
{
'uuid': self.WRITABLE_DESC_UUID,
'property': GattDescriptor.PERMISSION_READ.value
| GattCharacteristic.PERMISSION_WRITE.value,
}, {
'uuid': self.READABLE_DESC_UUID,
'property': GattDescriptor.PERMISSION_READ.value
| GattDescriptor.PERMISSION_WRITE.value,
}, {
'uuid': self.CCC_DESC_UUID,
'property': GattDescriptor.PERMISSION_READ.value
| GattDescriptor.PERMISSION_WRITE.value,
}
]
characteristic_list = setup_gatt_characteristics(droid,
characteristic_input)
self.notifiable_char_index = characteristic_list[2]
descriptor_list = setup_gatt_descriptors(droid, descriptor_input)
return characteristic_list, descriptor_list
def _orchestrate_gatt_disconnection(self, bluetooth_gatt, gatt_callback):
self.log.info("Disconnecting from peripheral device.")
test_result = disconnect_gatt_connection(self.cen_ad, bluetooth_gatt,
gatt_callback)
self.cen_ad.droid.gattClientClose(bluetooth_gatt)
if not test_result:
self.log.info("Failed to disconnect from peripheral device.")
return False
return True
def _find_service_added_event(self, gatt_server_callback, uuid):
expected_event = GattCbStrings.SERV_ADDED.value.format(
gatt_server_callback)
try:
event = self.per_ad.ed.pop_event(expected_event,
self.DEFAULT_TIMEOUT)
except Empty:
self.log.error(GattCbErr.SERV_ADDED_ERR.value.format(
expected_event))
return False
if event['data']['serviceUuid'].lower() != uuid.lower():
self.log.error("Uuid mismatch. Found: {}, Expected {}.".format(
event['data']['serviceUuid'], uuid))
return False
return True
def _setup_multiple_services(self):
gatt_server_callback = (
self.per_ad.droid.gattServerCreateGattServerCallback())
gatt_server = self.per_ad.droid.gattServerOpenGattServer(
gatt_server_callback)
characteristic_list, descriptor_list = (
self._setup_characteristics_and_descriptors(self.per_ad.droid))
self.per_ad.droid.gattServerCharacteristicAddDescriptor(
characteristic_list[0], descriptor_list[0])
self.per_ad.droid.gattServerCharacteristicAddDescriptor(
characteristic_list[1], descriptor_list[1])
self.per_ad.droid.gattServerCharacteristicAddDescriptor(
characteristic_list[2], descriptor_list[2])
gatt_service3 = self.per_ad.droid.gattServerCreateService(
self.TEST_SERVICE_UUID, GattService.SERVICE_TYPE_PRIMARY)
for characteristic in characteristic_list:
self.per_ad.droid.gattServerAddCharacteristicToService(
gatt_service3, characteristic)
self.per_ad.droid.gattServerAddService(gatt_server, gatt_service3)
result = self._find_service_added_event(gatt_server_callback,
self.TEST_SERVICE_UUID)
if not result:
return False, False
return gatt_server_callback, gatt_server
def assertEqual(self, first, second, msg=None):
if not first == second:
if not msg:
raise AssertionError('%r != %r' % (first, second))
else:
raise AssertionError(msg + ' %r != %r' % (first, second))