Declare emitted events as constants
diff --git a/bumble/att.py b/bumble/att.py
index 235d907..6bac58d 100644
--- a/bumble/att.py
+++ b/bumble/att.py
@@ -836,6 +836,9 @@
READ_REQUIRES_AUTHORIZATION = Permissions.READ_REQUIRES_AUTHORIZATION
WRITE_REQUIRES_AUTHORIZATION = Permissions.WRITE_REQUIRES_AUTHORIZATION
+ EVENT_READ = "read"
+ EVENT_WRITE = "write"
+
value: Union[AttributeValue[_T], _T, None]
def __init__(
@@ -906,7 +909,7 @@
else:
value = self.value
- self.emit('read', connection, b'' if value is None else value)
+ self.emit(self.EVENT_READ, connection, b'' if value is None else value)
return b'' if value is None else self.encode_value(value)
@@ -947,7 +950,7 @@
else:
self.value = decoded_value
- self.emit('write', connection, decoded_value)
+ self.emit(self.EVENT_WRITE, connection, decoded_value)
def __repr__(self):
if isinstance(self.value, bytes):
diff --git a/bumble/avctp.py b/bumble/avctp.py
index 6d70256..c59d258 100644
--- a/bumble/avctp.py
+++ b/bumble/avctp.py
@@ -166,8 +166,8 @@
# Register to receive PDUs from the channel
l2cap_channel.sink = self.on_pdu
- l2cap_channel.on("open", self.on_l2cap_channel_open)
- l2cap_channel.on("close", self.on_l2cap_channel_close)
+ l2cap_channel.on(l2cap_channel.EVENT_OPEN, self.on_l2cap_channel_open)
+ l2cap_channel.on(l2cap_channel.EVENT_CLOSE, self.on_l2cap_channel_close)
def on_l2cap_channel_open(self):
logger.debug(color("<<< AVCTP channel open", "magenta"))
diff --git a/bumble/avdtp.py b/bumble/avdtp.py
index b4bb5fd..2593366 100644
--- a/bumble/avdtp.py
+++ b/bumble/avdtp.py
@@ -896,7 +896,7 @@
self.service_category = self.payload[0]
self.error_code = self.payload[1]
- def __init__(self, service_category, error_code):
+ def __init__(self, error_code: int, service_category: int = 0) -> None:
super().__init__(payload=bytes([service_category, error_code]))
self.service_category = service_category
self.error_code = error_code
@@ -1132,6 +1132,14 @@
See Bluetooth AVDTP spec - 8.17.1 Security Control Command
'''
+ def init_from_payload(self):
+ # pylint: disable=attribute-defined-outside-init
+ self.acp_seid = self.payload[0] >> 2
+ self.data = self.payload[1:]
+
+ def __str__(self) -> str:
+ return self.to_string([f'ACP_SEID: {self.acp_seid}', f'data: {self.data}'])
+
# -----------------------------------------------------------------------------
@Message.subclass
@@ -1200,6 +1208,9 @@
transaction_results: List[Optional[asyncio.Future[Message]]]
channel_connector: Callable[[], Awaitable[l2cap.ClassicChannel]]
+ EVENT_OPEN = "open"
+ EVENT_CLOSE = "close"
+
class PacketType(enum.IntEnum):
SINGLE_PACKET = 0
START_PACKET = 1
@@ -1239,8 +1250,8 @@
# Register to receive PDUs from the channel
l2cap_channel.sink = self.on_pdu
- l2cap_channel.on('open', self.on_l2cap_channel_open)
- l2cap_channel.on('close', self.on_l2cap_channel_close)
+ l2cap_channel.on(l2cap_channel.EVENT_OPEN, self.on_l2cap_channel_open)
+ l2cap_channel.on(l2cap_channel.EVENT_CLOSE, self.on_l2cap_channel_close)
def get_local_endpoint_by_seid(self, seid: int) -> Optional[LocalStreamEndPoint]:
if 0 < seid <= len(self.local_endpoints):
@@ -1410,20 +1421,20 @@
self.transaction_results[transaction_label] = None
self.transaction_semaphore.release()
- def on_l2cap_connection(self, channel):
+ def on_l2cap_connection(self, channel: l2cap.ClassicChannel) -> None:
# Forward the channel to the endpoint that's expecting it
if self.channel_acceptor is None:
logger.warning(color('!!! l2cap connection with no acceptor', 'red'))
return
self.channel_acceptor.on_l2cap_connection(channel)
- def on_l2cap_channel_open(self):
+ def on_l2cap_channel_open(self) -> None:
logger.debug(color('<<< L2CAP channel open', 'magenta'))
- self.emit('open')
+ self.emit(self.EVENT_OPEN)
- def on_l2cap_channel_close(self):
+ def on_l2cap_channel_close(self) -> None:
logger.debug(color('<<< L2CAP channel close', 'magenta'))
- self.emit('close')
+ self.emit(self.EVENT_CLOSE)
def send_message(self, transaction_label: int, message: Message) -> None:
logger.debug(
@@ -1541,28 +1552,34 @@
async def abort(self, seid: int) -> Abort_Response:
return await self.send_command(Abort_Command(seid))
- def on_discover_command(self, _command):
+ def on_discover_command(self, command: Discover_Command) -> Optional[Message]:
endpoint_infos = [
EndPointInfo(endpoint.seid, 0, endpoint.media_type, endpoint.tsep)
for endpoint in self.local_endpoints
]
return Discover_Response(endpoint_infos)
- def on_get_capabilities_command(self, command):
+ def on_get_capabilities_command(
+ self, command: Get_Capabilities_Command
+ ) -> Optional[Message]:
endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
if endpoint is None:
return Get_Capabilities_Reject(AVDTP_BAD_ACP_SEID_ERROR)
return Get_Capabilities_Response(endpoint.capabilities)
- def on_get_all_capabilities_command(self, command):
+ def on_get_all_capabilities_command(
+ self, command: Get_All_Capabilities_Command
+ ) -> Optional[Message]:
endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
if endpoint is None:
return Get_All_Capabilities_Reject(AVDTP_BAD_ACP_SEID_ERROR)
return Get_All_Capabilities_Response(endpoint.capabilities)
- def on_set_configuration_command(self, command):
+ def on_set_configuration_command(
+ self, command: Set_Configuration_Command
+ ) -> Optional[Message]:
endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
if endpoint is None:
return Set_Configuration_Reject(AVDTP_BAD_ACP_SEID_ERROR)
@@ -1578,7 +1595,9 @@
result = stream.on_set_configuration_command(command.capabilities)
return result or Set_Configuration_Response()
- def on_get_configuration_command(self, command):
+ def on_get_configuration_command(
+ self, command: Get_Configuration_Command
+ ) -> Optional[Message]:
endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
if endpoint is None:
return Get_Configuration_Reject(AVDTP_BAD_ACP_SEID_ERROR)
@@ -1587,7 +1606,7 @@
return endpoint.stream.on_get_configuration_command()
- def on_reconfigure_command(self, command):
+ def on_reconfigure_command(self, command: Reconfigure_Command) -> Optional[Message]:
endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
if endpoint is None:
return Reconfigure_Reject(0, AVDTP_BAD_ACP_SEID_ERROR)
@@ -1597,7 +1616,7 @@
result = endpoint.stream.on_reconfigure_command(command.capabilities)
return result or Reconfigure_Response()
- def on_open_command(self, command):
+ def on_open_command(self, command: Open_Command) -> Optional[Message]:
endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
if endpoint is None:
return Open_Reject(AVDTP_BAD_ACP_SEID_ERROR)
@@ -1607,25 +1626,26 @@
result = endpoint.stream.on_open_command()
return result or Open_Response()
- def on_start_command(self, command):
+ def on_start_command(self, command: Start_Command) -> Optional[Message]:
for seid in command.acp_seids:
endpoint = self.get_local_endpoint_by_seid(seid)
if endpoint is None:
return Start_Reject(seid, AVDTP_BAD_ACP_SEID_ERROR)
if endpoint.stream is None:
- return Start_Reject(AVDTP_BAD_STATE_ERROR)
+ return Start_Reject(seid, AVDTP_BAD_STATE_ERROR)
# Start all streams
# TODO: deal with partial failures
for seid in command.acp_seids:
endpoint = self.get_local_endpoint_by_seid(seid)
- result = endpoint.stream.on_start_command()
- if result is not None:
+ if not endpoint or not endpoint.stream:
+ raise InvalidStateError("Should already be checked!")
+ if (result := endpoint.stream.on_start_command()) is not None:
return result
return Start_Response()
- def on_suspend_command(self, command):
+ def on_suspend_command(self, command: Suspend_Command) -> Optional[Message]:
for seid in command.acp_seids:
endpoint = self.get_local_endpoint_by_seid(seid)
if endpoint is None:
@@ -1637,13 +1657,14 @@
# TODO: deal with partial failures
for seid in command.acp_seids:
endpoint = self.get_local_endpoint_by_seid(seid)
- result = endpoint.stream.on_suspend_command()
- if result is not None:
+ if not endpoint or not endpoint.stream:
+ raise InvalidStateError("Should already be checked!")
+ if (result := endpoint.stream.on_suspend_command()) is not None:
return result
return Suspend_Response()
- def on_close_command(self, command):
+ def on_close_command(self, command: Close_Command) -> Optional[Message]:
endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
if endpoint is None:
return Close_Reject(AVDTP_BAD_ACP_SEID_ERROR)
@@ -1653,7 +1674,7 @@
result = endpoint.stream.on_close_command()
return result or Close_Response()
- def on_abort_command(self, command):
+ def on_abort_command(self, command: Abort_Command) -> Optional[Message]:
endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
if endpoint is None or endpoint.stream is None:
return Abort_Response()
@@ -1661,15 +1682,17 @@
endpoint.stream.on_abort_command()
return Abort_Response()
- def on_security_control_command(self, command):
+ def on_security_control_command(
+ self, command: Security_Control_Command
+ ) -> Optional[Message]:
endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
if endpoint is None:
return Security_Control_Reject(AVDTP_BAD_ACP_SEID_ERROR)
- result = endpoint.on_security_control_command(command.payload)
+ result = endpoint.on_security_control_command(command.data)
return result or Security_Control_Response()
- def on_delayreport_command(self, command):
+ def on_delayreport_command(self, command: DelayReport_Command) -> Optional[Message]:
endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
if endpoint is None:
return DelayReport_Reject(AVDTP_BAD_ACP_SEID_ERROR)
@@ -1682,6 +1705,8 @@
class Listener(utils.EventEmitter):
servers: Dict[int, Protocol]
+ EVENT_CONNECTION = "connection"
+
@staticmethod
def create_registrar(device: device.Device):
warnings.warn("Please use Listener.for_device()", DeprecationWarning)
@@ -1716,7 +1741,7 @@
l2cap_server = device.create_l2cap_server(
spec=l2cap.ClassicChannelSpec(psm=AVDTP_PSM)
)
- l2cap_server.on('connection', listener.on_l2cap_connection)
+ l2cap_server.on(l2cap_server.EVENT_CONNECTION, listener.on_l2cap_connection)
return listener
def on_l2cap_connection(self, channel: l2cap.ClassicChannel) -> None:
@@ -1732,14 +1757,14 @@
logger.debug('setting up new Protocol for the connection')
server = Protocol(channel, self.version)
self.set_server(channel.connection, server)
- self.emit('connection', server)
+ self.emit(self.EVENT_CONNECTION, server)
def on_channel_close():
logger.debug('removing Protocol for the connection')
self.remove_server(channel.connection)
- channel.on('open', on_channel_open)
- channel.on('close', on_channel_close)
+ channel.on(channel.EVENT_OPEN, on_channel_open)
+ channel.on(channel.EVENT_CLOSE, on_channel_close)
# -----------------------------------------------------------------------------
@@ -1788,6 +1813,7 @@
)
async def start(self) -> None:
+ """[Source] Start streaming."""
# Auto-open if needed
if self.state == AVDTP_CONFIGURED_STATE:
await self.open()
@@ -1804,6 +1830,7 @@
self.change_state(AVDTP_STREAMING_STATE)
async def stop(self) -> None:
+ """[Source] Stop streaming and transit to OPEN state."""
if self.state != AVDTP_STREAMING_STATE:
raise InvalidStateError('current state is not STREAMING')
@@ -1816,6 +1843,7 @@
self.change_state(AVDTP_OPEN_STATE)
async def close(self) -> None:
+ """[Source] Close channel and transit to IDLE state."""
if self.state not in (AVDTP_OPEN_STATE, AVDTP_STREAMING_STATE):
raise InvalidStateError('current state is not OPEN or STREAMING')
@@ -1847,7 +1875,7 @@
self.change_state(AVDTP_CONFIGURED_STATE)
return None
- def on_get_configuration_command(self, configuration):
+ def on_get_configuration_command(self):
if self.state not in (
AVDTP_CONFIGURED_STATE,
AVDTP_OPEN_STATE,
@@ -1855,7 +1883,7 @@
):
return Get_Configuration_Reject(AVDTP_BAD_STATE_ERROR)
- return self.local_endpoint.on_get_configuration_command(configuration)
+ return self.local_endpoint.on_get_configuration_command()
def on_reconfigure_command(self, configuration):
if self.state != AVDTP_OPEN_STATE:
@@ -1935,20 +1963,20 @@
# Wait for the RTP channel to be closed
self.change_state(AVDTP_ABORTING_STATE)
- def on_l2cap_connection(self, channel):
+ def on_l2cap_connection(self, channel: l2cap.ClassicChannel) -> None:
logger.debug(color('<<< stream channel connected', 'magenta'))
self.rtp_channel = channel
- channel.on('open', self.on_l2cap_channel_open)
- channel.on('close', self.on_l2cap_channel_close)
+ channel.on(channel.EVENT_OPEN, self.on_l2cap_channel_open)
+ channel.on(channel.EVENT_CLOSE, self.on_l2cap_channel_close)
# We don't need more channels
self.protocol.channel_acceptor = None
- def on_l2cap_channel_open(self):
+ def on_l2cap_channel_open(self) -> None:
logger.debug(color('<<< stream channel open', 'magenta'))
self.local_endpoint.on_rtp_channel_open()
- def on_l2cap_channel_close(self):
+ def on_l2cap_channel_close(self) -> None:
logger.debug(color('<<< stream channel closed', 'magenta'))
self.local_endpoint.on_rtp_channel_close()
self.local_endpoint.in_use = 0
@@ -2065,6 +2093,19 @@
class LocalStreamEndPoint(StreamEndPoint, utils.EventEmitter):
stream: Optional[Stream]
+ EVENT_CONFIGURATION = "configuration"
+ EVENT_OPEN = "open"
+ EVENT_START = "start"
+ EVENT_STOP = "stop"
+ EVENT_RTP_PACKET = "rtp_packet"
+ EVENT_SUSPEND = "suspend"
+ EVENT_CLOSE = "close"
+ EVENT_ABORT = "abort"
+ EVENT_DELAY_REPORT = "delay_report"
+ EVENT_SECURITY_CONTROL = "security_control"
+ EVENT_RTP_CHANNEL_OPEN = "rtp_channel_open"
+ EVENT_RTP_CHANNEL_CLOSE = "rtp_channel_close"
+
def __init__(
self,
protocol: Protocol,
@@ -2080,52 +2121,65 @@
self.configuration = configuration if configuration is not None else []
self.stream = None
- async def start(self):
- pass
+ async def start(self) -> None:
+ """[Source Only] Handles when receiving start command."""
- async def stop(self):
- pass
+ async def stop(self) -> None:
+ """[Source Only] Handles when receiving stop command."""
- async def close(self):
- pass
+ async def close(self) -> None:
+ """[Source Only] Handles when receiving close command."""
- def on_reconfigure_command(self, command):
- pass
+ def on_reconfigure_command(self, command) -> Optional[Message]:
+ return None
- def on_set_configuration_command(self, configuration):
+ def on_set_configuration_command(self, configuration) -> Optional[Message]:
logger.debug(
'<<< received configuration: '
f'{",".join([str(capability) for capability in configuration])}'
)
self.configuration = configuration
- self.emit('configuration')
+ self.emit(self.EVENT_CONFIGURATION)
+ return None
- def on_get_configuration_command(self):
+ def on_get_configuration_command(self) -> Optional[Message]:
return Get_Configuration_Response(self.configuration)
- def on_open_command(self):
- self.emit('open')
+ def on_open_command(self) -> Optional[Message]:
+ self.emit(self.EVENT_OPEN)
+ return None
- def on_start_command(self):
- self.emit('start')
+ def on_start_command(self) -> Optional[Message]:
+ self.emit(self.EVENT_START)
+ return None
- def on_suspend_command(self):
- self.emit('suspend')
+ def on_suspend_command(self) -> Optional[Message]:
+ self.emit(self.EVENT_SUSPEND)
+ return None
- def on_close_command(self):
- self.emit('close')
+ def on_close_command(self) -> Optional[Message]:
+ self.emit(self.EVENT_CLOSE)
+ return None
- def on_abort_command(self):
- self.emit('abort')
+ def on_abort_command(self) -> Optional[Message]:
+ self.emit(self.EVENT_ABORT)
+ return None
- def on_delayreport_command(self, delay: int):
- self.emit('delay_report', delay)
+ def on_delayreport_command(self, delay: int) -> Optional[Message]:
+ self.emit(self.EVENT_DELAY_REPORT, delay)
+ return None
- def on_rtp_channel_open(self):
- self.emit('rtp_channel_open')
+ def on_security_control_command(self, data: bytes) -> Optional[Message]:
+ self.emit(self.EVENT_SECURITY_CONTROL, data)
+ return None
- def on_rtp_channel_close(self):
- self.emit('rtp_channel_close')
+ def on_rtp_channel_open(self) -> None:
+ self.emit(self.EVENT_RTP_CHANNEL_OPEN)
+ return None
+
+ def on_rtp_channel_close(self) -> None:
+ self.emit(self.EVENT_RTP_CHANNEL_CLOSE)
+ return None
# -----------------------------------------------------------------------------
@@ -2156,13 +2210,13 @@
if self.packet_pump and self.stream and self.stream.rtp_channel:
return await self.packet_pump.start(self.stream.rtp_channel)
- self.emit('start')
+ self.emit(self.EVENT_START)
async def stop(self) -> None:
if self.packet_pump:
return await self.packet_pump.stop()
- self.emit('stop')
+ self.emit(self.EVENT_STOP)
def on_start_command(self):
asyncio.create_task(self.start())
@@ -2203,4 +2257,4 @@
f'{color("<<< RTP Packet:", "green")} '
f'{rtp_packet} {rtp_packet.payload[:16].hex()}'
)
- self.emit('rtp_packet', rtp_packet)
+ self.emit(self.EVENT_RTP_PACKET, rtp_packet)
diff --git a/bumble/avrcp.py b/bumble/avrcp.py
index b56e604..919e23e 100644
--- a/bumble/avrcp.py
+++ b/bumble/avrcp.py
@@ -996,6 +996,10 @@
class Protocol(utils.EventEmitter):
"""AVRCP Controller and Target protocol."""
+ EVENT_CONNECTION = "connection"
+ EVENT_START = "start"
+ EVENT_STOP = "stop"
+
class PacketType(enum.IntEnum):
SINGLE = 0b00
START = 0b01
@@ -1456,9 +1460,11 @@
def _on_avctp_connection(self, l2cap_channel: l2cap.ClassicChannel) -> None:
logger.debug("AVCTP connection established")
- l2cap_channel.on("open", lambda: self._on_avctp_channel_open(l2cap_channel))
+ l2cap_channel.on(
+ l2cap_channel.EVENT_OPEN, lambda: self._on_avctp_channel_open(l2cap_channel)
+ )
- self.emit("connection")
+ self.emit(self.EVENT_CONNECTION)
def _on_avctp_channel_open(self, l2cap_channel: l2cap.ClassicChannel) -> None:
logger.debug("AVCTP channel open")
@@ -1473,15 +1479,15 @@
self.avctp_protocol.register_response_handler(
AVRCP_PID, self._on_avctp_response
)
- l2cap_channel.on("close", self._on_avctp_channel_close)
+ l2cap_channel.on(l2cap_channel.EVENT_CLOSE, self._on_avctp_channel_close)
- self.emit("start")
+ self.emit(self.EVENT_START)
def _on_avctp_channel_close(self) -> None:
logger.debug("AVCTP channel closed")
self.avctp_protocol = None
- self.emit("stop")
+ self.emit(self.EVENT_STOP)
def _on_avctp_command(
self, transaction_label: int, command: avc.CommandFrame
diff --git a/bumble/device.py b/bumble/device.py
index 97a32f3..7d1d91d 100644
--- a/bumble/device.py
+++ b/bumble/device.py
@@ -581,6 +581,12 @@
enabled: bool = False
periodic_enabled: bool = False
+ EVENT_START = "start"
+ EVENT_STOP = "stop"
+ EVENT_START_PERIODIC = "start_periodic"
+ EVENT_STOP_PERIODIC = "stop_periodic"
+ EVENT_TERMINATION = "termination"
+
def __post_init__(self) -> None:
super().__init__()
@@ -731,7 +737,7 @@
)
self.enabled = True
- self.emit('start')
+ self.emit(self.EVENT_START)
async def stop(self) -> None:
await self.device.send_command(
@@ -745,7 +751,7 @@
)
self.enabled = False
- self.emit('stop')
+ self.emit(self.EVENT_STOP)
async def start_periodic(self, include_adi: bool = False) -> None:
if self.periodic_enabled:
@@ -759,7 +765,7 @@
)
self.periodic_enabled = True
- self.emit('start_periodic')
+ self.emit(self.EVENT_START_PERIODIC)
async def stop_periodic(self) -> None:
if not self.periodic_enabled:
@@ -773,7 +779,7 @@
)
self.periodic_enabled = False
- self.emit('stop_periodic')
+ self.emit(self.EVENT_STOP_PERIODIC)
async def remove(self) -> None:
await self.device.send_command(
@@ -797,7 +803,7 @@
def on_termination(self, status: int) -> None:
self.enabled = False
- self.emit('termination', status)
+ self.emit(self.EVENT_TERMINATION, status)
# -----------------------------------------------------------------------------
@@ -823,6 +829,14 @@
periodic_advertising_interval: int
advertiser_clock_accuracy: int
+ EVENT_STATE_CHANGE = "state_change"
+ EVENT_ESTABLISHMENT = "establishment"
+ EVENT_CANCELLATION = "cancellation"
+ EVENT_ERROR = "error"
+ EVENT_LOSS = "loss"
+ EVENT_PERIODIC_ADVERTISEMENT = "periodic_advertisement"
+ EVENT_BIGINFO_ADVERTISEMENT = "biginfo_advertisement"
+
def __init__(
self,
device: Device,
@@ -855,7 +869,7 @@
def state(self, state: State) -> None:
logger.debug(f'{self} -> {state.name}')
self._state = state
- self.emit('state_change')
+ self.emit(self.EVENT_STATE_CHANGE)
async def establish(self) -> None:
if self.state != self.State.INIT:
@@ -939,7 +953,7 @@
self.periodic_advertising_interval = periodic_advertising_interval
self.advertiser_clock_accuracy = advertiser_clock_accuracy
self.state = self.State.ESTABLISHED
- self.emit('establishment')
+ self.emit(self.EVENT_ESTABLISHMENT)
return
# We don't need to keep a reference anymore
@@ -948,15 +962,15 @@
if status == hci.HCI_OPERATION_CANCELLED_BY_HOST_ERROR:
self.state = self.State.CANCELLED
- self.emit('cancellation')
+ self.emit(self.EVENT_CANCELLATION)
return
self.state = self.State.ERROR
- self.emit('error')
+ self.emit(self.EVENT_ERROR)
def on_loss(self):
self.state = self.State.LOST
- self.emit('loss')
+ self.emit(self.EVENT_LOSS)
def on_periodic_advertising_report(self, report) -> None:
self.data_accumulator += report.data
@@ -967,7 +981,7 @@
return
self.emit(
- 'periodic_advertisement',
+ self.EVENT_PERIODIC_ADVERTISEMENT,
PeriodicAdvertisement(
self.advertiser_address,
self.sid,
@@ -984,7 +998,7 @@
def on_biginfo_advertising_report(self, report) -> None:
self.emit(
- 'biginfo_advertisement',
+ self.EVENT_BIGINFO_ADVERTISEMENT,
BIGInfoAdvertisement.from_report(self.advertiser_address, self.sid, report),
)
@@ -1222,7 +1236,7 @@
async def request_mtu(self, mtu: int) -> int:
mtu = await self.gatt_client.request_mtu(mtu)
- self.connection.emit('connection_att_mtu_update')
+ self.connection.emit(self.connection.EVENT_CONNECTION_ATT_MTU_UPDATE)
return mtu
async def discover_service(
@@ -1390,6 +1404,9 @@
link_type: int
sink: Optional[Callable[[hci.HCI_SynchronousDataPacket], Any]] = None
+ EVENT_DISCONNECTION: ClassVar[str] = "disconnection"
+ EVENT_DISCONNECTION_FAILURE: ClassVar[str] = "disconnection_failure"
+
def __post_init__(self) -> None:
super().__init__()
@@ -1487,6 +1504,11 @@
state: State = State.PENDING
sink: Callable[[hci.HCI_IsoDataPacket], Any] | None = None
+ EVENT_DISCONNECTION: ClassVar[str] = "disconnection"
+ EVENT_DISCONNECTION_FAILURE: ClassVar[str] = "disconnection_failure"
+ EVENT_ESTABLISHMENT: ClassVar[str] = "establishment"
+ EVENT_ESTABLISHMENT_FAILURE: ClassVar[str] = "establishment_failure"
+
def __post_init__(self) -> None:
super().__init__()
@@ -1571,6 +1593,40 @@
cs_configs: dict[int, ChannelSoundingConfig] # Config ID to Configuration
cs_procedures: dict[int, ChannelSoundingProcedure] # Config ID to Procedures
+ EVENT_CONNECTION_ATT_MTU_UPDATE = "connection_att_mtu_update"
+ EVENT_DISCONNECTION = "disconnection"
+ EVENT_DISCONNECTION_FAILURE = "disconnection_failure"
+ EVENT_CONNECTION_AUTHENTICATION = "connection_authentication"
+ EVENT_CONNECTION_AUTHENTICATION_FAILURE = "connection_authentication_failure"
+ EVENT_REMOTE_NAME = "remote_name"
+ EVENT_REMOTE_NAME_FAILURE = "remote_name_failure"
+ EVENT_CONNECTION_ENCRYPTION_CHANGE = "connection_encryption_change"
+ EVENT_CONNECTION_ENCRYPTION_FAILURE = "connection_encryption_failure"
+ EVENT_CONNECTION_ENCRYPTION_KEY_REFRESH = "connection_encryption_key_refresh"
+ EVENT_CONNECTION_PARAMETERS_UPDATE = "connection_parameters_update"
+ EVENT_CONNECTION_PARAMETERS_UPDATE_FAILURE = "connection_parameters_update_failure"
+ EVENT_CONNECTION_PHY_UPDATE = "connection_phy_update"
+ EVENT_CONNECTION_PHY_UPDATE_FAILURE = "connection_phy_update_failure"
+ EVENT_CONNECTION_ATT_MTU_UPDATE = "connection_att_mtu_update"
+ EVENT_CONNECTION_DATA_LENGTH_CHANGE = "connection_data_length_change"
+ EVENT_CHANNEL_SOUNDING_CAPABILITIES_FAILURE = (
+ "channel_sounding_capabilities_failure"
+ )
+ EVENT_CHANNEL_SOUNDING_CAPABILITIES = "channel_sounding_capabilities"
+ EVENT_CHANNEL_SOUNDING_CONFIG_FAILURE = "channel_sounding_config_failure"
+ EVENT_CHANNEL_SOUNDING_CONFIG = "channel_sounding_config"
+ EVENT_CHANNEL_SOUNDING_CONFIG_REMOVED = "channel_sounding_config_removed"
+ EVENT_CHANNEL_SOUNDING_PROCEDURE_FAILURE = "channel_sounding_procedure_failure"
+ EVENT_CHANNEL_SOUNDING_PROCEDURE = "channel_sounding_procedure"
+ EVENT_ROLE_CHANGE = "role_change"
+ EVENT_ROLE_CHANGE_FAILURE = "role_change_failure"
+ EVENT_CLASSIC_PAIRING = "classic_pairing"
+ EVENT_CLASSIC_PAIRING_FAILURE = "classic_pairing_failure"
+ EVENT_PAIRING_START = "pairing_start"
+ EVENT_PAIRING = "pairing"
+ EVENT_PAIRING_FAILURE = "pairing_failure"
+ EVENT_SECURITY_REQUEST = "security_request"
+
@utils.composite_listener
class Listener:
def on_disconnection(self, reason):
@@ -1740,16 +1796,16 @@
"""Idles the current task waiting for a disconnect or timeout"""
abort = asyncio.get_running_loop().create_future()
- self.on('disconnection', abort.set_result)
- self.on('disconnection_failure', abort.set_exception)
+ self.on(self.EVENT_DISCONNECTION, abort.set_result)
+ self.on(self.EVENT_DISCONNECTION_FAILURE, abort.set_exception)
try:
await asyncio.wait_for(
utils.cancel_on_event(self.device, 'flush', abort), timeout
)
finally:
- self.remove_listener('disconnection', abort.set_result)
- self.remove_listener('disconnection_failure', abort.set_exception)
+ self.remove_listener(self.EVENT_DISCONNECTION, abort.set_result)
+ self.remove_listener(self.EVENT_DISCONNECTION_FAILURE, abort.set_exception)
async def set_data_length(self, tx_octets, tx_time) -> None:
return await self.device.set_data_length(self, tx_octets, tx_time)
@@ -2062,6 +2118,26 @@
_pending_cis: Dict[int, tuple[int, int]]
gatt_service: gatt_service.GenericAttributeProfileService | None = None
+ EVENT_ADVERTISEMENT = "advertisement"
+ EVENT_PERIODIC_ADVERTISING_SYNC_TRANSFER = "periodic_advertising_sync_transfer"
+ EVENT_KEY_STORE_UPDATE = "key_store_update"
+ EVENT_FLUSH = "flush"
+ EVENT_CONNECTION = "connection"
+ EVENT_CONNECTION_FAILURE = "connection_failure"
+ EVENT_SCO_REQUEST = "sco_request"
+ EVENT_INQUIRY_COMPLETE = "inquiry_complete"
+ EVENT_REMOTE_NAME = "remote_name"
+ EVENT_REMOTE_NAME_FAILURE = "remote_name_failure"
+ EVENT_SCO_CONNECTION = "sco_connection"
+ EVENT_SCO_CONNECTION_FAILURE = "sco_connection_failure"
+ EVENT_CIS_REQUEST = "cis_request"
+ EVENT_CIS_ESTABLISHMENT = "cis_establishment"
+ EVENT_CIS_ESTABLISHMENT_FAILURE = "cis_establishment_failure"
+ EVENT_ROLE_CHANGE_FAILURE = "role_change_failure"
+ EVENT_INQUIRY_RESULT = "inquiry_result"
+ EVENT_REMOTE_NAME = "remote_name"
+ EVENT_REMOTE_NAME_FAILURE = "remote_name_failure"
+
@utils.composite_listener
class Listener:
def on_advertisement(self, advertisement):
@@ -3149,7 +3225,7 @@
accumulator = AdvertisementDataAccumulator(passive=self.scanning_is_passive)
self.advertisement_accumulators[report.address] = accumulator
if advertisement := accumulator.update(report):
- self.emit('advertisement', advertisement)
+ self.emit(self.EVENT_ADVERTISEMENT, advertisement)
async def create_periodic_advertising_sync(
self,
@@ -3273,7 +3349,7 @@
periodic_advertising_interval=periodic_advertising_interval,
advertiser_clock_accuracy=advertiser_clock_accuracy,
)
- self.emit('periodic_advertising_sync_transfer', pa_sync, connection)
+ self.emit(self.EVENT_PERIODIC_ADVERTISING_SYNC_TRANSFER, pa_sync, connection)
@host_event_handler
@with_periodic_advertising_sync_from_handle
@@ -3331,7 +3407,7 @@
@host_event_handler
def on_inquiry_result(self, address, class_of_device, data, rssi):
self.emit(
- 'inquiry_result',
+ self.EVENT_INQUIRY_RESULT,
address,
class_of_device,
AdvertisingData.from_bytes(data),
@@ -3508,8 +3584,8 @@
# Create a future so that we can wait for the connection's result
pending_connection = asyncio.get_running_loop().create_future()
- self.on('connection', on_connection)
- self.on('connection_failure', on_connection_failure)
+ self.on(self.EVENT_CONNECTION, on_connection)
+ self.on(self.EVENT_CONNECTION_FAILURE, on_connection_failure)
try:
# Tell the controller to connect
@@ -3685,8 +3761,8 @@
except core.ConnectionError as error:
raise core.TimeoutError() from error
finally:
- self.remove_listener('connection', on_connection)
- self.remove_listener('connection_failure', on_connection_failure)
+ self.remove_listener(self.EVENT_CONNECTION, on_connection)
+ self.remove_listener(self.EVENT_CONNECTION_FAILURE, on_connection_failure)
if transport == PhysicalTransport.LE:
self.le_connecting = False
self.connect_own_address_type = None
@@ -3779,8 +3855,8 @@
):
pending_connection.set_exception(error)
- self.on('connection', on_connection)
- self.on('connection_failure', on_connection_failure)
+ self.on(self.EVENT_CONNECTION, on_connection)
+ self.on(self.EVENT_CONNECTION_FAILURE, on_connection_failure)
# Save pending connection, with the Peripheral hci.role.
# Even if we requested a role switch in the hci.HCI_Accept_Connection_Request
@@ -3802,8 +3878,8 @@
return await utils.cancel_on_event(self, 'flush', pending_connection)
finally:
- self.remove_listener('connection', on_connection)
- self.remove_listener('connection_failure', on_connection_failure)
+ self.remove_listener(self.EVENT_CONNECTION, on_connection)
+ self.remove_listener(self.EVENT_CONNECTION_FAILURE, on_connection_failure)
self.pending_connections.pop(peer_address, None)
@asynccontextmanager
@@ -3857,8 +3933,10 @@
) -> None:
# Create a future so that we can wait for the disconnection's result
pending_disconnection = asyncio.get_running_loop().create_future()
- connection.on('disconnection', pending_disconnection.set_result)
- connection.on('disconnection_failure', pending_disconnection.set_exception)
+ connection.on(connection.EVENT_DISCONNECTION, pending_disconnection.set_result)
+ connection.on(
+ connection.EVENT_DISCONNECTION_FAILURE, pending_disconnection.set_exception
+ )
# Request a disconnection
result = await self.send_command(
@@ -3876,10 +3954,11 @@
return await utils.cancel_on_event(self, 'flush', pending_disconnection)
finally:
connection.remove_listener(
- 'disconnection', pending_disconnection.set_result
+ connection.EVENT_DISCONNECTION, pending_disconnection.set_result
)
connection.remove_listener(
- 'disconnection_failure', pending_disconnection.set_exception
+ connection.EVENT_DISCONNECTION_FAILURE,
+ pending_disconnection.set_exception,
)
self.disconnecting = False
@@ -4198,7 +4277,7 @@
return keys.link_key.value
# [Classic only]
- async def authenticate(self, connection):
+ async def authenticate(self, connection: Connection) -> None:
# Set up event handlers
pending_authentication = asyncio.get_running_loop().create_future()
@@ -4208,8 +4287,11 @@
def on_authentication_failure(error_code):
pending_authentication.set_exception(hci.HCI_Error(error_code))
- connection.on('connection_authentication', on_authentication)
- connection.on('connection_authentication_failure', on_authentication_failure)
+ connection.on(connection.EVENT_CONNECTION_AUTHENTICATION, on_authentication)
+ connection.on(
+ connection.EVENT_CONNECTION_AUTHENTICATION_FAILURE,
+ on_authentication_failure,
+ )
# Request the authentication
try:
@@ -4230,9 +4312,12 @@
connection, 'disconnection', pending_authentication
)
finally:
- connection.remove_listener('connection_authentication', on_authentication)
connection.remove_listener(
- 'connection_authentication_failure', on_authentication_failure
+ connection.EVENT_CONNECTION_AUTHENTICATION, on_authentication
+ )
+ connection.remove_listener(
+ connection.EVENT_CONNECTION_AUTHENTICATION_FAILURE,
+ on_authentication_failure,
)
async def encrypt(self, connection, enable=True):
@@ -4248,8 +4333,12 @@
def on_encryption_failure(error_code):
pending_encryption.set_exception(hci.HCI_Error(error_code))
- connection.on('connection_encryption_change', on_encryption_change)
- connection.on('connection_encryption_failure', on_encryption_failure)
+ connection.on(
+ connection.EVENT_CONNECTION_ENCRYPTION_CHANGE, on_encryption_change
+ )
+ connection.on(
+ connection.EVENT_CONNECTION_ENCRYPTION_FAILURE, on_encryption_failure
+ )
# Request the encryption
try:
@@ -4311,10 +4400,10 @@
await utils.cancel_on_event(connection, 'disconnection', pending_encryption)
finally:
connection.remove_listener(
- 'connection_encryption_change', on_encryption_change
+ connection.EVENT_CONNECTION_ENCRYPTION_CHANGE, on_encryption_change
)
connection.remove_listener(
- 'connection_encryption_failure', on_encryption_failure
+ connection.EVENT_CONNECTION_ENCRYPTION_FAILURE, on_encryption_failure
)
async def update_keys(self, address: str, keys: PairingKeys) -> None:
@@ -4327,7 +4416,7 @@
except Exception as error:
logger.warning(f'!!! error while storing keys: {error}')
else:
- self.emit('key_store_update')
+ self.emit(self.EVENT_KEY_STORE_UPDATE)
# [Classic only]
async def switch_role(self, connection: Connection, role: hci.Role):
@@ -4339,8 +4428,8 @@
def on_role_change_failure(error_code):
pending_role_change.set_exception(hci.HCI_Error(error_code))
- connection.on('role_change', on_role_change)
- connection.on('role_change_failure', on_role_change_failure)
+ connection.on(connection.EVENT_ROLE_CHANGE, on_role_change)
+ connection.on(connection.EVENT_ROLE_CHANGE_FAILURE, on_role_change_failure)
try:
result = await self.send_command(
@@ -4356,8 +4445,10 @@
connection, 'disconnection', pending_role_change
)
finally:
- connection.remove_listener('role_change', on_role_change)
- connection.remove_listener('role_change_failure', on_role_change_failure)
+ connection.remove_listener(connection.EVENT_ROLE_CHANGE, on_role_change)
+ connection.remove_listener(
+ connection.EVENT_ROLE_CHANGE_FAILURE, on_role_change_failure
+ )
# [Classic only]
async def request_remote_name(self, remote: Union[hci.Address, Connection]) -> str:
@@ -4369,7 +4460,7 @@
)
handler = self.on(
- 'remote_name',
+ self.EVENT_REMOTE_NAME,
lambda address, remote_name: (
pending_name.set_result(remote_name)
if address == peer_address
@@ -4377,7 +4468,7 @@
),
)
failure_handler = self.on(
- 'remote_name_failure',
+ self.EVENT_REMOTE_NAME_FAILURE,
lambda address, error_code: (
pending_name.set_exception(hci.HCI_Error(error_code))
if address == peer_address
@@ -4405,8 +4496,8 @@
# Wait for the result
return await utils.cancel_on_event(self, 'flush', pending_name)
finally:
- self.remove_listener('remote_name', handler)
- self.remove_listener('remote_name_failure', failure_handler)
+ self.remove_listener(self.EVENT_REMOTE_NAME, handler)
+ self.remove_listener(self.EVENT_REMOTE_NAME_FAILURE, failure_handler)
# [LE only]
@utils.experimental('Only for testing.')
@@ -4497,8 +4588,10 @@
if pending_future := pending_cis_establishments.get(cis_handle):
pending_future.set_exception(hci.HCI_Error(status))
- watcher.on(self, 'cis_establishment', on_cis_establishment)
- watcher.on(self, 'cis_establishment_failure', on_cis_establishment_failure)
+ watcher.on(self, self.EVENT_CIS_ESTABLISHMENT, on_cis_establishment)
+ watcher.on(
+ self, self.EVENT_CIS_ESTABLISHMENT_FAILURE, on_cis_establishment_failure
+ )
await self.send_command(
hci.HCI_LE_Create_CIS_Command(
cis_connection_handle=[p[0] for p in cis_acl_pairs],
@@ -4541,8 +4634,12 @@
def on_establishment_failure(status: int) -> None:
pending_establishment.set_exception(hci.HCI_Error(status))
- watcher.on(cis_link, 'establishment', on_establishment)
- watcher.on(cis_link, 'establishment_failure', on_establishment_failure)
+ watcher.on(cis_link, cis_link.EVENT_ESTABLISHMENT, on_establishment)
+ watcher.on(
+ cis_link,
+ cis_link.EVENT_ESTABLISHMENT_FAILURE,
+ on_establishment_failure,
+ )
await self.send_command(
hci.HCI_LE_Accept_CIS_Request_Command(connection_handle=handle),
@@ -4910,9 +5007,9 @@
@host_event_handler
def on_flush(self):
- self.emit('flush')
+ self.emit(self.EVENT_FLUSH)
for _, connection in self.connections.items():
- connection.emit('disconnection', 0)
+ connection.emit(connection.EVENT_DISCONNECTION, 0)
self.connections = {}
# [Classic only]
@@ -5203,7 +5300,7 @@
lambda _: utils.cancel_on_event(self, 'flush', advertising_set.start()),
)
- self.emit('connection', connection)
+ self.emit(self.EVENT_CONNECTION, connection)
@host_event_handler
def on_connection(
@@ -5241,7 +5338,7 @@
self.connections[connection_handle] = connection
# Emit an event to notify listeners of the new connection
- self.emit('connection', connection)
+ self.emit(self.EVENT_CONNECTION, connection)
return
@@ -5316,7 +5413,7 @@
if role == hci.Role.CENTRAL or not self.supports_le_extended_advertising:
# We can emit now, we have all the info we need
- self.emit('connection', connection)
+ self.emit(self.EVENT_CONNECTION, connection)
return
if role == hci.Role.PERIPHERAL and self.supports_le_extended_advertising:
@@ -5350,7 +5447,7 @@
'hci',
hci.HCI_Constant.error_name(error_code),
)
- self.emit('connection_failure', error)
+ self.emit(self.EVENT_CONNECTION_FAILURE, error)
# FIXME: Explore a delegate-model for BR/EDR wait connection #56.
@host_event_handler
@@ -5365,7 +5462,7 @@
if connection := self.find_connection_by_bd_addr(
bd_addr, transport=PhysicalTransport.BR_EDR
):
- self.emit('sco_request', connection, link_type)
+ self.emit(self.EVENT_SCO_REQUEST, connection, link_type)
else:
logger.error(f'SCO request from a non-connected device {bd_addr}')
return
@@ -5409,14 +5506,14 @@
f'*** Disconnection: [0x{connection.handle:04X}] '
f'{connection.peer_address} as {connection.role_name}, reason={reason}'
)
- connection.emit('disconnection', reason)
+ connection.emit(connection.EVENT_DISCONNECTION, reason)
# Cleanup subsystems that maintain per-connection state
self.gatt_server.on_disconnection(connection)
elif sco_link := self.sco_links.pop(connection_handle, None):
- sco_link.emit('disconnection', reason)
+ sco_link.emit(sco_link.EVENT_DISCONNECTION, reason)
elif cis_link := self.cis_links.pop(connection_handle, None):
- cis_link.emit('disconnection', reason)
+ cis_link.emit(cis_link.EVENT_DISCONNECTION, reason)
else:
logger.error(
f'*** Unknown disconnection handle=0x{connection_handle}, reason={reason} ***'
@@ -5424,7 +5521,7 @@
@host_event_handler
@with_connection_from_handle
- def on_disconnection_failure(self, connection, error_code):
+ def on_disconnection_failure(self, connection: Connection, error_code: int):
logger.debug(f'*** Disconnection failed: {error_code}')
error = core.ConnectionError(
error_code,
@@ -5433,7 +5530,7 @@
'hci',
hci.HCI_Constant.error_name(error_code),
)
- connection.emit('disconnection_failure', error)
+ connection.emit(connection.EVENT_DISCONNECTION_FAILURE, error)
@host_event_handler
@utils.AsyncRunner.run_in_task()
@@ -5444,7 +5541,7 @@
else:
self.auto_restart_inquiry = True
self.discovering = False
- self.emit('inquiry_complete')
+ self.emit(self.EVENT_INQUIRY_COMPLETE)
@host_event_handler
@with_connection_from_handle
@@ -5454,7 +5551,7 @@
f'{connection.peer_address} as {connection.role_name}'
)
connection.authenticated = True
- connection.emit('connection_authentication')
+ connection.emit(connection.EVENT_CONNECTION_AUTHENTICATION)
@host_event_handler
@with_connection_from_handle
@@ -5463,7 +5560,7 @@
f'*** Connection Authentication Failure: [0x{connection.handle:04X}] '
f'{connection.peer_address} as {connection.role_name}, error={error}'
)
- connection.emit('connection_authentication_failure', error)
+ connection.emit(connection.EVENT_CONNECTION_AUTHENTICATION_FAILURE, error)
# [Classic only]
@host_event_handler
@@ -5681,22 +5778,22 @@
remote_name = remote_name.decode('utf-8')
if connection:
connection.peer_name = remote_name
- connection.emit('remote_name')
- self.emit('remote_name', address, remote_name)
+ connection.emit(connection.EVENT_REMOTE_NAME)
+ self.emit(self.EVENT_REMOTE_NAME, address, remote_name)
except UnicodeDecodeError as error:
logger.warning('peer name is not valid UTF-8')
if connection:
- connection.emit('remote_name_failure', error)
+ connection.emit(connection.EVENT_REMOTE_NAME_FAILURE, error)
else:
- self.emit('remote_name_failure', address, error)
+ self.emit(self.EVENT_REMOTE_NAME_FAILURE, address, error)
# [Classic only]
@host_event_handler
@try_with_connection_from_address
def on_remote_name_failure(self, connection: Connection, address, error):
if connection:
- connection.emit('remote_name_failure', error)
- self.emit('remote_name_failure', address, error)
+ connection.emit(connection.EVENT_REMOTE_NAME_FAILURE, error)
+ self.emit(self.EVENT_REMOTE_NAME_FAILURE, address, error)
# [Classic only]
@host_event_handler
@@ -5716,7 +5813,7 @@
handle=sco_handle,
link_type=link_type,
)
- self.emit('sco_connection', sco_link)
+ self.emit(self.EVENT_SCO_CONNECTION, sco_link)
# [Classic only]
@host_event_handler
@@ -5726,7 +5823,7 @@
self, acl_connection: Connection, status: int
) -> None:
logger.debug(f'*** SCO connection failure: {acl_connection.peer_address}***')
- self.emit('sco_connection_failure')
+ self.emit(self.EVENT_SCO_CONNECTION_FAILURE)
# [Classic only]
@host_event_handler
@@ -5763,7 +5860,7 @@
cig_id=cig_id,
cis_id=cis_id,
)
- self.emit('cis_request', acl_connection, cis_handle, cig_id, cis_id)
+ self.emit(self.EVENT_CIS_REQUEST, acl_connection, cis_handle, cig_id, cis_id)
# [LE only]
@host_event_handler
@@ -5782,8 +5879,8 @@
f'cis_id=[0x{cis_link.cis_id:02X}] ***'
)
- cis_link.emit('establishment')
- self.emit('cis_establishment', cis_link)
+ cis_link.emit(cis_link.EVENT_ESTABLISHMENT)
+ self.emit(self.EVENT_CIS_ESTABLISHMENT, cis_link)
# [LE only]
@host_event_handler
@@ -5791,8 +5888,8 @@
def on_cis_establishment_failure(self, cis_handle: int, status: int) -> None:
logger.debug(f'*** CIS Establishment Failure: cis=[0x{cis_handle:04X}] ***')
if cis_link := self.cis_links.pop(cis_handle):
- cis_link.emit('establishment_failure', status)
- self.emit('cis_establishment_failure', cis_handle, status)
+ cis_link.emit(cis_link.EVENT_ESTABLISHMENT_FAILURE, status)
+ self.emit(self.EVENT_CIS_ESTABLISHMENT_FAILURE, cis_handle, status)
# [LE only]
@host_event_handler
@@ -5826,7 +5923,7 @@
):
connection.authenticated = True
connection.sc = True
- connection.emit('connection_encryption_change')
+ connection.emit(connection.EVENT_CONNECTION_ENCRYPTION_CHANGE)
@host_event_handler
@with_connection_from_handle
@@ -5836,7 +5933,7 @@
f'{connection.peer_address} as {connection.role_name}, '
f'error={error}'
)
- connection.emit('connection_encryption_failure', error)
+ connection.emit(connection.EVENT_CONNECTION_ENCRYPTION_FAILURE, error)
@host_event_handler
@with_connection_from_handle
@@ -5845,7 +5942,7 @@
f'*** Connection Key Refresh: [0x{connection.handle:04X}] '
f'{connection.peer_address} as {connection.role_name}'
)
- connection.emit('connection_encryption_key_refresh')
+ connection.emit(connection.EVENT_CONNECTION_ENCRYPTION_KEY_REFRESH)
@host_event_handler
@with_connection_from_handle
@@ -5856,7 +5953,7 @@
f'{connection_parameters}'
)
connection.parameters = connection_parameters
- connection.emit('connection_parameters_update')
+ connection.emit(connection.EVENT_CONNECTION_PARAMETERS_UPDATE)
@host_event_handler
@with_connection_from_handle
@@ -5866,7 +5963,7 @@
f'{connection.peer_address} as {connection.role_name}, '
f'error={error}'
)
- connection.emit('connection_parameters_update_failure', error)
+ connection.emit(connection.EVENT_CONNECTION_PARAMETERS_UPDATE_FAILURE, error)
@host_event_handler
@with_connection_from_handle
@@ -5876,7 +5973,7 @@
f'{connection.peer_address} as {connection.role_name}, '
f'{phy}'
)
- connection.emit('connection_phy_update', phy)
+ connection.emit(connection.EVENT_CONNECTION_PHY_UPDATE, phy)
@host_event_handler
@with_connection_from_handle
@@ -5886,7 +5983,7 @@
f'{connection.peer_address} as {connection.role_name}, '
f'error={error}'
)
- connection.emit('connection_phy_update_failure', error)
+ connection.emit(connection.EVENT_CONNECTION_PHY_UPDATE_FAILURE, error)
@host_event_handler
@with_connection_from_handle
@@ -5897,7 +5994,7 @@
f'{att_mtu}'
)
connection.att_mtu = att_mtu
- connection.emit('connection_att_mtu_update')
+ connection.emit(connection.EVENT_CONNECTION_ATT_MTU_UPDATE)
@host_event_handler
@with_connection_from_handle
@@ -5914,7 +6011,7 @@
max_rx_octets,
max_rx_time,
)
- connection.emit('connection_data_length_change')
+ connection.emit(connection.EVENT_CONNECTION_DATA_LENGTH_CHANGE)
@host_event_handler
def on_cs_remote_supported_capabilities(
@@ -5924,7 +6021,9 @@
return
if event.status != hci.HCI_SUCCESS:
- connection.emit('channel_sounding_capabilities_failure', event.status)
+ connection.emit(
+ connection.EVENT_CHANNEL_SOUNDING_CAPABILITIES_FAILURE, event.status
+ )
return
capabilities = ChannelSoundingCapabilities(
@@ -5949,7 +6048,7 @@
t_sw_time_supported=event.t_sw_time_supported,
tx_snr_capability=event.tx_snr_capability,
)
- connection.emit('channel_sounding_capabilities', capabilities)
+ connection.emit(connection.EVENT_CHANNEL_SOUNDING_CAPABILITIES, capabilities)
@host_event_handler
def on_cs_config(self, event: hci.HCI_LE_CS_Config_Complete_Event):
@@ -5957,7 +6056,9 @@
return
if event.status != hci.HCI_SUCCESS:
- connection.emit('channel_sounding_config_failure', event.status)
+ connection.emit(
+ connection.EVENT_CHANNEL_SOUNDING_CONFIG_FAILURE, event.status
+ )
return
if event.action == hci.HCI_LE_CS_Config_Complete_Event.Action.CREATED:
config = ChannelSoundingConfig(
@@ -5983,11 +6084,13 @@
t_pm_time=event.t_pm_time,
)
connection.cs_configs[event.config_id] = config
- connection.emit('channel_sounding_config', config)
+ connection.emit(connection.EVENT_CHANNEL_SOUNDING_CONFIG, config)
elif event.action == hci.HCI_LE_CS_Config_Complete_Event.Action.REMOVED:
try:
config = connection.cs_configs.pop(event.config_id)
- connection.emit('channel_sounding_config_removed', config.config_id)
+ connection.emit(
+ connection.EVENT_CHANNEL_SOUNDING_CONFIG_REMOVED, config.config_id
+ )
except KeyError:
logger.error('Removing unknown config %d', event.config_id)
@@ -5997,7 +6100,9 @@
return
if event.status != hci.HCI_SUCCESS:
- connection.emit('channel_sounding_procedure_failure', event.status)
+ connection.emit(
+ connection.EVENT_CHANNEL_SOUNDING_PROCEDURE_FAILURE, event.status
+ )
return
procedure = ChannelSoundingProcedure(
@@ -6014,37 +6119,37 @@
max_procedure_len=event.max_procedure_len,
)
connection.cs_procedures[procedure.config_id] = procedure
- connection.emit('channel_sounding_procedure', procedure)
+ connection.emit(connection.EVENT_CHANNEL_SOUNDING_PROCEDURE, procedure)
# [Classic only]
@host_event_handler
@with_connection_from_address
def on_role_change(self, connection, new_role):
connection.role = new_role
- connection.emit('role_change', new_role)
+ connection.emit(connection.EVENT_ROLE_CHANGE, new_role)
# [Classic only]
@host_event_handler
@try_with_connection_from_address
def on_role_change_failure(self, connection, address, error):
if connection:
- connection.emit('role_change_failure', error)
- self.emit('role_change_failure', address, error)
+ connection.emit(connection.EVENT_ROLE_CHANGE_FAILURE, error)
+ self.emit(self.EVENT_ROLE_CHANGE_FAILURE, address, error)
# [Classic only]
@host_event_handler
@with_connection_from_address
def on_classic_pairing(self, connection: Connection) -> None:
- connection.emit('classic_pairing')
+ connection.emit(connection.EVENT_CLASSIC_PAIRING)
# [Classic only]
@host_event_handler
@with_connection_from_address
def on_classic_pairing_failure(self, connection: Connection, status) -> None:
- connection.emit('classic_pairing_failure', status)
+ connection.emit(connection.EVENT_CLASSIC_PAIRING_FAILURE, status)
def on_pairing_start(self, connection: Connection) -> None:
- connection.emit('pairing_start')
+ connection.emit(connection.EVENT_PAIRING_START)
def on_pairing(
self,
@@ -6058,10 +6163,10 @@
connection.peer_address = identity_address
connection.sc = sc
connection.authenticated = True
- connection.emit('pairing', keys)
+ connection.emit(connection.EVENT_PAIRING, keys)
def on_pairing_failure(self, connection: Connection, reason: int) -> None:
- connection.emit('pairing_failure', reason)
+ connection.emit(connection.EVENT_PAIRING_FAILURE, reason)
@with_connection_from_handle
def on_gatt_pdu(self, connection, pdu):
diff --git a/bumble/gatt.py b/bumble/gatt.py
index 3dacbf7..be75f45 100644
--- a/bumble/gatt.py
+++ b/bumble/gatt.py
@@ -448,6 +448,8 @@
uuid: UUID
properties: Characteristic.Properties
+ EVENT_SUBSCRIPTION = "subscription"
+
class Properties(enum.IntFlag):
"""Property flags"""
diff --git a/bumble/gatt_client.py b/bumble/gatt_client.py
index fbe4e77..c2f23e3 100644
--- a/bumble/gatt_client.py
+++ b/bumble/gatt_client.py
@@ -202,6 +202,8 @@
descriptors: List[DescriptorProxy]
subscribers: Dict[Any, Callable[[_T], Any]]
+ EVENT_UPDATE = "update"
+
def __init__(
self,
client: Client,
@@ -308,7 +310,7 @@
self.services = []
self.cached_values = {}
- connection.on('disconnection', self.on_disconnection)
+ connection.on(connection.EVENT_DISCONNECTION, self.on_disconnection)
def send_gatt_pdu(self, pdu: bytes) -> None:
self.connection.send_l2cap_pdu(ATT_CID, pdu)
@@ -1142,7 +1144,7 @@
if callable(subscriber):
subscriber(notification.attribute_value)
else:
- subscriber.emit('update', notification.attribute_value)
+ subscriber.emit(subscriber.EVENT_UPDATE, notification.attribute_value)
def on_att_handle_value_indication(self, indication):
# Call all subscribers
@@ -1157,7 +1159,7 @@
if callable(subscriber):
subscriber(indication.attribute_value)
else:
- subscriber.emit('update', indication.attribute_value)
+ subscriber.emit(subscriber.EVENT_UPDATE, indication.attribute_value)
# Confirm that we received the indication
self.send_confirmation(ATT_Handle_Value_Confirmation())
diff --git a/bumble/gatt_server.py b/bumble/gatt_server.py
index 9cb2ba6..ad281da 100644
--- a/bumble/gatt_server.py
+++ b/bumble/gatt_server.py
@@ -110,6 +110,8 @@
indication_semaphores: defaultdict[int, asyncio.Semaphore]
pending_confirmations: defaultdict[int, Optional[asyncio.futures.Future]]
+ EVENT_CHARACTERISTIC_SUBSCRIPTION = "characteristic_subscription"
+
def __init__(self, device: Device) -> None:
super().__init__()
self.device = device
@@ -347,10 +349,13 @@
notify_enabled = value[0] & 0x01 != 0
indicate_enabled = value[0] & 0x02 != 0
characteristic.emit(
- 'subscription', connection, notify_enabled, indicate_enabled
+ characteristic.EVENT_SUBSCRIPTION,
+ connection,
+ notify_enabled,
+ indicate_enabled,
)
self.emit(
- 'characteristic_subscription',
+ self.EVENT_CHARACTERISTIC_SUBSCRIPTION,
connection,
characteristic,
notify_enabled,
diff --git a/bumble/hfp.py b/bumble/hfp.py
index 454279e..76d622f 100644
--- a/bumble/hfp.py
+++ b/bumble/hfp.py
@@ -720,6 +720,14 @@
vrec: VoiceRecognitionState
"""
+ EVENT_CODEC_NEGOTIATION = "codec_negotiation"
+ EVENT_AG_INDICATOR = "ag_indicator"
+ EVENT_SPEAKER_VOLUME = "speaker_volume"
+ EVENT_MICROPHONE_VOLUME = "microphone_volume"
+ EVENT_RING = "ring"
+ EVENT_CLI_NOTIFICATION = "cli_notification"
+ EVENT_VOICE_RECOGNITION = "voice_recognition"
+
class HfLoopTermination(HfpProtocolError):
"""Termination signal for run() loop."""
@@ -777,7 +785,8 @@
self.dlc.sink = self._read_at
# Stop the run() loop when L2CAP is closed.
self.dlc.multiplexer.l2cap_channel.on(
- 'close', lambda: self.unsolicited_queue.put_nowait(None)
+ self.dlc.multiplexer.l2cap_channel.EVENT_CLOSE,
+ lambda: self.unsolicited_queue.put_nowait(None),
)
def supports_hf_feature(self, feature: HfFeature) -> bool:
@@ -1034,7 +1043,7 @@
# ID. The HF shall be ready to accept the synchronous connection
# establishment as soon as it has sent the AT commands AT+BCS=<Codec ID>.
self.active_codec = AudioCodec(codec_id)
- self.emit('codec_negotiation', self.active_codec)
+ self.emit(self.EVENT_CODEC_NEGOTIATION, self.active_codec)
logger.info("codec connection setup completed")
@@ -1095,7 +1104,7 @@
# CIEV is in 1-index, while ag_indicators is in 0-index.
ag_indicator = self.ag_indicators[index - 1]
ag_indicator.current_status = value
- self.emit('ag_indicator', ag_indicator)
+ self.emit(self.EVENT_AG_INDICATOR, ag_indicator)
logger.info(f"AG indicator updated: {ag_indicator.indicator}, {value}")
async def handle_unsolicited(self):
@@ -1110,19 +1119,21 @@
int(result.parameters[0]), int(result.parameters[1])
)
elif result.code == "+VGS":
- self.emit('speaker_volume', int(result.parameters[0]))
+ self.emit(self.EVENT_SPEAKER_VOLUME, int(result.parameters[0]))
elif result.code == "+VGM":
- self.emit('microphone_volume', int(result.parameters[0]))
+ self.emit(self.EVENT_MICROPHONE_VOLUME, int(result.parameters[0]))
elif result.code == "RING":
- self.emit('ring')
+ self.emit(self.EVENT_RING)
elif result.code == "+CLIP":
self.emit(
- 'cli_notification', CallLineIdentification.parse_from(result.parameters)
+ self.EVENT_CLI_NOTIFICATION,
+ CallLineIdentification.parse_from(result.parameters),
)
elif result.code == "+BVRA":
# TODO: Support Enhanced Voice Recognition.
self.emit(
- 'voice_recognition', VoiceRecognitionState(int(result.parameters[0]))
+ self.EVENT_VOICE_RECOGNITION,
+ VoiceRecognitionState(int(result.parameters[0])),
)
else:
logging.info(f"unhandled unsolicited response {result.code}")
@@ -1179,6 +1190,19 @@
volume: Int
"""
+ EVENT_SLC_COMPLETE = "slc_complete"
+ EVENT_SUPPORTED_AUDIO_CODECS = "supported_audio_codecs"
+ EVENT_CODEC_NEGOTIATION = "codec_negotiation"
+ EVENT_VOICE_RECOGNITION = "voice_recognition"
+ EVENT_CALL_HOLD = "call_hold"
+ EVENT_HF_INDICATOR = "hf_indicator"
+ EVENT_CODEC_CONNECTION_REQUEST = "codec_connection_request"
+ EVENT_ANSWER = "answer"
+ EVENT_DIAL = "dial"
+ EVENT_HANG_UP = "hang_up"
+ EVENT_SPEAKER_VOLUME = "speaker_volume"
+ EVENT_MICROPHONE_VOLUME = "microphone_volume"
+
supported_hf_features: int
supported_hf_indicators: Set[HfIndicator]
supported_audio_codecs: List[AudioCodec]
@@ -1371,7 +1395,7 @@
def _check_remained_slc_commands(self) -> None:
if not self._remained_slc_setup_features:
- self.emit('slc_complete')
+ self.emit(self.EVENT_SLC_COMPLETE)
def _on_brsf(self, hf_features: bytes) -> None:
self.supported_hf_features = int(hf_features)
@@ -1390,17 +1414,17 @@
def _on_bac(self, *args) -> None:
self.supported_audio_codecs = [AudioCodec(int(value)) for value in args]
- self.emit('supported_audio_codecs', self.supported_audio_codecs)
+ self.emit(self.EVENT_SUPPORTED_AUDIO_CODECS, self.supported_audio_codecs)
self.send_ok()
def _on_bcs(self, codec: bytes) -> None:
self.active_codec = AudioCodec(int(codec))
self.send_ok()
- self.emit('codec_negotiation', self.active_codec)
+ self.emit(self.EVENT_CODEC_NEGOTIATION, self.active_codec)
def _on_bvra(self, vrec: bytes) -> None:
self.send_ok()
- self.emit('voice_recognition', VoiceRecognitionState(int(vrec)))
+ self.emit(self.EVENT_VOICE_RECOGNITION, VoiceRecognitionState(int(vrec)))
def _on_chld(self, operation_code: bytes) -> None:
call_index: Optional[int] = None
@@ -1427,7 +1451,7 @@
# Real three-way calls have more complicated situations, but this is not a popular issue - let users to handle the remaining :)
self.send_ok()
- self.emit('call_hold', operation, call_index)
+ self.emit(self.EVENT_CALL_HOLD, operation, call_index)
def _on_chld_test(self) -> None:
if not self.supports_ag_feature(AgFeature.THREE_WAY_CALLING):
@@ -1553,7 +1577,7 @@
return
self.hf_indicators[index].current_status = int(value_bytes)
- self.emit('hf_indicator', self.hf_indicators[index])
+ self.emit(self.EVENT_HF_INDICATOR, self.hf_indicators[index])
self.send_ok()
def _on_bia(self, *args) -> None:
@@ -1562,21 +1586,21 @@
self.send_ok()
def _on_bcc(self) -> None:
- self.emit('codec_connection_request')
+ self.emit(self.EVENT_CODEC_CONNECTION_REQUEST)
self.send_ok()
def _on_a(self) -> None:
"""ATA handler."""
- self.emit('answer')
+ self.emit(self.EVENT_ANSWER)
self.send_ok()
def _on_d(self, number: bytes) -> None:
"""ATD handler."""
- self.emit('dial', number.decode())
+ self.emit(self.EVENT_DIAL, number.decode())
self.send_ok()
def _on_chup(self) -> None:
- self.emit('hang_up')
+ self.emit(self.EVENT_HANG_UP)
self.send_ok()
def _on_clcc(self) -> None:
@@ -1602,11 +1626,11 @@
self.send_ok()
def _on_vgs(self, level: bytes) -> None:
- self.emit('speaker_volume', int(level))
+ self.emit(self.EVENT_SPEAKER_VOLUME, int(level))
self.send_ok()
def _on_vgm(self, level: bytes) -> None:
- self.emit('microphone_volume', int(level))
+ self.emit(self.EVENT_MICROPHONE_VOLUME, int(level))
self.send_ok()
diff --git a/bumble/hid.py b/bumble/hid.py
index daaeb35..edcfc98 100644
--- a/bumble/hid.py
+++ b/bumble/hid.py
@@ -201,6 +201,13 @@
l2cap_intr_channel: Optional[l2cap.ClassicChannel] = None
connection: Optional[device.Connection] = None
+ EVENT_INTERRUPT_DATA = "interrupt_data"
+ EVENT_CONTROL_DATA = "control_data"
+ EVENT_SUSPEND = "suspend"
+ EVENT_EXIT_SUSPEND = "exit_suspend"
+ EVENT_VIRTUAL_CABLE_UNPLUG = "virtual_cable_unplug"
+ EVENT_HANDSHAKE = "handshake"
+
class Role(enum.IntEnum):
HOST = 0x00
DEVICE = 0x01
@@ -215,7 +222,7 @@
device.register_l2cap_server(HID_CONTROL_PSM, self.on_l2cap_connection)
device.register_l2cap_server(HID_INTERRUPT_PSM, self.on_l2cap_connection)
- device.on('connection', self.on_device_connection)
+ device.on(device.EVENT_CONNECTION, self.on_device_connection)
async def connect_control_channel(self) -> None:
# Create a new L2CAP connection - control channel
@@ -258,15 +265,20 @@
def on_device_connection(self, connection: device.Connection) -> None:
self.connection = connection
self.remote_device_bd_address = connection.peer_address
- connection.on('disconnection', self.on_device_disconnection)
+ connection.on(connection.EVENT_DISCONNECTION, self.on_device_disconnection)
def on_device_disconnection(self, reason: int) -> None:
self.connection = None
def on_l2cap_connection(self, l2cap_channel: l2cap.ClassicChannel) -> None:
logger.debug(f'+++ New L2CAP connection: {l2cap_channel}')
- l2cap_channel.on('open', lambda: self.on_l2cap_channel_open(l2cap_channel))
- l2cap_channel.on('close', lambda: self.on_l2cap_channel_close(l2cap_channel))
+ l2cap_channel.on(
+ l2cap_channel.EVENT_OPEN, lambda: self.on_l2cap_channel_open(l2cap_channel)
+ )
+ l2cap_channel.on(
+ l2cap_channel.EVENT_CLOSE,
+ lambda: self.on_l2cap_channel_close(l2cap_channel),
+ )
def on_l2cap_channel_open(self, l2cap_channel: l2cap.ClassicChannel) -> None:
if l2cap_channel.psm == HID_CONTROL_PSM:
@@ -290,7 +302,7 @@
def on_intr_pdu(self, pdu: bytes) -> None:
logger.debug(f'<<< HID INTERRUPT PDU: {pdu.hex()}')
- self.emit("interrupt_data", pdu)
+ self.emit(self.EVENT_INTERRUPT_DATA, pdu)
def send_pdu_on_ctrl(self, msg: bytes) -> None:
assert self.l2cap_ctrl_channel
@@ -363,17 +375,17 @@
self.handle_set_protocol(pdu)
elif message_type == Message.MessageType.DATA:
logger.debug('<<< HID CONTROL DATA')
- self.emit('control_data', pdu)
+ self.emit(self.EVENT_CONTROL_DATA, pdu)
elif message_type == Message.MessageType.CONTROL:
if param == Message.ControlCommand.SUSPEND:
logger.debug('<<< HID SUSPEND')
- self.emit('suspend')
+ self.emit(self.EVENT_SUSPEND)
elif param == Message.ControlCommand.EXIT_SUSPEND:
logger.debug('<<< HID EXIT SUSPEND')
- self.emit('exit_suspend')
+ self.emit(self.EVENT_EXIT_SUSPEND)
elif param == Message.ControlCommand.VIRTUAL_CABLE_UNPLUG:
logger.debug('<<< HID VIRTUAL CABLE UNPLUG')
- self.emit('virtual_cable_unplug')
+ self.emit(self.EVENT_VIRTUAL_CABLE_UNPLUG)
else:
logger.debug('<<< HID CONTROL OPERATION UNSUPPORTED')
else:
@@ -538,14 +550,14 @@
message_type = pdu[0] >> 4
if message_type == Message.MessageType.HANDSHAKE:
logger.debug(f'<<< HID HANDSHAKE: {Message.Handshake(param).name}')
- self.emit('handshake', Message.Handshake(param))
+ self.emit(self.EVENT_HANDSHAKE, Message.Handshake(param))
elif message_type == Message.MessageType.DATA:
logger.debug('<<< HID CONTROL DATA')
- self.emit('control_data', pdu)
+ self.emit(self.EVENT_CONTROL_DATA, pdu)
elif message_type == Message.MessageType.CONTROL:
if param == Message.ControlCommand.VIRTUAL_CABLE_UNPLUG:
logger.debug('<<< HID VIRTUAL CABLE UNPLUG')
- self.emit('virtual_cable_unplug')
+ self.emit(self.EVENT_VIRTUAL_CABLE_UNPLUG)
else:
logger.debug('<<< HID CONTROL OPERATION UNSUPPORTED')
else:
diff --git a/bumble/l2cap.py b/bumble/l2cap.py
index c8bbabe..08d8edf 100644
--- a/bumble/l2cap.py
+++ b/bumble/l2cap.py
@@ -744,6 +744,9 @@
WAIT_FINAL_RSP = 0x16
WAIT_CONTROL_IND = 0x17
+ EVENT_OPEN = "open"
+ EVENT_CLOSE = "close"
+
connection_result: Optional[asyncio.Future[None]]
disconnection_result: Optional[asyncio.Future[None]]
response: Optional[asyncio.Future[bytes]]
@@ -847,7 +850,7 @@
def abort(self) -> None:
if self.state == self.State.OPEN:
self._change_state(self.State.CLOSED)
- self.emit('close')
+ self.emit(self.EVENT_CLOSE)
def send_configure_request(self) -> None:
options = L2CAP_Control_Frame.encode_configuration_options(
@@ -940,7 +943,7 @@
if self.connection_result:
self.connection_result.set_result(None)
self.connection_result = None
- self.emit('open')
+ self.emit(self.EVENT_OPEN)
elif self.state == self.State.WAIT_CONFIG_REQ_RSP:
self._change_state(self.State.WAIT_CONFIG_RSP)
@@ -956,7 +959,7 @@
if self.connection_result:
self.connection_result.set_result(None)
self.connection_result = None
- self.emit('open')
+ self.emit(self.EVENT_OPEN)
else:
logger.warning(color('invalid state', 'red'))
elif (
@@ -991,7 +994,7 @@
)
)
self._change_state(self.State.CLOSED)
- self.emit('close')
+ self.emit(self.EVENT_CLOSE)
self.manager.on_channel_closed(self)
else:
logger.warning(color('invalid state', 'red'))
@@ -1012,7 +1015,7 @@
if self.disconnection_result:
self.disconnection_result.set_result(None)
self.disconnection_result = None
- self.emit('close')
+ self.emit(self.EVENT_CLOSE)
self.manager.on_channel_closed(self)
def __str__(self) -> str:
@@ -1047,6 +1050,9 @@
connection: Connection
sink: Optional[Callable[[bytes], Any]]
+ EVENT_OPEN = "open"
+ EVENT_CLOSE = "close"
+
def __init__(
self,
manager: ChannelManager,
@@ -1098,9 +1104,9 @@
self.state = new_state
if new_state == self.State.CONNECTED:
- self.emit('open')
+ self.emit(self.EVENT_OPEN)
elif new_state == self.State.DISCONNECTED:
- self.emit('close')
+ self.emit(self.EVENT_CLOSE)
def send_pdu(self, pdu: Union[SupportsBytes, bytes]) -> None:
self.manager.send_pdu(self.connection, self.destination_cid, pdu)
@@ -1381,6 +1387,8 @@
# -----------------------------------------------------------------------------
class ClassicChannelServer(utils.EventEmitter):
+ EVENT_CONNECTION = "connection"
+
def __init__(
self,
manager: ChannelManager,
@@ -1395,7 +1403,7 @@
self.mtu = mtu
def on_connection(self, channel: ClassicChannel) -> None:
- self.emit('connection', channel)
+ self.emit(self.EVENT_CONNECTION, channel)
if self.handler:
self.handler(channel)
@@ -1406,6 +1414,8 @@
# -----------------------------------------------------------------------------
class LeCreditBasedChannelServer(utils.EventEmitter):
+ EVENT_CONNECTION = "connection"
+
def __init__(
self,
manager: ChannelManager,
@@ -1424,7 +1434,7 @@
self.mps = mps
def on_connection(self, channel: LeCreditBasedChannel) -> None:
- self.emit('connection', channel)
+ self.emit(self.EVENT_CONNECTION, channel)
if self.handler:
self.handler(channel)
diff --git a/bumble/pandora/host.py b/bumble/pandora/host.py
index 0390170..515c1d5 100644
--- a/bumble/pandora/host.py
+++ b/bumble/pandora/host.py
@@ -296,12 +296,12 @@
def on_disconnection(_: None) -> None:
disconnection_future.set_result(None)
- connection.on('disconnection', on_disconnection)
+ connection.on(connection.EVENT_DISCONNECTION, on_disconnection)
try:
await disconnection_future
self.log.debug("Disconnected")
finally:
- connection.remove_listener('disconnection', on_disconnection) # type: ignore
+ connection.remove_listener(connection.EVENT_DISCONNECTION, on_disconnection) # type: ignore
return empty_pb2.Empty()
@@ -383,7 +383,7 @@
):
connections.put_nowait(connection)
- self.device.on('connection', on_connection)
+ self.device.on(self.device.EVENT_CONNECTION, on_connection)
try:
# Advertise until RPC is canceled
@@ -501,7 +501,7 @@
):
connections.put_nowait(connection)
- self.device.on('connection', on_connection)
+ self.device.on(self.device.EVENT_CONNECTION, on_connection)
try:
while True:
@@ -531,7 +531,7 @@
await asyncio.sleep(1)
finally:
if request.connectable:
- self.device.remove_listener('connection', on_connection) # type: ignore
+ self.device.remove_listener(self.device.EVENT_CONNECTION, on_connection) # type: ignore
try:
self.log.debug('Stop advertising')
@@ -557,7 +557,7 @@
scanning_phys = [int(Phy.LE_1M), int(Phy.LE_CODED)]
scan_queue: asyncio.Queue[Advertisement] = asyncio.Queue()
- handler = self.device.on('advertisement', scan_queue.put_nowait)
+ handler = self.device.on(self.device.EVENT_ADVERTISEMENT, scan_queue.put_nowait)
await self.device.start_scanning(
legacy=request.legacy,
active=not request.passive,
@@ -602,7 +602,7 @@
yield sr
finally:
- self.device.remove_listener('advertisement', handler) # type: ignore
+ self.device.remove_listener(self.device.EVENT_ADVERTISEMENT, handler) # type: ignore
try:
self.log.debug('Stop scanning')
await bumble.utils.cancel_on_event(
@@ -621,10 +621,10 @@
Optional[Tuple[Address, int, AdvertisingData, int]]
] = asyncio.Queue()
complete_handler = self.device.on(
- 'inquiry_complete', lambda: inquiry_queue.put_nowait(None)
+ self.device.EVENT_INQUIRY_COMPLETE, lambda: inquiry_queue.put_nowait(None)
)
result_handler = self.device.on( # type: ignore
- 'inquiry_result',
+ self.device.EVENT_INQUIRY_RESULT,
lambda address, class_of_device, eir_data, rssi: inquiry_queue.put_nowait( # type: ignore
(address, class_of_device, eir_data, rssi) # type: ignore
),
@@ -643,8 +643,8 @@
)
finally:
- self.device.remove_listener('inquiry_complete', complete_handler) # type: ignore
- self.device.remove_listener('inquiry_result', result_handler) # type: ignore
+ self.device.remove_listener(self.device.EVENT_INQUIRY_COMPLETE, complete_handler) # type: ignore
+ self.device.remove_listener(self.device.EVENT_INQUIRY_RESULT, result_handler) # type: ignore
try:
self.log.debug('Stop inquiry')
await bumble.utils.cancel_on_event(
diff --git a/bumble/pandora/l2cap.py b/bumble/pandora/l2cap.py
index df52b50..94b042c 100644
--- a/bumble/pandora/l2cap.py
+++ b/bumble/pandora/l2cap.py
@@ -83,7 +83,7 @@
close_future.set_result(None)
l2cap_channel.sink = on_channel_sdu
- l2cap_channel.on('close', on_close)
+ l2cap_channel.on(l2cap_channel.EVENT_CLOSE, on_close)
return ChannelContext(close_future, sdu_queue)
@@ -151,7 +151,7 @@
spec=spec, handler=on_l2cap_channel
)
else:
- l2cap_server.on('connection', on_l2cap_channel)
+ l2cap_server.on(l2cap_server.EVENT_CONNECTION, on_l2cap_channel)
try:
self.log.debug('Waiting for a channel connection.')
diff --git a/bumble/pandora/security.py b/bumble/pandora/security.py
index 947587e..ca3102d 100644
--- a/bumble/pandora/security.py
+++ b/bumble/pandora/security.py
@@ -302,15 +302,15 @@
with contextlib.closing(bumble.utils.EventWatcher()) as watcher:
- @watcher.on(connection, 'pairing')
+ @watcher.on(connection, connection.EVENT_PAIRING)
def on_pairing(*_: Any) -> None:
security_result.set_result('success')
- @watcher.on(connection, 'pairing_failure')
+ @watcher.on(connection, connection.EVENT_PAIRING_FAILURE)
def on_pairing_failure(*_: Any) -> None:
security_result.set_result('pairing_failure')
- @watcher.on(connection, 'disconnection')
+ @watcher.on(connection, connection.EVENT_DISCONNECTION)
def on_disconnection(*_: Any) -> None:
security_result.set_result('connection_died')
diff --git a/bumble/profiles/ancs.py b/bumble/profiles/ancs.py
index 8ea5aaa..6bf93f1 100644
--- a/bumble/profiles/ancs.py
+++ b/bumble/profiles/ancs.py
@@ -250,6 +250,8 @@
_expected_response_tuples: int
_response_accumulator: bytes
+ EVENT_NOTIFICATION = "notification"
+
def __init__(self, ancs_proxy: AncsProxy) -> None:
super().__init__()
self._ancs_proxy = ancs_proxy
@@ -284,7 +286,7 @@
def _on_notification(self, notification: Notification) -> None:
logger.debug(f"ANCS NOTIFICATION: {notification}")
- self.emit("notification", notification)
+ self.emit(self.EVENT_NOTIFICATION, notification)
def _on_data(self, data: bytes) -> None:
logger.debug(f"ANCS DATA: {data.hex()}")
diff --git a/bumble/profiles/ascs.py b/bumble/profiles/ascs.py
index cab70da..2b00160 100644
--- a/bumble/profiles/ascs.py
+++ b/bumble/profiles/ascs.py
@@ -276,6 +276,8 @@
DISABLING = 0x05
RELEASING = 0x06
+ EVENT_STATE_CHANGE = "state_change"
+
cis_link: Optional[device.CisLink] = None
# Additional parameters in CODEC_CONFIGURED State
@@ -329,8 +331,12 @@
value=gatt.CharacteristicValue(read=self.on_read),
)
- self.service.device.on('cis_request', self.on_cis_request)
- self.service.device.on('cis_establishment', self.on_cis_establishment)
+ self.service.device.on(
+ self.service.device.EVENT_CIS_REQUEST, self.on_cis_request
+ )
+ self.service.device.on(
+ self.service.device.EVENT_CIS_ESTABLISHMENT, self.on_cis_establishment
+ )
def on_cis_request(
self,
@@ -356,7 +362,7 @@
and cis_link.cis_id == self.cis_id
and self.state == self.State.ENABLING
):
- cis_link.on('disconnection', self.on_cis_disconnection)
+ cis_link.on(cis_link.EVENT_DISCONNECTION, self.on_cis_disconnection)
async def post_cis_established():
await cis_link.setup_data_path(direction=self.role)
@@ -525,7 +531,7 @@
def state(self, new_state: State) -> None:
logger.debug(f'{self} state change -> {colors.color(new_state.name, "cyan")}')
self._state = new_state
- self.emit('state_change')
+ self.emit(self.EVENT_STATE_CHANGE)
@property
def value(self):
diff --git a/bumble/profiles/asha.py b/bumble/profiles/asha.py
index 7422d53..9166802 100644
--- a/bumble/profiles/asha.py
+++ b/bumble/profiles/asha.py
@@ -88,6 +88,11 @@
class AshaService(gatt.TemplateService):
UUID = gatt.GATT_ASHA_SERVICE
+ EVENT_STARTED = "started"
+ EVENT_STOPPED = "stopped"
+ EVENT_DISCONNECTED = "disconnected"
+ EVENT_VOLUME_CHANGED = "volume_changed"
+
audio_sink: Optional[Callable[[bytes], Any]]
active_codec: Optional[Codec] = None
audio_type: Optional[AudioType] = None
@@ -211,14 +216,14 @@
f'volume={self.volume}, '
f'other_state={self.other_state}'
)
- self.emit('started')
+ self.emit(self.EVENT_STARTED)
elif opcode == OpCode.STOP:
_logger.debug('### STOP')
self.active_codec = None
self.audio_type = None
self.volume = None
self.other_state = None
- self.emit('stopped')
+ self.emit(self.EVENT_STOPPED)
elif opcode == OpCode.STATUS:
_logger.debug('### STATUS: %s', PeripheralStatus(value[1]).name)
@@ -231,7 +236,7 @@
self.audio_type = None
self.volume = None
self.other_state = None
- self.emit('disconnected')
+ self.emit(self.EVENT_DISCONNECTED)
connection.once('disconnection', on_disconnection)
@@ -245,7 +250,7 @@
def _on_volume_write(self, connection: Optional[Connection], value: bytes) -> None:
_logger.debug(f'--- VOLUME Write:{value[0]}')
self.volume = value[0]
- self.emit('volume_changed')
+ self.emit(self.EVENT_VOLUME_CHANGED)
# Register an L2CAP CoC server
def _on_connection(self, channel: l2cap.LeCreditBasedChannel) -> None:
diff --git a/bumble/profiles/hap.py b/bumble/profiles/hap.py
index 655cc6a..0074818 100644
--- a/bumble/profiles/hap.py
+++ b/bumble/profiles/hap.py
@@ -266,13 +266,13 @@
# associate the lowest index as the current active preset at startup
self.active_preset_index = sorted(self.preset_records.keys())[0]
- @device.on('connection') # type: ignore
+ @device.on(device.EVENT_CONNECTION)
def on_connection(connection: Connection) -> None:
- @connection.on('disconnection') # type: ignore
+ @connection.on(connection.EVENT_DISCONNECTION)
def on_disconnection(_reason) -> None:
self.currently_connected_clients.remove(connection)
- @connection.on('pairing') # type: ignore
+ @connection.on(connection.EVENT_PAIRING)
def on_pairing(*_: Any) -> None:
self.on_incoming_paired_connection(connection)
diff --git a/bumble/profiles/mcp.py b/bumble/profiles/mcp.py
index abd35e2..68cac93 100644
--- a/bumble/profiles/mcp.py
+++ b/bumble/profiles/mcp.py
@@ -338,6 +338,12 @@
'content_control_id': gatt.GATT_CONTENT_CONTROL_ID_CHARACTERISTIC,
}
+ EVENT_MEDIA_STATE = "media_state"
+ EVENT_TRACK_CHANGED = "track_changed"
+ EVENT_TRACK_TITLE = "track_title"
+ EVENT_TRACK_DURATION = "track_duration"
+ EVENT_TRACK_POSITION = "track_position"
+
media_player_name: Optional[gatt_client.CharacteristicProxy[bytes]] = None
media_player_icon_object_id: Optional[gatt_client.CharacteristicProxy[bytes]] = None
media_player_icon_url: Optional[gatt_client.CharacteristicProxy[bytes]] = None
@@ -432,20 +438,20 @@
self.media_control_point_notifications.put_nowait(data)
def _on_media_state(self, data: bytes) -> None:
- self.emit('media_state', MediaState(data[0]))
+ self.emit(self.EVENT_MEDIA_STATE, MediaState(data[0]))
def _on_track_changed(self, data: bytes) -> None:
del data
- self.emit('track_changed')
+ self.emit(self.EVENT_TRACK_CHANGED)
def _on_track_title(self, data: bytes) -> None:
- self.emit('track_title', data.decode("utf-8"))
+ self.emit(self.EVENT_TRACK_TITLE, data.decode("utf-8"))
def _on_track_duration(self, data: bytes) -> None:
- self.emit('track_duration', struct.unpack_from('<i', data)[0])
+ self.emit(self.EVENT_TRACK_DURATION, struct.unpack_from('<i', data)[0])
def _on_track_position(self, data: bytes) -> None:
- self.emit('track_position', struct.unpack_from('<i', data)[0])
+ self.emit(self.EVENT_TRACK_POSITION, struct.unpack_from('<i', data)[0])
class GenericMediaControlServiceProxy(MediaControlServiceProxy):
diff --git a/bumble/profiles/vcs.py b/bumble/profiles/vcs.py
index 30c820e..54d7bbe 100644
--- a/bumble/profiles/vcs.py
+++ b/bumble/profiles/vcs.py
@@ -91,6 +91,8 @@
class VolumeControlService(gatt.TemplateService):
UUID = gatt.GATT_VOLUME_CONTROL_SERVICE
+ EVENT_VOLUME_STATE_CHANGE = "volume_state_change"
+
volume_state: gatt.Characteristic[bytes]
volume_control_point: gatt.Characteristic[bytes]
volume_flags: gatt.Characteristic[bytes]
@@ -166,7 +168,7 @@
'disconnection',
connection.device.notify_subscribers(attribute=self.volume_state),
)
- self.emit('volume_state_change')
+ self.emit(self.EVENT_VOLUME_STATE_CHANGE)
def _on_relative_volume_down(self) -> bool:
old_volume = self.volume_setting
diff --git a/bumble/rfcomm.py b/bumble/rfcomm.py
index 1fbb24e..fe5038b 100644
--- a/bumble/rfcomm.py
+++ b/bumble/rfcomm.py
@@ -442,6 +442,9 @@
# -----------------------------------------------------------------------------
class DLC(utils.EventEmitter):
+ EVENT_OPEN = "open"
+ EVENT_CLOSE = "close"
+
class State(enum.IntEnum):
INIT = 0x00
CONNECTING = 0x01
@@ -529,7 +532,7 @@
self.send_frame(RFCOMM_Frame.uih(c_r=self.c_r, dlci=0, information=mcc))
self.change_state(DLC.State.CONNECTED)
- self.emit('open')
+ self.emit(self.EVENT_OPEN)
def on_ua_frame(self, _frame: RFCOMM_Frame) -> None:
if self.state == DLC.State.CONNECTING:
@@ -550,7 +553,7 @@
self.disconnection_result.set_result(None)
self.disconnection_result = None
self.multiplexer.on_dlc_disconnection(self)
- self.emit('close')
+ self.emit(self.EVENT_CLOSE)
else:
logger.warning(
color(
@@ -733,7 +736,7 @@
self.disconnection_result.cancel()
self.disconnection_result = None
self.change_state(DLC.State.RESET)
- self.emit('close')
+ self.emit(self.EVENT_CLOSE)
def __str__(self) -> str:
return (
@@ -763,6 +766,8 @@
DISCONNECTED = 0x05
RESET = 0x06
+ EVENT_DLC = "dlc"
+
connection_result: Optional[asyncio.Future]
disconnection_result: Optional[asyncio.Future]
open_result: Optional[asyncio.Future]
@@ -785,7 +790,7 @@
# Become a sink for the L2CAP channel
l2cap_channel.sink = self.on_pdu
- l2cap_channel.on('close', self.on_l2cap_channel_close)
+ l2cap_channel.on(l2cap_channel.EVENT_CLOSE, self.on_l2cap_channel_close)
def change_state(self, new_state: State) -> None:
logger.debug(f'{self} state change -> {color(new_state.name, "cyan")}')
@@ -901,7 +906,7 @@
self.dlcs[pn.dlci] = dlc
# Re-emit the handshake completion event
- dlc.on('open', lambda: self.emit('dlc', dlc))
+ dlc.on(dlc.EVENT_OPEN, lambda: self.emit(self.EVENT_DLC, dlc))
# Respond to complete the handshake
dlc.accept()
@@ -1076,6 +1081,8 @@
# -----------------------------------------------------------------------------
class Server(utils.EventEmitter):
+ EVENT_START = "start"
+
def __init__(
self, device: Device, l2cap_mtu: int = RFCOMM_DEFAULT_L2CAP_MTU
) -> None:
@@ -1122,7 +1129,9 @@
def on_connection(self, l2cap_channel: l2cap.ClassicChannel) -> None:
logger.debug(f'+++ new L2CAP connection: {l2cap_channel}')
- l2cap_channel.on('open', lambda: self.on_l2cap_channel_open(l2cap_channel))
+ l2cap_channel.on(
+ l2cap_channel.EVENT_OPEN, lambda: self.on_l2cap_channel_open(l2cap_channel)
+ )
def on_l2cap_channel_open(self, l2cap_channel: l2cap.ClassicChannel) -> None:
logger.debug(f'$$$ L2CAP channel open: {l2cap_channel}')
@@ -1130,10 +1139,10 @@
# Create a new multiplexer for the channel
multiplexer = Multiplexer(l2cap_channel, Multiplexer.Role.RESPONDER)
multiplexer.acceptor = self.accept_dlc
- multiplexer.on('dlc', self.on_dlc)
+ multiplexer.on(multiplexer.EVENT_DLC, self.on_dlc)
# Notify
- self.emit('start', multiplexer)
+ self.emit(self.EVENT_START, multiplexer)
def accept_dlc(self, channel_number: int) -> Optional[Tuple[int, int]]:
return self.dlc_configs.get(channel_number)
diff --git a/bumble/smp.py b/bumble/smp.py
index d98565c..79579d3 100644
--- a/bumble/smp.py
+++ b/bumble/smp.py
@@ -724,12 +724,13 @@
self.is_responder = not self.is_initiator
# Listen for connection events
- connection.on('disconnection', self.on_disconnection)
+ connection.on(connection.EVENT_DISCONNECTION, self.on_disconnection)
connection.on(
- 'connection_encryption_change', self.on_connection_encryption_change
+ connection.EVENT_CONNECTION_ENCRYPTION_CHANGE,
+ self.on_connection_encryption_change,
)
connection.on(
- 'connection_encryption_key_refresh',
+ connection.EVENT_CONNECTION_ENCRYPTION_KEY_REFRESH,
self.on_connection_encryption_key_refresh,
)
@@ -1310,12 +1311,15 @@
)
def on_disconnection(self, _: int) -> None:
- self.connection.remove_listener('disconnection', self.on_disconnection)
self.connection.remove_listener(
- 'connection_encryption_change', self.on_connection_encryption_change
+ self.connection.EVENT_DISCONNECTION, self.on_disconnection
)
self.connection.remove_listener(
- 'connection_encryption_key_refresh',
+ self.connection.EVENT_CONNECTION_ENCRYPTION_CHANGE,
+ self.on_connection_encryption_change,
+ )
+ self.connection.remove_listener(
+ self.connection.EVENT_CONNECTION_ENCRYPTION_KEY_REFRESH,
self.on_connection_encryption_key_refresh,
)
self.manager.on_session_end(self)
@@ -1962,7 +1966,7 @@
def on_smp_security_request_command(
self, connection: Connection, request: SMP_Security_Request_Command
) -> None:
- connection.emit('security_request', request.auth_req)
+ connection.emit(connection.EVENT_SECURITY_REQUEST, request.auth_req)
def on_smp_pdu(self, connection: Connection, pdu: bytes) -> None:
# Parse the L2CAP payload into an SMP Command object