| # Copyright 2022 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import contextlib |
| import functools |
| import grpc |
| import inspect |
| import logging |
| |
| from bumble.device import Device |
| from bumble.hci import Address |
| from google.protobuf.message import Message # pytype: disable=pyi-error |
| from typing import Any, Generator, MutableMapping, Optional, Tuple |
| |
| ADDRESS_TYPES: dict[str, int] = { |
| "public": Address.PUBLIC_DEVICE_ADDRESS, |
| "random": Address.RANDOM_DEVICE_ADDRESS, |
| "public_identity": Address.PUBLIC_IDENTITY_ADDRESS, |
| "random_static_identity": Address.RANDOM_IDENTITY_ADDRESS, |
| } |
| |
| |
| def address_from_request(request: Message, field: Optional[str]) -> Address: |
| if field is None: |
| return Address.ANY |
| return Address(bytes(reversed(getattr(request, field))), ADDRESS_TYPES[field]) |
| |
| |
| class BumbleServerLoggerAdapter(logging.LoggerAdapter): # type: ignore |
| """Formats logs from the PandoraClient.""" |
| |
| def process(self, msg: str, kwargs: MutableMapping[str, Any]) -> Tuple[str, MutableMapping[str, Any]]: |
| assert self.extra |
| service_name = self.extra['service_name'] |
| assert isinstance(service_name, str) |
| device = self.extra['device'] |
| assert isinstance(device, Device) |
| addr_bytes = bytes(reversed(bytes(device.public_address))) # pytype: disable=attribute-error |
| addr = ':'.join([f'{x:02X}' for x in addr_bytes[4:]]) |
| return (f'[bumble.{service_name}:{addr}] {msg}', kwargs) |
| |
| |
| @contextlib.contextmanager |
| def exception_to_rpc_error(context: grpc.ServicerContext) -> Generator[None, None, None]: |
| try: |
| yield None |
| except NotImplementedError as e: |
| context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore |
| context.set_details(str(e)) # type: ignore |
| except ValueError as e: |
| context.set_code(grpc.StatusCode.INVALID_ARGUMENT) # type: ignore |
| context.set_details(str(e)) # type: ignore |
| except RuntimeError as e: |
| context.set_code(grpc.StatusCode.ABORTED) # type: ignore |
| context.set_details(str(e)) # type: ignore |
| |
| |
| # Decorate an RPC servicer method with a wrapper that transform exceptions to gRPC errors. |
| def rpc(func: Any) -> Any: |
| @functools.wraps(func) |
| async def asyncgen_wrapper(self: Any, request: Any, context: grpc.ServicerContext) -> Any: |
| with exception_to_rpc_error(context): |
| async for v in func(self, request, context): |
| yield v |
| |
| @functools.wraps(func) |
| async def async_wrapper(self: Any, request: Any, context: grpc.ServicerContext) -> Any: |
| with exception_to_rpc_error(context): |
| return await func(self, request, context) |
| |
| @functools.wraps(func) |
| def gen_wrapper(self: Any, request: Any, context: grpc.ServicerContext) -> Any: |
| with exception_to_rpc_error(context): |
| for v in func(self, request, context): |
| yield v |
| |
| @functools.wraps(func) |
| def wrapper(self: Any, request: Any, context: grpc.ServicerContext) -> Any: |
| with exception_to_rpc_error(context): |
| return func(self, request, context) |
| |
| if inspect.isasyncgenfunction(func): |
| return asyncgen_wrapper |
| |
| if inspect.iscoroutinefunction(func): |
| return async_wrapper |
| |
| if inspect.isgenerator(func): |
| return gen_wrapper |
| |
| return wrapper |