| #![warn(rust_2018_idioms)] |
| #![cfg(feature = "full")] |
| |
| use tokio::io::{duplex, AsyncReadExt, AsyncWriteExt}; |
| |
| #[tokio::test] |
| async fn ping_pong() { |
| let (mut a, mut b) = duplex(32); |
| |
| let mut buf = [0u8; 4]; |
| |
| a.write_all(b"ping").await.unwrap(); |
| b.read_exact(&mut buf).await.unwrap(); |
| assert_eq!(&buf, b"ping"); |
| |
| b.write_all(b"pong").await.unwrap(); |
| a.read_exact(&mut buf).await.unwrap(); |
| assert_eq!(&buf, b"pong"); |
| } |
| |
| #[tokio::test] |
| async fn across_tasks() { |
| let (mut a, mut b) = duplex(32); |
| |
| let t1 = tokio::spawn(async move { |
| a.write_all(b"ping").await.unwrap(); |
| let mut buf = [0u8; 4]; |
| a.read_exact(&mut buf).await.unwrap(); |
| assert_eq!(&buf, b"pong"); |
| }); |
| |
| let t2 = tokio::spawn(async move { |
| let mut buf = [0u8; 4]; |
| b.read_exact(&mut buf).await.unwrap(); |
| assert_eq!(&buf, b"ping"); |
| b.write_all(b"pong").await.unwrap(); |
| }); |
| |
| t1.await.unwrap(); |
| t2.await.unwrap(); |
| } |
| |
| #[tokio::test] |
| async fn disconnect() { |
| let (mut a, mut b) = duplex(32); |
| |
| let t1 = tokio::spawn(async move { |
| a.write_all(b"ping").await.unwrap(); |
| // and dropped |
| }); |
| |
| let t2 = tokio::spawn(async move { |
| let mut buf = [0u8; 32]; |
| let n = b.read(&mut buf).await.unwrap(); |
| assert_eq!(&buf[..n], b"ping"); |
| |
| let n = b.read(&mut buf).await.unwrap(); |
| assert_eq!(n, 0); |
| }); |
| |
| t1.await.unwrap(); |
| t2.await.unwrap(); |
| } |
| |
| #[tokio::test] |
| #[cfg(not(target_os = "android"))] |
| async fn disconnect_reader() { |
| let (a, mut b) = duplex(2); |
| |
| let t1 = tokio::spawn(async move { |
| // this will block, as not all data fits into duplex |
| b.write_all(b"ping").await.unwrap_err(); |
| }); |
| |
| let t2 = tokio::spawn(async move { |
| // here we drop the reader side, and we expect the writer in the other |
| // task to exit with an error |
| drop(a); |
| }); |
| |
| t2.await.unwrap(); |
| t1.await.unwrap(); |
| } |
| |
| #[tokio::test] |
| async fn max_write_size() { |
| let (mut a, mut b) = duplex(32); |
| |
| let t1 = tokio::spawn(async move { |
| let n = a.write(&[0u8; 64]).await.unwrap(); |
| assert_eq!(n, 32); |
| let n = a.write(&[0u8; 64]).await.unwrap(); |
| assert_eq!(n, 4); |
| }); |
| |
| let mut buf = [0u8; 4]; |
| b.read_exact(&mut buf).await.unwrap(); |
| |
| t1.await.unwrap(); |
| |
| // drop b only after task t1 finishes writing |
| drop(b); |
| } |
| |
| #[tokio::test] |
| async fn duplex_is_cooperative() { |
| let (mut tx, mut rx) = tokio::io::duplex(1024 * 8); |
| |
| tokio::select! { |
| biased; |
| |
| _ = async { |
| loop { |
| let buf = [3u8; 4096]; |
| tx.write_all(&buf).await.unwrap(); |
| let mut buf = [0u8; 4096]; |
| let _ = rx.read(&mut buf).await.unwrap(); |
| } |
| } => {}, |
| _ = tokio::task::yield_now() => {} |
| } |
| } |