| # Copyright 2024 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """RFCOMM grpc interface.""" |
| |
| import asyncio |
| import logging |
| import os |
| import socket as socket_module |
| import uuid as uuid_module |
| |
| from floss.pandora.floss import floss_enums |
| from floss.pandora.floss import socket_manager |
| from floss.pandora.floss import utils |
| from floss.pandora.server import bluetooth as bluetooth_module |
| from google.protobuf import empty_pb2 |
| import grpc |
| from pandora import rfcomm_grpc_aio |
| from pandora import rfcomm_pb2 |
| |
| |
| class RFCOMMService(rfcomm_grpc_aio.RFCOMMServicer): |
| """Service to trigger Bluetooth RFCOMM procedures. |
| |
| This class implements the Pandora bluetooth test interfaces, |
| where the meta class definition is automatically generated by the protobuf. |
| The interface definition can be found in: |
| https://cs.android.com/android/platform/superproject/main/+/main:packages/modules/Bluetooth/pandora/interfaces/pandora/rfcomm.proto |
| """ |
| |
| # Size of the buffer for data transactions. |
| BUFFER_SIZE = 512 |
| |
| def __init__(self, bluetooth: bluetooth_module.Bluetooth): |
| self.bluetooth = bluetooth |
| |
| # Used by new_stream_id() to generate IDs for the RPC client to specify the stream. |
| self.current_stream_id = 0x12FC0 |
| |
| # key = stream_id, val = stream |
| self.streams = dict() |
| |
| def new_stream_id(self) -> int: |
| id = self.current_stream_id |
| self.current_stream_id += 1 |
| return id |
| |
| async def ConnectToServer(self, request: rfcomm_pb2.ConnectionRequest, |
| context: grpc.ServicerContext) -> rfcomm_pb2.ConnectionResponse: |
| |
| class CreateRFCOMMObserver(socket_manager.SocketManagerCallbacks): |
| """Observer to observe the created RFCOMM connection state.""" |
| |
| def __init__(self, task): |
| self.task = task |
| |
| @utils.glib_callback() |
| def on_outgoing_connection_result(self, |
| connecting_id, |
| result, |
| socket, |
| *, |
| dbus_unix_fd_list=None): |
| if connecting_id != self.task['connecting_id']: |
| return |
| |
| future = self.task['create_rfcomm_channel'] |
| if result is None or floss_enums.BtStatus(result) != floss_enums.BtStatus.SUCCESS: |
| logging.error( |
| 'Failed to create the RFCOMM channel with connecting_id: %s. Status: %s', |
| connecting_id, result) |
| future.get_loop().call_soon_threadsafe(future.set_result, None) |
| return |
| |
| if not socket: |
| future.get_loop().call_soon_threadsafe(future.set_result, None) |
| return |
| |
| optional_fd = socket['optional_value']['fd'] |
| if not optional_fd: |
| future.get_loop().call_soon_threadsafe(future.set_result, None) |
| return |
| |
| if not dbus_unix_fd_list or dbus_unix_fd_list.get_length() < 1: |
| logging.error('on_outgoing_connection_result: Empty fd list') |
| future.get_loop().call_soon_threadsafe(future.set_result, None) |
| return |
| |
| fd_handle = optional_fd['optional_value'] |
| if fd_handle > dbus_unix_fd_list.get_length(): |
| logging.error('on_outgoing_connection_result: Invalid fd handle') |
| future.get_loop().call_soon_threadsafe(future.set_result, None) |
| return |
| |
| fd = dbus_unix_fd_list.get(fd_handle) |
| fd_dup = os.dup(fd) |
| future.get_loop().call_soon_threadsafe(future.set_result, fd_dup) |
| |
| address = utils.address_from(request.address) |
| uuid = list(uuid_module.UUID(request.uuid).bytes) |
| try: |
| socket_result = self.bluetooth.create_insecure_rfcomm_socket_to_service_record( |
| address, uuid) |
| if socket_result is None: |
| await context.abort( |
| grpc.StatusCode.INTERNAL, |
| 'Failed to call create_insecure_rfcomm_socket_to_service_record.') |
| |
| connecting_id = socket_result['id'] |
| rfcomm_channel_creation = { |
| 'create_rfcomm_channel': asyncio.get_running_loop().create_future(), |
| 'connecting_id': connecting_id |
| } |
| observer = CreateRFCOMMObserver(rfcomm_channel_creation) |
| name = utils.create_observer_name(observer) |
| self.bluetooth.socket_manager.register_callback_observer(name, observer) |
| fd = await asyncio.wait_for(rfcomm_channel_creation['create_rfcomm_channel'], timeout=5) |
| if fd is None: |
| await context.abort( |
| grpc.StatusCode.INTERNAL, |
| f'Failed to get the fd from RFCOMM socket with connecting_id: {connecting_id}') |
| |
| stream_id = self.new_stream_id() |
| stream = socket_module.fromfd(fd, socket_module.AF_UNIX, socket_module.SOCK_STREAM) |
| self.streams[stream_id] = stream |
| finally: |
| self.bluetooth.socket_manager.unregister_callback_observer(name, observer) |
| |
| return rfcomm_pb2.ConnectionResponse(connection=rfcomm_pb2.RfcommConnection(id=stream_id)) |
| |
| async def Disconnect(self, request: rfcomm_pb2.DisconnectionRequest, |
| context: grpc.ServicerContext) -> rfcomm_pb2.DisconnectionResponse: |
| |
| stream_id = request.connection.id |
| if stream_id in self.streams: |
| stream = self.streams[stream_id] |
| try: |
| stream.shutdown(socket_module.SHUT_RDWR) |
| stream.close() |
| del self.streams[stream_id] |
| except asyncio.TimeoutError as e: |
| logging.error('Disconnect: asyncio.TimeoutError %s', e) |
| else: |
| logging.error('No stream found with ID %s', stream_id) |
| |
| return empty_pb2.Empty() |
| |
| async def StopServer(self, request: rfcomm_pb2.StopServerRequest, |
| context: grpc.ServicerContext) -> rfcomm_pb2.StopServerResponse: |
| |
| class StopRFCOMMSocket(socket_manager.SocketManagerCallbacks): |
| """Observer to observe stop state of RFCOMM connection.""" |
| |
| def __init__(self, task): |
| self.task = task |
| |
| @utils.glib_callback() |
| def on_incoming_socket_closed(self, listener_id, reason): |
| if listener_id != self.task['listener_id']: |
| return |
| |
| if reason is None or floss_enums.BtStatus(reason) != floss_enums.BtStatus.SUCCESS: |
| logging.error('Failed to stop RFCOMM channel with listener_id: %s. Status: %s', |
| listener_id, reason) |
| |
| future = self.task['stop_rfcomm_channel'] |
| future.get_loop().call_soon_threadsafe(future.set_result, reason) |
| |
| try: |
| listener_id = request.server.id |
| rfcomm_channel_stop = { |
| 'stop_rfcomm_channel': asyncio.get_running_loop().create_future(), |
| 'listener_id': listener_id |
| } |
| observer = StopRFCOMMSocket(rfcomm_channel_stop) |
| name = utils.create_observer_name(observer) |
| self.bluetooth.socket_manager.register_callback_observer(name, observer) |
| if not self.bluetooth.close_socket(listener_id): |
| await context.abort(grpc.StatusCode.INTERNAL, 'Failed to call close_socket.') |
| |
| status = await asyncio.wait_for(rfcomm_channel_stop['stop_rfcomm_channel'], timeout=5) |
| if status != floss_enums.BtStatus.SUCCESS: |
| await context.abort( |
| grpc.StatusCode.INTERNAL, |
| f'Failed to stop RFCOMM channel with listener_id: {listener_id}. Status: {status}' |
| ) |
| finally: |
| self.bluetooth.socket_manager.unregister_callback_observer(name, observer) |
| |
| return empty_pb2.Empty() |
| |
| async def StartServer(self, request: rfcomm_pb2.StartServerRequest, |
| context: grpc.ServicerContext) -> rfcomm_pb2.StartServerResponse: |
| |
| class StartServerObserver(socket_manager.SocketManagerCallbacks): |
| """Observer to observe the RFCOMM server start.""" |
| |
| def __init__(self, task): |
| self.task = task |
| |
| @utils.glib_callback() |
| def on_incoming_socket_ready(self, socket, status): |
| if not socket or 'id' not in socket: |
| return |
| |
| listener_id = socket['id'] |
| if listener_id != self.task['socket_id']: |
| return |
| |
| future = self.task['start_rfcomm_server'] |
| if status is None or floss_enums.BtStatus(status) != floss_enums.BtStatus.SUCCESS: |
| logging.error( |
| 'Failed listening to RFCOMM channel with socket_id: %s. Status: %s', |
| listener_id, status) |
| future.get_loop().call_soon_threadsafe(future.set_result, None) |
| else: |
| future.get_loop().call_soon_threadsafe(future.set_result, listener_id) |
| |
| try: |
| uuid = list(uuid_module.UUID(request.uuid).bytes) |
| socket_result = self.bluetooth.listen_using_insecure_rfcomm_with_service_record( |
| request.name, uuid) |
| if socket_result is None: |
| await context.abort( |
| grpc.StatusCode.INTERNAL, |
| 'Failed to call listen_using_insecure_rfcomm_with_service_record.') |
| |
| rfcomm_channel_listener = { |
| 'start_rfcomm_server': asyncio.get_running_loop().create_future(), |
| 'socket_id': socket_result['id'] |
| } |
| observer = StartServerObserver(rfcomm_channel_listener) |
| name = utils.create_observer_name(observer) |
| self.bluetooth.socket_manager.register_callback_observer(name, observer) |
| listener_id = await asyncio.wait_for(rfcomm_channel_listener['start_rfcomm_server'], |
| timeout=5) |
| finally: |
| self.bluetooth.socket_manager.unregister_callback_observer(name, observer) |
| |
| return rfcomm_pb2.StartServerResponse(server=rfcomm_pb2.ServerId(id=listener_id)) |
| |
| async def AcceptConnection( |
| self, request: rfcomm_pb2.AcceptConnectionRequest, |
| context: grpc.ServicerContext) -> rfcomm_pb2.AcceptConnectionResponse: |
| |
| class AcceptConnectionObserver(socket_manager.SocketManagerCallbacks): |
| """Observer to observe the accepted RFCOMM connection.""" |
| |
| def __init__(self, task): |
| self.task = task |
| |
| @utils.glib_callback() |
| def on_handle_incoming_connection(self, |
| listener_id, |
| connection, |
| *, |
| dbus_unix_fd_list=None): |
| if listener_id != self.task['listener_id']: |
| return |
| |
| future = self.task['accept_rfcomm_channel'] |
| if not connection: |
| future.get_loop().call_soon_threadsafe(future.set_result, None) |
| return |
| |
| optional_fd = connection['fd'] |
| if not optional_fd: |
| future.get_loop().call_soon_threadsafe(future.set_result, None) |
| return |
| |
| if not dbus_unix_fd_list or dbus_unix_fd_list.get_length() < 1: |
| logging.error('on_handle_incoming_connection: Empty fd list') |
| future.get_loop().call_soon_threadsafe(future.set_result, None) |
| return |
| |
| fd_handle = optional_fd['optional_value'] |
| if fd_handle > dbus_unix_fd_list.get_length(): |
| logging.error('on_handle_incoming_connection: Invalid fd handle') |
| future.get_loop().call_soon_threadsafe(future.set_result, None) |
| return |
| |
| fd = dbus_unix_fd_list.get(fd_handle) |
| fd_dup = os.dup(fd) |
| future.get_loop().call_soon_threadsafe(future.set_result, fd_dup) |
| |
| try: |
| listener_id = request.server.id |
| rfcomm_channel_acceptance = { |
| 'accept_rfcomm_channel': asyncio.get_running_loop().create_future(), |
| 'listener_id': listener_id |
| } |
| observer = AcceptConnectionObserver(rfcomm_channel_acceptance) |
| name = utils.create_observer_name(observer) |
| self.bluetooth.socket_manager.register_callback_observer(name, observer) |
| accept_socket_status = self.bluetooth.accept_socket(listener_id, timeout_ms=5) |
| if accept_socket_status != floss_enums.BtStatus.SUCCESS: |
| await context.abort( |
| grpc.StatusCode.INTERNAL, |
| f'Failed to accept the RFCOMM socket with listener_id: {listener_id}. ' |
| f'Status: {accept_socket_status}.') |
| |
| fd = await asyncio.wait_for(rfcomm_channel_acceptance['accept_rfcomm_channel'], |
| timeout=5) |
| if fd is None: |
| await context.abort( |
| grpc.StatusCode.INTERNAL, |
| f'Failed to get the fd from RFCOMM socket with listener_id: {listener_id}') |
| |
| stream_id = self.new_stream_id() |
| stream = socket_module.fromfd(fd, socket_module.AF_UNIX, socket_module.SOCK_STREAM) |
| self.streams[stream_id] = stream |
| |
| except asyncio.TimeoutError as e: |
| logging.error('AcceptConnection: asyncio.TimeoutError %s', e) |
| return rfcomm_pb2.AcceptConnectionResponse(connection=rfcomm_pb2.RfcommConnection( |
| id=None)) |
| finally: |
| self.bluetooth.socket_manager.unregister_callback_observer(name, observer) |
| |
| return rfcomm_pb2.AcceptConnectionResponse(connection=rfcomm_pb2.RfcommConnection( |
| id=stream_id)) |
| |
| async def Send(self, request: rfcomm_pb2.TxRequest, |
| context: grpc.ServicerContext) -> rfcomm_pb2.TxResponse: |
| stream_id = request.connection.id |
| output_stream = self.streams.get(stream_id) |
| if output_stream: |
| try: |
| output_stream.send(request.data) |
| |
| except Exception as e: |
| logging.error('Exception during writing to output stream %s', e) |
| else: |
| logging.error('Output stream: %s not found for the stream_id: %s', output_stream, |
| stream_id) |
| |
| return empty_pb2.Empty() |
| |
| async def Receive(self, request: rfcomm_pb2.RxRequest, |
| context: grpc.ServicerContext) -> rfcomm_pb2.RxResponse: |
| stream_id = request.connection.id |
| input_stream = self.streams.get(stream_id) |
| if input_stream: |
| try: |
| data = input_stream.recv(self.BUFFER_SIZE) |
| if data: |
| return rfcomm_pb2.RxResponse(data=bytes(data)) |
| except Exception as e: |
| logging.error('Exception during reading from input stream %s', e) |
| else: |
| logging.error('Input stream: %s not found for the stream_id: %s', input_stream, |
| stream_id) |
| |
| # Return an empty byte array. |
| return rfcomm_pb2.RxResponse(data=b'') |