| # Copyright 2022 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| |
| """Pandora client interface for Avatar tests.""" |
| |
| import asyncio |
| import avatar.aio |
| import bumble |
| import bumble.device |
| import grpc |
| import grpc.aio |
| import logging |
| |
| from avatar.bumble_device import BumbleDevice |
| from bumble.hci import Address as BumbleAddress |
| from dataclasses import dataclass |
| from pandora import host_grpc, host_grpc_aio, security_grpc, security_grpc_aio |
| from typing import Any, Dict, MutableMapping, Optional, Tuple, Union |
| |
| |
| class Address(bytes): |
| def __new__(cls, address: Union[bytes, str, BumbleAddress]) -> 'Address': |
| if type(address) is bytes: |
| address_bytes = address |
| elif type(address) is str: |
| address_bytes = bytes.fromhex(address.replace(':', '')) |
| elif isinstance(address, BumbleAddress): |
| address_bytes = bytes(reversed(bytes(address))) |
| else: |
| raise ValueError('Invalid address format') |
| |
| if len(address_bytes) != 6: |
| raise ValueError('Invalid address length') |
| |
| return bytes.__new__(cls, address_bytes) |
| |
| def __str__(self) -> str: |
| return ':'.join([f'{x:02X}' for x in self]) |
| |
| |
| class PandoraClient: |
| """Provides Pandora interface access to a device via gRPC.""" |
| |
| # public fields |
| grpc_target: str # Server address for the gRPC channel. |
| log: 'PandoraClientLoggerAdapter' # Logger adapter. |
| |
| # private fields |
| _channel: grpc.Channel # Synchronous gRPC channel. |
| _address: Address # Bluetooth device address |
| _aio: Optional['PandoraClient.Aio'] # Asynchronous gRPC channel. |
| |
| def __init__(self, grpc_target: str, name: str = '..') -> None: |
| """Creates a PandoraClient. |
| |
| Establishes a channel with the Pandora gRPC server. |
| |
| Args: |
| grpc_target: Server address for the gRPC channel. |
| """ |
| self.grpc_target = grpc_target |
| self.log = PandoraClientLoggerAdapter(logging.getLogger(), {'client': self, 'client_name': name}) |
| self._channel = grpc.insecure_channel(grpc_target) # type: ignore |
| self._address = Address(b'\x00\x00\x00\x00\x00\x00') |
| self._aio = None |
| |
| def close(self) -> None: |
| """Closes the gRPC channels.""" |
| self._channel.close() |
| if self._aio: |
| avatar.aio.run_until_complete(self._aio.channel.close()) |
| |
| @property |
| def address(self) -> Address: |
| """Returns the BD address.""" |
| return self._address |
| |
| @address.setter |
| def address(self, address: Union[bytes, str, BumbleAddress]) -> None: |
| """Sets the BD address.""" |
| self._address = Address(address) |
| |
| async def reset(self) -> None: |
| """Factory reset the device & read it's BD address.""" |
| attempts, max_attempts = 1, 3 |
| while True: |
| try: |
| await self.aio.host.FactoryReset(wait_for_ready=True, timeout=15.0) |
| |
| # Factory reset stopped the server, close the client too. |
| assert self._aio |
| await self._aio.channel.close() |
| self._aio = None |
| |
| # This call might fail if the server is unavailable. |
| self._address = Address( |
| (await self.aio.host.ReadLocalAddress(wait_for_ready=True, timeout=15.0)).address |
| ) |
| return |
| except grpc.aio.AioRpcError as e: |
| if e.code() in (grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.DEADLINE_EXCEEDED): |
| if attempts <= max_attempts: |
| self.log.debug(f'Server unavailable, retry [{attempts}/{max_attempts}].') |
| attempts += 1 |
| continue |
| self.log.exception(f'Server still unavailable after {attempts} attempts, abort.') |
| raise e |
| |
| @property |
| def channel(self) -> grpc.Channel: |
| """Returns the synchronous gRPC channel.""" |
| try: |
| _ = asyncio.get_running_loop() |
| except: |
| return self._channel |
| raise RuntimeError('Trying to use the synchronous gRPC channel from asynchronous code.') |
| |
| # Pandora interfaces |
| |
| @property |
| def host(self) -> host_grpc.Host: |
| """Returns the Pandora Host gRPC interface.""" |
| return host_grpc.Host(self.channel) |
| |
| @property |
| def security(self) -> security_grpc.Security: |
| """Returns the Pandora Security gRPC interface.""" |
| return security_grpc.Security(self.channel) |
| |
| @property |
| def security_storage(self) -> security_grpc.SecurityStorage: |
| """Returns the Pandora SecurityStorage gRPC interface.""" |
| return security_grpc.SecurityStorage(self.channel) |
| |
| @dataclass |
| class Aio: |
| channel: grpc.aio.Channel |
| |
| @property |
| def host(self) -> host_grpc_aio.Host: |
| """Returns the Pandora Host gRPC interface.""" |
| return host_grpc_aio.Host(self.channel) |
| |
| @property |
| def security(self) -> security_grpc_aio.Security: |
| """Returns the Pandora Security gRPC interface.""" |
| return security_grpc_aio.Security(self.channel) |
| |
| @property |
| def security_storage(self) -> security_grpc_aio.SecurityStorage: |
| """Returns the Pandora SecurityStorage gRPC interface.""" |
| return security_grpc_aio.SecurityStorage(self.channel) |
| |
| @property |
| def aio(self) -> 'PandoraClient.Aio': |
| if not self._aio: |
| self._aio = PandoraClient.Aio(grpc.aio.insecure_channel(self.grpc_target)) |
| return self._aio |
| |
| |
| class PandoraClientLoggerAdapter(logging.LoggerAdapter): # type: ignore |
| """Formats logs from the PandoraClient.""" |
| |
| def process(self, msg: str, kwargs: MutableMapping[str, Any]) -> Tuple[str, MutableMapping[str, Any]]: |
| assert self.extra |
| client = self.extra['client'] |
| assert isinstance(client, PandoraClient) |
| client_name = self.extra.get('client_name', client.__class__.__name__) |
| addr = ':'.join([f'{x:02X}' for x in client.address[4:]]) |
| return (f'[{client_name}:{addr}] {msg}', kwargs) |
| |
| |
| class BumblePandoraClient(PandoraClient): |
| """Special Pandora client which also give access to a Bumble device instance.""" |
| |
| _bumble: BumbleDevice # Bumble device wrapper. |
| |
| def __init__(self, grpc_target: str, bumble: BumbleDevice) -> None: |
| super().__init__(grpc_target, 'bumble') |
| self._bumble = bumble |
| |
| @property |
| def config(self) -> Dict[str, Any]: |
| return self._bumble.config |
| |
| @property |
| def device(self) -> bumble.device.Device: |
| return self._bumble.device |
| |
| @property |
| def random_address(self) -> Address: |
| return Address(self.device.random_address) |