Implement fd connector
Bug: 298189303
Test: m netsimd
Test: Connect phone device with wear device
// cf_x86_64_phone-trunk_staging-userdebug
launch_cvd
// cf_gwear_x86-trunk_staging-userdebug
launch_cvd --base_instance_num=2 --rootcanal-instance-num=1
Change-Id: I066266e9814c069f09844031b3f23c227de89a66
diff --git a/Android.bp b/Android.bp
index 426818e..fdf8153 100644
--- a/Android.bp
+++ b/Android.bp
@@ -102,7 +102,7 @@
genrule {
name: "netsim_daemon_cc",
tools: ["cxxbridge"],
- cmd: "$(location cxxbridge) $(in) >> $(out)",
+ cmd: "$(location cxxbridge) $(in) --cfg feature=\\\"cuttlefish\\\" >> $(out)",
srcs: ["rust/daemon/src/ffi.rs"],
out: ["netsim-daemon/src/ffi.rs.cc"],
}
@@ -110,7 +110,7 @@
genrule {
name: "netsim_daemon_h",
tools: ["cxxbridge"],
- cmd: "$(location cxxbridge) $(in) --header >> $(out)",
+ cmd: "$(location cxxbridge) $(in) --cfg feature=\\\"cuttlefish\\\" --header >> $(out)",
srcs: ["rust/daemon/src/ffi.rs"],
out: ["netsim-daemon/src/ffi.rs.h"],
}
diff --git a/rust/daemon/src/rust_main.rs b/rust/daemon/src/rust_main.rs
index d6fb1cf..2960a49 100644
--- a/rust/daemon/src/rust_main.rs
+++ b/rust/daemon/src/rust_main.rs
@@ -13,13 +13,17 @@
// limitations under the License.
use clap::Parser;
-use log::{error, info, warn};
+use log::warn;
+#[cfg(feature = "cuttlefish")]
+use log::{error, info};
use netsim_common::util::netsim_logger;
use crate::args::NetsimdArgs;
use crate::ffi::ffi_util;
use crate::service::{Service, ServiceParams};
+#[cfg(feature = "cuttlefish")]
+use netsim_common::util::os_utils::get_server_address;
use std::ffi::{c_char, c_int};
/// Wireless network simulator for android (and other) emulated devices.
@@ -65,22 +69,32 @@
}
match args.connector_instance {
+ #[cfg(feature = "cuttlefish")]
Some(connector_instance) => run_netsimd_connector(args, connector_instance),
- None => run_netsimd_primary(args),
+ _ => run_netsimd_primary(args),
}
}
-// Forwards packets to another netsim daemon.
+/// Forwards packets to another netsim daemon.
+#[cfg(feature = "cuttlefish")]
fn run_netsimd_connector(args: NetsimdArgs, instance: u16) {
- if args.fd_startup_str.is_none() {
- error!("Failed to start netsimd forwarder, missing `-s` arg");
- return;
- }
- if !ffi_util::is_netsimd_alive(instance) {
- error!("Failed to start netsimd forwarder, no primary at {}", instance);
- return;
- }
+ let fd_startup = match args.fd_startup_str {
+ None => {
+ error!("Failed to start netsimd forwarder, missing `-s` arg");
+ return;
+ }
+ Some(fd_startup) => fd_startup,
+ };
+
info!("Starting netsim daemon in forwarding mode");
+ // TODO: Make this function returns Result to use `?` instead of unwrap().
+ let server = get_server_address(instance)
+ .map(|port| format!("localhost:{}", port))
+ .ok_or_else(|| warn!("Unable to find server address for instance {}", instance))
+ .unwrap();
+ crate::transport::fd::run_fd_connector(&fd_startup, server.as_str())
+ .map_err(|e| error!("Failed to run fd connector: {}", e))
+ .unwrap();
}
fn run_netsimd_primary(netsimd_args: NetsimdArgs) {
diff --git a/rust/daemon/src/transport/fd.rs b/rust/daemon/src/transport/fd.rs
index 710fc9e..c59a4c1 100644
--- a/rust/daemon/src/transport/fd.rs
+++ b/rust/daemon/src/transport/fd.rs
@@ -20,13 +20,21 @@
use super::h4::PacketError;
use super::uci;
use crate::devices::devices_handler::{add_chip, remove_chip};
+use crate::ffi::ffi_transport;
+use lazy_static::lazy_static;
use log::{error, info, warn};
use netsim_proto::common::ChipKind;
+use netsim_proto::hci_packet::HCIPacket;
use netsim_proto::model::ChipCreate;
+use netsim_proto::packet_streamer::PacketRequest;
+use netsim_proto::startup::{Chip as ChipProto, ChipInfo};
+use protobuf::{Enum, EnumOrUnknown, Message, MessageField};
use serde::{Deserialize, Serialize};
+use std::collections::HashMap;
use std::fs::File;
use std::io::{ErrorKind, Write};
use std::os::fd::FromRawFd;
+use std::sync::{Arc, RwLock};
use std::thread::JoinHandle;
use std::{fmt, thread};
@@ -238,6 +246,181 @@
.unwrap();
}
+/// Read from the raw fd and pass to the grpc server.
+///
+/// # Safety
+///
+/// `fd_rx` must be a valid and open file descriptor.
+unsafe fn connector_fd_reader(fd_rx: i32, kind: ChipKindEnum, stream_id: u32) -> JoinHandle<()> {
+ info!("Connecting fd reader for stream_id: {}, fd_rx: {}", stream_id, fd_rx);
+ thread::Builder::new()
+ .name(format!("fd_connector_{}_{}", stream_id, fd_rx))
+ .spawn(move || {
+ // SAFETY: The caller promises that `fd_rx` is valid and open.
+ let mut rx = unsafe { File::from_raw_fd(fd_rx) };
+ info!("Handling fd={} for kind: {:?} stream_id: {:?}", fd_rx, kind, stream_id);
+
+ loop {
+ match kind {
+ ChipKindEnum::UWB => match uci::read_uci_packet(&mut rx) {
+ Err(e) => {
+ error!(
+ "End reader connection with fd={}. Failed to read \
+ uci control packet: {:?}",
+ fd_rx, e
+ );
+ break;
+ }
+ Ok(uci::Packet { payload: _ }) => {
+ // TODO: Compose PacketRequest.
+ }
+ },
+ ChipKindEnum::BLUETOOTH => match h4::read_h4_packet(&mut rx) {
+ Ok(h4::Packet { h4_type, payload }) => {
+ let mut request = PacketRequest::new();
+ let hci_packet = HCIPacket {
+ packet_type: EnumOrUnknown::from_i32(h4_type as i32),
+ packet: payload,
+ ..Default::default()
+ };
+ request.set_hci_packet(hci_packet);
+ let proto_bytes = request.write_to_bytes().unwrap();
+ ffi_transport::write_packet_request(stream_id, &proto_bytes);
+ }
+ Err(PacketError::IoError(e)) if e.kind() == ErrorKind::UnexpectedEof => {
+ info!("End reader connection with fd={}.", fd_rx);
+ break;
+ }
+ Err(e) => {
+ error!(
+ "End reader connection with fd={}. Failed to read \
+ hci control packet: {:?}",
+ fd_rx, e
+ );
+ break;
+ }
+ },
+ _ => {
+ error!("unknown control packet chip_kind: {:?}", kind);
+ break;
+ }
+ };
+ }
+ })
+ .unwrap()
+}
+
+// For connector.
+lazy_static! {
+ static ref CONNECTOR_FILES: Arc<RwLock<HashMap<u32, File>>> =
+ Arc::new(RwLock::new(HashMap::new()));
+}
+
+/// This function is called when a packet is received from the gRPC server.
+fn connector_grpc_read_callback(stream_id: u32, proto_bytes: &[u8]) {
+ let request = PacketRequest::parse_from_bytes(proto_bytes).unwrap();
+
+ let mut buffer = Vec::<u8>::with_capacity(request.hci_packet().packet.len() + 1);
+ buffer.push(request.hci_packet().packet_type.enum_value_or_default().value() as u8);
+ buffer.extend(&request.hci_packet().packet);
+
+ if let Some(mut file_in) = CONNECTOR_FILES.read().unwrap().get(&stream_id) {
+ if let Err(e) = file_in.write_all(&buffer[..]) {
+ error!("Failed to write: {}", e);
+ }
+ } else {
+ warn!("Unable to find file with stream_id {}", stream_id);
+ }
+}
+
+/// Read from grpc server and write back to file descriptor.
+fn connector_grpc_reader(chip_kind: u32, stream_id: u32, file_in: File) -> JoinHandle<()> {
+ info!("Connecting grpc reader for stream_id: {}", stream_id);
+ thread::Builder::new()
+ .name(format!("grpc_reader_{}", stream_id))
+ .spawn(move || {
+ {
+ let mut binding = CONNECTOR_FILES.write().unwrap();
+ if binding.contains_key(&stream_id) {
+ error!(
+ "register_connector: key already present for \
+ stream_id: {stream_id}"
+ );
+ }
+ binding.insert(stream_id, file_in);
+ }
+ if chip_kind != ChipKindEnum::BLUETOOTH as u32 {
+ warn!("Unable to register connector for chip type {}", chip_kind);
+ }
+ // Read packet from grpc and send to file_in.
+ ffi_transport::read_packet_response_loop(stream_id, connector_grpc_read_callback);
+
+ CONNECTOR_FILES.write().unwrap().remove(&stream_id);
+ })
+ .unwrap()
+}
+
+/// Create threads to forward file descriptors to another netsim daemon.
+pub fn run_fd_connector(startup_json: &String, server: &str) -> Result<(), String> {
+ info!("Running fd connector with {startup_json}");
+ let startup_info = match serde_json::from_str::<StartupInfo>(startup_json.as_str()) {
+ Ok(startup_info) => startup_info,
+ Err(e) => {
+ return Err(format!("Error parsing startup info: {:?}", e.to_string()));
+ }
+ };
+ let server = server.to_owned();
+
+ let chip_count = startup_info.devices.iter().map(|d| d.chips.len()).sum();
+ let mut handles = Vec::with_capacity(chip_count);
+
+ for device in startup_info.devices {
+ for chip in device.chips {
+ // Cf writes to fd_out and reads from fd_in
+ // SAFETY: Our caller promises that the file descriptors in the JSON are valid
+ // and open.
+ let file_in = unsafe { File::from_raw_fd(chip.fd_in as i32) };
+
+ let stream_id = ffi_transport::stream_packets(&server);
+ // Send out initial info of PacketRequest to grpc server.
+ let mut initial_request = PacketRequest::new();
+ initial_request.set_initial_info(ChipInfo {
+ name: device.name.clone(),
+ chip: MessageField::some(ChipProto {
+ kind: EnumOrUnknown::from_i32(chip.kind as i32),
+ id: chip.id.unwrap_or_default(),
+ manufacturer: chip.manufacturer.unwrap_or_default(),
+ product_name: chip.product_name.unwrap_or_default(),
+ address: chip.address.unwrap_or_default(),
+ ..Default::default()
+ }),
+ ..Default::default()
+ });
+ ffi_transport::write_packet_request(
+ stream_id,
+ &initial_request.write_to_bytes().unwrap(),
+ );
+ info!("Sent initial request to grpc for stream_id: {}", stream_id);
+
+ handles.push(connector_grpc_reader(chip.kind as u32, stream_id, file_in));
+
+ // TODO: switch to runtime.spawn once FIFOs are available in Tokio
+ // SAFETY: Our caller promises that the file descriptors in the JSON are valid
+ // and open.
+ handles.push(unsafe { connector_fd_reader(chip.fd_out as i32, chip.kind, stream_id) });
+ }
+ }
+ // Wait for all of them to complete.
+ for handle in handles {
+ // The `spawn` method returns a `JoinHandle`. A `JoinHandle` is
+ // a future, so we can wait for it using `block_on`.
+ // runtime.block_on(handle).unwrap();
+ // TODO: use runtime.block_on once FIFOs are available in Tokio
+ handle.join().unwrap();
+ }
+ Ok(())
+}
+
#[cfg(test)]
mod tests {
use super::StartupInfo;