blob: 98211c6518068d768376a5c6ac6653dd12c8822a [file] [log] [blame]
# Copyright 2022 Google LLC
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
"""Pandora client interface for Avatar tests."""
import asyncio
import avatar.aio
import bumble
import bumble.device
import grpc
import grpc.aio
import logging
from avatar.metrics.interceptors import aio_interceptors
from avatar.metrics.interceptors import interceptors
from bumble import pandora as bumble_server
from bumble.hci import Address as BumbleAddress
from bumble.pandora.device import PandoraDevice as BumblePandoraDevice
from dataclasses import dataclass
from pandora import host_grpc
from pandora import host_grpc_aio
from pandora import security_grpc
from pandora import security_grpc_aio
from typing import Any, Dict, MutableMapping, Optional, Tuple, Union
class Address(bytes):
def __new__(cls, address: Union[bytes, str, BumbleAddress]) -> 'Address':
if type(address) is bytes:
address_bytes = address
elif type(address) is str:
address_bytes = bytes.fromhex(address.replace(':', ''))
elif isinstance(address, BumbleAddress):
address_bytes = bytes(reversed(bytes(address)))
raise ValueError('Invalid address format')
if len(address_bytes) != 6:
raise ValueError('Invalid address length')
return bytes.__new__(cls, address_bytes)
def __str__(self) -> str:
return ':'.join([f'{x:02X}' for x in self])
class PandoraClient:
"""Provides Pandora interface access to a device via gRPC."""
# public fields
name: str
grpc_target: str # Server address for the gRPC channel.
log: 'PandoraClientLoggerAdapter' # Logger adapter.
# private fields
_channel: grpc.Channel # Synchronous gRPC channel.
_address: Address # Bluetooth device address
_aio: Optional['PandoraClient.Aio'] # Asynchronous gRPC channel.
def __init__(self, grpc_target: str, name: str = '..') -> None:
"""Creates a PandoraClient.
Establishes a channel with the Pandora gRPC server.
grpc_target: Server address for the gRPC channel.
""" = name
self.grpc_target = grpc_target
self.log = PandoraClientLoggerAdapter(logging.getLogger(), {'client': self})
self._channel = grpc.intercept_channel(grpc.insecure_channel(grpc_target), *interceptors(self)) # type: ignore
self._address = Address(b'\x00\x00\x00\x00\x00\x00')
self._aio = None
def close(self) -> None:
"""Closes the gRPC channels."""
if self._aio:
def address(self) -> Address:
"""Returns the BD address."""
return self._address
def address(self, address: Union[bytes, str, BumbleAddress]) -> None:
"""Sets the BD address."""
self._address = Address(address)
async def reset(self) -> None:
"""Factory reset the device & read it's BD address."""
attempts, max_attempts = 1, 3
while True:
await, timeout=15.0)
# Factory reset stopped the server, close the client too.
assert self._aio
self._aio = None
# This call might fail if the server is unavailable.
self._address = Address(
(await, timeout=15.0)).address
except grpc.aio.AioRpcError as e:
if e.code() in (grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.DEADLINE_EXCEEDED):
if attempts <= max_attempts:
self.log.debug(f'Server unavailable, retry [{attempts}/{max_attempts}].')
attempts += 1
self.log.exception(f'Server still unavailable after {attempts} attempts, abort.')
raise e
def channel(self) -> grpc.Channel:
"""Returns the synchronous gRPC channel."""
_ = asyncio.get_running_loop()
return self._channel
raise RuntimeError('Trying to use the synchronous gRPC channel from asynchronous code.')
# Pandora interfaces
def host(self) -> host_grpc.Host:
"""Returns the Pandora Host gRPC interface."""
return host_grpc.Host(
def security(self) -> security_grpc.Security:
"""Returns the Pandora Security gRPC interface."""
return security_grpc.Security(
def security_storage(self) -> security_grpc.SecurityStorage:
"""Returns the Pandora SecurityStorage gRPC interface."""
return security_grpc.SecurityStorage(
class Aio:
channel: grpc.aio.Channel
def host(self) -> host_grpc_aio.Host:
"""Returns the Pandora Host gRPC interface."""
return host_grpc_aio.Host(
def security(self) -> security_grpc_aio.Security:
"""Returns the Pandora Security gRPC interface."""
return security_grpc_aio.Security(
def security_storage(self) -> security_grpc_aio.SecurityStorage:
"""Returns the Pandora SecurityStorage gRPC interface."""
return security_grpc_aio.SecurityStorage(
def aio(self) -> 'PandoraClient.Aio':
if not self._aio:
self._aio = PandoraClient.Aio(
grpc.aio.insecure_channel(self.grpc_target, interceptors=aio_interceptors(self))
return self._aio
class PandoraClientLoggerAdapter(logging.LoggerAdapter): # type: ignore
"""Formats logs from the PandoraClient."""
def process(self, msg: str, kwargs: MutableMapping[str, Any]) -> Tuple[str, MutableMapping[str, Any]]:
assert self.extra
client = self.extra['client']
assert isinstance(client, PandoraClient)
addr = ':'.join([f'{x:02X}' for x in client.address[4:]])
return (f'[{<8}:{addr}] {msg}', kwargs)
class BumblePandoraClient(PandoraClient):
"""Special Pandora client which also give access to a Bumble device instance."""
_bumble: BumblePandoraDevice # Bumble device wrapper.
_server_config: bumble_server.Config # Bumble server config.
def __init__(self, grpc_target: str, bumble: BumblePandoraDevice, server_config: bumble_server.Config) -> None:
super().__init__(grpc_target, 'bumble')
self._bumble = bumble
self._server_config = server_config
def server_config(self) -> bumble_server.Config:
return self._server_config
def config(self) -> Dict[str, Any]:
return self._bumble.config
def device(self) -> bumble.device.Device:
return self._bumble.device
def random_address(self) -> Address:
return Address(self.device.random_address)