add support for ACL and ISO HCI packet queues
diff --git a/apps/auracast.py b/apps/auracast.py
index ede4ead..42c1cab 100644
--- a/apps/auracast.py
+++ b/apps/auracast.py
@@ -825,10 +825,24 @@
),
)
print('Setup ISO Data Path')
+
+ def on_drain(packet_queue):
+ print(
+ f'\rPACKETS: pending={packet_queue.pending}, '
+ f'queued={packet_queue.queued}, completed={packet_queue.completed}',
+ end='',
+ )
+
+ packet_queue = None
for bis_link in big.bis_links:
await bis_link.setup_data_path(
direction=bis_link.Direction.HOST_TO_CONTROLLER
)
+ if packet_queue is None:
+ packet_queue = bis_link.data_packet_queue
+
+ if packet_queue:
+ packet_queue.on('drain', lambda: on_drain(packet_queue))
for frame in itertools.cycle(frames):
mid = len(frame) // 2
diff --git a/apps/controller_info.py b/apps/controller_info.py
index 89c830c..2c8d9aa 100644
--- a/apps/controller_info.py
+++ b/apps/controller_info.py
@@ -37,6 +37,8 @@
HCI_Command_Status_Event,
HCI_READ_BUFFER_SIZE_COMMAND,
HCI_Read_Buffer_Size_Command,
+ HCI_LE_READ_BUFFER_SIZE_V2_COMMAND,
+ HCI_LE_Read_Buffer_Size_V2_Command,
HCI_READ_BD_ADDR_COMMAND,
HCI_Read_BD_ADDR_Command,
HCI_READ_LOCAL_NAME_COMMAND,
@@ -147,7 +149,7 @@
# -----------------------------------------------------------------------------
-async def get_acl_flow_control_info(host: Host) -> None:
+async def get_flow_control_info(host: Host) -> None:
print()
if host.supports_command(HCI_READ_BUFFER_SIZE_COMMAND):
@@ -160,14 +162,28 @@
f'packets of size {response.return_parameters.hc_acl_data_packet_length}',
)
- if host.supports_command(HCI_LE_READ_BUFFER_SIZE_COMMAND):
+ if host.supports_command(HCI_LE_READ_BUFFER_SIZE_V2_COMMAND):
+ response = await host.send_command(
+ HCI_LE_Read_Buffer_Size_V2_Command(), check_result=True
+ )
+ print(
+ color('LE ACL Flow Control:', 'yellow'),
+ f'{response.return_parameters.total_num_le_acl_data_packets} '
+ f'packets of size {response.return_parameters.le_acl_data_packet_length}',
+ )
+ print(
+ color('LE ISO Flow Control:', 'yellow'),
+ f'{response.return_parameters.total_num_iso_data_packets} '
+ f'packets of size {response.return_parameters.iso_data_packet_length}',
+ )
+ elif host.supports_command(HCI_LE_READ_BUFFER_SIZE_COMMAND):
response = await host.send_command(
HCI_LE_Read_Buffer_Size_Command(), check_result=True
)
print(
color('LE ACL Flow Control:', 'yellow'),
- f'{response.return_parameters.hc_total_num_le_acl_data_packets} '
- f'packets of size {response.return_parameters.hc_le_acl_data_packet_length}',
+ f'{response.return_parameters.total_num_le_acl_data_packets} '
+ f'packets of size {response.return_parameters.le_acl_data_packet_length}',
)
@@ -274,8 +290,8 @@
# Get the LE info
await get_le_info(host)
- # Print the ACL flow control info
- await get_acl_flow_control_info(host)
+ # Print the flow control info
+ await get_flow_control_info(host)
# Get codec info
await get_codecs_info(host)
diff --git a/bumble/controller.py b/bumble/controller.py
index 03d3c14..9366c1d 100644
--- a/bumble/controller.py
+++ b/bumble/controller.py
@@ -154,15 +154,17 @@
'0000000060000000'
) # BR/EDR Not Supported, LE Supported (Controller)
self.manufacturer_name = 0xFFFF
- self.hc_data_packet_length = 27
- self.hc_total_num_data_packets = 64
- self.hc_le_data_packet_length = 27
- self.hc_total_num_le_data_packets = 64
+ self.acl_data_packet_length = 27
+ self.total_num_acl_data_packets = 64
+ self.le_acl_data_packet_length = 27
+ self.total_num_le_acl_data_packets = 64
+ self.iso_data_packet_length = 960
+ self.total_num_iso_data_packets = 64
self.event_mask = 0
self.event_mask_page_2 = 0
self.supported_commands = bytes.fromhex(
'2000800000c000000000e4000000a822000000000000040000f7ffff7f000000'
- '30f0f9ff01008004000000000000000000000000000000000000000000000000'
+ '30f0f9ff01008004002000000000000000000000000000000000000000000000'
)
self.le_event_mask = 0
self.advertising_parameters = None
@@ -1181,9 +1183,9 @@
return struct.pack(
'<BHBHH',
HCI_SUCCESS,
- self.hc_data_packet_length,
+ self.acl_data_packet_length,
0,
- self.hc_total_num_data_packets,
+ self.total_num_acl_data_packets,
0,
)
@@ -1212,8 +1214,21 @@
return struct.pack(
'<BHB',
HCI_SUCCESS,
- self.hc_le_data_packet_length,
- self.hc_total_num_le_data_packets,
+ self.le_acl_data_packet_length,
+ self.total_num_le_acl_data_packets,
+ )
+
+ def on_hci_le_read_buffer_size_v2_command(self, _command):
+ '''
+ See Bluetooth spec Vol 4, Part E - 7.8.2 LE Read Buffer Size Command
+ '''
+ return struct.pack(
+ '<BHBHB',
+ HCI_SUCCESS,
+ self.le_acl_data_packet_length,
+ self.total_num_le_acl_data_packets,
+ self.iso_data_packet_length,
+ self.total_num_iso_data_packets,
)
def on_hci_le_read_local_supported_features_command(self, _command):
diff --git a/bumble/device.py b/bumble/device.py
index 1a6d754..e58d942 100644
--- a/bumble/device.py
+++ b/bumble/device.py
@@ -52,7 +52,7 @@
from .colors import color
from .att import ATT_CID, ATT_DEFAULT_MTU, ATT_PDU
from .gatt import Characteristic, Descriptor, Service
-from .host import Host
+from .host import DataPacketQueue, Host
from .profiles.gap import GenericAccessService
from .core import (
BT_BR_EDR_TRANSPORT,
@@ -1329,7 +1329,6 @@
class _IsoLink:
handle: int
device: Device
- packet_sequence_number: int
sink: Callable[[hci.HCI_IsoDataPacket], Any] | None = None
class Direction(IntEnum):
@@ -1391,22 +1390,12 @@
return response.return_parameters.status
def write(self, sdu: bytes) -> None:
- """Write an ISO SDU.
+ """Write an ISO SDU."""
+ self.device.host.send_iso_sdu(connection_handle=self.handle, sdu=sdu)
- This will automatically increase the packet sequence number.
- """
- self.device.host.send_hci_packet(
- hci.HCI_IsoDataPacket(
- connection_handle=self.handle,
- data_total_length=len(sdu) + 4,
- packet_sequence_number=self.packet_sequence_number,
- pb_flag=0b10,
- packet_status_flag=0,
- iso_sdu_length=len(sdu),
- iso_sdu_fragment=sdu,
- )
- )
- self.packet_sequence_number = (self.packet_sequence_number + 1) % 0x10000
+ @property
+ def data_packet_queue(self) -> DataPacketQueue | None:
+ return self.device.host.get_data_packet_queue(self.handle)
# -----------------------------------------------------------------------------
@@ -1426,7 +1415,6 @@
def __post_init__(self) -> None:
super().__init__()
- self.packet_sequence_number = 0
async def disconnect(
self, reason: int = hci.HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR
@@ -1443,7 +1431,6 @@
def __post_init__(self) -> None:
self.device = self.big.device
- self.packet_sequence_number = 0
# -----------------------------------------------------------------------------
@@ -1691,6 +1678,10 @@
self.peer_le_features = await self.device.get_remote_le_features(self)
return self.peer_le_features
+ @property
+ def data_packet_queue(self) -> DataPacketQueue | None:
+ return self.device.host.get_data_packet_queue(self.handle)
+
async def __aenter__(self):
return self
diff --git a/bumble/hci.py b/bumble/hci.py
index 77a95e7..2a1eb85 100644
--- a/bumble/hci.py
+++ b/bumble/hci.py
@@ -3585,8 +3585,8 @@
@HCI_Command.command(
return_parameters_fields=[
('status', STATUS_SPEC),
- ('hc_le_acl_data_packet_length', 2),
- ('hc_total_num_le_acl_data_packets', 1),
+ ('le_acl_data_packet_length', 2),
+ ('total_num_le_acl_data_packets', 1),
]
)
class HCI_LE_Read_Buffer_Size_Command(HCI_Command):
@@ -3597,6 +3597,22 @@
# -----------------------------------------------------------------------------
@HCI_Command.command(
+ return_parameters_fields=[
+ ('status', STATUS_SPEC),
+ ('le_acl_data_packet_length', 2),
+ ('total_num_le_acl_data_packets', 1),
+ ('iso_data_packet_length', 2),
+ ('total_num_iso_data_packets', 1),
+ ]
+)
+class HCI_LE_Read_Buffer_Size_V2_Command(HCI_Command):
+ '''
+ See Bluetooth spec @ 7.8.2 LE Read Buffer Size V2 Command
+ '''
+
+
+# -----------------------------------------------------------------------------
+@HCI_Command.command(
return_parameters_fields=[('status', STATUS_SPEC), ('le_features', 8)]
)
class HCI_LE_Read_Local_Supported_Features_Command(HCI_Command):
@@ -7555,7 +7571,7 @@
if should_include_sdu_info:
packet_sequence_number, sdu_info = struct.unpack_from('<HH', packet, pos)
iso_sdu_length = sdu_info & 0xFFF
- packet_status_flag = sdu_info >> 14
+ packet_status_flag = (sdu_info >> 15) & 1
pos += 4
iso_sdu_fragment = packet[pos:]
@@ -7589,7 +7605,7 @@
fmt += 'HH'
args += [
self.packet_sequence_number,
- self.iso_sdu_length | self.packet_status_flag << 14,
+ self.iso_sdu_length | self.packet_status_flag << 15,
]
return struct.pack(fmt, *args) + self.iso_sdu_fragment
@@ -7597,9 +7613,10 @@
return (
f'{color("ISO", "blue")}: '
f'handle=0x{self.connection_handle:04x}, '
+ f'pb={self.pb_flag}, '
f'ps={self.packet_status_flag}, '
f'data_total_length={self.data_total_length}, '
- f'sdu={self.iso_sdu_fragment.hex()}'
+ f'sdu_fragment={self.iso_sdu_fragment.hex()}'
)
diff --git a/bumble/host.py b/bumble/host.py
index 1ce4263..48c03e0 100644
--- a/bumble/host.py
+++ b/bumble/host.py
@@ -21,7 +21,6 @@
import dataclasses
import logging
import struct
-import itertools
from typing import (
Any,
@@ -35,6 +34,8 @@
TYPE_CHECKING,
)
+import pyee
+
from bumble.colors import color
from bumble.l2cap import L2CAP_PDU
from bumble.snoop import Snooper
@@ -60,7 +61,19 @@
# -----------------------------------------------------------------------------
-class AclPacketQueue:
+class DataPacketQueue(pyee.EventEmitter):
+ """
+ Flow-control queue for host->controller data packets (ACL, ISO).
+
+ The queue holds packets associated with a connection handle. The packets
+ are sent to the controller, up to a maximum total number of packets in flight.
+ A packet is considered to be "in flight" when it has been sent to the controller
+ but not completed yet. Packets are no longer "in flight" when the controller
+ declares them as completed.
+
+ The queue emits a 'drain' event whenever one or more packets are completed.
+ """
+
max_packet_size: int
def __init__(
@@ -69,40 +82,105 @@
max_in_flight: int,
send: Callable[[hci.HCI_Packet], None],
) -> None:
+ super().__init__()
self.max_packet_size = max_packet_size
self.max_in_flight = max_in_flight
- self.in_flight = 0
- self.send = send
- self.packets: Deque[hci.HCI_AclDataPacket] = collections.deque()
+ self._in_flight = 0 # Total number of packets in flight across all connections
+ self._in_flight_per_connection: dict[int, int] = collections.defaultdict(
+ int
+ ) # Number of packets in flight per connection
+ self._send = send
+ self._packets: Deque[tuple[hci.HCI_Packet, int]] = collections.deque()
+ self._queued = 0
+ self._completed = 0
- def enqueue(self, packet: hci.HCI_AclDataPacket) -> None:
- self.packets.appendleft(packet)
- self.check_queue()
+ @property
+ def queued(self) -> int:
+ """Total number of packets queued since creation."""
+ return self._queued
- if self.packets:
+ @property
+ def completed(self) -> int:
+ """Total number of packets completed since creation."""
+ return self._completed
+
+ @property
+ def pending(self) -> int:
+ """Number of packets that have been queued but not completed."""
+ return self._queued - self._completed
+
+ def enqueue(self, packet: hci.HCI_Packet, connection_handle: int) -> None:
+ """Enqueue a packet associated with a connection"""
+ self._packets.appendleft((packet, connection_handle))
+ self._queued += 1
+ self._check_queue()
+
+ if self._packets:
logger.debug(
- f'{self.in_flight} ACL packets in flight, '
- f'{len(self.packets)} in queue'
+ f'{self._in_flight} packets in flight, '
+ f'{len(self._packets)} in queue'
)
- def check_queue(self) -> None:
- while self.packets and self.in_flight < self.max_in_flight:
- packet = self.packets.pop()
- self.send(packet)
- self.in_flight += 1
+ def flush(self, connection_handle: int) -> None:
+ """
+ Remove all packets associated with a connection.
- def on_packets_completed(self, packet_count: int) -> None:
- if packet_count > self.in_flight:
+ All packets associated with the connection that are in flight are implicitly
+ marked as completed, but no 'drain' event is emitted.
+ """
+
+ packets_to_keep = [
+ (packet, handle)
+ for (packet, handle) in self._packets
+ if handle != connection_handle
+ ]
+ if flushed_count := len(self._packets) - len(packets_to_keep):
+ self._completed += flushed_count
+ self._packets = collections.deque(packets_to_keep)
+
+ if connection_handle in self._in_flight_per_connection:
+ in_flight = self._in_flight_per_connection[connection_handle]
+ self._completed += in_flight
+ self._in_flight -= in_flight
+ del self._in_flight_per_connection[connection_handle]
+
+ def _check_queue(self) -> None:
+ while self._packets and self._in_flight < self.max_in_flight:
+ packet, connection_handle = self._packets.pop()
+ self._send(packet)
+ self._in_flight += 1
+ self._in_flight_per_connection[connection_handle] += 1
+
+ def on_packets_completed(self, packet_count: int, connection_handle: int) -> None:
+ """Mark one or more packets associated with a connection as completed."""
+ if connection_handle not in self._in_flight_per_connection:
logger.warning(
- color(
- '!!! {packet_count} completed but only '
- f'{self.in_flight} in flight'
- )
+ f'received completion for unknown connection {connection_handle}'
)
- packet_count = self.in_flight
+ return
- self.in_flight -= packet_count
- self.check_queue()
+ in_flight_for_connection = self._in_flight_per_connection[connection_handle]
+ if packet_count <= in_flight_for_connection:
+ self._in_flight_per_connection[connection_handle] -= packet_count
+ else:
+ logger.warning(
+ f'{packet_count} completed for {connection_handle} '
+ f'but only {in_flight_for_connection} in flight'
+ )
+ self._in_flight_per_connection[connection_handle] = 0
+
+ if packet_count <= self._in_flight:
+ self._in_flight -= packet_count
+ self._completed += packet_count
+ else:
+ logger.warning(
+ f'{packet_count} completed but only {self._in_flight} in flight'
+ )
+ self._in_flight = 0
+ self._completed = self._queued
+
+ self._check_queue()
+ self.emit('drain')
# -----------------------------------------------------------------------------
@@ -115,7 +193,7 @@
self.peer_address = peer_address
self.assembler = hci.HCI_AclDataPacketAssembler(self.on_acl_pdu)
self.transport = transport
- acl_packet_queue: Optional[AclPacketQueue] = (
+ acl_packet_queue: Optional[DataPacketQueue] = (
host.le_acl_packet_queue
if transport == BT_LE_TRANSPORT
else host.acl_packet_queue
@@ -130,29 +208,37 @@
l2cap_pdu = L2CAP_PDU.from_bytes(pdu)
self.host.on_l2cap_pdu(self, l2cap_pdu.cid, l2cap_pdu.payload)
+ def __str__(self) -> str:
+ return (
+ f'Connection(transport={self.transport}, peer_address={self.peer_address})'
+ )
+
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class ScoLink:
peer_address: hci.Address
- handle: int
+ connection_handle: int
# -----------------------------------------------------------------------------
@dataclasses.dataclass
-class CisLink:
- peer_address: hci.Address
+class IsoLink:
handle: int
+ packet_queue: DataPacketQueue = dataclasses.field(repr=False)
+ packet_sequence_number: int = 0
# -----------------------------------------------------------------------------
class Host(AbortableEventEmitter):
connections: Dict[int, Connection]
- cis_links: Dict[int, CisLink]
+ cis_links: Dict[int, IsoLink]
+ bis_links: Dict[int, IsoLink]
sco_links: Dict[int, ScoLink]
bigs: dict[int, set[int]] = {} # BIG Handle to BIS Handles
- acl_packet_queue: Optional[AclPacketQueue] = None
- le_acl_packet_queue: Optional[AclPacketQueue] = None
+ acl_packet_queue: Optional[DataPacketQueue] = None
+ le_acl_packet_queue: Optional[DataPacketQueue] = None
+ iso_packet_queue: Optional[DataPacketQueue] = None
hci_sink: Optional[TransportSink] = None
hci_metadata: Dict[str, Any]
long_term_key_provider: Optional[
@@ -171,6 +257,7 @@
self.ready = False # True when we can accept incoming packets
self.connections = {} # Connections, by connection handle
self.cis_links = {} # CIS links, by connection handle
+ self.bis_links = {} # BIS links, by connection handle
self.sco_links = {} # SCO links, by connection handle
self.pending_command = None
self.pending_response: Optional[asyncio.Future[Any]] = None
@@ -413,39 +500,70 @@
f'hc_total_num_acl_data_packets={hc_total_num_acl_data_packets}'
)
- self.acl_packet_queue = AclPacketQueue(
+ self.acl_packet_queue = DataPacketQueue(
max_packet_size=hc_acl_data_packet_length,
max_in_flight=hc_total_num_acl_data_packets,
send=self.send_hci_packet,
)
- hc_le_acl_data_packet_length = 0
- hc_total_num_le_acl_data_packets = 0
- if self.supports_command(hci.HCI_LE_READ_BUFFER_SIZE_COMMAND):
+ le_acl_data_packet_length = 0
+ total_num_le_acl_data_packets = 0
+ iso_data_packet_length = 0
+ total_num_iso_data_packets = 0
+ if self.supports_command(hci.HCI_LE_READ_BUFFER_SIZE_V2_COMMAND):
+ response = await self.send_command(
+ hci.HCI_LE_Read_Buffer_Size_V2_Command(), check_result=True
+ )
+ le_acl_data_packet_length = (
+ response.return_parameters.le_acl_data_packet_length
+ )
+ total_num_le_acl_data_packets = (
+ response.return_parameters.total_num_le_acl_data_packets
+ )
+ iso_data_packet_length = response.return_parameters.iso_data_packet_length
+ total_num_iso_data_packets = (
+ response.return_parameters.total_num_iso_data_packets
+ )
+
+ logger.debug(
+ 'HCI LE flow control: '
+ f'le_acl_data_packet_length={le_acl_data_packet_length},'
+ f'total_num_le_acl_data_packets={total_num_le_acl_data_packets}'
+ f'iso_data_packet_length={iso_data_packet_length},'
+ f'total_num_iso_data_packets={total_num_iso_data_packets}'
+ )
+ elif self.supports_command(hci.HCI_LE_READ_BUFFER_SIZE_COMMAND):
response = await self.send_command(
hci.HCI_LE_Read_Buffer_Size_Command(), check_result=True
)
- hc_le_acl_data_packet_length = (
- response.return_parameters.hc_le_acl_data_packet_length
+ le_acl_data_packet_length = (
+ response.return_parameters.le_acl_data_packet_length
)
- hc_total_num_le_acl_data_packets = (
- response.return_parameters.hc_total_num_le_acl_data_packets
+ total_num_le_acl_data_packets = (
+ response.return_parameters.total_num_le_acl_data_packets
)
logger.debug(
'HCI LE ACL flow control: '
- f'hc_le_acl_data_packet_length={hc_le_acl_data_packet_length},'
- f'hc_total_num_le_acl_data_packets={hc_total_num_le_acl_data_packets}'
+ f'le_acl_data_packet_length={le_acl_data_packet_length},'
+ f'total_num_le_acl_data_packets={total_num_le_acl_data_packets}'
)
- if hc_le_acl_data_packet_length == 0 or hc_total_num_le_acl_data_packets == 0:
+ if le_acl_data_packet_length == 0 or total_num_le_acl_data_packets == 0:
# LE and Classic share the same queue
self.le_acl_packet_queue = self.acl_packet_queue
else:
# Create a separate queue for LE
- self.le_acl_packet_queue = AclPacketQueue(
- max_packet_size=hc_le_acl_data_packet_length,
- max_in_flight=hc_total_num_le_acl_data_packets,
+ self.le_acl_packet_queue = DataPacketQueue(
+ max_packet_size=le_acl_data_packet_length,
+ max_in_flight=total_num_le_acl_data_packets,
+ send=self.send_hci_packet,
+ )
+
+ if iso_data_packet_length and total_num_iso_data_packets:
+ self.iso_packet_queue = DataPacketQueue(
+ max_packet_size=iso_data_packet_length,
+ max_in_flight=total_num_iso_data_packets,
send=self.send_hci_packet,
)
@@ -597,11 +715,78 @@
data=l2cap_pdu[offset : offset + data_total_length],
)
logger.debug(f'>>> ACL packet enqueue: (CID={cid}) {acl_packet}')
- packet_queue.enqueue(acl_packet)
+ packet_queue.enqueue(acl_packet, connection_handle)
pb_flag = 1
offset += data_total_length
bytes_remaining -= data_total_length
+ def get_data_packet_queue(self, connection_handle: int) -> DataPacketQueue | None:
+ if connection := self.connections.get(connection_handle):
+ return connection.acl_packet_queue
+
+ if iso_link := self.cis_links.get(connection_handle) or self.bis_links.get(
+ connection_handle
+ ):
+ return iso_link.packet_queue
+
+ return None
+
+ def send_iso_sdu(self, connection_handle: int, sdu: bytes) -> None:
+ if not (
+ iso_link := self.cis_links.get(connection_handle)
+ or self.bis_links.get(connection_handle)
+ ):
+ logger.warning(f"no ISO link for connection handle {connection_handle}")
+ return
+
+ if iso_link.packet_queue is None:
+ logger.warning("ISO link has no data packet queue")
+ return
+
+ bytes_remaining = len(sdu)
+ offset = 0
+ while bytes_remaining:
+ is_first_fragment = offset == 0
+ header_length = 4 if is_first_fragment else 0
+ assert iso_link.packet_queue.max_packet_size > header_length
+ fragment_length = min(
+ bytes_remaining, iso_link.packet_queue.max_packet_size - header_length
+ )
+ is_last_fragment = bytes_remaining == fragment_length
+ iso_sdu_fragment = sdu[offset : offset + fragment_length]
+ iso_link.packet_queue.enqueue(
+ (
+ hci.HCI_IsoDataPacket(
+ connection_handle=connection_handle,
+ data_total_length=header_length + fragment_length,
+ packet_sequence_number=iso_link.packet_sequence_number,
+ pb_flag=0b10 if is_last_fragment else 0b00,
+ packet_status_flag=0,
+ iso_sdu_length=len(sdu),
+ iso_sdu_fragment=iso_sdu_fragment,
+ )
+ if is_first_fragment
+ else hci.HCI_IsoDataPacket(
+ connection_handle=connection_handle,
+ data_total_length=fragment_length,
+ pb_flag=0b11 if is_last_fragment else 0b01,
+ iso_sdu_fragment=iso_sdu_fragment,
+ )
+ ),
+ connection_handle,
+ )
+
+ offset += fragment_length
+ bytes_remaining -= fragment_length
+
+ iso_link.packet_sequence_number = (iso_link.packet_sequence_number + 1) & 0xFFFF
+
+ def remove_big(self, big_handle: int) -> None:
+ if big := self.bigs.pop(big_handle, None):
+ for connection_handle in big:
+ if bis_link := self.bis_links.pop(connection_handle, None):
+ bis_link.packet_queue.flush(bis_link.handle)
+
def supports_command(self, op_code: int) -> bool:
return (
self.local_supported_commands
@@ -729,17 +914,31 @@
def on_hci_command_status_event(self, event):
return self.on_command_processed(event)
- def on_hci_number_of_completed_packets_event(self, event):
+ def on_hci_number_of_completed_packets_event(
+ self, event: hci.HCI_Number_Of_Completed_Packets_Event
+ ) -> None:
for connection_handle, num_completed_packets in zip(
event.connection_handles, event.num_completed_packets
):
if connection := self.connections.get(connection_handle):
- connection.acl_packet_queue.on_packets_completed(num_completed_packets)
- elif connection_handle not in itertools.chain(
- self.cis_links.keys(),
- self.sco_links.keys(),
- itertools.chain.from_iterable(self.bigs.values()),
- ):
+ connection.acl_packet_queue.on_packets_completed(
+ num_completed_packets, connection_handle
+ )
+ return
+
+ if cis_link := self.cis_links.get(connection_handle):
+ cis_link.packet_queue.on_packets_completed(
+ num_completed_packets, connection_handle
+ )
+ return
+
+ if bis_link := self.bis_links.get(connection_handle):
+ bis_link.packet_queue.on_packets_completed(
+ num_completed_packets, connection_handle
+ )
+ return
+
+ if connection_handle not in self.sco_links:
logger.warning(
'received packet completion event for unknown handle '
f'0x{connection_handle:04X}'
@@ -857,11 +1056,7 @@
return
if event.status == hci.HCI_SUCCESS:
- logger.debug(
- f'### DISCONNECTION: [0x{handle:04X}] '
- f'{connection.peer_address} '
- f'reason={event.reason}'
- )
+ logger.debug(f'### DISCONNECTION: {connection}, reason={event.reason}')
# Notify the listeners
self.emit('disconnection', handle, event.reason)
@@ -872,6 +1067,12 @@
or self.cis_links.pop(handle, 0)
or self.sco_links.pop(handle, 0)
)
+
+ # Flush the data queues
+ self.acl_packet_queue.flush(handle)
+ self.le_acl_packet_queue.flush(handle)
+ if self.iso_packet_queue:
+ self.iso_packet_queue.flush(handle)
else:
logger.debug(f'### DISCONNECTION FAILED: {event.status}')
@@ -958,6 +1159,14 @@
def on_hci_le_create_big_complete_event(self, event):
self.bigs[event.big_handle] = set(event.connection_handle)
+ if self.iso_packet_queue is None:
+ logger.warning("BIS established but ISO packets not supported")
+
+ for connection_handle in event.connection_handle:
+ self.bis_links[connection_handle] = IsoLink(
+ connection_handle, self.iso_packet_queue
+ )
+
self.emit(
'big_establishment',
event.status,
@@ -975,6 +1184,12 @@
)
def on_hci_le_big_sync_established_event(self, event):
+ self.bigs[event.big_handle] = set(event.connection_handle)
+ for connection_handle in event.connection_handle:
+ self.bis_links[connection_handle] = IsoLink(
+ connection_handle, self.iso_packet_queue
+ )
+
self.emit(
'big_sync_establishment',
event.status,
@@ -990,22 +1205,20 @@
)
def on_hci_le_big_sync_lost_event(self, event):
- self.emit(
- 'big_sync_lost',
- event.big_handle,
- event.reason,
- )
+ self.remove_big(event.big_handle)
+ self.emit('big_sync_lost', event.big_handle, event.reason)
def on_hci_le_terminate_big_complete_event(self, event):
- self.bigs.pop(event.big_handle)
+ self.remove_big(event.big_handle)
self.emit('big_termination', event.reason, event.big_handle)
def on_hci_le_cis_established_event(self, event):
# The remaining parameters are unused for now.
if event.status == hci.HCI_SUCCESS:
- self.cis_links[event.connection_handle] = CisLink(
- handle=event.connection_handle,
- peer_address=hci.Address.ANY,
+ if self.iso_packet_queue is None:
+ logger.warning("CIS established but ISO packets not supported")
+ self.cis_links[event.connection_handle] = IsoLink(
+ handle=event.connection_handle, packet_queue=self.iso_packet_queue
)
self.emit('cis_establishment', event.connection_handle)
else:
@@ -1075,7 +1288,7 @@
self.sco_links[event.connection_handle] = ScoLink(
peer_address=event.bd_addr,
- handle=event.connection_handle,
+ connection_handle=event.connection_handle,
)
# Notify the client
diff --git a/tests/device_test.py b/tests/device_test.py
index 1f6175a..350c0a4 100644
--- a/tests/device_test.py
+++ b/tests/device_test.py
@@ -34,7 +34,7 @@
Device,
PeriodicAdvertisingParameters,
)
-from bumble.host import AclPacketQueue, Host
+from bumble.host import DataPacketQueue, Host
from bumble.hci import (
HCI_ACCEPT_CONNECTION_REQUEST_COMMAND,
HCI_COMMAND_STATUS_PENDING,
@@ -90,9 +90,9 @@
def _send(packet):
pass
- d0.host.acl_packet_queue = AclPacketQueue(0, 0, _send)
- d1.host.acl_packet_queue = AclPacketQueue(0, 0, _send)
- d2.host.acl_packet_queue = AclPacketQueue(0, 0, _send)
+ d0.host.acl_packet_queue = DataPacketQueue(0, 0, _send)
+ d1.host.acl_packet_queue = DataPacketQueue(0, 0, _send)
+ d2.host.acl_packet_queue = DataPacketQueue(0, 0, _send)
# enable classic
d0.classic_enabled = True
diff --git a/tests/hci_test.py b/tests/hci_test.py
index ee4ef8a..eac641e 100644
--- a/tests/hci_test.py
+++ b/tests/hci_test.py
@@ -170,8 +170,8 @@
command_opcode=HCI_LE_READ_BUFFER_SIZE_COMMAND,
return_parameters=HCI_LE_Read_Buffer_Size_Command.create_return_parameters(
status=0,
- hc_le_acl_data_packet_length=1234,
- hc_total_num_le_acl_data_packets=56,
+ le_acl_data_packet_length=1234,
+ total_num_le_acl_data_packets=56,
),
)
basic_check(event)
diff --git a/tests/host_test.py b/tests/host_test.py
index 5170497..5789b5e 100644
--- a/tests/host_test.py
+++ b/tests/host_test.py
@@ -16,11 +16,14 @@
# Imports
# -----------------------------------------------------------------------------
import logging
+import unittest.mock
import pytest
+import unittest
from bumble.controller import Controller
-from bumble.host import Host
+from bumble.host import Host, DataPacketQueue
from bumble.transport import AsyncPipeSink
+from bumble.hci import HCI_AclDataPacket
# -----------------------------------------------------------------------------
# Logging
@@ -60,3 +63,90 @@
assert host.local_lmp_features == int.from_bytes(
bytes.fromhex(lmp_features), 'little'
)
+
+
+# -----------------------------------------------------------------------------
+def test_data_packet_queue():
+ controller = unittest.mock.Mock()
+ queue = DataPacketQueue(10, 2, controller.send)
+ assert queue.queued == 0
+ assert queue.completed == 0
+ packet = HCI_AclDataPacket(
+ connection_handle=123, pb_flag=0, bc_flag=0, data_total_length=0, data=b''
+ )
+
+ queue.enqueue(packet, packet.connection_handle)
+ assert queue.queued == 1
+ assert queue.completed == 0
+ assert controller.send.call_count == 1
+
+ queue.enqueue(packet, packet.connection_handle)
+ assert queue.queued == 2
+ assert queue.completed == 0
+ assert controller.send.call_count == 2
+
+ queue.enqueue(packet, packet.connection_handle)
+ assert queue.queued == 3
+ assert queue.completed == 0
+ assert controller.send.call_count == 2
+
+ queue.on_packets_completed(1, 8000)
+ assert queue.queued == 3
+ assert queue.completed == 0
+ assert controller.send.call_count == 2
+
+ queue.on_packets_completed(1, 123)
+ assert queue.queued == 3
+ assert queue.completed == 1
+ assert controller.send.call_count == 3
+
+ queue.enqueue(packet, packet.connection_handle)
+ assert queue.queued == 4
+ assert queue.completed == 1
+ assert controller.send.call_count == 3
+
+ queue.on_packets_completed(2, 123)
+ assert queue.queued == 4
+ assert queue.completed == 3
+ assert controller.send.call_count == 4
+
+ queue.on_packets_completed(1, 123)
+ assert queue.queued == 4
+ assert queue.completed == 4
+ assert controller.send.call_count == 4
+
+ queue.enqueue(packet, 123)
+ queue.enqueue(packet, 123)
+ queue.enqueue(packet, 123)
+ queue.enqueue(packet, 124)
+ queue.enqueue(packet, 124)
+ queue.enqueue(packet, 124)
+ queue.on_packets_completed(1, 123)
+ assert queue.queued == 10
+ assert queue.completed == 5
+ queue.flush(123)
+ queue.flush(124)
+ assert queue.queued == 10
+ assert queue.completed == 10
+
+ queue.enqueue(packet, 123)
+ queue.on_packets_completed(1, 124)
+ assert queue.queued == 11
+ assert queue.completed == 10
+ queue.on_packets_completed(1000, 123)
+ assert queue.queued == 11
+ assert queue.completed == 11
+
+ drain_listener = unittest.mock.Mock()
+ queue.on('drain', drain_listener.on_drain)
+ queue.enqueue(packet, 123)
+ assert drain_listener.on_drain.call_count == 0
+ queue.on_packets_completed(1, 123)
+ assert drain_listener.on_drain.call_count == 1
+ queue.enqueue(packet, 123)
+ queue.enqueue(packet, 123)
+ queue.enqueue(packet, 123)
+ queue.flush(123)
+ assert drain_listener.on_drain.call_count == 1
+ assert queue.queued == 15
+ assert queue.completed == 15