blob: 6c41df129b9f46eaac5f8f8c8423b1b9db9156b8 [file] [log] [blame]
#!/usr/bin/env python
# Copyright 2021 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
#
#
# This script extracts LE Audio audio data from btsnoop.
# Generates a audio dump file where each frame consists of a two-byte frame
# length information and the coded frame
#
# Audio File Name Format:
# [Context]_sf[Sample frequency]_fd[Frame duration]_[Channel allocation]_
# frame[Octets per frame]_[Stream start timestamp]_[Direction].bin
#
#
# Usage:
# ./dump_le_audio.py BTSNOOP.cfa
#
# -v, --verbose to enable the verbose log
# --header, Add the header for LC3 Conformance Interoperability Test Software V.1.0.3 from LC3 test specification.
#
# NOTE:
# Please make sure you HCI Snoop data file includes the following frames:
# GATT service discovery for "ASE Control Point" chracteristic
# GATT config codec via ASE Control Point
# HCI create CIS to point out the "Start stream", and the data frames.
# HCI remove ISO data path would trigger dump audio data once
# After all hci packet parse finished, would dump all remain audio data as well
#
# Correspondsing Spec.
# ASCS_1.0
# PACS_1.0
# BAP_1.0
# LC3.TS V1.0.3
#
from collections import defaultdict
from os import X_OK
import argparse
import struct
import sys
import time
BTSNOOP_FILE_NAME = ""
BTSNOOP_HEADER = b'btsnoop\x00\x00\x00\x00\x01\x00\x00\x03\xea'
COMMADN_PACKET = 1
ACL_PACKET = 2
SCO_PACKET = 3
EVENT_PACKET = 4
ISO_PACKET = 5
SENT = 0
RECEIVED = 1
L2CAP_ATT_CID = 4
# opcode for att protocol
OPCODE_ATT_READ_BY_TYPE_RSP = 0x09
OPCODE_ATT_WRITE_CMD = 0x52
UUID_ASE_CONTROL_POINT = 0x2BC6
# opcode for ase control
OPCODE_CONFIG_CODEC = 0x01
OPCODE_ENABLE = 0x03
OPCODE_RELEASE = 0x08
# opcode for hci command
OPCODE_HCI_CREATE_CIS = 0x2064
OPCODE_REMOVE_ISO_DATA_PATH = 0x206F
TYPE_STREAMING_AUDIO_CONTEXTS = 0x02
TYPE_SAMPLING_FREQUENCIES = 0x01
TYPE_FRAME_DURATION = 0x02
TYPE_CHANNEL_ALLOCATION = 0x03
TYPE_OCTETS_PER_FRAME = 0x04
CONTEXT_TYPE_CONVERSATIONAL = 0x0002
CONTEXT_TYPE_MEDIA = 0x0004
CONTEXT_TYPE_RINGTONE = 0x0200
# sample frequency
SAMPLE_FREQUENCY_8000 = 0x01
SAMPLE_FREQUENCY_11025 = 0x02
SAMPLE_FREQUENCY_16000 = 0x03
SAMPLE_FREQUENCY_22050 = 0x04
SAMPLE_FREQUENCY_24000 = 0x05
SAMPLE_FREQUENCY_32000 = 0x06
SAMPLE_FREQUENCY_44100 = 0x07
SAMPLE_FREQUENCY_48000 = 0x08
SAMPLE_FREQUENCY_88200 = 0x09
SAMPLE_FREQUENCY_96000 = 0x0a
SAMPLE_FREQUENCY_176400 = 0x0b
SAMPLE_FREQUENCY_192000 = 0x0c
SAMPLE_FREQUENCY_384000 = 0x0d
FRAME_DURATION_7_5 = 0x00
FRAME_DURATION_10 = 0x01
AUDIO_LOCATION_MONO = 0x00
AUDIO_LOCATION_LEFT = 0x01
AUDIO_LOCATION_RIGHT = 0x02
AUDIO_LOCATION_CENTER = 0x04
packet_number = 0
debug_enable = False
add_header = False
class Connection:
def __init__(self):
self.ase_handle = 0xFFFF
self.number_of_ases = 0
self.ase = defaultdict(AseStream)
self.context = 0xFFFF
self.cis_handle = 0xFFFF
self.input_dump = []
self.output_dump = []
self.start_time = 0xFFFFFFFF
def dump(self):
print("start_time: " + str(self.start_time))
print("ase_handle: " + str(self.ase_handle))
print("context type: " + str(self.context))
print("number_of_ases: " + str(self.number_of_ases))
print("cis_handle: " + str(self.cis_handle))
for id, ase_stream in self.ase.items():
print("ase id: " + str(id))
ase_stream.dump()
class AseStream:
def __init__(self):
self.sampling_frequencies = 0xFF
self.frame_duration = 0xFF
self.channel_allocation = 0xFFFFFFFF
self.octets_per_frame = 0xFFFF
def dump(self):
print("sampling_frequencies: " + str(self.sampling_frequencies))
print("frame_duration: " + str(self.frame_duration))
print("channel_allocation: " + str(self.channel_allocation))
print("octets_per_frame: " + str(self.octets_per_frame))
connection_map = defaultdict(Connection)
cis_acl_map = defaultdict(int)
def generate_header(file, connection):
header = bytearray.fromhex('1ccc1200')
for ase in connection.ase.values():
sf_case = {
SAMPLE_FREQUENCY_8000: 80,
SAMPLE_FREQUENCY_11025: 110,
SAMPLE_FREQUENCY_16000: 160,
SAMPLE_FREQUENCY_22050: 220,
SAMPLE_FREQUENCY_24000: 240,
SAMPLE_FREQUENCY_32000: 320,
SAMPLE_FREQUENCY_44100: 441,
SAMPLE_FREQUENCY_48000: 480,
SAMPLE_FREQUENCY_88200: 882,
SAMPLE_FREQUENCY_96000: 960,
SAMPLE_FREQUENCY_176400: 1764,
SAMPLE_FREQUENCY_192000: 1920,
SAMPLE_FREQUENCY_384000: 2840,
}
header = header + struct.pack("<H", sf_case[ase.sampling_frequencies])
fd_case = {FRAME_DURATION_7_5: 7.5, FRAME_DURATION_10: 10}
header = header + struct.pack("<H", ase.octets_per_frame * 8 * 10 / fd_case[ase.frame_duration])
al_case = {AUDIO_LOCATION_MONO: 1, AUDIO_LOCATION_LEFT: 1, AUDIO_LOCATION_RIGHT: 1, AUDIO_LOCATION_CENTER: 2}
header = header + struct.pack("<HHHL", al_case[ase.channel_allocation], fd_case[ase.frame_duration] * 100, 0,
48000000)
break
file.write(header)
def parse_codec_information(connection_handle, ase_id, packet):
length, packet = unpack_data(packet, 1, False)
if len(packet) < length:
debug_print("Invalid codec configuration length")
return packet
ase = connection_map[connection_handle].ase[ase_id]
while length > 0:
config_length, packet = unpack_data(packet, 1, False)
config_type, packet = unpack_data(packet, 1, False)
value, packet = unpack_data(packet, config_length - 1, False)
if config_type == TYPE_SAMPLING_FREQUENCIES:
ase.sampling_frequencies = value
elif config_type == TYPE_FRAME_DURATION:
ase.frame_duration = value
elif config_type == TYPE_CHANNEL_ALLOCATION:
ase.channel_allocation = value
elif TYPE_OCTETS_PER_FRAME:
ase.octets_per_frame = value
length -= (config_length + 1)
return packet
def parse_att_read_by_type_rsp(packet, connection_handle):
length, packet = unpack_data(packet, 1, False)
if length > len(packet):
debug_print("Invalid att packet length")
return
attribute_handle, packet = unpack_data(packet, 2, False)
if debug_enable:
debug_print("attribute_handle - " + str(attribute_handle))
packet = unpack_data(packet, 1, True)
value_handle, packet = unpack_data(packet, 2, False)
characteristic_uuid, packet = unpack_data(packet, 2, False)
if characteristic_uuid == UUID_ASE_CONTROL_POINT:
debug_print("ASE Control point found!")
connection_map[connection_handle].ase_handle = value_handle
def parse_att_write_cmd(packet, connection_handle, timestamp):
attribute_handle, packet = unpack_data(packet, 2, False)
if connection_map[connection_handle].ase_handle == attribute_handle:
if debug_enable:
debug_print("Action with ASE Control point")
opcode, packet = unpack_data(packet, 1, False)
if opcode == OPCODE_CONFIG_CODEC:
debug_print("config_codec")
(connection_map[connection_handle].number_of_ases, packet) = unpack_data(packet, 1, False)
for i in range(connection_map[connection_handle].number_of_ases):
ase_id, packet = unpack_data(packet, 1, False)
# ignore target_latency, target_phy, codec_id
packet = unpack_data(packet, 7, True)
packet = parse_codec_information(connection_handle, ase_id, packet)
elif opcode == OPCODE_ENABLE:
debug_print("enable")
numbers_of_ases, packet = unpack_data(packet, 1, False)
for i in range(numbers_of_ases):
ase_id, packet = unpack_data(packet, 1, False)
metadata_length, packet = unpack_data(packet, 1, False)
if metadata_length > len(packet):
debug_print("Invalid metadata length")
return
length, packet = unpack_data(packet, 1, False)
if length > len(packet):
debug_print("Invalid metadata value length")
return
metadata_type, packet = unpack_data(packet, 1, False)
if metadata_type == TYPE_STREAMING_AUDIO_CONTEXTS:
(connection_map[connection_handle].context, packet) = unpack_data(packet, 2, False)
break
connection_map[connection_handle].start_time = timestamp
if debug_enable:
connection_map[connection_handle].dump()
def parse_att_packet(packet, connection_handle, flags, timestamp):
opcode, packet = unpack_data(packet, 1, False)
packet_handle = {
(OPCODE_ATT_READ_BY_TYPE_RSP, RECEIVED): (lambda x, y, z: parse_att_read_by_type_rsp(x, y)),
(OPCODE_ATT_WRITE_CMD, SENT): (lambda x, y, z: parse_att_write_cmd(x, y, z))
}
packet_handle.get((opcode, flags), lambda x, y, z: None)(packet, connection_handle, timestamp)
def debug_print(log):
global packet_number
print("#" + str(packet_number) + ": " + log)
def unpack_data(data, byte, ignore):
if ignore:
return data[byte:]
value = 0
if byte == 1:
value = struct.unpack("<B", data[:byte])[0]
elif byte == 2:
value = struct.unpack("<H", data[:byte])[0]
elif byte == 4:
value = struct.unpack("<I", data[:byte])[0]
return value, data[byte:]
def parse_command_packet(packet):
opcode, packet = unpack_data(packet, 2, False)
if opcode == OPCODE_HCI_CREATE_CIS:
debug_print("OPCODE_HCI_CREATE_CIS")
length, packet = unpack_data(packet, 1, False)
if length != len(packet):
debug_print("Invalid cmd length")
return
cis_count, packet = unpack_data(packet, 1, False)
for i in range(cis_count):
cis_handle, packet = unpack_data(packet, 2, False)
cis_handle &= 0x0EFF
acl_handle, packet = unpack_data(packet, 2, False)
connection_map[acl_handle].cis_handle = cis_handle
cis_acl_map[cis_handle] = acl_handle
if debug_enable:
connection_map[acl_handle].dump()
elif opcode == OPCODE_REMOVE_ISO_DATA_PATH:
debug_print("OPCODE_REMOVE_ISO_DATA_PATH")
length, packet = unpack_data(packet, 1, False)
if length != len(packet):
debug_print("Invalid cmd length")
return
cis_handle, packet = unpack_data(packet, 2, False)
acl_handle = cis_acl_map[cis_handle]
dump_audio_data_to_file(acl_handle)
def convert_time_str(timestamp):
"""This function converts time to string format."""
timestamp_sec = float(timestamp) / 1000000
local_timestamp = time.localtime(timestamp_sec)
ms = timestamp_sec - long(timestamp_sec)
ms_str = "{0:06}".format(int(round(ms * 1000000)))
str_format = time.strftime("%m_%d__%H_%M_%S", local_timestamp)
full_str_format = str_format + "_" + ms_str
return full_str_format
def dump_audio_data_to_file(acl_handle):
if debug_enable:
connection_map[acl_handle].dump()
file_name = ""
context_case = {
CONTEXT_TYPE_CONVERSATIONAL: "Conversational",
CONTEXT_TYPE_MEDIA: "Media",
CONTEXT_TYPE_RINGTONE: "Ringtone"
}
file_name += context_case.get(connection_map[acl_handle].context, "Unknown")
for ase in connection_map[acl_handle].ase.values():
sf_case = {
SAMPLE_FREQUENCY_8000: "8000",
SAMPLE_FREQUENCY_11025: "11025",
SAMPLE_FREQUENCY_16000: "16000",
SAMPLE_FREQUENCY_22050: "22050",
SAMPLE_FREQUENCY_24000: "24000",
SAMPLE_FREQUENCY_32000: "32000",
SAMPLE_FREQUENCY_44100: "44100",
SAMPLE_FREQUENCY_48000: "48000",
SAMPLE_FREQUENCY_88200: "88200",
SAMPLE_FREQUENCY_96000: "96000",
SAMPLE_FREQUENCY_176400: "176400",
SAMPLE_FREQUENCY_192000: "192000",
SAMPLE_FREQUENCY_384000: "284000"
}
file_name += ("_sf" + sf_case[ase.sampling_frequencies])
fd_case = {FRAME_DURATION_7_5: "7_5", FRAME_DURATION_10: "10"}
file_name += ("_fd" + fd_case[ase.frame_duration])
al_case = {
AUDIO_LOCATION_MONO: "mono",
AUDIO_LOCATION_LEFT: "left",
AUDIO_LOCATION_RIGHT: "right",
AUDIO_LOCATION_CENTER: "center"
}
file_name += ("_" + al_case[ase.channel_allocation])
file_name += ("_frame" + str(ase.octets_per_frame))
file_name += ("_" + convert_time_str(connection_map[acl_handle].start_time))
break
if connection_map[acl_handle].input_dump != []:
debug_print("Dump input...")
f = open(file_name + "_input.bin", 'wb')
if add_header == True:
generate_header(f, connection_map[acl_handle])
arr = bytearray(connection_map[acl_handle].input_dump)
f.write(arr)
f.close()
connection_map[acl_handle].input_dump = []
if connection_map[acl_handle].output_dump != []:
debug_print("Dump output...")
f = open(file_name + "_output.bin", 'wb')
if add_header == True:
generate_header(f, connection_map[acl_handle])
arr = bytearray(connection_map[acl_handle].output_dump)
f.write(arr)
f.close()
connection_map[acl_handle].output_dump = []
return
def parse_acl_packet(packet, flags, timestamp):
# Check the minimum acl length, HCI leader (4 bytes)
# + L2CAP header (4 bytes)
if len(packet) < 8:
debug_print("Invalid acl data length.")
return
connection_handle, packet = unpack_data(packet, 2, False)
connection_handle = connection_handle & 0x0FFF
if connection_handle > 0x0EFF:
debug_print("Invalid packet handle, skip")
return
total_length, packet = unpack_data(packet, 2, False)
if total_length != len(packet):
debug_print("Invalid total length, skip")
return
pdu_length, packet = unpack_data(packet, 2, False)
channel_id, packet = unpack_data(packet, 2, False)
if pdu_length != len(packet):
debug_print("Invalid pdu length, skip")
return
if debug_enable:
debug_print("ACL connection_handle - " + str(connection_handle))
# Parse ATT protocol
if channel_id == L2CAP_ATT_CID:
parse_att_packet(packet, connection_handle, flags, timestamp)
def parse_iso_packet(packet, flags):
cis_handle, packet = unpack_data(packet, 2, False)
cis_handle &= 0x0EFF
iso_data_load_length, packet = unpack_data(packet, 2, False)
if iso_data_load_length != len(packet):
debug_print("Invalid iso data load length")
return
# Ignore timestamp, sequence number
packet = unpack_data(packet, 6, True)
iso_sdu_length, packet = unpack_data(packet, 2, False)
if iso_sdu_length != len(packet):
debug_print("Invalid iso sdu length")
return
acl_handle = cis_acl_map[cis_handle]
if flags == SENT:
connection_map[acl_handle].output_dump.extend(struct.pack("<H", len(packet)))
connection_map[acl_handle].output_dump.extend(list(packet))
elif flags == RECEIVED:
connection_map[acl_handle].input_dump.extend(struct.pack("<H", len(packet)))
connection_map[acl_handle].input_dump.extend(list(packet))
def parse_next_packet(btsnoop_file):
global packet_number
packet_number += 1
packet_header = btsnoop_file.read(25)
if len(packet_header) != 25:
return False
(length_original, length_captured, flags, dropped_packets, timestamp, type) = struct.unpack(
">IIIIqB", packet_header)
if length_original != length_captured:
debug_print("Filtered btnsoop, can not be parsed")
return False
packet = btsnoop_file.read(length_captured - 1)
if len(packet) != length_original - 1:
debug_print("Invalid packet length!")
return False
if dropped_packets:
debug_print("Invalid droped value")
return False
packet_handle = {
COMMADN_PACKET: (lambda x, y, z: parse_command_packet(x)),
ACL_PACKET: (lambda x, y, z: parse_acl_packet(x, y, z)),
SCO_PACKET: (lambda x, y, z: None),
EVENT_PACKET: (lambda x, y, z: None),
ISO_PACKET: (lambda x, y, z: parse_iso_packet(x, y))
}
packet_handle.get(type, lambda x, y, z: None)(packet, flags, timestamp)
return True
def main():
parser = argparse.ArgumentParser()
parser.add_argument("btsnoop_file", help="btsnoop file contains LE audio start procedure")
parser.add_argument("-v", "--verbose", help="Enable verbose log.", action="store_true")
parser.add_argument(
"--header",
help="Add the header for LC3 Conformance Interoperability Test Software V.1.0.3.",
action="store_true")
argv = parser.parse_args()
BTSNOOP_FILE_NAME = argv.btsnoop_file
global debug_enable
global add_header
if argv.verbose:
debug_enable = True
if argv.header:
add_header = True
with open(BTSNOOP_FILE_NAME, "rb") as btsnoop_file:
if btsnoop_file.read(16) != BTSNOOP_HEADER:
print("Invalid btsnoop header")
exit(1)
while True:
if not parse_next_packet(btsnoop_file):
break
for handle in connection_map.keys():
dump_audio_data_to_file(handle)
if __name__ == "__main__":
main()