blob: e36499bf358b067ddcaefe796eca41923d0877a6 [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2019 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import logging
from blueberry.tests.gd.cert.gd_device import GdHostOnlyDevice
from blueberry.tests.gd.cert.gd_device import MOBLY_CONTROLLER_CONFIG_NAME
from blueberry.tests.gd.cert.os_utils import get_gd_root
from blueberry.tests.topshim.lib.async_closable import AsyncClosable
from blueberry.tests.topshim.lib.async_closable import asyncSafeClose
from blueberry.tests.topshim.lib.gatt_client import GattClient
from blueberry.tests.topshim.lib.security_client import SecurityClient
def create(configs):
return get_instances_with_configs(configs)
def destroy(devices):
pass
def replace_vars_for_topshim(string, config):
serial_number = config.get("serial_number")
if serial_number is None:
serial_number = ""
rootcanal_port = config.get("rootcanal_port")
if rootcanal_port is None:
rootcanal_port = ""
if serial_number == "DUT" or serial_number == "CERT":
raise Exception("Did you forget to configure the serial number?")
# We run bt_topshim_facade instead of bluetooth_stack_with_facade
return string.replace("$GD_ROOT", get_gd_root()) \
.replace("bluetooth_stack_with_facade", "bt_topshim_facade") \
.replace("$(grpc_port)", config.get("grpc_port")) \
.replace("$(grpc_root_server_port)", config.get("grpc_root_server_port")) \
.replace("$(rootcanal_port)", rootcanal_port) \
.replace("$(signal_port)", config.get("signal_port")) \
.replace("$(serial_number)", serial_number)
def get_instances_with_configs(configs):
logging.info(configs)
devices = []
for config in configs:
resolved_cmd = []
for arg in config["cmd"]:
logging.debug(arg)
resolved_cmd.append(replace_vars_for_topshim(arg, config))
verbose_mode = bool(config.get('verbose_mode', False))
device = GdHostOnlyDevice(config["grpc_port"], "-1", config["signal_port"], resolved_cmd, config["label"],
MOBLY_CONTROLLER_CONFIG_NAME, config["name"], verbose_mode)
device.setup()
devices.append(device)
return devices
class TopshimDevice(AsyncClosable):
__adapter = None
__gatt = None
__security = None
async def __le_rand_wrapper(self, async_fn):
await async_fn
# await self.__adapter.le_rand()
def __post(self, async_fn):
asyncio.get_event_loop().run_until_complete(self.__le_rand_wrapper(async_fn))
def __init__(self, adapter, grpc_port):
self.__adapter = adapter
self.__gatt = GattClient(port=grpc_port)
self.__security = SecurityClient(adapter, port=grpc_port)
async def close(self):
"""
Implement abstract method to close out any streams or jobs.
"""
await asyncSafeClose(self.__adapter)
await asyncSafeClose(self.__gatt)
await asyncSafeClose(self.__security)
def enable_page_scan(self):
self.__post(self.__adapter.enable_page_scan())
def disable_page_scan(self):
self.__post(self.__adapter.disable_page_scan())
def start_advertising(self):
"""
Starts BLE Advertiser for the stack.
Assumes stack defaults. Which in our case would be RRPA
"""
self.__post(self.__gatt.advertising_enable())
def stop_advertising(self):
"""
Stop BLE Advertiser.
"""
self.__post(self.__gatt.advertising_disable())
def start_scanning(self):
pass
def stop_scanning(self):
pass
def clear_event_mask(self):
self.__post(self.__adapter.clear_event_mask())
def clear_event_filter(self):
self.__post(self.__adapter.clear_event_filter())
def clear_filter_accept_list(self):
self.__post(self.__adapter.clear_filter_accept_list())
def disconnect_all_acls(self):
self.__post(self.__adapter.disconnect_all_acls())
def allow_wake_by_hid(self):
self.__post(self.__adapter.allow_wake_by_hid())
def set_default_event_mask_except(self, mask, le_mask):
self.__post(self.__adapter.set_default_event_mask(mask, le_mask))
def set_event_filter_inquiry_result_all_devices(self):
self.__post(self.__adapter.set_event_filter_inquiry_result_all_devices())
def set_event_filter_connection_setup_all_devices(self):
self.__post(self.__adapter.set_event_filter_connection_setup_all_devices())
def le_rand(self):
self.__post(self.__adapter.le_rand())
def remove_bonded_device(self, address):
"""
Removes a bonding entry for a given address.
"""
self.__post(self.__security.remove_bond(address))