blob: e1ed9b78add68d9489673ff3e4d74854745660fb [file] [log] [blame]
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use anyhow::Result;
use bytes::{Bytes, BytesMut};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt::Display;
use std::path::PathBuf;
use thiserror::Error;
use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;
use tokio::sync::{broadcast, mpsc, oneshot};
use num_traits::{FromPrimitive, ToPrimitive};
mod pcapng;
mod position;
pub use position::Position;
mod uci_packets;
use uci_packets::StatusCode as UciStatusCode;
use uci_packets::*;
mod device;
use device::{Device, MAX_DEVICE};
mod session;
use session::MAX_SESSION;
mod mac_address;
pub use mac_address::MacAddress;
// UCI Generic Specification v1.1.0 ยง 4.4
const HEADER_SIZE: usize = 4;
const MAX_PAYLOAD_SIZE: usize = 255;
const MAX_PACKET_SIZE: usize = HEADER_SIZE + MAX_PAYLOAD_SIZE;
struct Connection {
socket: TcpStream,
buffer: BytesMut,
pcapng_file: Option<pcapng::File>,
}
impl Connection {
fn new(socket: TcpStream, pcapng_file: Option<pcapng::File>) -> Self {
Connection {
socket,
buffer: BytesMut::with_capacity(MAX_PACKET_SIZE),
pcapng_file,
}
}
async fn read(&mut self) -> Result<Option<BytesMut>> {
let len = self.socket.read_buf(&mut self.buffer).await?;
if len == 0 {
return Ok(None);
}
if let Some(ref mut pcapng_file) = self.pcapng_file {
pcapng_file
.write(&self.buffer, pcapng::Direction::Tx)
.await?
}
let bytes = self.buffer.split_to(self.buffer.len());
Ok(Some(bytes))
}
async fn write(&mut self, packet: Bytes) -> Result<()> {
if let Some(ref mut pcapng_file) = self.pcapng_file {
pcapng_file.write(&packet, pcapng::Direction::Rx).await?
}
let _ = self.socket.try_write(&packet)?;
Ok(())
}
}
pub type PicaCommandStatus = Result<(), PicaCommandError>;
#[derive(Error, Debug, Clone, PartialEq, Eq)]
pub enum PicaCommandError {
#[error("Device already exists: {0}")]
DeviceAlreadyExists(MacAddress),
#[error("Device not found: {0}")]
DeviceNotFound(MacAddress),
}
#[derive(Debug)]
pub enum PicaCommand {
// Connect a new device.
Connect(TcpStream),
// Disconnect the selected device.
Disconnect(usize),
// Execute ranging command for selected device and session.
Ranging(usize, u32),
// Execute UCI command received for selected device.
Command(usize, UciCommandPacket),
// Init Uci Device
InitUciDevice(MacAddress, Position, oneshot::Sender<PicaCommandStatus>),
// Set Position
SetPosition(MacAddress, Position, oneshot::Sender<PicaCommandStatus>),
// Create Anchor
CreateAnchor(MacAddress, Position, oneshot::Sender<PicaCommandStatus>),
// Destroy Anchor
DestroyAnchor(MacAddress, oneshot::Sender<PicaCommandStatus>),
// Get State
GetState(oneshot::Sender<Vec<(Category, MacAddress, Position)>>),
}
impl Display for PicaCommand {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let cmd = match self {
PicaCommand::Connect(_) => "Connect",
PicaCommand::Disconnect(_) => "Disconnect",
PicaCommand::Ranging(_, _) => "Ranging",
PicaCommand::Command(_, _) => "Command",
PicaCommand::InitUciDevice(_, _, _) => "InitUciDevice",
PicaCommand::SetPosition(_, _, _) => "SetPosition",
PicaCommand::CreateAnchor(_, _, _) => "CreateAnchor",
PicaCommand::DestroyAnchor(_, _) => "DestroyAnchor",
PicaCommand::GetState(_) => "GetState",
};
write!(f, "{}", cmd)
}
}
#[derive(Clone, Debug, Serialize)]
#[serde(untagged)]
pub enum PicaEvent {
// A Device was added
DeviceAdded {
category: Category,
mac_address: MacAddress,
#[serde(flatten)]
position: Position,
},
// A Device was removed
DeviceRemoved {
category: Category,
mac_address: MacAddress,
},
// A Device position has changed
DeviceUpdated {
category: Category,
mac_address: MacAddress,
#[serde(flatten)]
position: Position,
},
NeighborUpdated {
source_category: Category,
source_mac_address: MacAddress,
destination_category: Category,
destination_mac_address: MacAddress,
distance: u16,
azimuth: i16,
elevation: i8,
},
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub enum Category {
Uci,
Anchor,
}
#[derive(Debug, Clone, Copy)]
struct Anchor {
mac_address: MacAddress,
position: Position,
}
pub struct Pica {
devices: HashMap<usize, Device>,
anchors: HashMap<MacAddress, Anchor>,
counter: usize,
rx: mpsc::Receiver<PicaCommand>,
tx: mpsc::Sender<PicaCommand>,
event_tx: broadcast::Sender<PicaEvent>,
pcapng_dir: Option<PathBuf>,
}
/// Result of UCI packet parsing.
enum UciParseResult {
Ok(UciCommandPacket),
Err(Bytes),
Skip,
}
/// Parse incoming UCI packets.
/// Handle parsing errors by crafting a suitable error response packet.
fn parse_uci_packet(bytes: &[u8]) -> UciParseResult {
match UciPacketPacket::parse(bytes) {
// Parsing error. Determine what error response should be
// returned to the host:
// - response and notifications are ignored, no response
// - if the group id is not known, STATUS_UNKNOWN_GID,
// - otherwise, and to simplify the code, STATUS_UNKNOWN_OID is
// always returned. That means that malformed commands
// get the same status code, instead of
// STATUS_SYNTAX_ERROR.
Err(_) => {
let message_type = (bytes[0] >> 5) & 0x7;
let group_id = bytes[0] & 0xf;
let opcode_id = bytes[1] & 0x3f;
let status = match (
MessageType::from_u8(message_type),
GroupId::from_u8(group_id),
) {
(Some(MessageType::Command), Some(_)) => UciStatusCode::UciStatusUnknownOid,
(Some(MessageType::Command), None) => UciStatusCode::UciStatusUnknownGid,
_ => return UciParseResult::Skip,
};
// The PDL generated code cannot be used to generate
// responses with invalid group identifiers.
let response = vec![
(MessageType::Response.to_u8().unwrap() << 5) | group_id,
opcode_id,
0,
1,
status.to_u8().unwrap(),
];
UciParseResult::Err(response.into())
}
// Parsing success, ignore non command packets.
Ok(packet) => {
if let Ok(cmd) = packet.try_into() {
UciParseResult::Ok(cmd)
} else {
UciParseResult::Skip
}
}
}
}
impl Pica {
pub fn new(event_tx: broadcast::Sender<PicaEvent>, pcapng_dir: Option<PathBuf>) -> Self {
let (tx, rx) = mpsc::channel(MAX_SESSION * MAX_DEVICE);
Pica {
devices: HashMap::new(),
anchors: HashMap::new(),
counter: 0,
rx,
tx,
event_tx,
pcapng_dir,
}
}
pub fn tx(&self) -> mpsc::Sender<PicaCommand> {
self.tx.clone()
}
fn get_device_mut(&mut self, device_handle: usize) -> Option<&mut Device> {
self.devices.get_mut(&device_handle)
}
fn get_device(&self, device_handle: usize) -> Option<&Device> {
self.devices.get(&device_handle)
}
fn get_category(&self, mac_address: &MacAddress) -> Option<Category> {
if self.anchors.contains_key(mac_address) {
Some(Category::Anchor)
} else if self
.devices
.iter()
.any(|(_, device)| device.mac_address == *mac_address)
{
Some(Category::Uci)
} else {
None
}
}
fn get_device_mut_by_mac(&mut self, mac_address: MacAddress) -> Option<&mut Device> {
self.devices
.values_mut()
.find(|d| d.mac_address == mac_address)
}
fn send_event(&self, event: PicaEvent) {
// An error here means that we have
// no receivers, so ignore it
let _ = self.event_tx.send(event);
}
async fn connect(&mut self, stream: TcpStream) {
let (packet_tx, mut packet_rx) = mpsc::channel(MAX_SESSION);
let device_handle = self.counter;
let pica_tx = self.tx.clone();
let pcapng_dir = self.pcapng_dir.clone();
println!("[{}] Connecting device", device_handle);
self.counter += 1;
let mut device = Device::new(device_handle, packet_tx, self.tx.clone());
device.init();
self.send_event(PicaEvent::DeviceAdded {
category: Category::Uci,
mac_address: device.mac_address,
position: device.position,
});
self.devices.insert(device_handle, device);
// Spawn and detach the connection handling task.
// The task notifies pica when exiting to let it clean
// the state.
tokio::spawn(async move {
let pcapng_file: Option<pcapng::File> = if let Some(dir) = pcapng_dir {
let full_path = dir.join(format!("device-{}.pcapng", device_handle));
println!("Recording pcapng to file {}", full_path.as_path().display());
Some(pcapng::File::create(full_path).await.unwrap())
} else {
None
};
let mut connection = Connection::new(stream, pcapng_file);
'outer: loop {
tokio::select! {
// Read command packet sent from connected UWB host.
// Run associated command.
result = connection.read() =>
match result {
Ok(Some(packet)) =>
match parse_uci_packet(&packet) {
UciParseResult::Ok(cmd) =>
pica_tx.send(PicaCommand::Command(device_handle, cmd)).await.unwrap(),
UciParseResult::Err(response) =>
connection.write(response).await.unwrap(),
UciParseResult::Skip => (),
},
Ok(None) | Err(_) => break 'outer
},
// Send response packets to the connected UWB host.
Some(packet) = packet_rx.recv() =>
if connection.write(packet.to_bytes()).await.is_err() {
break 'outer
}
}
}
pica_tx
.send(PicaCommand::Disconnect(device_handle))
.await
.unwrap()
});
}
fn disconnect(&mut self, device_handle: usize) {
println!("[{}] Disconnecting device", device_handle);
match self
.devices
.get(&device_handle)
.ok_or_else(|| PicaCommandError::DeviceNotFound(device_handle.into()))
{
Ok(device) => {
self.send_event(PicaEvent::DeviceRemoved {
category: Category::Uci,
mac_address: device.mac_address,
});
self.devices.remove(&device_handle);
}
Err(err) => println!("{}", err),
}
}
async fn ranging(&mut self, device_handle: usize, session_id: u32) {
println!("[{}] Ranging event", device_handle);
println!(" session_id={}", session_id);
let device = self.get_device(device_handle).unwrap();
let session = device.get_session(session_id).unwrap();
let mut measurements = Vec::new();
session
.get_dst_mac_addresses()
.iter()
.for_each(|mac_address| {
if let Some(anchor) = self.anchors.get(mac_address) {
let local = device
.position
.compute_range_azimuth_elevation(&anchor.position);
let remote = anchor
.position
.compute_range_azimuth_elevation(&device.position);
assert!(local.0 == remote.0);
// TODO: support extended address
match mac_address {
MacAddress::Short(address) => {
measurements.push(ShortAddressTwoWayRangingMeasurement {
mac_address: u16::from_be_bytes(*address),
status: UciStatusCode::UciStatusOk,
nlos: 0, // in Line Of Sight
distance: local.0,
aoa_azimuth: local.1 as u16,
aoa_azimuth_fom: 100, // Yup, pretty sure about this
aoa_elevation: local.2 as u16,
aoa_elevation_fom: 100, // Yup, pretty sure about this
aoa_destination_azimuth: remote.1 as u16,
aoa_destination_azimuth_fom: 100,
aoa_destination_elevation: remote.2 as u16,
aoa_destination_elevation_fom: 100,
slot_index: 0,
})
}
MacAddress::Extend(_) => unimplemented!(),
}
}
});
device
.tx
.send(
// TODO: support extended address
ShortMacTwoWayRangeDataNtfBuilder {
sequence_number: session.sequence_number,
session_id,
rcr_indicator: 0, //TODO
current_ranging_interval: 0, //TODO
two_way_ranging_measurements: measurements,
}
.build()
.into(),
)
.await
.unwrap();
let device = self.get_device_mut(device_handle).unwrap();
let session = device.get_session_mut(session_id).unwrap();
session.sequence_number += 1;
}
async fn command(&mut self, device_handle: usize, cmd: UciCommandPacket) {
// TODO: implement fragmentation support
assert_eq!(
cmd.get_packet_boundary_flag(),
PacketBoundaryFlag::Complete,
"Boundary flag is true, implement fragmentation"
);
match self
.get_device_mut(device_handle)
.ok_or_else(|| PicaCommandError::DeviceNotFound(device_handle.into()))
{
Ok(device) => {
let response = device.command(cmd).into();
device
.tx
.send(response)
.await
.unwrap_or_else(|err| println!("Failed to send UCI command response: {}", err));
}
Err(err) => println!("{}", err),
}
}
pub async fn run(&mut self) -> Result<()> {
loop {
use PicaCommand::*;
match self.rx.recv().await {
Some(Connect(stream)) => {
self.connect(stream).await;
}
Some(Disconnect(device_handle)) => self.disconnect(device_handle),
Some(Ranging(device_handle, session_id)) => {
self.ranging(device_handle, session_id).await;
}
Some(Command(device_handle, cmd)) => self.command(device_handle, cmd).await,
Some(SetPosition(mac_address, position, pica_cmd_rsp_tx)) => {
self.set_position(mac_address, position, pica_cmd_rsp_tx)
}
Some(CreateAnchor(mac_address, position, pica_cmd_rsp_tx)) => {
self.create_anchor(mac_address, position, pica_cmd_rsp_tx)
}
Some(DestroyAnchor(mac_address, pica_cmd_rsp_tx)) => {
self.destroy_anchor(mac_address, pica_cmd_rsp_tx)
}
Some(GetState(state_tx)) => self.get_state(state_tx),
Some(InitUciDevice(mac_address, position, pica_cmd_rsp_tx)) => {
self.init_uci_device(mac_address, position, pica_cmd_rsp_tx);
}
None => (),
};
}
}
// TODO: Assign a reserved range of mac addresses for UCI devices
// to protect against conflicts with user defined Anchor addresses
// b/246000641
fn init_uci_device(
&mut self,
mac_address: MacAddress,
position: Position,
pica_cmd_rsp_tx: oneshot::Sender<PicaCommandStatus>,
) {
println!("[_] Init device");
println!(" mac_address: {}", mac_address);
println!(" position={:?}", position);
let status = self
.get_device_mut_by_mac(mac_address)
.ok_or(PicaCommandError::DeviceNotFound(mac_address))
.map(|uci_device| {
uci_device.mac_address = mac_address;
uci_device.position = position;
});
pica_cmd_rsp_tx.send(status).unwrap_or_else(|err| {
println!("Failed to send init-uci-device command response: {:?}", err)
});
}
fn set_position(
&mut self,
mac_address: MacAddress,
position: Position,
pica_cmd_rsp_tx: oneshot::Sender<PicaCommandStatus>,
) {
let mut status = if let Some(uci_device) = self.get_device_mut_by_mac(mac_address) {
uci_device.position = position;
Ok(())
} else if let Some(anchor) = self.anchors.get_mut(&mac_address) {
anchor.position = position;
Ok(())
} else {
Err(PicaCommandError::DeviceNotFound(mac_address))
};
if status.is_ok() {
status = self.update_position(mac_address, position)
}
pica_cmd_rsp_tx.send(status).unwrap_or_else(|err| {
println!("Failed to send set-position command response: {:?}", err)
});
}
fn update_position(
&self,
mac_address: MacAddress,
position: Position,
) -> Result<(), PicaCommandError> {
let category = match self.get_category(&mac_address) {
Some(category) => category,
None => {
return Err(PicaCommandError::DeviceNotFound(mac_address));
}
};
self.send_event(PicaEvent::DeviceUpdated {
category,
mac_address,
position,
});
let devices = self.devices.values().map(|d| (d.mac_address, d.position));
let anchors = self.anchors.values().map(|b| (b.mac_address, b.position));
let update_neighbors = |device_category, device_mac_address, device_position| {
if mac_address != device_mac_address {
let local = position.compute_range_azimuth_elevation(&device_position);
let remote = device_position.compute_range_azimuth_elevation(&position);
assert!(local.0 == remote.0);
self.send_event(PicaEvent::NeighborUpdated {
source_category: category,
source_mac_address: mac_address,
destination_category: device_category,
destination_mac_address: device_mac_address,
distance: local.0,
azimuth: local.1,
elevation: local.2,
});
self.send_event(PicaEvent::NeighborUpdated {
source_category: device_category,
source_mac_address: device_mac_address,
destination_category: category,
destination_mac_address: mac_address,
distance: remote.0,
azimuth: remote.1,
elevation: remote.2,
});
}
};
devices.for_each(|device| update_neighbors(Category::Uci, device.0, device.1));
anchors.for_each(|anchor| update_neighbors(Category::Anchor, anchor.0, anchor.1));
Ok(())
}
#[allow(clippy::map_entry)]
fn create_anchor(
&mut self,
mac_address: MacAddress,
position: Position,
pica_cmd_rsp_tx: oneshot::Sender<PicaCommandStatus>,
) {
println!("Create anchor: {} {}", mac_address, position);
let status = if self.get_category(&mac_address).is_some() {
Err(PicaCommandError::DeviceAlreadyExists(mac_address))
} else {
self.send_event(PicaEvent::DeviceAdded {
category: Category::Anchor,
mac_address,
position,
});
assert!(self
.anchors
.insert(
mac_address,
Anchor {
mac_address,
position,
},
)
.is_none());
Ok(())
};
pica_cmd_rsp_tx.send(status).unwrap_or_else(|err| {
println!("Failed to send create-anchor command response: {:?}", err)
})
}
fn destroy_anchor(
&mut self,
mac_address: MacAddress,
pica_cmd_rsp_tx: oneshot::Sender<PicaCommandStatus>,
) {
println!("[_] Destroy anchor");
println!(" mac_address: {}", mac_address);
let status = if self.anchors.remove(&mac_address).is_none() {
Err(PicaCommandError::DeviceNotFound(mac_address))
} else {
self.send_event(PicaEvent::DeviceRemoved {
category: Category::Anchor,
mac_address,
});
Ok(())
};
pica_cmd_rsp_tx.send(status).unwrap_or_else(|err| {
println!("Failed to send destroy-anchor command response: {:?}", err)
})
}
fn get_state(&self, state_tx: oneshot::Sender<Vec<(Category, MacAddress, Position)>>) {
println!("[_] Get State");
state_tx
.send(
self.anchors
.values()
.map(|anchor| (Category::Anchor, anchor.mac_address, anchor.position))
.chain(
self.devices
.values()
.map(|device| (Category::Uci, device.mac_address, device.position)),
)
.collect(),
)
.unwrap();
}
}