blob: dd247529574c729f8184806f5f513710bfdc2d64 [file] [log] [blame]
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""RFCOMM grpc interface."""
import asyncio
import logging
import os
import socket as socket_module
import uuid as uuid_module
from floss.pandora.floss import floss_enums
from floss.pandora.floss import socket_manager
from floss.pandora.floss import utils
from floss.pandora.server import bluetooth as bluetooth_module
from google.protobuf import empty_pb2
import grpc
from pandora import rfcomm_grpc_aio
from pandora import rfcomm_pb2
class RFCOMMService(rfcomm_grpc_aio.RFCOMMServicer):
"""Service to trigger Bluetooth RFCOMM procedures.
This class implements the Pandora bluetooth test interfaces,
where the meta class definition is automatically generated by the protobuf.
The interface definition can be found in:
https://cs.android.com/android/platform/superproject/main/+/main:packages/modules/Bluetooth/pandora/interfaces/pandora/rfcomm.proto
"""
# Size of the buffer for data transactions.
BUFFER_SIZE = 512
def __init__(self, bluetooth: bluetooth_module.Bluetooth):
self.bluetooth = bluetooth
# Used by new_stream_id() to generate IDs for the RPC client to specify the stream.
self.current_stream_id = 0x12FC0
# key = stream_id, val = stream
self.streams = dict()
def new_stream_id(self) -> int:
id = self.current_stream_id
self.current_stream_id += 1
return id
async def ConnectToServer(self, request: rfcomm_pb2.ConnectionRequest,
context: grpc.ServicerContext) -> rfcomm_pb2.ConnectionResponse:
class CreateRFCOMMObserver(socket_manager.SocketManagerCallbacks):
"""Observer to observe the created RFCOMM connection state."""
def __init__(self, task):
self.task = task
@utils.glib_callback()
def on_outgoing_connection_result(self,
connecting_id,
result,
socket,
*,
dbus_unix_fd_list=None):
if connecting_id != self.task['connecting_id']:
return
future = self.task['create_rfcomm_channel']
if result is None or floss_enums.BtStatus(result) != floss_enums.BtStatus.SUCCESS:
logging.error(
'Failed to create the RFCOMM channel with connecting_id: %s. Status: %s',
connecting_id, result)
future.get_loop().call_soon_threadsafe(future.set_result, None)
return
if not socket:
future.get_loop().call_soon_threadsafe(future.set_result, None)
return
optional_fd = socket['optional_value']['fd']
if not optional_fd:
future.get_loop().call_soon_threadsafe(future.set_result, None)
return
if not dbus_unix_fd_list or dbus_unix_fd_list.get_length() < 1:
logging.error('on_outgoing_connection_result: Empty fd list')
future.get_loop().call_soon_threadsafe(future.set_result, None)
return
fd_handle = optional_fd['optional_value']
if fd_handle > dbus_unix_fd_list.get_length():
logging.error('on_outgoing_connection_result: Invalid fd handle')
future.get_loop().call_soon_threadsafe(future.set_result, None)
return
fd = dbus_unix_fd_list.get(fd_handle)
fd_dup = os.dup(fd)
future.get_loop().call_soon_threadsafe(future.set_result, fd_dup)
address = utils.address_from(request.address)
uuid = list(uuid_module.UUID(request.uuid).bytes)
try:
socket_result = self.bluetooth.create_insecure_rfcomm_socket_to_service_record(
address, uuid)
if socket_result is None:
await context.abort(
grpc.StatusCode.INTERNAL,
'Failed to call create_insecure_rfcomm_socket_to_service_record.')
connecting_id = socket_result['id']
rfcomm_channel_creation = {
'create_rfcomm_channel': asyncio.get_running_loop().create_future(),
'connecting_id': connecting_id
}
observer = CreateRFCOMMObserver(rfcomm_channel_creation)
name = utils.create_observer_name(observer)
self.bluetooth.socket_manager.register_callback_observer(name, observer)
fd = await asyncio.wait_for(rfcomm_channel_creation['create_rfcomm_channel'], timeout=5)
if fd is None:
await context.abort(
grpc.StatusCode.INTERNAL,
f'Failed to get the fd from RFCOMM socket with connecting_id: {connecting_id}')
stream_id = self.new_stream_id()
stream = socket_module.fromfd(fd, socket_module.AF_UNIX, socket_module.SOCK_STREAM)
self.streams[stream_id] = stream
finally:
self.bluetooth.socket_manager.unregister_callback_observer(name, observer)
return rfcomm_pb2.ConnectionResponse(connection=rfcomm_pb2.RfcommConnection(id=stream_id))
async def Disconnect(self, request: rfcomm_pb2.DisconnectionRequest,
context: grpc.ServicerContext) -> rfcomm_pb2.DisconnectionResponse:
stream_id = request.connection.id
if stream_id in self.streams:
stream = self.streams[stream_id]
try:
stream.shutdown(socket_module.SHUT_RDWR)
stream.close()
del self.streams[stream_id]
except asyncio.TimeoutError as e:
logging.error('Disconnect: asyncio.TimeoutError %s', e)
else:
logging.error('No stream found with ID %s', stream_id)
return empty_pb2.Empty()
async def StopServer(self, request: rfcomm_pb2.StopServerRequest,
context: grpc.ServicerContext) -> rfcomm_pb2.StopServerResponse:
class StopRFCOMMSocket(socket_manager.SocketManagerCallbacks):
"""Observer to observe stop state of RFCOMM connection."""
def __init__(self, task):
self.task = task
@utils.glib_callback()
def on_incoming_socket_closed(self, listener_id, reason):
if listener_id != self.task['listener_id']:
return
if reason is None or floss_enums.BtStatus(reason) != floss_enums.BtStatus.SUCCESS:
logging.error('Failed to stop RFCOMM channel with listener_id: %s. Status: %s',
listener_id, reason)
future = self.task['stop_rfcomm_channel']
future.get_loop().call_soon_threadsafe(future.set_result, reason)
try:
listener_id = request.server.id
rfcomm_channel_stop = {
'stop_rfcomm_channel': asyncio.get_running_loop().create_future(),
'listener_id': listener_id
}
observer = StopRFCOMMSocket(rfcomm_channel_stop)
name = utils.create_observer_name(observer)
self.bluetooth.socket_manager.register_callback_observer(name, observer)
if not self.bluetooth.close_socket(listener_id):
await context.abort(grpc.StatusCode.INTERNAL, 'Failed to call close_socket.')
status = await asyncio.wait_for(rfcomm_channel_stop['stop_rfcomm_channel'], timeout=5)
if status != floss_enums.BtStatus.SUCCESS:
await context.abort(
grpc.StatusCode.INTERNAL,
f'Failed to stop RFCOMM channel with listener_id: {listener_id}. Status: {status}'
)
finally:
self.bluetooth.socket_manager.unregister_callback_observer(name, observer)
return empty_pb2.Empty()
async def StartServer(self, request: rfcomm_pb2.StartServerRequest,
context: grpc.ServicerContext) -> rfcomm_pb2.StartServerResponse:
class StartServerObserver(socket_manager.SocketManagerCallbacks):
"""Observer to observe the RFCOMM server start."""
def __init__(self, task):
self.task = task
@utils.glib_callback()
def on_incoming_socket_ready(self, socket, status):
if not socket or 'id' not in socket:
return
listener_id = socket['id']
if listener_id != self.task['socket_id']:
return
future = self.task['start_rfcomm_server']
if status is None or floss_enums.BtStatus(status) != floss_enums.BtStatus.SUCCESS:
logging.error(
'Failed listening to RFCOMM channel with socket_id: %s. Status: %s',
listener_id, status)
future.get_loop().call_soon_threadsafe(future.set_result, None)
else:
future.get_loop().call_soon_threadsafe(future.set_result, listener_id)
try:
uuid = list(uuid_module.UUID(request.uuid).bytes)
socket_result = self.bluetooth.listen_using_insecure_rfcomm_with_service_record(
request.name, uuid)
if socket_result is None:
await context.abort(
grpc.StatusCode.INTERNAL,
'Failed to call listen_using_insecure_rfcomm_with_service_record.')
rfcomm_channel_listener = {
'start_rfcomm_server': asyncio.get_running_loop().create_future(),
'socket_id': socket_result['id']
}
observer = StartServerObserver(rfcomm_channel_listener)
name = utils.create_observer_name(observer)
self.bluetooth.socket_manager.register_callback_observer(name, observer)
listener_id = await asyncio.wait_for(rfcomm_channel_listener['start_rfcomm_server'],
timeout=5)
finally:
self.bluetooth.socket_manager.unregister_callback_observer(name, observer)
return rfcomm_pb2.StartServerResponse(server=rfcomm_pb2.ServerId(id=listener_id))
async def AcceptConnection(
self, request: rfcomm_pb2.AcceptConnectionRequest,
context: grpc.ServicerContext) -> rfcomm_pb2.AcceptConnectionResponse:
class AcceptConnectionObserver(socket_manager.SocketManagerCallbacks):
"""Observer to observe the accepted RFCOMM connection."""
def __init__(self, task):
self.task = task
@utils.glib_callback()
def on_handle_incoming_connection(self,
listener_id,
connection,
*,
dbus_unix_fd_list=None):
if listener_id != self.task['listener_id']:
return
future = self.task['accept_rfcomm_channel']
if not connection:
future.get_loop().call_soon_threadsafe(future.set_result, None)
return
optional_fd = connection['fd']
if not optional_fd:
future.get_loop().call_soon_threadsafe(future.set_result, None)
return
if not dbus_unix_fd_list or dbus_unix_fd_list.get_length() < 1:
logging.error('on_handle_incoming_connection: Empty fd list')
future.get_loop().call_soon_threadsafe(future.set_result, None)
return
fd_handle = optional_fd['optional_value']
if fd_handle > dbus_unix_fd_list.get_length():
logging.error('on_handle_incoming_connection: Invalid fd handle')
future.get_loop().call_soon_threadsafe(future.set_result, None)
return
fd = dbus_unix_fd_list.get(fd_handle)
fd_dup = os.dup(fd)
future.get_loop().call_soon_threadsafe(future.set_result, fd_dup)
try:
listener_id = request.server.id
rfcomm_channel_acceptance = {
'accept_rfcomm_channel': asyncio.get_running_loop().create_future(),
'listener_id': listener_id
}
observer = AcceptConnectionObserver(rfcomm_channel_acceptance)
name = utils.create_observer_name(observer)
self.bluetooth.socket_manager.register_callback_observer(name, observer)
accept_socket_status = self.bluetooth.accept_socket(listener_id, timeout_ms=5)
if accept_socket_status != floss_enums.BtStatus.SUCCESS:
await context.abort(
grpc.StatusCode.INTERNAL,
f'Failed to accept the RFCOMM socket with listener_id: {listener_id}. '
f'Status: {accept_socket_status}.')
fd = await asyncio.wait_for(rfcomm_channel_acceptance['accept_rfcomm_channel'],
timeout=5)
if fd is None:
await context.abort(
grpc.StatusCode.INTERNAL,
f'Failed to get the fd from RFCOMM socket with listener_id: {listener_id}')
stream_id = self.new_stream_id()
stream = socket_module.fromfd(fd, socket_module.AF_UNIX, socket_module.SOCK_STREAM)
self.streams[stream_id] = stream
except asyncio.TimeoutError as e:
logging.error('AcceptConnection: asyncio.TimeoutError %s', e)
return rfcomm_pb2.AcceptConnectionResponse(connection=rfcomm_pb2.RfcommConnection(
id=None))
finally:
self.bluetooth.socket_manager.unregister_callback_observer(name, observer)
return rfcomm_pb2.AcceptConnectionResponse(connection=rfcomm_pb2.RfcommConnection(
id=stream_id))
async def Send(self, request: rfcomm_pb2.TxRequest,
context: grpc.ServicerContext) -> rfcomm_pb2.TxResponse:
stream_id = request.connection.id
output_stream = self.streams.get(stream_id)
if output_stream:
try:
output_stream.send(request.data)
except Exception as e:
logging.error('Exception during writing to output stream %s', e)
else:
logging.error('Output stream: %s not found for the stream_id: %s', output_stream,
stream_id)
return empty_pb2.Empty()
async def Receive(self, request: rfcomm_pb2.RxRequest,
context: grpc.ServicerContext) -> rfcomm_pb2.RxResponse:
stream_id = request.connection.id
input_stream = self.streams.get(stream_id)
if input_stream:
try:
data = input_stream.recv(self.BUFFER_SIZE)
if data:
return rfcomm_pb2.RxResponse(data=bytes(data))
except Exception as e:
logging.error('Exception during reading from input stream %s', e)
else:
logging.error('Input stream: %s not found for the stream_id: %s', input_stream,
stream_id)
# Return an empty byte array.
return rfcomm_pb2.RxResponse(data=b'')