| # Copyright 2021-2022 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| # ----------------------------------------------------------------------------- |
| # Imports |
| # ----------------------------------------------------------------------------- |
| from __future__ import annotations |
| |
| import logging |
| import asyncio |
| import collections |
| import dataclasses |
| import enum |
| from typing import Callable, Optional, Union, TYPE_CHECKING |
| from typing_extensions import Self |
| |
| |
| from bumble import core |
| from bumble import l2cap |
| from bumble import sdp |
| from bumble import utils |
| from bumble.colors import color |
| from bumble.core import ( |
| UUID, |
| BT_RFCOMM_PROTOCOL_ID, |
| PhysicalTransport, |
| BT_L2CAP_PROTOCOL_ID, |
| InvalidArgumentError, |
| InvalidStateError, |
| InvalidPacketError, |
| ProtocolError, |
| ) |
| |
| if TYPE_CHECKING: |
| from bumble.device import Device, Connection |
| |
| # ----------------------------------------------------------------------------- |
| # Logging |
| # ----------------------------------------------------------------------------- |
| logger = logging.getLogger(__name__) |
| |
| |
| # ----------------------------------------------------------------------------- |
| # Constants |
| # ----------------------------------------------------------------------------- |
| # fmt: off |
| |
| RFCOMM_PSM = 0x0003 |
| DEFAULT_RX_QUEUE_SIZE = 32 |
| |
| class FrameType(enum.IntEnum): |
| SABM = 0x2F # Control field [1,1,1,1,_,1,0,0] LSB-first |
| UA = 0x63 # Control field [0,1,1,0,_,0,1,1] LSB-first |
| DM = 0x0F # Control field [1,1,1,1,_,0,0,0] LSB-first |
| DISC = 0x43 # Control field [0,1,0,_,0,0,1,1] LSB-first |
| UIH = 0xEF # Control field [1,1,1,_,1,1,1,1] LSB-first |
| UI = 0x03 # Control field [0,0,0,_,0,0,1,1] LSB-first |
| |
| class MccType(enum.IntEnum): |
| PN = 0x20 |
| MSC = 0x38 |
| |
| |
| # FCS CRC |
| CRC_TABLE = bytes([ |
| 0X00, 0X91, 0XE3, 0X72, 0X07, 0X96, 0XE4, 0X75, |
| 0X0E, 0X9F, 0XED, 0X7C, 0X09, 0X98, 0XEA, 0X7B, |
| 0X1C, 0X8D, 0XFF, 0X6E, 0X1B, 0X8A, 0XF8, 0X69, |
| 0X12, 0X83, 0XF1, 0X60, 0X15, 0X84, 0XF6, 0X67, |
| 0X38, 0XA9, 0XDB, 0X4A, 0X3F, 0XAE, 0XDC, 0X4D, |
| 0X36, 0XA7, 0XD5, 0X44, 0X31, 0XA0, 0XD2, 0X43, |
| 0X24, 0XB5, 0XC7, 0X56, 0X23, 0XB2, 0XC0, 0X51, |
| 0X2A, 0XBB, 0XC9, 0X58, 0X2D, 0XBC, 0XCE, 0X5F, |
| 0X70, 0XE1, 0X93, 0X02, 0X77, 0XE6, 0X94, 0X05, |
| 0X7E, 0XEF, 0X9D, 0X0C, 0X79, 0XE8, 0X9A, 0X0B, |
| 0X6C, 0XFD, 0X8F, 0X1E, 0X6B, 0XFA, 0X88, 0X19, |
| 0X62, 0XF3, 0X81, 0X10, 0X65, 0XF4, 0X86, 0X17, |
| 0X48, 0XD9, 0XAB, 0X3A, 0X4F, 0XDE, 0XAC, 0X3D, |
| 0X46, 0XD7, 0XA5, 0X34, 0X41, 0XD0, 0XA2, 0X33, |
| 0X54, 0XC5, 0XB7, 0X26, 0X53, 0XC2, 0XB0, 0X21, |
| 0X5A, 0XCB, 0XB9, 0X28, 0X5D, 0XCC, 0XBE, 0X2F, |
| 0XE0, 0X71, 0X03, 0X92, 0XE7, 0X76, 0X04, 0X95, |
| 0XEE, 0X7F, 0X0D, 0X9C, 0XE9, 0X78, 0X0A, 0X9B, |
| 0XFC, 0X6D, 0X1F, 0X8E, 0XFB, 0X6A, 0X18, 0X89, |
| 0XF2, 0X63, 0X11, 0X80, 0XF5, 0X64, 0X16, 0X87, |
| 0XD8, 0X49, 0X3B, 0XAA, 0XDF, 0X4E, 0X3C, 0XAD, |
| 0XD6, 0X47, 0X35, 0XA4, 0XD1, 0X40, 0X32, 0XA3, |
| 0XC4, 0X55, 0X27, 0XB6, 0XC3, 0X52, 0X20, 0XB1, |
| 0XCA, 0X5B, 0X29, 0XB8, 0XCD, 0X5C, 0X2E, 0XBF, |
| 0X90, 0X01, 0X73, 0XE2, 0X97, 0X06, 0X74, 0XE5, |
| 0X9E, 0X0F, 0X7D, 0XEC, 0X99, 0X08, 0X7A, 0XEB, |
| 0X8C, 0X1D, 0X6F, 0XFE, 0X8B, 0X1A, 0X68, 0XF9, |
| 0X82, 0X13, 0X61, 0XF0, 0X85, 0X14, 0X66, 0XF7, |
| 0XA8, 0X39, 0X4B, 0XDA, 0XAF, 0X3E, 0X4C, 0XDD, |
| 0XA6, 0X37, 0X45, 0XD4, 0XA1, 0X30, 0X42, 0XD3, |
| 0XB4, 0X25, 0X57, 0XC6, 0XB3, 0X22, 0X50, 0XC1, |
| 0XBA, 0X2B, 0X59, 0XC8, 0XBD, 0X2C, 0X5E, 0XCF |
| ]) |
| |
| RFCOMM_DEFAULT_L2CAP_MTU = 2048 |
| RFCOMM_DEFAULT_INITIAL_CREDITS = 7 |
| RFCOMM_DEFAULT_MAX_CREDITS = 32 |
| RFCOMM_DEFAULT_CREDIT_THRESHOLD = RFCOMM_DEFAULT_MAX_CREDITS // 2 |
| RFCOMM_DEFAULT_MAX_FRAME_SIZE = 2000 |
| |
| RFCOMM_DYNAMIC_CHANNEL_NUMBER_START = 1 |
| RFCOMM_DYNAMIC_CHANNEL_NUMBER_END = 30 |
| |
| # fmt: on |
| |
| |
| # ----------------------------------------------------------------------------- |
| def make_service_sdp_records( |
| service_record_handle: int, channel: int, uuid: Optional[UUID] = None |
| ) -> list[sdp.ServiceAttribute]: |
| """ |
| Create SDP records for an RFComm service given a channel number and an |
| optional UUID. A Service Class Attribute is included only if the UUID is not None. |
| """ |
| records = [ |
| sdp.ServiceAttribute( |
| sdp.SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, |
| sdp.DataElement.unsigned_integer_32(service_record_handle), |
| ), |
| sdp.ServiceAttribute( |
| sdp.SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID, |
| sdp.DataElement.sequence( |
| [sdp.DataElement.uuid(sdp.SDP_PUBLIC_BROWSE_ROOT)] |
| ), |
| ), |
| sdp.ServiceAttribute( |
| sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, |
| sdp.DataElement.sequence( |
| [ |
| sdp.DataElement.sequence( |
| [sdp.DataElement.uuid(BT_L2CAP_PROTOCOL_ID)] |
| ), |
| sdp.DataElement.sequence( |
| [ |
| sdp.DataElement.uuid(BT_RFCOMM_PROTOCOL_ID), |
| sdp.DataElement.unsigned_integer_8(channel), |
| ] |
| ), |
| ] |
| ), |
| ), |
| ] |
| |
| if uuid: |
| records.append( |
| sdp.ServiceAttribute( |
| sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, |
| sdp.DataElement.sequence([sdp.DataElement.uuid(uuid)]), |
| ) |
| ) |
| |
| return records |
| |
| |
| # ----------------------------------------------------------------------------- |
| async def find_rfcomm_channels(connection: Connection) -> dict[int, list[UUID]]: |
| """Searches all RFCOMM channels and their associated UUID from SDP service records. |
| |
| Args: |
| connection: ACL connection to make SDP search. |
| |
| Returns: |
| Dictionary mapping from channel number to service class UUID list. |
| """ |
| results = {} |
| async with sdp.Client(connection) as sdp_client: |
| search_result = await sdp_client.search_attributes( |
| uuids=[core.BT_RFCOMM_PROTOCOL_ID], |
| attribute_ids=[ |
| sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, |
| sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, |
| ], |
| ) |
| for attribute_lists in search_result: |
| service_classes: list[UUID] = [] |
| channel: Optional[int] = None |
| for attribute in attribute_lists: |
| # The layout is [[L2CAP_PROTOCOL], [RFCOMM_PROTOCOL, RFCOMM_CHANNEL]]. |
| if attribute.id == sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID: |
| protocol_descriptor_list = attribute.value.value |
| channel = protocol_descriptor_list[1].value[1].value |
| elif attribute.id == sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID: |
| service_class_id_list = attribute.value.value |
| service_classes = [ |
| service_class.value for service_class in service_class_id_list |
| ] |
| if not service_classes or not channel: |
| logger.warning(f"Bad result {attribute_lists}.") |
| else: |
| results[channel] = service_classes |
| return results |
| |
| |
| # ----------------------------------------------------------------------------- |
| async def find_rfcomm_channel_with_uuid( |
| connection: Connection, uuid: str | UUID |
| ) -> Optional[int]: |
| """Searches an RFCOMM channel associated with given UUID from service records. |
| |
| Args: |
| connection: ACL connection to make SDP search. |
| uuid: UUID of service record to search for. |
| |
| Returns: |
| RFCOMM channel number if found, otherwise None. |
| """ |
| if isinstance(uuid, str): |
| uuid = UUID(uuid) |
| return next( |
| ( |
| channel |
| for channel, class_id_list in ( |
| await find_rfcomm_channels(connection) |
| ).items() |
| if uuid in class_id_list |
| ), |
| None, |
| ) |
| |
| |
| # ----------------------------------------------------------------------------- |
| def compute_fcs(buffer: bytes) -> int: |
| result = 0xFF |
| for byte in buffer: |
| result = CRC_TABLE[result ^ byte] |
| return 0xFF - result |
| |
| |
| # ----------------------------------------------------------------------------- |
| class RFCOMM_Frame: |
| def __init__( |
| self, |
| frame_type: FrameType, |
| c_r: int, |
| dlci: int, |
| p_f: int, |
| information: bytes = b'', |
| with_credits: bool = False, |
| ) -> None: |
| self.type = frame_type |
| self.c_r = c_r |
| self.dlci = dlci |
| self.p_f = p_f |
| self.information = information |
| length = len(information) |
| if with_credits: |
| length -= 1 |
| if length > 0x7F: |
| # 2-byte length indicator |
| self.length = bytes([(length & 0x7F) << 1, (length >> 7) & 0xFF]) |
| else: |
| # 1-byte length indicator |
| self.length = bytes([(length << 1) | 1]) |
| self.address = (dlci << 2) | (c_r << 1) | 1 |
| self.control = frame_type | (p_f << 4) |
| if frame_type == FrameType.UIH: |
| self.fcs = compute_fcs(bytes([self.address, self.control])) |
| else: |
| self.fcs = compute_fcs(bytes([self.address, self.control]) + self.length) |
| |
| @staticmethod |
| def parse_mcc(data) -> tuple[int, bool, bytes]: |
| mcc_type = data[0] >> 2 |
| c_r = bool((data[0] >> 1) & 1) |
| length = data[1] |
| if data[1] & 1: |
| length >>= 1 |
| value = data[2:] |
| else: |
| length = (data[3] << 7) & (length >> 1) |
| value = data[3 : 3 + length] |
| |
| return (mcc_type, c_r, value) |
| |
| @staticmethod |
| def make_mcc(mcc_type: int, c_r: int, data: bytes) -> bytes: |
| return ( |
| bytes([(mcc_type << 2 | c_r << 1 | 1) & 0xFF, (len(data) & 0x7F) << 1 | 1]) |
| + data |
| ) |
| |
| @staticmethod |
| def sabm(c_r: int, dlci: int): |
| return RFCOMM_Frame(FrameType.SABM, c_r, dlci, 1) |
| |
| @staticmethod |
| def ua(c_r: int, dlci: int): |
| return RFCOMM_Frame(FrameType.UA, c_r, dlci, 1) |
| |
| @staticmethod |
| def dm(c_r: int, dlci: int): |
| return RFCOMM_Frame(FrameType.DM, c_r, dlci, 1) |
| |
| @staticmethod |
| def disc(c_r: int, dlci: int): |
| return RFCOMM_Frame(FrameType.DISC, c_r, dlci, 1) |
| |
| @staticmethod |
| def uih(c_r: int, dlci: int, information: bytes, p_f: int = 0): |
| return RFCOMM_Frame( |
| FrameType.UIH, c_r, dlci, p_f, information, with_credits=(p_f == 1) |
| ) |
| |
| @staticmethod |
| def from_bytes(data: bytes) -> RFCOMM_Frame: |
| # Extract fields |
| dlci = (data[0] >> 2) & 0x3F |
| c_r = (data[0] >> 1) & 0x01 |
| frame_type = FrameType(data[1] & 0xEF) |
| p_f = (data[1] >> 4) & 0x01 |
| length = data[2] |
| if length & 0x01: |
| length >>= 1 |
| information = data[3:-1] |
| else: |
| length = (data[3] << 7) & (length >> 1) |
| information = data[4:-1] |
| fcs = data[-1] |
| |
| # Construct the frame and check the CRC |
| frame = RFCOMM_Frame(frame_type, c_r, dlci, p_f, information) |
| if frame.fcs != fcs: |
| logger.warning(f'FCS mismatch: got {fcs:02X}, expected {frame.fcs:02X}') |
| raise InvalidPacketError('fcs mismatch') |
| |
| return frame |
| |
| def __bytes__(self) -> bytes: |
| return ( |
| bytes([self.address, self.control]) |
| + self.length |
| + self.information |
| + bytes([self.fcs]) |
| ) |
| |
| def __str__(self) -> str: |
| return ( |
| f'{color(self.type.name, "yellow")}' |
| f'(c/r={self.c_r},' |
| f'dlci={self.dlci},' |
| f'p/f={self.p_f},' |
| f'length={len(self.information)},' |
| f'fcs=0x{self.fcs:02X})' |
| ) |
| |
| |
| # ----------------------------------------------------------------------------- |
| @dataclasses.dataclass |
| class RFCOMM_MCC_PN: |
| dlci: int |
| cl: int |
| priority: int |
| ack_timer: int |
| max_frame_size: int |
| max_retransmissions: int |
| initial_credits: int |
| |
| def __post_init__(self) -> None: |
| if self.initial_credits < 1 or self.initial_credits > 7: |
| logger.warning( |
| f'Initial credits {self.initial_credits} is out of range [1, 7].' |
| ) |
| |
| @staticmethod |
| def from_bytes(data: bytes) -> RFCOMM_MCC_PN: |
| return RFCOMM_MCC_PN( |
| dlci=data[0], |
| cl=data[1], |
| priority=data[2], |
| ack_timer=data[3], |
| max_frame_size=data[4] | data[5] << 8, |
| max_retransmissions=data[6], |
| initial_credits=data[7] & 0x07, |
| ) |
| |
| def __bytes__(self) -> bytes: |
| return bytes( |
| [ |
| self.dlci & 0xFF, |
| self.cl & 0xFF, |
| self.priority & 0xFF, |
| self.ack_timer & 0xFF, |
| self.max_frame_size & 0xFF, |
| (self.max_frame_size >> 8) & 0xFF, |
| self.max_retransmissions & 0xFF, |
| # Only 3 bits are meaningful. |
| self.initial_credits & 0x07, |
| ] |
| ) |
| |
| |
| # ----------------------------------------------------------------------------- |
| @dataclasses.dataclass |
| class RFCOMM_MCC_MSC: |
| dlci: int |
| fc: int |
| rtc: int |
| rtr: int |
| ic: int |
| dv: int |
| |
| @staticmethod |
| def from_bytes(data: bytes) -> RFCOMM_MCC_MSC: |
| return RFCOMM_MCC_MSC( |
| dlci=data[0] >> 2, |
| fc=data[1] >> 1 & 1, |
| rtc=data[1] >> 2 & 1, |
| rtr=data[1] >> 3 & 1, |
| ic=data[1] >> 6 & 1, |
| dv=data[1] >> 7 & 1, |
| ) |
| |
| def __bytes__(self) -> bytes: |
| return bytes( |
| [ |
| (self.dlci << 2) | 3, |
| 1 |
| | self.fc << 1 |
| | self.rtc << 2 |
| | self.rtr << 3 |
| | self.ic << 6 |
| | self.dv << 7, |
| ] |
| ) |
| |
| |
| # ----------------------------------------------------------------------------- |
| class DLC(utils.EventEmitter): |
| EVENT_OPEN = "open" |
| EVENT_CLOSE = "close" |
| |
| class State(enum.IntEnum): |
| INIT = 0x00 |
| CONNECTING = 0x01 |
| CONNECTED = 0x02 |
| DISCONNECTING = 0x03 |
| DISCONNECTED = 0x04 |
| RESET = 0x05 |
| |
| def __init__( |
| self, |
| multiplexer: Multiplexer, |
| dlci: int, |
| tx_max_frame_size: int, |
| tx_initial_credits: int, |
| rx_max_frame_size: int, |
| rx_initial_credits: int, |
| ) -> None: |
| super().__init__() |
| self.multiplexer = multiplexer |
| self.dlci = dlci |
| self.rx_max_frame_size = rx_max_frame_size |
| self.rx_initial_credits = rx_initial_credits |
| self.rx_max_credits = RFCOMM_DEFAULT_MAX_CREDITS |
| self.rx_credits = rx_initial_credits |
| self.rx_credits_threshold = RFCOMM_DEFAULT_CREDIT_THRESHOLD |
| self.tx_max_frame_size = tx_max_frame_size |
| self.tx_credits = tx_initial_credits |
| self.tx_buffer = b'' |
| self.state = DLC.State.INIT |
| self.role = multiplexer.role |
| self.c_r = 1 if self.role == Multiplexer.Role.INITIATOR else 0 |
| self.connection_result: Optional[asyncio.Future] = None |
| self.disconnection_result: Optional[asyncio.Future] = None |
| self.drained = asyncio.Event() |
| self.drained.set() |
| # Queued packets when sink is not set. |
| self._enqueued_rx_packets: collections.deque[bytes] = collections.deque( |
| maxlen=DEFAULT_RX_QUEUE_SIZE |
| ) |
| self._sink: Optional[Callable[[bytes], None]] = None |
| |
| # Compute the MTU |
| max_overhead = 4 + 1 # header with 2-byte length + fcs |
| self.mtu = min( |
| tx_max_frame_size, self.multiplexer.l2cap_channel.peer_mtu - max_overhead |
| ) |
| |
| @property |
| def sink(self) -> Optional[Callable[[bytes], None]]: |
| return self._sink |
| |
| @sink.setter |
| def sink(self, sink: Optional[Callable[[bytes], None]]) -> None: |
| self._sink = sink |
| # Dump queued packets to sink |
| if sink: |
| for packet in self._enqueued_rx_packets: |
| sink(packet) # pylint: disable=not-callable |
| self._enqueued_rx_packets.clear() |
| |
| def change_state(self, new_state: State) -> None: |
| logger.debug(f'{self} state change -> {color(new_state.name, "magenta")}') |
| self.state = new_state |
| |
| def send_frame(self, frame: RFCOMM_Frame) -> None: |
| self.multiplexer.send_frame(frame) |
| |
| def on_frame(self, frame: RFCOMM_Frame) -> None: |
| handler = getattr(self, f'on_{frame.type.name}_frame'.lower()) |
| handler(frame) |
| |
| def on_sabm_frame(self, _frame: RFCOMM_Frame) -> None: |
| if self.state != DLC.State.CONNECTING: |
| logger.warning( |
| color('!!! received SABM when not in CONNECTING state', 'red') |
| ) |
| return |
| |
| self.send_frame(RFCOMM_Frame.ua(c_r=1 - self.c_r, dlci=self.dlci)) |
| |
| # Exchange the modem status with the peer |
| msc = RFCOMM_MCC_MSC(dlci=self.dlci, fc=0, rtc=1, rtr=1, ic=0, dv=1) |
| mcc = RFCOMM_Frame.make_mcc(mcc_type=MccType.MSC, c_r=1, data=bytes(msc)) |
| logger.debug(f'>>> MCC MSC Command: {msc}') |
| self.send_frame(RFCOMM_Frame.uih(c_r=self.c_r, dlci=0, information=mcc)) |
| |
| self.change_state(DLC.State.CONNECTED) |
| self.emit(self.EVENT_OPEN) |
| |
| def on_ua_frame(self, _frame: RFCOMM_Frame) -> None: |
| if self.state == DLC.State.CONNECTING: |
| # Exchange the modem status with the peer |
| msc = RFCOMM_MCC_MSC(dlci=self.dlci, fc=0, rtc=1, rtr=1, ic=0, dv=1) |
| mcc = RFCOMM_Frame.make_mcc(mcc_type=MccType.MSC, c_r=1, data=bytes(msc)) |
| logger.debug(f'>>> MCC MSC Command: {msc}') |
| self.send_frame(RFCOMM_Frame.uih(c_r=self.c_r, dlci=0, information=mcc)) |
| |
| self.change_state(DLC.State.CONNECTED) |
| if self.connection_result: |
| self.connection_result.set_result(None) |
| self.connection_result = None |
| self.multiplexer.on_dlc_open_complete(self) |
| elif self.state == DLC.State.DISCONNECTING: |
| self.change_state(DLC.State.DISCONNECTED) |
| if self.disconnection_result: |
| self.disconnection_result.set_result(None) |
| self.disconnection_result = None |
| self.multiplexer.on_dlc_disconnection(self) |
| self.emit(self.EVENT_CLOSE) |
| else: |
| logger.warning( |
| color( |
| ( |
| '!!! received UA frame when not in ' |
| 'CONNECTING or DISCONNECTING state' |
| ), |
| 'red', |
| ) |
| ) |
| |
| def on_dm_frame(self, frame: RFCOMM_Frame) -> None: |
| # TODO: handle all states |
| pass |
| |
| def on_disc_frame(self, _frame: RFCOMM_Frame) -> None: |
| # TODO: handle all states |
| self.send_frame(RFCOMM_Frame.ua(c_r=1 - self.c_r, dlci=self.dlci)) |
| |
| def on_uih_frame(self, frame: RFCOMM_Frame) -> None: |
| data = frame.information |
| if frame.p_f == 1: |
| # With credits |
| received_credits = frame.information[0] |
| self.tx_credits += received_credits |
| |
| logger.debug( |
| f'<<< Credits [{self.dlci}]: ' |
| f'received {received_credits}, total={self.tx_credits}' |
| ) |
| data = data[1:] |
| |
| logger.debug( |
| f'{color("<<< Data", "yellow")} ' |
| f'[{self.dlci}] {len(data)} bytes, ' |
| f'rx_credits={self.rx_credits}: {data.hex()}' |
| ) |
| if data: |
| if self._sink: |
| self._sink(data) # pylint: disable=not-callable |
| else: |
| self._enqueued_rx_packets.append(data) |
| if ( |
| self._enqueued_rx_packets.maxlen |
| and len(self._enqueued_rx_packets) >= self._enqueued_rx_packets.maxlen |
| ): |
| logger.warning(f'DLC [{self.dlci}] received packet queue is full') |
| |
| # Update the credits |
| if self.rx_credits > 0: |
| self.rx_credits -= 1 |
| else: |
| logger.warning(color('!!! received frame with no rx credits', 'red')) |
| |
| # Check if there's anything to send (including credits) |
| self.process_tx() |
| |
| def on_ui_frame(self, frame: RFCOMM_Frame) -> None: |
| pass |
| |
| def on_mcc_msc(self, c_r: bool, msc: RFCOMM_MCC_MSC) -> None: |
| if c_r: |
| # Command |
| logger.debug(f'<<< MCC MSC Command: {msc}') |
| msc = RFCOMM_MCC_MSC(dlci=self.dlci, fc=0, rtc=1, rtr=1, ic=0, dv=1) |
| mcc = RFCOMM_Frame.make_mcc(mcc_type=MccType.MSC, c_r=0, data=bytes(msc)) |
| logger.debug(f'>>> MCC MSC Response: {msc}') |
| self.send_frame(RFCOMM_Frame.uih(c_r=self.c_r, dlci=0, information=mcc)) |
| else: |
| # Response |
| logger.debug(f'<<< MCC MSC Response: {msc}') |
| |
| def connect(self) -> None: |
| if self.state != DLC.State.INIT: |
| raise InvalidStateError('invalid state') |
| |
| self.change_state(DLC.State.CONNECTING) |
| self.connection_result = asyncio.get_running_loop().create_future() |
| self.send_frame(RFCOMM_Frame.sabm(c_r=self.c_r, dlci=self.dlci)) |
| |
| async def disconnect(self) -> None: |
| if self.state != DLC.State.CONNECTED: |
| raise InvalidStateError('invalid state') |
| |
| self.disconnection_result = asyncio.get_running_loop().create_future() |
| self.change_state(DLC.State.DISCONNECTING) |
| self.send_frame( |
| RFCOMM_Frame.disc( |
| c_r=1 if self.role == Multiplexer.Role.INITIATOR else 0, dlci=self.dlci |
| ) |
| ) |
| await self.disconnection_result |
| |
| def accept(self) -> None: |
| if self.state != DLC.State.INIT: |
| raise InvalidStateError('invalid state') |
| |
| pn = RFCOMM_MCC_PN( |
| dlci=self.dlci, |
| cl=0xE0, |
| priority=7, |
| ack_timer=0, |
| max_frame_size=self.rx_max_frame_size, |
| max_retransmissions=0, |
| initial_credits=self.rx_initial_credits, |
| ) |
| mcc = RFCOMM_Frame.make_mcc(mcc_type=MccType.PN, c_r=0, data=bytes(pn)) |
| logger.debug(f'>>> PN Response: {pn}') |
| self.send_frame(RFCOMM_Frame.uih(c_r=self.c_r, dlci=0, information=mcc)) |
| self.change_state(DLC.State.CONNECTING) |
| |
| def rx_credits_needed(self) -> int: |
| if self.rx_credits <= self.rx_credits_threshold: |
| return self.rx_max_credits - self.rx_credits |
| |
| return 0 |
| |
| def process_tx(self) -> None: |
| # Send anything we can (or an empty frame if we need to send rx credits) |
| rx_credits_needed = self.rx_credits_needed() |
| while (self.tx_buffer and self.tx_credits > 0) or rx_credits_needed > 0: |
| # Get the next chunk, up to MTU size |
| if rx_credits_needed > 0: |
| chunk = bytes([rx_credits_needed]) + self.tx_buffer[: self.mtu - 1] |
| self.tx_buffer = self.tx_buffer[len(chunk) - 1 :] |
| self.rx_credits += rx_credits_needed |
| tx_credit_spent = len(chunk) > 1 |
| else: |
| chunk = self.tx_buffer[: self.mtu] |
| self.tx_buffer = self.tx_buffer[len(chunk) :] |
| tx_credit_spent = True |
| |
| # Update the tx credits |
| # (no tx credit spent for empty frames that only contain rx credits) |
| if tx_credit_spent: |
| self.tx_credits -= 1 |
| |
| # Send the frame |
| logger.debug( |
| f'>>> sending {len(chunk)} bytes with {rx_credits_needed} credits, ' |
| f'rx_credits={self.rx_credits}, ' |
| f'tx_credits={self.tx_credits}' |
| ) |
| self.send_frame( |
| RFCOMM_Frame.uih( |
| c_r=self.c_r, |
| dlci=self.dlci, |
| information=chunk, |
| p_f=1 if rx_credits_needed > 0 else 0, |
| ) |
| ) |
| |
| rx_credits_needed = 0 |
| if not self.tx_buffer: |
| self.drained.set() |
| |
| # Stream protocol |
| def write(self, data: Union[bytes, str]) -> None: |
| # We can only send bytes |
| if not isinstance(data, bytes): |
| if isinstance(data, str): |
| # Automatically convert strings to bytes using UTF-8 |
| data = data.encode('utf-8') |
| else: |
| raise InvalidArgumentError('write only accept bytes or strings') |
| |
| self.tx_buffer += data |
| self.drained.clear() |
| self.process_tx() |
| |
| async def drain(self) -> None: |
| await self.drained.wait() |
| |
| def abort(self) -> None: |
| logger.debug(f'aborting DLC: {self}') |
| if self.connection_result: |
| self.connection_result.cancel() |
| self.connection_result = None |
| if self.disconnection_result: |
| self.disconnection_result.cancel() |
| self.disconnection_result = None |
| self.change_state(DLC.State.RESET) |
| self.emit(self.EVENT_CLOSE) |
| |
| def __str__(self) -> str: |
| return ( |
| f'DLC(dlci={self.dlci}, ' |
| f'state={self.state.name}, ' |
| f'rx_max_frame_size={self.rx_max_frame_size}, ' |
| f'rx_credits={self.rx_credits}, ' |
| f'rx_max_credits={self.rx_max_credits}, ' |
| f'tx_max_frame_size={self.tx_max_frame_size}, ' |
| f'tx_credits={self.tx_credits}' |
| ')' |
| ) |
| |
| |
| # ----------------------------------------------------------------------------- |
| class Multiplexer(utils.EventEmitter): |
| class Role(enum.IntEnum): |
| INITIATOR = 0x00 |
| RESPONDER = 0x01 |
| |
| class State(enum.IntEnum): |
| INIT = 0x00 |
| CONNECTING = 0x01 |
| CONNECTED = 0x02 |
| OPENING = 0x03 |
| DISCONNECTING = 0x04 |
| DISCONNECTED = 0x05 |
| RESET = 0x06 |
| |
| EVENT_DLC = "dlc" |
| |
| connection_result: Optional[asyncio.Future] |
| disconnection_result: Optional[asyncio.Future] |
| open_result: Optional[asyncio.Future] |
| acceptor: Optional[Callable[[int], Optional[tuple[int, int]]]] |
| dlcs: dict[int, DLC] |
| |
| def __init__(self, l2cap_channel: l2cap.ClassicChannel, role: Role) -> None: |
| super().__init__() |
| self.role = role |
| self.l2cap_channel = l2cap_channel |
| self.state = Multiplexer.State.INIT |
| self.dlcs = {} # DLCs, by DLCI |
| self.connection_result = None |
| self.disconnection_result = None |
| self.open_result = None |
| self.open_pn: Optional[RFCOMM_MCC_PN] = None |
| self.open_rx_max_credits = 0 |
| self.acceptor = None |
| |
| # Become a sink for the L2CAP channel |
| l2cap_channel.sink = self.on_pdu |
| |
| l2cap_channel.on(l2cap_channel.EVENT_CLOSE, self.on_l2cap_channel_close) |
| |
| def change_state(self, new_state: State) -> None: |
| logger.debug(f'{self} state change -> {color(new_state.name, "cyan")}') |
| self.state = new_state |
| |
| def send_frame(self, frame: RFCOMM_Frame) -> None: |
| logger.debug(f'>>> Multiplexer sending {frame}') |
| self.l2cap_channel.send_pdu(frame) |
| |
| def on_pdu(self, pdu: bytes) -> None: |
| frame = RFCOMM_Frame.from_bytes(pdu) |
| logger.debug(f'<<< Multiplexer received {frame}') |
| |
| # Dispatch to this multiplexer or to a dlc, depending on the address |
| if frame.dlci == 0: |
| self.on_frame(frame) |
| else: |
| if frame.type == FrameType.DM: |
| # DM responses are for a DLCI, but since we only create the dlc when we |
| # receive a PN response (because we need the parameters), we handle DM |
| # frames at the Multiplexer level |
| self.on_dm_frame(frame) |
| else: |
| dlc = self.dlcs.get(frame.dlci) |
| if dlc is None: |
| logger.warning(f'no dlc for DLCI {frame.dlci}') |
| return |
| dlc.on_frame(frame) |
| |
| def on_frame(self, frame: RFCOMM_Frame) -> None: |
| handler = getattr(self, f'on_{frame.type.name}_frame'.lower()) |
| handler(frame) |
| |
| def on_sabm_frame(self, _frame: RFCOMM_Frame) -> None: |
| if self.state != Multiplexer.State.INIT: |
| logger.debug('not in INIT state, ignoring SABM') |
| return |
| self.change_state(Multiplexer.State.CONNECTED) |
| self.send_frame(RFCOMM_Frame.ua(c_r=1, dlci=0)) |
| |
| def on_ua_frame(self, _frame: RFCOMM_Frame) -> None: |
| if self.state == Multiplexer.State.CONNECTING: |
| self.change_state(Multiplexer.State.CONNECTED) |
| if self.connection_result: |
| self.connection_result.set_result(0) |
| self.connection_result = None |
| elif self.state == Multiplexer.State.DISCONNECTING: |
| self.change_state(Multiplexer.State.DISCONNECTED) |
| if self.disconnection_result: |
| self.disconnection_result.set_result(None) |
| self.disconnection_result = None |
| |
| def on_dm_frame(self, _frame: RFCOMM_Frame) -> None: |
| if self.state == Multiplexer.State.OPENING: |
| self.change_state(Multiplexer.State.CONNECTED) |
| if self.open_result: |
| self.open_result.set_exception( |
| core.ConnectionError( |
| core.ConnectionError.CONNECTION_REFUSED, |
| PhysicalTransport.BR_EDR, |
| self.l2cap_channel.connection.peer_address, |
| 'rfcomm', |
| ) |
| ) |
| self.open_result = None |
| else: |
| logger.warning(f'unexpected state for DM: {self}') |
| |
| def on_disc_frame(self, _frame: RFCOMM_Frame) -> None: |
| self.change_state(Multiplexer.State.DISCONNECTED) |
| self.send_frame( |
| RFCOMM_Frame.ua( |
| c_r=0 if self.role == Multiplexer.Role.INITIATOR else 1, dlci=0 |
| ) |
| ) |
| |
| def on_uih_frame(self, frame: RFCOMM_Frame) -> None: |
| (mcc_type, c_r, value) = RFCOMM_Frame.parse_mcc(frame.information) |
| |
| if mcc_type == MccType.PN: |
| pn = RFCOMM_MCC_PN.from_bytes(value) |
| self.on_mcc_pn(c_r, pn) |
| elif mcc_type == MccType.MSC: |
| mcs = RFCOMM_MCC_MSC.from_bytes(value) |
| self.on_mcc_msc(c_r, mcs) |
| |
| def on_ui_frame(self, frame: RFCOMM_Frame) -> None: |
| pass |
| |
| def on_mcc_pn(self, c_r: bool, pn: RFCOMM_MCC_PN) -> None: |
| if c_r: |
| # Command |
| logger.debug(f'<<< PN Command: {pn}') |
| |
| # Check with the multiplexer if there's an acceptor for this channel |
| if pn.dlci & 1: |
| # Not expected, this is an initiator-side number |
| # TODO: error out |
| logger.warning(f'invalid DLCI: {pn.dlci}') |
| else: |
| if self.acceptor: |
| channel_number = pn.dlci >> 1 |
| if dlc_params := self.acceptor(channel_number): |
| # Create a new DLC |
| dlc = DLC( |
| self, |
| dlci=pn.dlci, |
| tx_max_frame_size=pn.max_frame_size, |
| tx_initial_credits=pn.initial_credits, |
| rx_max_frame_size=dlc_params[0], |
| rx_initial_credits=dlc_params[1], |
| ) |
| self.dlcs[pn.dlci] = dlc |
| |
| # Re-emit the handshake completion event |
| dlc.on(dlc.EVENT_OPEN, lambda: self.emit(self.EVENT_DLC, dlc)) |
| |
| # Respond to complete the handshake |
| dlc.accept() |
| else: |
| # No acceptor, we're in Disconnected Mode |
| self.send_frame(RFCOMM_Frame.dm(c_r=1, dlci=pn.dlci)) |
| else: |
| # No acceptor?? shouldn't happen |
| logger.warning(color('!!! no acceptor registered', 'red')) |
| else: |
| # Response |
| logger.debug(f'>>> PN Response: {pn}') |
| if self.state == Multiplexer.State.OPENING: |
| assert self.open_pn |
| dlc = DLC( |
| self, |
| dlci=pn.dlci, |
| tx_max_frame_size=pn.max_frame_size, |
| tx_initial_credits=pn.initial_credits, |
| rx_max_frame_size=self.open_pn.max_frame_size, |
| rx_initial_credits=self.open_pn.initial_credits, |
| ) |
| self.dlcs[pn.dlci] = dlc |
| self.open_pn = None |
| dlc.connect() |
| else: |
| logger.warning('ignoring PN response') |
| |
| def on_mcc_msc(self, c_r: bool, msc: RFCOMM_MCC_MSC) -> None: |
| dlc = self.dlcs.get(msc.dlci) |
| if dlc is None: |
| logger.warning(f'no dlc for DLCI {msc.dlci}') |
| return |
| dlc.on_mcc_msc(c_r, msc) |
| |
| async def connect(self) -> None: |
| if self.state != Multiplexer.State.INIT: |
| raise InvalidStateError('invalid state') |
| |
| self.change_state(Multiplexer.State.CONNECTING) |
| self.connection_result = asyncio.get_running_loop().create_future() |
| self.send_frame(RFCOMM_Frame.sabm(c_r=1, dlci=0)) |
| return await self.connection_result |
| |
| async def disconnect(self) -> None: |
| if self.state != Multiplexer.State.CONNECTED: |
| return |
| |
| self.disconnection_result = asyncio.get_running_loop().create_future() |
| self.change_state(Multiplexer.State.DISCONNECTING) |
| self.send_frame( |
| RFCOMM_Frame.disc( |
| c_r=1 if self.role == Multiplexer.Role.INITIATOR else 0, dlci=0 |
| ) |
| ) |
| await self.disconnection_result |
| |
| async def open_dlc( |
| self, |
| channel: int, |
| max_frame_size: int = RFCOMM_DEFAULT_MAX_FRAME_SIZE, |
| initial_credits: int = RFCOMM_DEFAULT_INITIAL_CREDITS, |
| ) -> DLC: |
| if self.state != Multiplexer.State.CONNECTED: |
| if self.state == Multiplexer.State.OPENING: |
| raise InvalidStateError('open already in progress') |
| |
| raise InvalidStateError('not connected') |
| |
| self.open_pn = RFCOMM_MCC_PN( |
| dlci=channel << 1, |
| cl=0xF0, |
| priority=7, |
| ack_timer=0, |
| max_frame_size=max_frame_size, |
| max_retransmissions=0, |
| initial_credits=initial_credits, |
| ) |
| mcc = RFCOMM_Frame.make_mcc( |
| mcc_type=MccType.PN, c_r=1, data=bytes(self.open_pn) |
| ) |
| logger.debug(f'>>> Sending MCC: {self.open_pn}') |
| self.open_result = asyncio.get_running_loop().create_future() |
| self.change_state(Multiplexer.State.OPENING) |
| self.send_frame( |
| RFCOMM_Frame.uih( |
| c_r=1 if self.role == Multiplexer.Role.INITIATOR else 0, |
| dlci=0, |
| information=mcc, |
| ) |
| ) |
| return await self.open_result |
| |
| def on_dlc_open_complete(self, dlc: DLC) -> None: |
| logger.debug(f'DLC [{dlc.dlci}] open complete') |
| |
| self.change_state(Multiplexer.State.CONNECTED) |
| |
| if self.open_result: |
| self.open_result.set_result(dlc) |
| self.open_result = None |
| |
| def on_dlc_disconnection(self, dlc: DLC) -> None: |
| logger.debug(f'DLC [{dlc.dlci}] disconnection') |
| self.dlcs.pop(dlc.dlci, None) |
| |
| def on_l2cap_channel_close(self) -> None: |
| logger.debug('L2CAP channel closed, cleaning up') |
| if self.open_result: |
| self.open_result.cancel() |
| self.open_result = None |
| if self.disconnection_result: |
| self.disconnection_result.cancel() |
| self.disconnection_result = None |
| for dlc in self.dlcs.values(): |
| dlc.abort() |
| |
| def __str__(self) -> str: |
| return f'Multiplexer(state={self.state.name})' |
| |
| |
| # ----------------------------------------------------------------------------- |
| class Client: |
| multiplexer: Optional[Multiplexer] |
| l2cap_channel: Optional[l2cap.ClassicChannel] |
| |
| def __init__( |
| self, connection: Connection, l2cap_mtu: int = RFCOMM_DEFAULT_L2CAP_MTU |
| ) -> None: |
| self.connection = connection |
| self.l2cap_mtu = l2cap_mtu |
| self.l2cap_channel = None |
| self.multiplexer = None |
| |
| async def start(self) -> Multiplexer: |
| # Create a new L2CAP connection |
| try: |
| self.l2cap_channel = await self.connection.create_l2cap_channel( |
| spec=l2cap.ClassicChannelSpec(psm=RFCOMM_PSM, mtu=self.l2cap_mtu) |
| ) |
| except ProtocolError as error: |
| logger.warning(f'L2CAP connection failed: {error}') |
| raise |
| |
| assert self.l2cap_channel is not None |
| # Create a multiplexer to manage DLCs with the server |
| self.multiplexer = Multiplexer(self.l2cap_channel, Multiplexer.Role.INITIATOR) |
| |
| # Connect the multiplexer |
| await self.multiplexer.connect() |
| |
| return self.multiplexer |
| |
| async def shutdown(self) -> None: |
| if self.multiplexer is None: |
| return |
| # Disconnect the multiplexer |
| await self.multiplexer.disconnect() |
| self.multiplexer = None |
| |
| # Close the L2CAP channel |
| if self.l2cap_channel: |
| await self.l2cap_channel.disconnect() |
| self.l2cap_channel = None |
| |
| async def __aenter__(self) -> Multiplexer: |
| return await self.start() |
| |
| async def __aexit__(self, *args) -> None: |
| await self.shutdown() |
| |
| |
| # ----------------------------------------------------------------------------- |
| class Server(utils.EventEmitter): |
| EVENT_START = "start" |
| |
| def __init__( |
| self, device: Device, l2cap_mtu: int = RFCOMM_DEFAULT_L2CAP_MTU |
| ) -> None: |
| super().__init__() |
| self.device = device |
| self.acceptors: dict[int, Callable[[DLC], None]] = {} |
| self.dlc_configs: dict[int, tuple[int, int]] = {} |
| |
| # Register ourselves with the L2CAP channel manager |
| self.l2cap_server = device.create_l2cap_server( |
| spec=l2cap.ClassicChannelSpec(psm=RFCOMM_PSM, mtu=l2cap_mtu), |
| handler=self.on_connection, |
| ) |
| |
| def listen( |
| self, |
| acceptor: Callable[[DLC], None], |
| channel: int = 0, |
| max_frame_size: int = RFCOMM_DEFAULT_MAX_FRAME_SIZE, |
| initial_credits: int = RFCOMM_DEFAULT_INITIAL_CREDITS, |
| ) -> int: |
| if channel: |
| if channel in self.acceptors: |
| # Busy |
| return 0 |
| else: |
| # Find a free channel number |
| for candidate in range( |
| RFCOMM_DYNAMIC_CHANNEL_NUMBER_START, |
| RFCOMM_DYNAMIC_CHANNEL_NUMBER_END + 1, |
| ): |
| if candidate not in self.acceptors: |
| channel = candidate |
| break |
| |
| if channel == 0: |
| # All channels used... |
| return 0 |
| |
| self.acceptors[channel] = acceptor |
| self.dlc_configs[channel] = (max_frame_size, initial_credits) |
| |
| return channel |
| |
| def on_connection(self, l2cap_channel: l2cap.ClassicChannel) -> None: |
| logger.debug(f'+++ new L2CAP connection: {l2cap_channel}') |
| l2cap_channel.on( |
| l2cap_channel.EVENT_OPEN, lambda: self.on_l2cap_channel_open(l2cap_channel) |
| ) |
| |
| def on_l2cap_channel_open(self, l2cap_channel: l2cap.ClassicChannel) -> None: |
| logger.debug(f'$$$ L2CAP channel open: {l2cap_channel}') |
| |
| # Create a new multiplexer for the channel |
| multiplexer = Multiplexer(l2cap_channel, Multiplexer.Role.RESPONDER) |
| multiplexer.acceptor = self.accept_dlc |
| multiplexer.on(multiplexer.EVENT_DLC, self.on_dlc) |
| |
| # Notify |
| self.emit(self.EVENT_START, multiplexer) |
| |
| def accept_dlc(self, channel_number: int) -> Optional[tuple[int, int]]: |
| return self.dlc_configs.get(channel_number) |
| |
| def on_dlc(self, dlc: DLC) -> None: |
| logger.debug(f'@@@ new DLC connected: {dlc}') |
| |
| # Let the acceptor know |
| if acceptor := self.acceptors.get(dlc.dlci >> 1): |
| acceptor(dlc) |
| |
| def __enter__(self) -> Self: |
| return self |
| |
| def __exit__(self, *args) -> None: |
| self.l2cap_server.close() |