Refactor the test structure to make it more extensible

1) make parts of the code to communicate with channels
2) use log library

Bug: 197333653
Test: works as expected on linux
Change-Id: Iaef58f45653b439322af3d810fc3aa7e17f9384c
diff --git a/src/rust/rootcanal/Android.bp b/src/rust/rootcanal/Android.bp
index 6f7c746..17f6972 100644
--- a/src/rust/rootcanal/Android.bp
+++ b/src/rust/rootcanal/Android.bp
@@ -16,6 +16,7 @@
         "libnfc_packets",
         "libbytes",
         "libthiserror",
+	"liblogger",
         "liblog_rust",
         "libtokio",
     ],
diff --git a/src/rust/rootcanal/main.rs b/src/rust/rootcanal/main.rs
index c4ac5da..1b05405 100644
--- a/src/rust/rootcanal/main.rs
+++ b/src/rust/rootcanal/main.rs
@@ -38,6 +38,10 @@
 
 #[tokio::main]
 async fn main() -> io::Result<()> {
+    logger::init(
+        logger::Config::default().with_tag_on_device("nfc-rc").with_min_level(log::Level::Trace),
+    );
+
     let listener = TcpListener::bind("127.0.0.1:54323").await?;
 
     let (mut sock, _) = listener.accept().await?;
@@ -66,11 +70,11 @@
     let mut buffer = BytesMut::with_capacity(1024);
     let pkt_type = reader.read_u8().await?;
     let len: usize = reader.read_u16().await?.into();
-    eprintln!("packet {} received len={}", &pkt_type, &len);
+    log::debug!("packet {} received len={}", &pkt_type, &len);
     buffer.resize(len, 0);
     reader.read_exact(&mut buffer).await?;
     let frozen = buffer.freeze();
-    eprintln!("{:?}", &frozen);
+    log::debug!("{:?}", &frozen);
     if pkt_type == NciMsgType::Command as u8 {
         match NciPacket::parse(&frozen) {
             Ok(p) => command_response(writer, p).await,
@@ -148,6 +152,6 @@
     data.extend(b);
     let frozen = data.freeze();
     writer.write_all(frozen.as_ref()).await?;
-    println!("command written");
+    log::debug!("command written");
     Ok(())
 }
diff --git a/src/rust/test/Android.bp b/src/rust/test/Android.bp
index df1a7f1..06a10fe 100644
--- a/src/rust/test/Android.bp
+++ b/src/rust/test/Android.bp
@@ -16,6 +16,7 @@
         "libnfc_packets",
         "libbytes",
         "libthiserror",
+	"liblogger",
         "liblog_rust",
         "libtokio",
     ],
diff --git a/src/rust/test/main.rs b/src/rust/test/main.rs
index 4337d05..f181122 100644
--- a/src/rust/test/main.rs
+++ b/src/rust/test/main.rs
@@ -2,17 +2,20 @@
 //! This connects to "rootcanal" which provides a simulated
 //! Nfc chip as well as a simulated environment.
 
-use bytes::{BufMut, Bytes, BytesMut};
-use nfc_packets::nci::NciChild::{InitResponse, ResetNotification, ResetResponse};
+use bytes::{BufMut, BytesMut};
+// use bytes::Bytes;
+// use nfc_hal::internal::RawHal;
+// use nfc_packets::nci::NciChild::{InitResponse, ResetNotification, ResetResponse};
 use nfc_packets::nci::ResetCommandBuilder;
 use nfc_packets::nci::{NciMsgType, PacketBoundaryFlag, ResetType};
 use nfc_packets::nci::{NciPacket, Packet};
 use std::convert::TryInto;
-use std::io::{self, ErrorKind};
+// use std::io::{self, ErrorKind};
 use thiserror::Error;
 use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
 use tokio::net::TcpStream;
-use tokio::sync::mpsc::{self, Sender};
+use tokio::select;
+use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
 
 /// Result type
 type Result<T> = std::result::Result<T, CommError>;
@@ -20,7 +23,9 @@
 #[derive(Debug, Error)]
 enum CommError {
     #[error("Communication error")]
-    IoError(#[from] io::Error),
+    IoError(#[from] tokio::io::Error),
+    #[error("Channel error")]
+    SendError(#[from] tokio::sync::mpsc::error::SendError<nfc_packets::nci::NciPacket>),
     #[error("Packet did not parse correctly")]
     InvalidPacket,
     #[error("Packet type not supported")]
@@ -29,86 +34,65 @@
 
 #[tokio::main]
 async fn main() -> Result<()> {
-    let (fin_tx, mut fin_rx) = mpsc::channel(1);
-    let (reader, mut writer) = TcpStream::connect("127.0.0.1:54323")
+    logger::init(
+        logger::Config::default().with_tag_on_device("lnfc").with_min_level(log::Level::Trace),
+    );
+    let (in_tx, in_rx) = unbounded_channel(); // upstream channel
+    let (out_tx, out_rx) = unbounded_channel(); // downstream channel
+    let out_tx_cmd = out_tx.clone();
+
+    let (reader, writer) = TcpStream::connect("127.0.0.1:54323")
         .await
         .expect("unable to create stream to rootcanal")
         .into_split();
 
-    let mut reader = BufReader::new(reader);
-    send_reset(&mut writer).await?;
-    let task = tokio::spawn(async move {
-        loop {
-            if let Err(e) = dispatch_incoming(&mut reader, &fin_tx).await {
-                match e {
-                    CommError::IoError(e) if e.kind() == ErrorKind::UnexpectedEof => break,
-                    _ => eprintln!("Processing error: {:?}", e),
-                }
-            }
-        }
-    });
-    let msg = fin_rx.recv().await.unwrap();
-    writer.write_all(msg.as_ref()).await?;
+    let reader = BufReader::new(reader);
+    tokio::spawn(dispatch_incoming(in_tx, reader));
+    tokio::spawn(dispatch_outgoing(out_rx, writer));
+    let task = tokio::spawn(command_response(out_tx, in_rx));
+    send_reset(out_tx_cmd).await?;
     task.await.unwrap();
     Ok(())
 }
 
 /// Send NCI events received from the HAL to the NCI layer
-async fn dispatch_incoming<R>(reader: &mut R, ch: &Sender<Bytes>) -> Result<()>
+async fn dispatch_incoming<R>(in_tx: UnboundedSender<NciPacket>, mut reader: R) -> Result<()>
 where
     R: AsyncReadExt + Unpin,
 {
-    let mut buffer = BytesMut::with_capacity(1024);
-    let t = reader.read_u8().await?;
-    let len: usize = reader.read_u16().await?.into();
-    eprintln!("packet {} received len={}", &t, &len);
-    buffer.resize(len, 0);
-    reader.read_exact(&mut buffer).await?;
-    let frozen = buffer.freeze();
-    eprintln!("{:?}", &frozen);
-    if t == NciMsgType::Response as u8 || t == NciMsgType::Notification as u8 {
-        match NciPacket::parse(&frozen) {
-            Ok(p) => command_response(p, ch).await,
-            Err(_) => Err(CommError::InvalidPacket),
+    loop {
+        let mut buffer = BytesMut::with_capacity(1024);
+        let t = reader.read_u8().await?;
+        let len: usize = reader.read_u16().await?.into();
+        log::debug!("packet {} received len={}", &t, &len);
+        buffer.resize(len, 0);
+        reader.read_exact(&mut buffer).await?;
+        let frozen = buffer.freeze();
+        log::debug!("{:?}", &frozen);
+        if t == NciMsgType::Response as u8 || t == NciMsgType::Notification as u8 {
+            match NciPacket::parse(&frozen) {
+                Ok(p) => in_tx.send(p).unwrap(),
+                Err(_) => log::error!("{}", CommError::InvalidPacket),
+            }
+        } else {
+            log::error!("{}", CommError::UnsupportedPacket)
         }
-    } else {
-        Err(CommError::UnsupportedPacket)
     }
 }
 
-async fn command_response(rsp: NciPacket, ch: &Sender<Bytes>) -> Result<()> {
-    let id = match rsp.specialize() {
-        ResetResponse(_) => "Reset",
-        InitResponse(_) => "Init",
-        ResetNotification(_) => {
-            ch.send(get_termination_msg()).await.unwrap();
-            "Reset NTF"
-        }
-        _ => "error",
-    };
-    println!("{} - response received", id);
-    Ok(())
-}
-
-async fn send_reset<W>(out: &mut W) -> Result<()>
+/// Send commands received from the NCI later to rootcanal
+async fn dispatch_outgoing<W>(mut out_rx: UnboundedReceiver<NciPacket>, mut writer: W) -> Result<()>
 where
     W: AsyncWriteExt + Unpin,
 {
-    let pbf = PacketBoundaryFlag::CompleteOrFinal;
-    write_nci(
-        out,
-        (ResetCommandBuilder { pbf, reset_type: ResetType::ResetConfig }).build().into(),
-    )
-    .await?;
-    Ok(())
-}
+    loop {
+        select! {
+            Some(cmd) = out_rx.recv() => write_nci(&mut writer, cmd).await?,
+            else => break,
+        }
+    }
 
-fn get_termination_msg() -> Bytes {
-    const TERMINATION: u8 = 4;
-    let mut data = BytesMut::with_capacity(3);
-    data.put_u8(TERMINATION);
-    data.put_u16(0u16);
-    data.freeze()
+    Ok(())
 }
 
 async fn write_nci<W>(writer: &mut W, cmd: NciPacket) -> Result<()>
@@ -122,6 +106,24 @@
     data.put_u16(b.len().try_into().unwrap());
     data.extend(b);
     writer.write_all(&data[..]).await?;
-    println!("Reset command is sent");
+    log::debug!("Reset command is sent");
+    Ok(())
+}
+
+async fn command_response(
+    _out_tx: UnboundedSender<NciPacket>,
+    mut in_rx: UnboundedReceiver<NciPacket>,
+) {
+    loop {
+        select! {
+            Some(cmd) = in_rx.recv() => log::debug!("{} - response received", cmd.get_op()),
+            else => break,
+        }
+    }
+}
+
+async fn send_reset(out: UnboundedSender<NciPacket>) -> Result<()> {
+    let pbf = PacketBoundaryFlag::CompleteOrFinal;
+    out.send((ResetCommandBuilder { pbf, reset_type: ResetType::ResetConfig }).build().into())?;
     Ok(())
 }