blob: 0af876fab83e844987e619a87af3631bae50f1f8 [file] [log] [blame] [edit]
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::convert::Infallible;
use std::net::{Ipv4Addr, SocketAddrV4};
use anyhow::{Context, Result};
use hyper::service::{make_service_fn, service_fn};
use hyper::{body, Body, Request, Response, Server, StatusCode as HttpStatusCode};
use serde::{Deserialize, Serialize};
use serde_json::error::Category as SerdeErrorCategory;
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use pica::{
Category, MacAddress, PicaCommand, PicaCommandError, PicaCommandStatus, PicaEvent, Position,
};
use PicaEvent::{DeviceAdded, DeviceRemoved, DeviceUpdated, NeighborUpdated};
const STATIC_FILES: &[(&str, &str, &str)] = &[
("/", "text/html", include_str!("../../../static/index.html")),
(
"/openapi",
"text/html",
include_str!("../../../static/openapi.html"),
),
(
"/openapi.yaml",
"text/yaml",
include_str!("../../../static/openapi.yaml"),
),
(
"/src/components/Map.js",
"application/javascript",
include_str!("../../../static/src/components/Map.js"),
),
(
"/src/components/DeviceInfo.js",
"application/javascript",
include_str!("../../../static/src/components/DeviceInfo.js"),
),
(
"/src/components/Orientation.js",
"application/javascript",
include_str!("../../../static/src/components/Orientation.js"),
),
];
#[derive(Deserialize)]
struct PositionBody {
x: i16,
y: i16,
z: i16,
yaw: i16,
pitch: i8,
roll: i16,
}
macro_rules! position {
($body: ident) => {
position!($body, false)
};
($body: ident, $mandatory: ident) => {
match serde_json::from_slice::<PositionBody>(&$body) {
Ok(body) => Position::new(body.x, body.y, body.z, body.yaw, body.pitch, body.roll),
Err(err) => {
if !$mandatory && err.classify() == SerdeErrorCategory::Eof {
Position::default()
} else {
let reason = format!("Error while deserializing position: {}", err);
println!("{}", reason);
return Ok(Response::builder().status(406).body(reason.into()).unwrap());
}
}
}
};
}
macro_rules! mac_address {
($mac_address: ident) => {
match MacAddress::new($mac_address.to_string()) {
Ok(mac_address) => mac_address,
Err(err) => {
let reason = format!("Error mac_address: {}", err);
println!("{}", reason);
return Ok(Response::builder().status(406).body(reason.into()).unwrap());
}
}
};
}
#[derive(Debug, Serialize, Clone)]
struct Device {
pub category: Category,
pub mac_address: String,
#[serde(flatten)]
pub position: Position,
}
fn event_name(event: &PicaEvent) -> &'static str {
match event {
DeviceAdded { .. } => "device-added",
DeviceRemoved { .. } => "device-removed",
DeviceUpdated { .. } => "device-updated",
NeighborUpdated { .. } => "neighbor-updated",
}
}
async fn handle(
mut req: Request<Body>,
tx: mpsc::Sender<PicaCommand>,
events: broadcast::Sender<PicaEvent>,
) -> Result<Response<Body>, Infallible> {
let static_file = STATIC_FILES
.iter()
.find(|(path, _, _)| req.uri().path() == *path);
if let Some((_, mime, content)) = static_file {
return Ok(Response::builder()
.header("content-type", *mime)
.body((*content).into())
.unwrap());
}
let body = body::to_bytes(req.body_mut()).await.unwrap();
let (pica_cmd_rsp_tx, pica_cmd_rsp_rx) = oneshot::channel::<PicaCommandStatus>();
let send_cmd = |pica_cmd| async {
println!("PicaCommand: {}", pica_cmd);
tx.send(pica_cmd).await.unwrap();
let (status, description) = match pica_cmd_rsp_rx.await {
Ok(Ok(_)) => (HttpStatusCode::OK, "success".into()),
Ok(Err(err)) => (
match err {
PicaCommandError::DeviceAlreadyExists(_) => HttpStatusCode::CONFLICT,
PicaCommandError::DeviceNotFound(_) => HttpStatusCode::NOT_FOUND,
},
format!("{}", err),
),
Err(err) => (
HttpStatusCode::INTERNAL_SERVER_ERROR,
format!("Error getting command response: {}", err),
),
};
println!(" status: {}, {}", status, description);
Response::builder()
.status(status)
.body(description.into())
.unwrap()
};
match req
.uri_mut()
.path()
.trim_start_matches('/')
.split('/')
.collect::<Vec<_>>()[..]
{
["events"] => {
let stream = BroadcastStream::new(events.subscribe()).map(|result| {
result.map(|event| {
format!(
"event: {}\ndata: {}\n\n",
event_name(&event),
serde_json::to_string(&event).unwrap()
)
})
});
return Ok(Response::builder()
.header("content-type", "text/event-stream")
.body(Body::wrap_stream(stream))
.unwrap());
}
["init-uci-device", mac_address] => {
return Ok(send_cmd(PicaCommand::InitUciDevice(
mac_address!(mac_address),
position!(body),
pica_cmd_rsp_tx,
))
.await);
}
["set-position", mac_address] => {
return Ok(send_cmd(PicaCommand::SetPosition(
mac_address!(mac_address),
position!(body),
pica_cmd_rsp_tx,
))
.await);
}
["create-anchor", mac_address] => {
return Ok(send_cmd(PicaCommand::CreateAnchor(
mac_address!(mac_address),
position!(body),
pica_cmd_rsp_tx,
))
.await);
}
["destroy-anchor", mac_address] => {
return Ok(send_cmd(PicaCommand::DestroyAnchor(
mac_address!(mac_address),
pica_cmd_rsp_tx,
))
.await);
}
["get-state"] => {
#[derive(Serialize)]
struct GetStateResponse {
devices: Vec<Device>,
}
println!("PicaCommand: GetState");
let (state_tx, state_rx) = oneshot::channel::<Vec<_>>();
tx.send(PicaCommand::GetState(state_tx)).await.unwrap();
let devices = match state_rx.await {
Ok(devices) => GetStateResponse {
devices: devices
.into_iter()
.map(|(category, mac_address, position)| Device {
category,
mac_address: mac_address.into(),
position,
})
.collect(),
},
Err(_) => GetStateResponse { devices: vec![] },
};
let body = serde_json::to_string(&devices).unwrap();
return Ok(Response::builder().status(200).body(body.into()).unwrap());
}
_ => (),
}
Ok(Response::builder().status(404).body("".into()).unwrap())
}
pub async fn serve(
tx: mpsc::Sender<PicaCommand>,
events: broadcast::Sender<PicaEvent>,
web_port: u16,
) -> Result<()> {
let addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, web_port);
let make_svc = make_service_fn(move |_conn| {
let tx = tx.clone();
let events = events.clone();
async move {
Ok::<_, Infallible>(service_fn(move |req| {
handle(req, tx.clone(), events.clone())
}))
}
});
let server = Server::bind(&addr.into()).serve(make_svc);
println!("Pica: Web server started on http://0.0.0.0:{}", web_port);
server.await.context("Web Server Error")
}