Implement fd connector

Bug: 298189303
Test: m netsimd
Test: Connect phone device with wear device
      // cf_x86_64_phone-trunk_staging-userdebug
      launch_cvd
      // cf_gwear_x86-trunk_staging-userdebug
      launch_cvd --base_instance_num=2 --rootcanal-instance-num=1
Change-Id: I066266e9814c069f09844031b3f23c227de89a66
diff --git a/Android.bp b/Android.bp
index 426818e..fdf8153 100644
--- a/Android.bp
+++ b/Android.bp
@@ -102,7 +102,7 @@
 genrule {
     name: "netsim_daemon_cc",
     tools: ["cxxbridge"],
-    cmd: "$(location cxxbridge) $(in) >> $(out)",
+    cmd: "$(location cxxbridge) $(in) --cfg feature=\\\"cuttlefish\\\" >> $(out)",
     srcs: ["rust/daemon/src/ffi.rs"],
     out: ["netsim-daemon/src/ffi.rs.cc"],
 }
@@ -110,7 +110,7 @@
 genrule {
     name: "netsim_daemon_h",
     tools: ["cxxbridge"],
-    cmd: "$(location cxxbridge) $(in) --header >> $(out)",
+    cmd: "$(location cxxbridge) $(in) --cfg feature=\\\"cuttlefish\\\" --header >> $(out)",
     srcs: ["rust/daemon/src/ffi.rs"],
     out: ["netsim-daemon/src/ffi.rs.h"],
 }
diff --git a/rust/daemon/src/rust_main.rs b/rust/daemon/src/rust_main.rs
index d6fb1cf..2960a49 100644
--- a/rust/daemon/src/rust_main.rs
+++ b/rust/daemon/src/rust_main.rs
@@ -13,13 +13,17 @@
 // limitations under the License.
 
 use clap::Parser;
-use log::{error, info, warn};
+use log::warn;
+#[cfg(feature = "cuttlefish")]
+use log::{error, info};
 
 use netsim_common::util::netsim_logger;
 
 use crate::args::NetsimdArgs;
 use crate::ffi::ffi_util;
 use crate::service::{Service, ServiceParams};
+#[cfg(feature = "cuttlefish")]
+use netsim_common::util::os_utils::get_server_address;
 use std::ffi::{c_char, c_int};
 
 /// Wireless network simulator for android (and other) emulated devices.
@@ -65,22 +69,32 @@
     }
 
     match args.connector_instance {
+        #[cfg(feature = "cuttlefish")]
         Some(connector_instance) => run_netsimd_connector(args, connector_instance),
-        None => run_netsimd_primary(args),
+        _ => run_netsimd_primary(args),
     }
 }
 
-// Forwards packets to another netsim daemon.
+/// Forwards packets to another netsim daemon.
+#[cfg(feature = "cuttlefish")]
 fn run_netsimd_connector(args: NetsimdArgs, instance: u16) {
-    if args.fd_startup_str.is_none() {
-        error!("Failed to start netsimd forwarder, missing `-s` arg");
-        return;
-    }
-    if !ffi_util::is_netsimd_alive(instance) {
-        error!("Failed to start netsimd forwarder, no primary at {}", instance);
-        return;
-    }
+    let fd_startup = match args.fd_startup_str {
+        None => {
+            error!("Failed to start netsimd forwarder, missing `-s` arg");
+            return;
+        }
+        Some(fd_startup) => fd_startup,
+    };
+
     info!("Starting netsim daemon in forwarding mode");
+    // TODO: Make this function returns Result to use `?` instead of unwrap().
+    let server = get_server_address(instance)
+        .map(|port| format!("localhost:{}", port))
+        .ok_or_else(|| warn!("Unable to find server address for instance {}", instance))
+        .unwrap();
+    crate::transport::fd::run_fd_connector(&fd_startup, server.as_str())
+        .map_err(|e| error!("Failed to run fd connector: {}", e))
+        .unwrap();
 }
 
 fn run_netsimd_primary(netsimd_args: NetsimdArgs) {
diff --git a/rust/daemon/src/transport/fd.rs b/rust/daemon/src/transport/fd.rs
index 710fc9e..c59a4c1 100644
--- a/rust/daemon/src/transport/fd.rs
+++ b/rust/daemon/src/transport/fd.rs
@@ -20,13 +20,21 @@
 use super::h4::PacketError;
 use super::uci;
 use crate::devices::devices_handler::{add_chip, remove_chip};
+use crate::ffi::ffi_transport;
+use lazy_static::lazy_static;
 use log::{error, info, warn};
 use netsim_proto::common::ChipKind;
+use netsim_proto::hci_packet::HCIPacket;
 use netsim_proto::model::ChipCreate;
+use netsim_proto::packet_streamer::PacketRequest;
+use netsim_proto::startup::{Chip as ChipProto, ChipInfo};
+use protobuf::{Enum, EnumOrUnknown, Message, MessageField};
 use serde::{Deserialize, Serialize};
+use std::collections::HashMap;
 use std::fs::File;
 use std::io::{ErrorKind, Write};
 use std::os::fd::FromRawFd;
+use std::sync::{Arc, RwLock};
 use std::thread::JoinHandle;
 use std::{fmt, thread};
 
@@ -238,6 +246,181 @@
         .unwrap();
 }
 
+/// Read from the raw fd and pass to the grpc server.
+///
+/// # Safety
+///
+/// `fd_rx` must be a valid and open file descriptor.
+unsafe fn connector_fd_reader(fd_rx: i32, kind: ChipKindEnum, stream_id: u32) -> JoinHandle<()> {
+    info!("Connecting fd reader for stream_id: {}, fd_rx: {}", stream_id, fd_rx);
+    thread::Builder::new()
+        .name(format!("fd_connector_{}_{}", stream_id, fd_rx))
+        .spawn(move || {
+            // SAFETY: The caller promises that `fd_rx` is valid and open.
+            let mut rx = unsafe { File::from_raw_fd(fd_rx) };
+            info!("Handling fd={} for kind: {:?} stream_id: {:?}", fd_rx, kind, stream_id);
+
+            loop {
+                match kind {
+                    ChipKindEnum::UWB => match uci::read_uci_packet(&mut rx) {
+                        Err(e) => {
+                            error!(
+                                "End reader connection with fd={}. Failed to read \
+                                     uci control packet: {:?}",
+                                fd_rx, e
+                            );
+                            break;
+                        }
+                        Ok(uci::Packet { payload: _ }) => {
+                            // TODO: Compose PacketRequest.
+                        }
+                    },
+                    ChipKindEnum::BLUETOOTH => match h4::read_h4_packet(&mut rx) {
+                        Ok(h4::Packet { h4_type, payload }) => {
+                            let mut request = PacketRequest::new();
+                            let hci_packet = HCIPacket {
+                                packet_type: EnumOrUnknown::from_i32(h4_type as i32),
+                                packet: payload,
+                                ..Default::default()
+                            };
+                            request.set_hci_packet(hci_packet);
+                            let proto_bytes = request.write_to_bytes().unwrap();
+                            ffi_transport::write_packet_request(stream_id, &proto_bytes);
+                        }
+                        Err(PacketError::IoError(e)) if e.kind() == ErrorKind::UnexpectedEof => {
+                            info!("End reader connection with fd={}.", fd_rx);
+                            break;
+                        }
+                        Err(e) => {
+                            error!(
+                                "End reader connection with fd={}. Failed to read \
+                                     hci control packet: {:?}",
+                                fd_rx, e
+                            );
+                            break;
+                        }
+                    },
+                    _ => {
+                        error!("unknown control packet chip_kind: {:?}", kind);
+                        break;
+                    }
+                };
+            }
+        })
+        .unwrap()
+}
+
+// For connector.
+lazy_static! {
+    static ref CONNECTOR_FILES: Arc<RwLock<HashMap<u32, File>>> =
+        Arc::new(RwLock::new(HashMap::new()));
+}
+
+/// This function is called when a packet is received from the gRPC server.
+fn connector_grpc_read_callback(stream_id: u32, proto_bytes: &[u8]) {
+    let request = PacketRequest::parse_from_bytes(proto_bytes).unwrap();
+
+    let mut buffer = Vec::<u8>::with_capacity(request.hci_packet().packet.len() + 1);
+    buffer.push(request.hci_packet().packet_type.enum_value_or_default().value() as u8);
+    buffer.extend(&request.hci_packet().packet);
+
+    if let Some(mut file_in) = CONNECTOR_FILES.read().unwrap().get(&stream_id) {
+        if let Err(e) = file_in.write_all(&buffer[..]) {
+            error!("Failed to write: {}", e);
+        }
+    } else {
+        warn!("Unable to find file with stream_id {}", stream_id);
+    }
+}
+
+/// Read from grpc server and write back to file descriptor.
+fn connector_grpc_reader(chip_kind: u32, stream_id: u32, file_in: File) -> JoinHandle<()> {
+    info!("Connecting grpc reader for stream_id: {}", stream_id);
+    thread::Builder::new()
+        .name(format!("grpc_reader_{}", stream_id))
+        .spawn(move || {
+            {
+                let mut binding = CONNECTOR_FILES.write().unwrap();
+                if binding.contains_key(&stream_id) {
+                    error!(
+                        "register_connector: key already present for \
+                                 stream_id: {stream_id}"
+                    );
+                }
+                binding.insert(stream_id, file_in);
+            }
+            if chip_kind != ChipKindEnum::BLUETOOTH as u32 {
+                warn!("Unable to register connector for chip type {}", chip_kind);
+            }
+            // Read packet from grpc and send to file_in.
+            ffi_transport::read_packet_response_loop(stream_id, connector_grpc_read_callback);
+
+            CONNECTOR_FILES.write().unwrap().remove(&stream_id);
+        })
+        .unwrap()
+}
+
+/// Create threads to forward file descriptors to another netsim daemon.
+pub fn run_fd_connector(startup_json: &String, server: &str) -> Result<(), String> {
+    info!("Running fd connector with {startup_json}");
+    let startup_info = match serde_json::from_str::<StartupInfo>(startup_json.as_str()) {
+        Ok(startup_info) => startup_info,
+        Err(e) => {
+            return Err(format!("Error parsing startup info: {:?}", e.to_string()));
+        }
+    };
+    let server = server.to_owned();
+
+    let chip_count = startup_info.devices.iter().map(|d| d.chips.len()).sum();
+    let mut handles = Vec::with_capacity(chip_count);
+
+    for device in startup_info.devices {
+        for chip in device.chips {
+            // Cf writes to fd_out and reads from fd_in
+            // SAFETY: Our caller promises that the file descriptors in the JSON are valid
+            // and open.
+            let file_in = unsafe { File::from_raw_fd(chip.fd_in as i32) };
+
+            let stream_id = ffi_transport::stream_packets(&server);
+            // Send out initial info of PacketRequest to grpc server.
+            let mut initial_request = PacketRequest::new();
+            initial_request.set_initial_info(ChipInfo {
+                name: device.name.clone(),
+                chip: MessageField::some(ChipProto {
+                    kind: EnumOrUnknown::from_i32(chip.kind as i32),
+                    id: chip.id.unwrap_or_default(),
+                    manufacturer: chip.manufacturer.unwrap_or_default(),
+                    product_name: chip.product_name.unwrap_or_default(),
+                    address: chip.address.unwrap_or_default(),
+                    ..Default::default()
+                }),
+                ..Default::default()
+            });
+            ffi_transport::write_packet_request(
+                stream_id,
+                &initial_request.write_to_bytes().unwrap(),
+            );
+            info!("Sent initial request to grpc for stream_id: {}", stream_id);
+
+            handles.push(connector_grpc_reader(chip.kind as u32, stream_id, file_in));
+
+            // TODO: switch to runtime.spawn once FIFOs are available in Tokio
+            // SAFETY: Our caller promises that the file descriptors in the JSON are valid
+            // and open.
+            handles.push(unsafe { connector_fd_reader(chip.fd_out as i32, chip.kind, stream_id) });
+        }
+    }
+    // Wait for all of them to complete.
+    for handle in handles {
+        // The `spawn` method returns a `JoinHandle`. A `JoinHandle` is
+        // a future, so we can wait for it using `block_on`.
+        // runtime.block_on(handle).unwrap();
+        // TODO: use runtime.block_on once FIFOs are available in Tokio
+        handle.join().unwrap();
+    }
+    Ok(())
+}
+
 #[cfg(test)]
 mod tests {
     use super::StartupInfo;