resolve merge conflicts
diff --git a/apps/pair.py b/apps/pair.py
index 1f303a0..284aec1 100644
--- a/apps/pair.py
+++ b/apps/pair.py
@@ -18,9 +18,12 @@
import asyncio
import os
import logging
+import struct
+
import click
from prompt_toolkit.shortcuts import PromptSession
+from bumble.a2dp import make_audio_sink_service_sdp_records
from bumble.colors import color
from bumble.device import Device, Peer
from bumble.transport import open_transport_or_link
@@ -30,8 +33,10 @@
from bumble.keys import JsonKeyStore
from bumble.core import (
AdvertisingData,
+ Appearance,
ProtocolError,
PhysicalTransport,
+ UUID,
)
from bumble.gatt import (
GATT_DEVICE_NAME_CHARACTERISTIC,
@@ -40,8 +45,8 @@
GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC,
Service,
Characteristic,
- CharacteristicValue,
)
+from bumble.hci import OwnAddressType
from bumble.att import (
ATT_Error,
ATT_INSUFFICIENT_AUTHENTICATION_ERROR,
@@ -195,7 +200,7 @@
# -----------------------------------------------------------------------------
async def get_peer_name(peer, mode):
- if mode == 'classic':
+ if peer.connection.transport == PhysicalTransport.BR_EDR:
return await peer.request_name()
# Try to get the peer name from GATT
@@ -228,6 +233,16 @@
# -----------------------------------------------------------------------------
+def sdp_records():
+ service_record_handle = 0x00010001
+ return {
+ service_record_handle: make_audio_sink_service_sdp_records(
+ service_record_handle
+ )
+ }
+
+
+# -----------------------------------------------------------------------------
def on_connection(connection, request):
print(color(f'<<< Connection: {connection}', 'green'))
@@ -298,6 +313,7 @@
mitm,
bond,
ctkd,
+ advertising_address,
identity_address,
linger,
io,
@@ -306,6 +322,8 @@
request,
print_keys,
keystore_file,
+ advertise_service_uuids,
+ advertise_appearance,
device_config,
hci_transport,
address_or_name,
@@ -321,8 +339,7 @@
# Expose a GATT characteristic that can be used to trigger pairing by
# responding with an authentication error when read
- if mode == 'le':
- device.le_enabled = True
+ if mode in ('le', 'dual'):
device.add_service(
Service(
GATT_HEART_RATE_SERVICE,
@@ -337,10 +354,18 @@
)
)
- # Select LE or Classic
- if mode == 'classic':
+ # LE and Classic support
+ if mode in ('classic', 'dual'):
device.classic_enabled = True
device.classic_smp_enabled = ctkd
+ if mode in ('le', 'dual'):
+ device.le_enabled = True
+ if mode == 'dual':
+ device.le_simultaneous_enabled = True
+
+ # Setup SDP
+ if mode in ('classic', 'dual'):
+ device.sdp_service_records = sdp_records()
# Get things going
await device.power_on()
@@ -426,33 +451,109 @@
print(color(f'Pairing failed: {error}', 'red'))
else:
- if mode == 'le':
+ if mode in ('le', 'dual'):
# Advertise so that peers can find us and connect.
# Include the heart rate service UUID in the advertisement data
# so that devices like iPhones can show this device in their
# Bluetooth selector.
- device.advertising_data = bytes(
- AdvertisingData(
- [
- (
- AdvertisingData.FLAGS,
- bytes(
- [AdvertisingData.LE_GENERAL_DISCOVERABLE_MODE_FLAG]
- ),
- ),
- (AdvertisingData.COMPLETE_LOCAL_NAME, 'Bumble'.encode()),
- (
- AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
- bytes(GATT_HEART_RATE_SERVICE),
- ),
- ]
+ service_uuids_16 = []
+ service_uuids_32 = []
+ service_uuids_128 = []
+ if advertise_service_uuids:
+ for uuid in advertise_service_uuids:
+ uuid = uuid.replace("-", "")
+ if len(uuid) == 4:
+ service_uuids_16.append(UUID(uuid))
+ elif len(uuid) == 8:
+ service_uuids_32.append(UUID(uuid))
+ elif len(uuid) == 32:
+ service_uuids_128.append(UUID(uuid))
+ else:
+ print(color('Invalid UUID format', 'red'))
+ return
+ else:
+ service_uuids_16.append(GATT_HEART_RATE_SERVICE)
+
+ flags = AdvertisingData.Flags.LE_LIMITED_DISCOVERABLE_MODE
+ if mode == 'le':
+ flags |= AdvertisingData.Flags.BR_EDR_NOT_SUPPORTED
+ if mode == 'dual':
+ flags |= AdvertisingData.Flags.SIMULTANEOUS_LE_BR_EDR_CAPABLE
+
+ ad_structs = [
+ (
+ AdvertisingData.FLAGS,
+ bytes([flags]),
+ ),
+ (AdvertisingData.COMPLETE_LOCAL_NAME, 'Bumble'.encode()),
+ ]
+ if service_uuids_16:
+ ad_structs.append(
+ (
+ AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
+ b"".join(bytes(uuid) for uuid in service_uuids_16),
+ )
)
+ if service_uuids_32:
+ ad_structs.append(
+ (
+ AdvertisingData.INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS,
+ b"".join(bytes(uuid) for uuid in service_uuids_32),
+ )
+ )
+ if service_uuids_128:
+ ad_structs.append(
+ (
+ AdvertisingData.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
+ b"".join(bytes(uuid) for uuid in service_uuids_128),
+ )
+ )
+
+ if advertise_appearance:
+ advertise_appearance = advertise_appearance.upper()
+ try:
+ advertise_appearance_int = int(advertise_appearance)
+ except ValueError:
+ category, subcategory = advertise_appearance.split('/')
+ try:
+ category_enum = Appearance.Category[category]
+ except ValueError:
+ print(
+ color(f'Invalid appearance category {category}', 'red')
+ )
+ return
+ subcategory_class = Appearance.SUBCATEGORY_CLASSES[
+ category_enum
+ ]
+ try:
+ subcategory_enum = subcategory_class[subcategory]
+ except ValueError:
+ print(color(f'Invalid subcategory {subcategory}', 'red'))
+ return
+ advertise_appearance_int = int(
+ Appearance(category_enum, subcategory_enum)
+ )
+ ad_structs.append(
+ (
+ AdvertisingData.APPEARANCE,
+ struct.pack('<H', advertise_appearance_int),
+ )
+ )
+ device.advertising_data = bytes(AdvertisingData(ad_structs))
+ await device.start_advertising(
+ auto_restart=True,
+ own_address_type=(
+ OwnAddressType.PUBLIC
+ if advertising_address == 'public'
+ else OwnAddressType.RANDOM
+ ),
)
- await device.start_advertising(auto_restart=True)
- else:
+
+ if mode in ('classic', 'dual'):
# Become discoverable and connectable
await device.set_discoverable(True)
await device.set_connectable(True)
+ print(color('Ready for connections on', 'blue'), device.public_address)
# Run until the user asks to exit
await Waiter.instance.wait_until_terminated()
@@ -472,7 +573,10 @@
# -----------------------------------------------------------------------------
@click.command()
@click.option(
- '--mode', type=click.Choice(['le', 'classic']), default='le', show_default=True
+ '--mode',
+ type=click.Choice(['le', 'classic', 'dual']),
+ default='le',
+ show_default=True,
)
@click.option(
'--sc',
@@ -495,6 +599,10 @@
show_default=True,
)
@click.option(
+ '--advertising-address',
+ type=click.Choice(['random', 'public']),
+)
+@click.option(
'--identity-address',
type=click.Choice(['random', 'public']),
)
@@ -522,9 +630,20 @@
@click.option('--print-keys', is_flag=True, help='Print the bond keys before pairing')
@click.option(
'--keystore-file',
- metavar='<filename>',
+ metavar='FILENAME',
help='File in which to store the pairing keys',
)
+@click.option(
+ '--advertise-service-uuid',
+ metavar="UUID",
+ multiple=True,
+ help="Advertise a GATT service UUID (may be specified more than once)",
+)
+@click.option(
+ '--advertise-appearance',
+ metavar='APPEARANCE',
+ help='Advertise an Appearance ID (int value or string)',
+)
@click.argument('device-config')
@click.argument('hci_transport')
@click.argument('address-or-name', required=False)
@@ -534,6 +653,7 @@
mitm,
bond,
ctkd,
+ advertising_address,
identity_address,
linger,
io,
@@ -542,6 +662,8 @@
request,
print_keys,
keystore_file,
+ advertise_service_uuid,
+ advertise_appearance,
device_config,
hci_transport,
address_or_name,
@@ -560,6 +682,7 @@
mitm,
bond,
ctkd,
+ advertising_address,
identity_address,
linger,
io,
@@ -568,6 +691,8 @@
request,
print_keys,
keystore_file,
+ advertise_service_uuid,
+ advertise_appearance,
device_config,
hci_transport,
address_or_name,
diff --git a/bumble/core.py b/bumble/core.py
index f10b9a6..eee7944 100644
--- a/bumble/core.py
+++ b/bumble/core.py
@@ -809,7 +809,7 @@
STICK_PC = 0x0F
class WatchSubcategory(utils.OpenIntEnum):
- GENENERIC_WATCH = 0x00
+ GENERIC_WATCH = 0x00
SPORTS_WATCH = 0x01
SMARTWATCH = 0x02
@@ -1127,7 +1127,7 @@
TURNTABLE = 0x05
CD_PLAYER = 0x06
DVD_PLAYER = 0x07
- BLUERAY_PLAYER = 0x08
+ BLURAY_PLAYER = 0x08
OPTICAL_DISC_PLAYER = 0x09
SET_TOP_BOX = 0x0A
@@ -1351,6 +1351,12 @@
THREE_D_INFORMATION_DATA = 0x3D
MANUFACTURER_SPECIFIC_DATA = 0xFF
+ class Flags(enum.IntFlag):
+ LE_LIMITED_DISCOVERABLE_MODE = 1 << 0
+ LE_GENERAL_DISCOVERABLE_MODE = 1 << 1
+ BR_EDR_NOT_SUPPORTED = 1 << 2
+ SIMULTANEOUS_LE_BR_EDR_CAPABLE = 1 << 3
+
# For backward-compatibility
FLAGS = Type.FLAGS
INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS = Type.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS
diff --git a/bumble/device.py b/bumble/device.py
index f9c032b..8c1d477 100644
--- a/bumble/device.py
+++ b/bumble/device.py
@@ -1586,6 +1586,7 @@
peer_le_features: Optional[hci.LeFeatureMask]
role: hci.Role
encryption: int
+ encryption_key_size: int
authenticated: bool
sc: bool
link_key_type: Optional[int]
@@ -1688,6 +1689,7 @@
self.role = role
self.parameters = parameters
self.encryption = 0
+ self.encryption_key_size = 0
self.authenticated = False
self.sc = False
self.link_key_type = None
@@ -5057,6 +5059,15 @@
# [Classic only]
@host_event_handler
def on_link_key(self, bd_addr, link_key, key_type):
+ authenticated = key_type in (
+ hci.HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE,
+ hci.HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
+ )
+ pairing_keys = PairingKeys()
+ pairing_keys.link_key = PairingKeys.Key(
+ value=link_key, authenticated=authenticated
+ )
+
# Store the keys in the key store
if self.keystore:
authenticated = key_type in (
@@ -5076,6 +5087,7 @@
bd_addr, transport=PhysicalTransport.BR_EDR
):
connection.link_key_type = key_type
+ connection.emit('pairing', pairing_keys)
def add_service(self, service):
self.gatt_server.add_service(service)
@@ -5813,8 +5825,13 @@
pairing_config = self.pairing_config_factory(connection)
# Show the passkey to the user
+<<<<<<< HEAD
utils.cancel_on_event(
connection, 'disconnection', pairing_config.delegate.display_number(passkey)
+=======
+ connection.abort_on(
+ 'disconnection', pairing_config.delegate.display_number(passkey, digits=6)
+>>>>>>> fdf90c6 (add LE advertisement and HR service)
)
# [Classic only]
@@ -5950,13 +5967,17 @@
@host_event_handler
@with_connection_from_handle
- def on_connection_encryption_change(self, connection, encryption):
+ def on_connection_encryption_change(
+ self, connection, encryption, encryption_key_size
+ ):
logger.debug(
f'*** Connection Encryption Change: [0x{connection.handle:04X}] '
f'{connection.peer_address} as {connection.role_name}, '
- f'encryption={encryption}'
+ f'encryption={encryption}, '
+ f'key_size={encryption_key_size}'
)
connection.encryption = encryption
+ connection.encryption_key_size = encryption_key_size
if (
not connection.authenticated
and connection.transport == PhysicalTransport.BR_EDR
diff --git a/bumble/hci.py b/bumble/hci.py
index 2ab46e4..6943c1c 100644
--- a/bumble/hci.py
+++ b/bumble/hci.py
@@ -225,6 +225,7 @@
HCI_INQUIRY_RESPONSE_NOTIFICATION_EVENT = 0X56
HCI_AUTHENTICATED_PAYLOAD_TIMEOUT_EXPIRED_EVENT = 0X57
HCI_SAM_STATUS_CHANGE_EVENT = 0X58
+HCI_ENCRYPTION_CHANGE_V2_EVENT = 0x59
HCI_VENDOR_EVENT = 0xFF
@@ -3364,6 +3365,20 @@
See Bluetooth spec @ 7.3.69 Set Event Mask Page 2 Command
'''
+ @staticmethod
+ def mask(event_codes: Iterable[int]) -> bytes:
+ '''
+ Compute the event mask value for a list of events.
+ '''
+ # NOTE: this implementation takes advantage of the fact that as of version 6.0
+ # of the core specification, the bit number for each event code is equal to 64
+ # less than the event code.
+ # If future versions of the specification deviate from that, a different
+ # implementation would be needed.
+ return sum((1 << event_code - 64) for event_code in event_codes).to_bytes(
+ 8, 'little'
+ )
+
# -----------------------------------------------------------------------------
@HCI_Command.command(
@@ -6979,6 +6994,30 @@
# -----------------------------------------------------------------------------
@HCI_Event.event(
+ [
+ ('status', STATUS_SPEC),
+ ('connection_handle', 2),
+ (
+ 'encryption_enabled',
+ {
+ 'size': 1,
+ # pylint: disable-next=unnecessary-lambda
+ 'mapper': lambda x: HCI_Encryption_Change_Event.encryption_enabled_name(
+ x
+ ),
+ },
+ ),
+ ('encryption_key_size', 1),
+ ]
+)
+class HCI_Encryption_Change_V2_Event(HCI_Event):
+ '''
+ See Bluetooth spec @ 7.7.8 Encryption Change Event
+ '''
+
+
+# -----------------------------------------------------------------------------
+@HCI_Event.event(
[('status', STATUS_SPEC), ('connection_handle', 2), ('lmp_features', 8)]
)
class HCI_Read_Remote_Supported_Features_Complete_Event(HCI_Event):
diff --git a/bumble/host.py b/bumble/host.py
index 183c5a7..755732a 100644
--- a/bumble/host.py
+++ b/bumble/host.py
@@ -435,6 +435,14 @@
)
)
)
+ if self.supports_command(hci.HCI_SET_EVENT_MASK_PAGE_2_COMMAND):
+ await self.send_command(
+ hci.HCI_Set_Event_Mask_Page_2_Command(
+ event_mask_page_2=hci.HCI_Set_Event_Mask_Page_2_Command.mask(
+ [hci.HCI_ENCRYPTION_CHANGE_V2_EVENT]
+ )
+ )
+ )
if (
self.local_version is not None
@@ -1384,6 +1392,21 @@
'connection_encryption_change',
event.connection_handle,
event.encryption_enabled,
+ 0,
+ )
+ else:
+ self.emit(
+ 'connection_encryption_failure', event.connection_handle, event.status
+ )
+
+ def on_hci_encryption_change_v2_event(self, event):
+ # Notify the client
+ if event.status == hci.HCI_SUCCESS:
+ self.emit(
+ 'connection_encryption_change',
+ event.connection_handle,
+ event.encryption_enabled,
+ event.encryption_key_size,
)
else:
self.emit(
diff --git a/examples/run_a2dp_sink.py b/examples/run_a2dp_sink.py
index a3bcb29..f5d337c 100644
--- a/examples/run_a2dp_sink.py
+++ b/examples/run_a2dp_sink.py
@@ -33,12 +33,6 @@
from bumble.a2dp import (
make_audio_sink_service_sdp_records,
A2DP_SBC_CODEC_TYPE,
- SBC_MONO_CHANNEL_MODE,
- SBC_DUAL_CHANNEL_MODE,
- SBC_SNR_ALLOCATION_METHOD,
- SBC_LOUDNESS_ALLOCATION_METHOD,
- SBC_STEREO_CHANNEL_MODE,
- SBC_JOINT_STEREO_CHANNEL_MODE,
SbcMediaCodecInformation,
)