blob: 425db5394b09d9fe09fa81a30d5c56c5662468d8 [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import inspect
import json
import random
import readline
import socket
import sys
import time
import requests
import struct
import asyncio
from concurrent.futures import ThreadPoolExecutor
from pica import Host
from pica.packets import uci
MAX_DATA_PACKET_PAYLOAD_SIZE = 1024
def encode_short_mac_address(mac_address: str) -> bytes:
return int(mac_address).to_bytes(2, byteorder="little")
def encode_mac_address(mac_address: str) -> bytes:
return int(mac_address).to_bytes(8, byteorder="little")
def parse_mac_address(mac_address: str) -> bytes:
bs = mac_address.split(":")
return bytes(int(b, 16) for b in bs)
class Device:
def __init__(self, reader, writer, http_address):
self.host = Host(reader, writer, bytes([0, 1]))
self.http_address = http_address
def pica_get_state(self, **kargs):
"""List the UCI devices and anchors currently existing within the Pica
virtual environment"""
r = requests.get(f"{self.http_address}/get-state")
print(f"{r.status_code}:\n{json.dumps(r.json(), indent=2)}")
def pica_init_uci_device(
self,
mac_address: str = "00:00",
x: str = "0",
y: str = "0",
z: str = "0",
yaw: str = "0",
pitch: str = "0",
roll: str = "0",
**kargs,
):
"""Init Pica device"""
r = requests.post(
f"{self.http_address}/init-uci-device/{mac_address}",
data=json.dumps(
{
"x": int(x),
"y": int(y),
"z": int(z),
"yaw": int(yaw),
"pitch": int(pitch),
"roll": int(roll),
}
),
)
print(f"{r.status_code}: {r.text}")
def pica_create_anchor(
self,
mac_address: str = "00:00",
x: str = "0",
y: str = "0",
z: str = "0",
yaw: str = "0",
pitch: str = "0",
roll: str = "0",
**kargs,
):
"""Create a Pica anchor"""
r = requests.post(
f"{self.http_address}/create-anchor/{mac_address}",
data=json.dumps(
{
"x": int(x),
"y": int(y),
"z": int(z),
"yaw": int(yaw),
"pitch": int(pitch),
"roll": int(roll),
}
),
)
print(f"{r.status_code}: {r.text}")
def pica_destroy_anchor(self, mac_address: str = "00:00", **kargs):
"""Destroy a Pica anchor"""
r = requests.post(f"{self.http_address}/destroy-anchor/{mac_address}")
print(f"{r.status_code}: {r.text}")
def pica_set_position(
self,
mac_address: str = "00:00",
x: str = "0",
y: str = "0",
z: str = "0",
yaw: str = "0",
pitch: str = "0",
roll: str = "0",
**kargs,
):
"""Set Pica UCI device or anchor position"""
r = requests.post(
f"{self.http_address}/set-position/{mac_address}",
data=json.dumps(
{
"x": int(x),
"y": int(y),
"z": int(z),
"yaw": int(yaw),
"pitch": int(pitch),
"roll": int(roll),
}
),
)
print(f"{r.status_code}: {r.text}")
def device_reset(self, **kargs):
"""Reset the UWBS."""
self.host.send_control(
uci.DeviceResetCmd(reset_config=uci.ResetConfig.UWBS_RESET)
)
def get_device_info(self, **kargs):
"""Retrieve the device information like (UCI version and other vendor specific info)."""
self.host.send_control(uci.GetDeviceInfoCmd())
def get_caps_info(self, **kargs):
"""Get the capability of the UWBS."""
self.host.send_control(uci.GetCapsInfoCmd())
def set_config(self, low_power_mode: str = "0", **kargs):
"""Set the configuration parameters on the UWBS."""
self.host.send_control(
uci.SetConfigCmd(
tlvs=[
uci.DeviceConfigTlv(
cfg_id=uci.DeviceConfigId.LOW_POWER_MODE,
v=bytes([int(low_power_mode)]),
),
]
)
)
def get_config(self, **kargs):
"""Retrieve the current configuration parameter(s) of the UWBS."""
self.host.send_control(
uci.GetConfigCmd(
cfg_id=[
uci.DeviceConfigId.LOW_POWER_MODE,
uci.DeviceConfigId.DEVICE_STATE,
]
)
)
def session_init(self, session_id: str = "0", **kargs):
"""Initialize the session"""
self.host.send_control(
uci.SessionInitCmd(
session_id=int(session_id),
session_type=uci.SessionType.FIRA_RANGING_AND_IN_BAND_DATA_SESSION,
)
)
def session_deinit(self, session_id: str = "0", **kargs):
"""Deinitialize the session"""
self.host.send_control(uci.SessionDeinitCmd(session_token=int(session_id)))
def session_set_app_config(
self,
session_id: str = "0",
ranging_interval: str = "200",
dst_mac_addresses: str = "",
**kargs,
):
"""set APP Configuration Parameters for the requested UWB session."""
dst_mac_addresses = [
parse_mac_address(a) for a in dst_mac_addresses.split(",") if a
]
if any(len(a) > 2 for a in dst_mac_addresses):
mac_address_mode = 0x2
mac_address_len = 8
else:
mac_address_mode = 0x0
mac_address_len = 2
encoded_dst_mac_addresses = bytes()
for mac_address in dst_mac_addresses:
encoded_dst_mac_addresses += mac_address
encoded_dst_mac_addresses += b"\0" * (mac_address_len - len(mac_address))
self.host.send_control(
uci.SessionSetAppConfigCmd(
session_token=int(session_id),
tlvs=[
uci.AppConfigTlv(
cfg_id=uci.AppConfigTlvType.MAC_ADDRESS_MODE,
v=bytes([mac_address_mode]),
),
uci.AppConfigTlv(
cfg_id=uci.AppConfigTlvType.RANGING_DURATION,
v=int(ranging_interval).to_bytes(4, byteorder="little"),
),
uci.AppConfigTlv(
cfg_id=uci.AppConfigTlvType.NO_OF_CONTROLEE,
v=bytes([len(dst_mac_addresses)]),
),
uci.AppConfigTlv(
cfg_id=uci.AppConfigTlvType.DST_MAC_ADDRESS,
v=encoded_dst_mac_addresses,
),
],
)
)
def session_get_app_config(self, session_id: str = "0", **kargs):
"""retrieve the current APP Configuration Parameters of the requested UWB session."""
self.host.send_control(
uci.SessionGetAppConfigCmd(session_token=int(session_id), app_cfg=[0x9])
)
def session_get_count(self, **kargs):
"""Retrieve number of UWB sessions in the UWBS."""
self.host.send_control(uci.SessionGetCountCmd())
def session_get_state(self, session_id: str = "0", **kargs):
"""Query the current state of the UWB session."""
self.host.send_control(uci.SessionGetStateCmd(session_token=int(session_id)))
def session_update_controller_multicast_list(
self,
session_id: str = "0",
action: str = "add",
mac_address: str = "0",
subsession_id: str = "0",
**kargs,
):
"""Update the controller multicast list."""
if action == "add":
encoded_action = uci.UpdateMulticastListAction.ADD_CONTROLEE
elif action == "remove":
encoded_action = uci.UpdateMulticastListAction.REMOVE_CONTROLEE
else:
print(f"Unexpected action: '{action}', expected add or remove")
return
self.host.send_control(
uci.SessionUpdateControllerMulticastListCmd(
session_token=int(session_id),
action=encoded_action,
payload=uci.SessionUpdateControllerMulticastListCmdPayload(
controlees=[
uci.Controlee(
short_address=encode_short_mac_address(mac_address),
subsession_id=int(subsession_id),
)
],
).serialize(),
)
)
def range_start(self, session_id: str = "0", **kargs):
"""start a UWB session."""
self.host.send_control(uci.SessionStartCmd(session_id=int(session_id)))
def range_stop(self, session_id: str = "0", **kargs):
"""Stop a UWB session."""
self.host.send_control(uci.SessionStopCmd(session_id=int(session_id)))
def get_ranging_count(self, session_id: str = "0", **kargs):
"""Get the number of times ranging has been attempted during the ranging session.."""
self.host.send_control(
uci.SessionGetRangingCountCmd(session_id=int(session_id))
)
def data_transfer(
self,
dst_mac_address,
file_name,
session_id: str = "0",
):
"""Initiates data transfer by sending (possibly segmented) UCI data packet(s)."""
# Does not have flow control, i.e. waiting for data credit notifications in between sending packets
try:
with open(file_name, "rb") as f:
b = f.read()
seq_num = 0
dst_mac_address = parse_mac_address(dst_mac_address)
if len(b) > MAX_DATA_PACKET_PAYLOAD_SIZE:
for i in range(0, len(b), MAX_DATA_PACKET_PAYLOAD_SIZE):
section = b[i : i + MAX_DATA_PACKET_PAYLOAD_SIZE]
if i + MAX_DATA_PACKET_PAYLOAD_SIZE >= len(b):
self.host.send_data(
uci.DataMessageSnd(
session_handle=int(session_id),
destination_address=int.from_bytes(dst_mac_address),
data_sequence_number=seq_num,
application_data=section,
)
)
else:
self.host.send_data(
uci.DataMessageSnd(
session_handle=int(session_id),
pbf=uci.PacketBoundaryFlag.NOT_COMPLETE,
destination_address=int.from_bytes(dst_mac_address),
data_sequence_number=seq_num,
application_data=section,
)
)
seq_num += 1
if seq_num >= 65535:
seq_num = 0
else:
self.host.send_data(
uci.DataMessageSnd(
session_handle=int(session_id),
destination_address=int.from_bytes(dst_mac_address),
data_sequence_number=seq_num,
application_data=b,
)
)
except Exception as e:
print(e)
async def read_responses_and_notifications(self):
def chunks(l, n):
for i in range(0, len(l), n):
yield l[i : i + n]
while True:
packet = await self.host._recv_control()
# Format and print raw response data
txt = "\n ".join(
[
" ".join(["{:02x}".format(b) for b in shard])
for shard in chunks(packet, 16)
]
)
command_buffer = readline.get_line_buffer()
print("\r", end="")
print(f"Received UCI packet [{len(packet)}]:")
print(f" {txt}")
try:
uci_packet = uci.ControlPacket.parse_all(packet)
uci_packet.show()
except Exception as exn:
pass
print(f"--> {command_buffer}", end="", flush=True)
async def ainput(prompt: str = ""):
with ThreadPoolExecutor(1, "ainput") as executor:
return (
await asyncio.get_event_loop().run_in_executor(executor, input, prompt)
).rstrip()
async def get_stream_reader(pipe) -> asyncio.StreamReader:
loop = asyncio.get_event_loop()
reader = asyncio.StreamReader(loop=loop)
protocol = asyncio.StreamReaderProtocol(reader)
await loop.connect_read_pipe(lambda: protocol, pipe)
return reader
async def command_line(device: Device):
commands = {
"pica_get_state": device.pica_get_state,
"pica_init_uci_device": device.pica_init_uci_device,
"pica_create_anchor": device.pica_create_anchor,
"pica_destroy_anchor": device.pica_destroy_anchor,
"pica_set_position": device.pica_set_position,
"device_reset": device.device_reset,
"get_device_info": device.get_device_info,
"get_config": device.get_config,
"set_config": device.set_config,
"get_caps_info": device.get_caps_info,
"session_init": device.session_init,
"session_deinit": device.session_deinit,
"session_set_app_config": device.session_set_app_config,
"session_get_app_config": device.session_get_app_config,
"session_get_count": device.session_get_count,
"session_get_state": device.session_get_state,
"session_update_controller_multicast_list": device.session_update_controller_multicast_list,
"range_start": device.range_start,
"range_stop": device.range_stop,
"data_transfer": device.data_transfer,
"get_ranging_count": device.get_ranging_count,
}
def usage():
for cmd, func in commands.items():
print(f" {cmd.ljust(32)}{func.__doc__}")
def complete(text, state):
tokens = readline.get_line_buffer().split()
if not tokens or readline.get_line_buffer()[-1] == " ":
tokens.append("")
# Writing a command name, complete to ' '
if len(tokens) == 1:
results = [cmd + " " for cmd in commands.keys() if cmd.startswith(text)]
# Writing a keyword argument, no completion
elif "=" in tokens[-1]:
results = []
# Writing a keyword name, but unknown command, no completion
elif tokens[0] not in commands:
results = []
# Writing a keyword name, complete to '='
else:
sig = inspect.signature(commands[tokens[0]])
names = [
name
for (name, p) in sig.parameters.items()
if (
p.kind == inspect.Parameter.POSITIONAL_OR_KEYWORD
or p.kind == inspect.Parameter.KEYWORD_ONLY
)
]
results = [name + "=" for name in names if name.startswith(tokens[-1])]
results += [None]
return results[state]
# Configure readline
readline.parse_and_bind("tab: complete")
readline.set_completer(complete)
while True:
cmd = await ainput("--> ")
[cmd, *params] = cmd.split(" ")
args = []
kargs = dict()
for param in params:
if len(param) == 0:
continue
elif "=" in param:
[key, value] = param.split("=")
kargs[key] = value
else:
args.append(param)
if cmd in ["quit", "q"]:
break
if cmd not in commands:
print(f"Undefined command {cmd}")
usage()
continue
commands[cmd](*args, **kargs)
async def run(address: str, uci_port: int, http_port: int):
try:
# Connect to Pica
reader, writer = await asyncio.open_connection(address, uci_port)
except Exception as exn:
print(
f"Failed to connect to Pica server at address {address}:{uci_port}\n"
+ "Make sure the server is running"
)
exit(1)
# Start input and receive loops
device = Device(reader, writer, f"http://{address}:{http_port}")
loop = asyncio.get_event_loop()
loop.create_task(device.read_responses_and_notifications())
await command_line(device)
def main():
"""Start a Pica interactive console."""
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--address",
type=str,
default="127.0.0.1",
help="Select the pica server address",
)
parser.add_argument(
"--uci-port", type=int, default=7000, help="Select the pica TCP UCI port"
)
parser.add_argument(
"--http-port", type=int, default=3000, help="Select the pica HTTP port"
)
asyncio.run(run(**vars(parser.parse_args())))
if __name__ == "__main__":
main()