blob: 904ca26e3c54c955b6085383084636ec528436f8 [file] [log] [blame]
#/usr/bin/env python3.4
#
# Copyright (C) 2016 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import logging
from acts.test_utils.bt.bt_test_utils import get_mac_address_of_generic_advertisement
from acts.test_utils.bt.GattEnum import GattCbErr
from acts.test_utils.bt.GattEnum import GattCbStrings
from acts.test_utils.bt.GattEnum import GattConnectionState
from acts.test_utils.bt.GattEnum import GattCharacteristic
from acts.test_utils.bt.GattEnum import GattDescriptor
from acts.test_utils.bt.GattEnum import GattService
from acts.test_utils.bt.GattEnum import GattConnectionPriority
import pprint
from queue import Empty
from contextlib import suppress
default_timeout = 10
log = logging
def setup_gatt_connection(cen_ad, mac_address, autoconnect):
test_result = True
gatt_callback = cen_ad.droid.gattCreateGattCallback()
log.info("Gatt Connect to mac address {}.".format(mac_address))
bluetooth_gatt = cen_ad.droid.gattClientConnectGatt(
gatt_callback, mac_address, autoconnect)
expected_event = GattCbStrings.GATT_CONN_CHANGE.value.format(gatt_callback)
try:
event = cen_ad.ed.pop_event(expected_event, default_timeout)
except Empty:
log.error(GattCbErr.GATT_CONN_CHANGE_ERR.value.format(expected_event))
test_result = False
return test_result, bluetooth_gatt, gatt_callback
if event['data']['State'] != GattConnectionState.STATE_CONNECTED.value:
log.info("Could not establish a connection to peripheral. Event "
"Details:".format(pprint.pformat(event)))
test_result = False
return test_result, bluetooth_gatt, gatt_callback
def disconnect_gatt_connection(cen_ad, bluetooth_gatt, gatt_callback):
cen_ad.droid.gattClientDisconnect(bluetooth_gatt)
expected_event = GattCbStrings.GATT_CONN_CHANGE.value.format(gatt_callback)
try:
event = cen_ad.ed.pop_event(expected_event, default_timeout)
except Empty:
log.error(GattCbErr.GATT_CONN_CHANGE_ERR.value.format(expected_event))
return False
if event['data']['State'] != GattConnectionState.STATE_DISCONNECTED.value:
return False
return True
def orchestrate_gatt_connection(cen_ad,
per_ad,
le=True,
mac_address=None,
autoconnect=False):
adv_callback = None
if mac_address is None:
if le:
mac_address, adv_callback = (
get_mac_address_of_generic_advertisement(cen_ad, per_ad))
else:
mac_address = per_ad.droid.bluetoothGetLocalAddress()
adv_callback = None
test_result, bluetooth_gatt, gatt_callback = setup_gatt_connection(
cen_ad, mac_address, autoconnect)
if not test_result:
log.error("Could not connect to peripheral.")
return False, None, None
return bluetooth_gatt, gatt_callback, adv_callback
def run_continuous_write_descriptor(cen_droid, cen_ed, per_droid, per_ed,
gatt_server, gatt_server_callback,
bluetooth_gatt, services_count,
discovered_services_index):
log.info("Starting continuous write")
bt_device_id = 0
status = 1
offset = 1
test_value = "1,2,3,4,5,6,7"
test_value_return = "1,2,3"
with suppress(Exception):
for x in range(100000):
for i in range(services_count):
characteristic_uuids = (
cen_droid.gattClientGetDiscoveredCharacteristicUuids(
discovered_services_index, i))
log.info(characteristic_uuids)
for characteristic in characteristic_uuids:
descriptor_uuids = (
cen_droid.gattClientGetDiscoveredDescriptorUuids(
discovered_services_index, i, characteristic))
log.info(descriptor_uuids)
for descriptor in descriptor_uuids:
log.info("descriptor to be written {}".format(
descriptor))
cen_droid.gattClientDescriptorSetValue(
bluetooth_gatt, discovered_services_index, i,
characteristic, descriptor, test_value)
cen_droid.gattClientWriteDescriptor(
bluetooth_gatt, discovered_services_index, i,
characteristic, descriptor)
expected_event = GattCbStrings.DESC_WRITE_REQ.value.format(
gatt_server_callback)
try:
event = per_ed.pop_event(expected_event,
default_timeout)
except Empty:
log.error(
GattCbErr.DESC_WRITE_REQ_ERR.value.format(
expected_event))
return False
request_id = event['data']['requestId']
found_value = event['data']['value']
if found_value != test_value:
log.error(
"Values didn't match. Found: {}, Expected: "
"{}".format(found_value, test_value))
per_droid.gattServerSendResponse(
gatt_server, bt_device_id, request_id, status,
offset, test_value_return)
expected_event = GattCbStrings.DESC_WRITE.value.format(
bluetooth_gatt)
try:
cen_ed.pop_event(expected_event, default_timeout)
except Empty:
log.error(GattCbErr.DESC_WRITE_ERR.value.format(
expected_event))
return False
def setup_characteristics_and_descriptors(droid):
characteristic_input = [
{
'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630",
'property': GattCharacteristic.PROPERTY_WRITE.value
| GattCharacteristic.PROPERTY_WRITE_NO_RESPONSE.value,
'permission': GattCharacteristic.PROPERTY_WRITE.value
},
{
'uuid': "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8",
'property': GattCharacteristic.PROPERTY_NOTIFY.value
| GattCharacteristic.PROPERTY_READ.value,
'permission': GattCharacteristic.PERMISSION_READ.value
},
{
'uuid': "6774191f-6ec3-4aa2-b8a8-cf830e41fda6",
'property': GattCharacteristic.PROPERTY_NOTIFY.value
| GattCharacteristic.PROPERTY_READ.value,
'permission': GattCharacteristic.PERMISSION_READ.value
},
]
descriptor_input = [
{
'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630",
'property': GattDescriptor.PERMISSION_READ.value
| GattDescriptor.PERMISSION_WRITE.value,
}, {
'uuid': "76d5ed92-ca81-4edb-bb6b-9f019665fb32",
'property': GattDescriptor.PERMISSION_READ.value
| GattCharacteristic.PERMISSION_WRITE.value,
}
]
characteristic_list = setup_gatt_characteristics(droid,
characteristic_input)
descriptor_list = setup_gatt_descriptors(droid, descriptor_input)
return characteristic_list, descriptor_list
def setup_multiple_services(per_ad):
per_droid, per_ed = per_ad.droid, per_ad.ed
gatt_server_callback = per_droid.gattServerCreateGattServerCallback()
gatt_server = per_droid.gattServerOpenGattServer(gatt_server_callback)
characteristic_list, descriptor_list = (
setup_characteristics_and_descriptors(per_droid))
per_droid.gattServerCharacteristicAddDescriptor(characteristic_list[1],
descriptor_list[0])
per_droid.gattServerCharacteristicAddDescriptor(characteristic_list[2],
descriptor_list[1])
gattService = per_droid.gattServerCreateService(
"00000000-0000-1000-8000-00805f9b34fb",
GattService.SERVICE_TYPE_PRIMARY.value)
gattService2 = per_droid.gattServerCreateService(
"FFFFFFFF-0000-1000-8000-00805f9b34fb",
GattService.SERVICE_TYPE_PRIMARY.value)
gattService3 = per_droid.gattServerCreateService(
"3846D7A0-69C8-11E4-BA00-0002A5D5C51B",
GattService.SERVICE_TYPE_PRIMARY.value)
for characteristic in characteristic_list:
per_droid.gattServerAddCharacteristicToService(gattService,
characteristic)
per_droid.gattServerAddService(gatt_server, gattService)
expected_event = GattCbStrings.SERV_ADDED.value.format(
gatt_server_callback)
try:
per_ed.pop_event(expected_event, default_timeout)
except Empty:
log.error(GattCbErr.SERV_ADDED_ERR.value.format(expected_event))
return False
for characteristic in characteristic_list:
per_droid.gattServerAddCharacteristicToService(gattService2,
characteristic)
per_droid.gattServerAddService(gatt_server, gattService2)
try:
per_ed.pop_event(expected_event, default_timeout)
except Empty:
log.error(GattCbErr.SERV_ADDED_ERR.value.format(expected_event))
return False
for characteristic in characteristic_list:
per_droid.gattServerAddCharacteristicToService(gattService3,
characteristic)
per_droid.gattServerAddService(gatt_server, gattService3)
try:
per_ed.pop_event(expected_event, default_timeout)
except Empty:
log.error(GattCbErr.SERV_ADDED_ERR.value.format(expected_event))
return False
return gatt_server_callback, gatt_server
def setup_characteristics_and_descriptors(droid):
characteristic_input = [
{
'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630",
'property': GattCharacteristic.PROPERTY_WRITE.value
| GattCharacteristic.PROPERTY_WRITE_NO_RESPONSE.value,
'permission': GattCharacteristic.PROPERTY_WRITE.value
},
{
'uuid': "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8",
'property': GattCharacteristic.PROPERTY_NOTIFY.value
| GattCharacteristic.PROPERTY_READ.value,
'permission': GattCharacteristic.PERMISSION_READ.value
},
{
'uuid': "6774191f-6ec3-4aa2-b8a8-cf830e41fda6",
'property': GattCharacteristic.PROPERTY_NOTIFY.value
| GattCharacteristic.PROPERTY_READ.value,
'permission': GattCharacteristic.PERMISSION_READ.value
},
]
descriptor_input = [
{
'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630",
'property': GattDescriptor.PERMISSION_READ.value
| GattDescriptor.PERMISSION_WRITE.value,
}, {
'uuid': "76d5ed92-ca81-4edb-bb6b-9f019665fb32",
'property': GattDescriptor.PERMISSION_READ.value
| GattCharacteristic.PERMISSION_WRITE.value,
}
]
characteristic_list = setup_gatt_characteristics(droid,
characteristic_input)
descriptor_list = setup_gatt_descriptors(droid, descriptor_input)
return characteristic_list, descriptor_list
def setup_gatt_characteristics(droid, input):
characteristic_list = []
for item in input:
index = droid.gattServerCreateBluetoothGattCharacteristic(
item['uuid'], item['property'], item['permission'])
characteristic_list.append(index)
return characteristic_list
def setup_gatt_descriptors(droid, input):
descriptor_list = []
for item in input:
index = droid.gattServerCreateBluetoothGattDescriptor(
item['uuid'],
item['property'], )
descriptor_list.append(index)
log.info("setup descriptor list: {}".format(descriptor_list))
return descriptor_list