blob: c7e6b9ab2c7f755fa3109df2f41957e1ead1884a [file] [log] [blame]
"""
Copied from tools/rootcanal/scripts/test_channel.py
"""
import socket
from time import sleep
class Connection:
def __init__(self, port):
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.connect(("localhost", port))
def close(self):
self._socket.close()
def send(self, data):
self._socket.sendall(data.encode())
def receive(self, size):
return self._socket.recv(size)
class TestChannel:
def __init__(self, port):
self._connection = Connection(port)
self._closed = False
def close(self):
self._connection.close()
self._closed = True
def send_command(self, name, args):
args = [str(arg) for arg in args]
name_size = len(name)
args_size = len(args)
self.lint_command(name, args, name_size, args_size)
encoded_name = chr(name_size) + name
encoded_args = chr(args_size) + "".join(chr(len(arg)) + arg for arg in args)
command = encoded_name + encoded_args
if self._closed:
return
self._connection.send(command)
if name != "CLOSE_TEST_CHANNEL":
return self.receive_response().decode()
def receive_response(self):
if self._closed:
return b"Closed"
size_chars = self._connection.receive(4)
if not size_chars:
return b"No response, assuming that the connection is broken"
response_size = 0
for i in range(0, len(size_chars) - 1):
response_size |= size_chars[i] << (8 * i)
response = self._connection.receive(response_size)
return response
def lint_command(self, name, args, name_size, args_size):
assert name_size == len(name) and args_size == len(args)
try:
name.encode()
for arg in args:
arg.encode()
except UnicodeError:
print("Unrecognized characters.")
raise
if name_size > 255 or args_size > 255:
raise ValueError # Size must be encodable in one octet.
for arg in args:
if len(arg) > 255:
raise ValueError # Size must be encodable in one octet.
class RootCanal:
def __init__(self, port):
self.channel = TestChannel(port)
self.disconnected_dev_phys = None
# discard initialization messages
self.channel.receive_response()
def close(self):
self.channel.close()
@staticmethod
def _parse_device_list(raw):
# time for some cursed parsing!
categories = {}
curr_category = None
for line in raw.split("\n"):
line = line.strip()
if not line:
continue
if line[0].isdigit():
# list entry
if curr_category is None or ":" not in line:
raise Exception("Failed to parse rootcanal device list output")
curr_category.append(line.split(":", 1)[1])
else:
if line.endswith(":"):
line = line[:-1]
curr_category = []
categories[line] = curr_category
return categories
@staticmethod
def _parse_phy(raw):
transport, idxs = raw.split(":")
idxs = [int(x) for x in idxs.split(",") if x.strip()]
return transport, idxs
def reconnect_phone(self):
raw_devices = None
try:
raw_devices = self.channel.send_command("list", [])
devices = self._parse_device_list(raw_devices)
for dev_i, name in enumerate(devices["Devices"]):
# the default transports are always 0 and 1
classic_phy = 0
le_phy = 1
if "beacon" in name:
target_phys = [le_phy]
elif "hci_device" in name:
target_phys = [classic_phy, le_phy]
else:
target_phys = []
for phy in target_phys:
if dev_i not in self._parse_phy(devices["Phys"][phy])[1]:
self.channel.send_command("add_device_to_phy", [dev_i, phy])
except Exception as e:
print(raw_devices, e)
def disconnect_phy(self):
# first, list all devices
devices = self.channel.send_command("list", [])
devices = self._parse_device_list(devices)
dev_phys = []
for phy_i, phy in enumerate(devices["Phys"]):
_, idxs = self._parse_phy(phy)
for dev_i in idxs:
dev_phys.append((dev_i, phy_i))
# now, disconnect all pairs
for dev_i, phy_i in dev_phys:
self.channel.send_command("del_device_from_phy", [dev_i, phy_i])
devices = self.channel.send_command("list", [])
devices = self._parse_device_list(devices)
self.disconnected_dev_phys = dev_phys
def reconnect_phy_if_needed(self):
if self.disconnected_dev_phys is not None:
for dev_i, phy_i in self.disconnected_dev_phys:
self.channel.send_command("add_device_to_phy", [dev_i, phy_i])
self.disconnected_dev_phys = None
def reconnect_phy(self):
if self.disconnected_dev_phys is None:
raise Exception("cannot reconnect_phy before disconnect_phy")
self.reconnect_phy_if_needed()