blob: c50df15981047ad558e606fc1e538bb7cd5f9176 [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright 2020 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""
A script to extract raw SCO RX packets from btsnoop.
Use 'btmon -S' to dump SCO traffic from btsnoop file.
Trim the btsnoop output to just the SCO traffic period.
Then execute 'python parse-sco.py <btsnoop-output>'
"""
import atexit
import binascii
import os
import re
import sys
class SCOParser:
"""
Parser for grepping SCO packets
"""
def __init__(self):
# On old releases, +CIEV: 4,1 indicates the start point of call session
# c 31 0d 0a 9a ..+CIEV: 4,1..
self.call_start_re = re.compile(r'.*?\+CIEV:\s4,(\d).*?')
# > SCO Data RX: Handle 257 flags 0x00 dlen 60 #13826 [hci0] 650.388305
# 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 ................
# 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 ................
# 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 ................
# 00 00 00 00 00 00 00 00 00 00 00 00
self.sco_rx_re = re.compile(r'.*?SCO\sData\sRX.*?flags\s0x(\d+).*?')
self.sco_f = None
self.output_idx = 0
self.pk_count = 0
self.pl_count = 0
atexit.register(self._cleanup)
def _cleanup(self):
if self.sco_f is not None:
print(
"Current file contains %d packets (%d with erroneous status flag)" %
(self.pk_count, self.pl_count))
self.pk_count = 0
self.pl_count = 0
self.sco_f.close()
def _new_session(self):
if self.sco_f is not None:
close(self.sco_f)
new_file = "sco_file_%d" % self.output_idx
print("Record to %s" % new_file)
self.sco_f = open(new_file, 'wb')
self.output_idx += 1
return self.sco_f
def parse(self, filename):
if not os.path.exists(filename):
print("%s doesn't exist" % filename)
return
print("Start parsing %s" % filename)
parse_rx_data = 0
with open(filename, "r") as f:
for line in f.readlines():
if parse_rx_data > 0:
self.sco_f.write(binascii.unhexlify(''.join(line[:56].split())))
parse_rx_data = (parse_rx_data + 1) % 5
# Start a new session and output following SCO data to a new file
match = self.call_start_re.search(line)
if match and (1 == int(match.group(1))):
self._new_session()
continue
match = self.sco_rx_re.search(line)
if match:
if self.sco_f is None:
self._new_session()
self.pk_count += 1
status_flag = int(match.group(1))
hdr = ['01', str(status_flag) + '1', '3c']
if status_flag != 0:
self.pl_count += 1
self.sco_f.write(binascii.unhexlify(''.join(hdr)))
parse_rx_data = 1
def main(argv):
if len(argv) < 1:
print("parse_sco.py [btsnoop.txt]")
return
p = SCOParser()
p.parse(argv[0])
if __name__ == "__main__":
main(sys.argv[1:])