Add Metadata LTV serializer and adapt Unicast
diff --git a/bumble/profiles/bap.py b/bumble/profiles/bap.py
index 188ca27..117e95e 100644
--- a/bumble/profiles/bap.py
+++ b/bumble/profiles/bap.py
@@ -685,10 +685,11 @@
@dataclasses.dataclass
class PacRecord:
+ '''Published Audio Capabilities Service, Table 3.2/3.4.'''
+
coding_format: hci.CodingFormat
codec_specific_capabilities: Union[CodecSpecificCapabilities, bytes]
- # TODO: Parse Metadata
- metadata: bytes = b''
+ metadata: le_audio.Metadata = dataclasses.field(default_factory=le_audio.Metadata)
@classmethod
def from_bytes(cls, data: bytes) -> PacRecord:
@@ -701,7 +702,8 @@
]
offset += codec_specific_capabilities_size
metadata_size = data[offset]
- metadata = data[offset : offset + metadata_size]
+ offset += 1
+ metadata = le_audio.Metadata.from_bytes(data[offset : offset + metadata_size])
codec_specific_capabilities: Union[CodecSpecificCapabilities, bytes]
if coding_format.codec_id == hci.CodecID.VENDOR_SPECIFIC:
@@ -719,12 +721,13 @@
def __bytes__(self) -> bytes:
capabilities_bytes = bytes(self.codec_specific_capabilities)
+ metadata_bytes = bytes(self.metadata)
return (
bytes(self.coding_format)
+ bytes([len(capabilities_bytes)])
+ capabilities_bytes
- + bytes([len(self.metadata)])
- + self.metadata
+ + bytes([len(metadata_bytes)])
+ + metadata_bytes
)
@@ -940,8 +943,7 @@
presentation_delay = 0
# Additional parameters in ENABLING, STREAMING, DISABLING State
- # TODO: Parse this
- metadata = b''
+ metadata = le_audio.Metadata()
def __init__(
self,
@@ -1088,7 +1090,7 @@
AseReasonCode.NONE,
)
- self.metadata = metadata
+ self.metadata = le_audio.Metadata.from_bytes(metadata)
self.state = self.State.ENABLING
return (AseResponseCode.SUCCESS, AseReasonCode.NONE)
@@ -1140,7 +1142,7 @@
AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION,
AseReasonCode.NONE,
)
- self.metadata = metadata
+ self.metadata = le_audio.Metadata.from_bytes(metadata)
return (AseResponseCode.SUCCESS, AseReasonCode.NONE)
def on_release(self) -> Tuple[AseResponseCode, AseReasonCode]:
@@ -1217,8 +1219,9 @@
self.State.STREAMING,
self.State.DISABLING,
):
+ metadata_bytes = bytes(self.metadata)
additional_parameters = (
- bytes([self.cig_id, self.cis_id, len(self.metadata)]) + self.metadata
+ bytes([self.cig_id, self.cis_id, len(metadata_bytes)]) + metadata_bytes
)
else:
additional_parameters = b''
diff --git a/bumble/profiles/le_audio.py b/bumble/profiles/le_audio.py
index 8d1e629..b152fd9 100644
--- a/bumble/profiles/le_audio.py
+++ b/bumble/profiles/le_audio.py
@@ -17,33 +17,67 @@
# -----------------------------------------------------------------------------
from __future__ import annotations
import dataclasses
-from typing import List
+import struct
+from typing import List, Type
from typing_extensions import Self
+from bumble import utils
+
# -----------------------------------------------------------------------------
# Classes
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class Metadata:
+ '''Bluetooth Assigned Numbers, Section 6.12.6 - Metadata LTV structures.
+
+ As Metadata fields may extend, and Spec doesn't forbid duplication, we don't parse
+ Metadata into a key-value style dataclass here. Rather, we encourage users to parse
+ again outside the lib.
+ '''
+
+ class Tag(utils.OpenIntEnum):
+ # fmt: off
+ PREFERRED_AUDIO_CONTEXTS = 0x01
+ STREAMING_AUDIO_CONTEXTS = 0x02
+ PROGRAM_INFO = 0x03
+ LANGUAGE = 0x04
+ CCID_LIST = 0x05
+ PARENTAL_RATING = 0x06
+ PROGRAM_INFO_URI = 0x07
+ AUDIO_ACTIVE_STATE = 0x08
+ BROADCAST_AUDIO_IMMEDIATE_RENDERING_FLAG = 0x09
+ ASSISTED_LISTENING_STREAM = 0x0A
+ BROADCAST_NAME = 0x0B
+ EXTENDED_METADATA = 0xFE
+ VENDOR_SPECIFIC = 0xFF
+
@dataclasses.dataclass
class Entry:
- tag: int
+ tag: Metadata.Tag
data: bytes
- entries: List[Entry]
+ @classmethod
+ def from_bytes(cls: Type[Self], data: bytes) -> Self:
+ return cls(tag=Metadata.Tag(data[0]), data=data[1:])
+
+ def __bytes__(self) -> bytes:
+ return bytes([len(self.data) + 1, self.tag]) + self.data
+
+ entries: List[Entry] = dataclasses.field(default_factory=list)
@classmethod
- def from_bytes(cls, data: bytes) -> Self:
+ def from_bytes(cls: Type[Self], data: bytes) -> Self:
entries = []
offset = 0
length = len(data)
- while length >= 2:
+ while offset < length:
entry_length = data[offset]
- entry_tag = data[offset + 1]
- entry_data = data[offset + 2 : offset + 2 + entry_length - 1]
- entries.append(cls.Entry(entry_tag, entry_data))
- length -= entry_length
+ offset += 1
+ entries.append(cls.Entry.from_bytes(data[offset : offset + entry_length]))
offset += entry_length
return cls(entries)
+
+ def __bytes__(self) -> bytes:
+ return b''.join([bytes(entry) for entry in self.entries])
diff --git a/tests/bap_test.py b/tests/bap_test.py
index 0b6db1a..e276790 100644
--- a/tests/bap_test.py
+++ b/tests/bap_test.py
@@ -48,6 +48,7 @@
PublishedAudioCapabilitiesService,
PublishedAudioCapabilitiesServiceProxy,
)
+from bumble.profiles.le_audio import Metadata
from tests.test_utils import TwoDevices
@@ -97,7 +98,7 @@
pac_record = PacRecord(
coding_format=CodingFormat(CodecID.LC3),
codec_specific_capabilities=cap,
- metadata=b'',
+ metadata=Metadata([Metadata.Entry(tag=Metadata.Tag.VENDOR_SPECIFIC, data=b'')]),
)
assert PacRecord.from_bytes(bytes(pac_record)) == pac_record
@@ -142,7 +143,7 @@
def test_ASE_Enable() -> None:
operation = ASE_Enable(
ase_id=[1, 2],
- metadata=[b'foo', b'bar'],
+ metadata=[b'', b''],
)
basic_check(operation)
@@ -151,7 +152,7 @@
def test_ASE_Update_Metadata() -> None:
operation = ASE_Update_Metadata(
ase_id=[1, 2],
- metadata=[b'foo', b'bar'],
+ metadata=[b'', b''],
)
basic_check(operation)
diff --git a/tests/le_audio_test.py b/tests/le_audio_test.py
new file mode 100644
index 0000000..264a96d
--- /dev/null
+++ b/tests/le_audio_test.py
@@ -0,0 +1,39 @@
+# Copyright 2021-2024 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# -----------------------------------------------------------------------------
+# Imports
+# -----------------------------------------------------------------------------
+from bumble.profiles import le_audio
+
+
+def test_parse_metadata():
+ metadata = le_audio.Metadata(
+ entries=[
+ le_audio.Metadata.Entry(
+ tag=le_audio.Metadata.Tag.PROGRAM_INFO,
+ data=b'',
+ ),
+ le_audio.Metadata.Entry(
+ tag=le_audio.Metadata.Tag.STREAMING_AUDIO_CONTEXTS,
+ data=bytes([0, 0]),
+ ),
+ le_audio.Metadata.Entry(
+ tag=le_audio.Metadata.Tag.PREFERRED_AUDIO_CONTEXTS,
+ data=bytes([1, 2]),
+ ),
+ ]
+ )
+
+ assert le_audio.Metadata.from_bytes(bytes(metadata)) == metadata