| #![cfg(feature = "full")] |
| #![cfg(all(windows))] |
| |
| use std::io; |
| use std::mem; |
| use std::os::windows::io::AsRawHandle; |
| use std::time::Duration; |
| use tokio::io::AsyncWriteExt; |
| use tokio::net::windows::named_pipe::{ClientOptions, PipeMode, ServerOptions}; |
| use tokio::time; |
| use windows_sys::Win32::Foundation::{ERROR_NO_DATA, ERROR_PIPE_BUSY, NO_ERROR, UNICODE_STRING}; |
| |
| #[tokio::test] |
| async fn test_named_pipe_client_drop() -> io::Result<()> { |
| const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-client-drop"; |
| |
| let mut server = ServerOptions::new().create(PIPE_NAME)?; |
| |
| assert_eq!(num_instances("test-named-pipe-client-drop")?, 1); |
| |
| let client = ClientOptions::new().open(PIPE_NAME)?; |
| |
| server.connect().await?; |
| drop(client); |
| |
| // instance will be broken because client is gone |
| match server.write_all(b"ping").await { |
| Err(e) if e.raw_os_error() == Some(ERROR_NO_DATA as i32) => (), |
| x => panic!("{:?}", x), |
| } |
| |
| Ok(()) |
| } |
| |
| #[tokio::test] |
| async fn test_named_pipe_single_client() -> io::Result<()> { |
| use tokio::io::{AsyncBufReadExt as _, BufReader}; |
| |
| const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-single-client"; |
| |
| let server = ServerOptions::new().create(PIPE_NAME)?; |
| |
| let server = tokio::spawn(async move { |
| // Note: we wait for a client to connect. |
| server.connect().await?; |
| |
| let mut server = BufReader::new(server); |
| |
| let mut buf = String::new(); |
| server.read_line(&mut buf).await?; |
| server.write_all(b"pong\n").await?; |
| Ok::<_, io::Error>(buf) |
| }); |
| |
| let client = tokio::spawn(async move { |
| let client = ClientOptions::new().open(PIPE_NAME)?; |
| |
| let mut client = BufReader::new(client); |
| |
| let mut buf = String::new(); |
| client.write_all(b"ping\n").await?; |
| client.read_line(&mut buf).await?; |
| Ok::<_, io::Error>(buf) |
| }); |
| |
| let (server, client) = tokio::try_join!(server, client)?; |
| |
| assert_eq!(server?, "ping\n"); |
| assert_eq!(client?, "pong\n"); |
| |
| Ok(()) |
| } |
| |
| #[tokio::test] |
| async fn test_named_pipe_multi_client() -> io::Result<()> { |
| use tokio::io::{AsyncBufReadExt as _, BufReader}; |
| |
| const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-multi-client"; |
| const N: usize = 10; |
| |
| // The first server needs to be constructed early so that clients can |
| // be correctly connected. Otherwise calling .wait will cause the client to |
| // error. |
| let mut server = ServerOptions::new().create(PIPE_NAME)?; |
| |
| let server = tokio::spawn(async move { |
| for _ in 0..N { |
| // Wait for client to connect. |
| server.connect().await?; |
| let mut inner = BufReader::new(server); |
| |
| // Construct the next server to be connected before sending the one |
| // we already have of onto a task. This ensures that the server |
| // isn't closed (after it's done in the task) before a new one is |
| // available. Otherwise the client might error with |
| // `io::ErrorKind::NotFound`. |
| server = ServerOptions::new().create(PIPE_NAME)?; |
| |
| let _ = tokio::spawn(async move { |
| let mut buf = String::new(); |
| inner.read_line(&mut buf).await?; |
| inner.write_all(b"pong\n").await?; |
| inner.flush().await?; |
| Ok::<_, io::Error>(()) |
| }); |
| } |
| |
| Ok::<_, io::Error>(()) |
| }); |
| |
| let mut clients = Vec::new(); |
| |
| for _ in 0..N { |
| clients.push(tokio::spawn(async move { |
| // This showcases a generic connect loop. |
| // |
| // We immediately try to create a client, if it's not found or the |
| // pipe is busy we use the specialized wait function on the client |
| // builder. |
| let client = loop { |
| match ClientOptions::new().open(PIPE_NAME) { |
| Ok(client) => break client, |
| Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (), |
| Err(e) if e.kind() == io::ErrorKind::NotFound => (), |
| Err(e) => return Err(e), |
| } |
| |
| // Wait for a named pipe to become available. |
| time::sleep(Duration::from_millis(10)).await; |
| }; |
| |
| let mut client = BufReader::new(client); |
| |
| let mut buf = String::new(); |
| client.write_all(b"ping\n").await?; |
| client.flush().await?; |
| client.read_line(&mut buf).await?; |
| Ok::<_, io::Error>(buf) |
| })); |
| } |
| |
| for client in clients { |
| let result = client.await?; |
| assert_eq!(result?, "pong\n"); |
| } |
| |
| server.await??; |
| Ok(()) |
| } |
| |
| #[tokio::test] |
| async fn test_named_pipe_multi_client_ready() -> io::Result<()> { |
| use tokio::io::Interest; |
| |
| const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-multi-client-ready"; |
| const N: usize = 10; |
| |
| // The first server needs to be constructed early so that clients can |
| // be correctly connected. Otherwise calling .wait will cause the client to |
| // error. |
| let mut server = ServerOptions::new().create(PIPE_NAME)?; |
| |
| let server = tokio::spawn(async move { |
| for _ in 0..N { |
| // Wait for client to connect. |
| server.connect().await?; |
| |
| let inner_server = server; |
| |
| // Construct the next server to be connected before sending the one |
| // we already have of onto a task. This ensures that the server |
| // isn't closed (after it's done in the task) before a new one is |
| // available. Otherwise the client might error with |
| // `io::ErrorKind::NotFound`. |
| server = ServerOptions::new().create(PIPE_NAME)?; |
| |
| let _ = tokio::spawn(async move { |
| let server = inner_server; |
| |
| { |
| let mut read_buf = [0u8; 5]; |
| let mut read_buf_cursor = 0; |
| |
| loop { |
| server.readable().await?; |
| |
| let buf = &mut read_buf[read_buf_cursor..]; |
| |
| match server.try_read(buf) { |
| Ok(n) => { |
| read_buf_cursor += n; |
| |
| if read_buf_cursor == read_buf.len() { |
| break; |
| } |
| } |
| Err(e) if e.kind() == io::ErrorKind::WouldBlock => { |
| continue; |
| } |
| Err(e) => { |
| return Err(e); |
| } |
| } |
| } |
| }; |
| |
| { |
| let write_buf = b"pong\n"; |
| let mut write_buf_cursor = 0; |
| |
| loop { |
| server.writable().await?; |
| let buf = &write_buf[write_buf_cursor..]; |
| |
| match server.try_write(buf) { |
| Ok(n) => { |
| write_buf_cursor += n; |
| |
| if write_buf_cursor == write_buf.len() { |
| break; |
| } |
| } |
| Err(e) if e.kind() == io::ErrorKind::WouldBlock => { |
| continue; |
| } |
| Err(e) => { |
| return Err(e); |
| } |
| } |
| } |
| } |
| |
| Ok::<_, io::Error>(()) |
| }); |
| } |
| |
| Ok::<_, io::Error>(()) |
| }); |
| |
| let mut clients = Vec::new(); |
| |
| for _ in 0..N { |
| clients.push(tokio::spawn(async move { |
| // This showcases a generic connect loop. |
| // |
| // We immediately try to create a client, if it's not found or the |
| // pipe is busy we use the specialized wait function on the client |
| // builder. |
| let client = loop { |
| match ClientOptions::new().open(PIPE_NAME) { |
| Ok(client) => break client, |
| Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (), |
| Err(e) if e.kind() == io::ErrorKind::NotFound => (), |
| Err(e) => return Err(e), |
| } |
| |
| // Wait for a named pipe to become available. |
| time::sleep(Duration::from_millis(10)).await; |
| }; |
| |
| let mut read_buf = [0u8; 5]; |
| let mut read_buf_cursor = 0; |
| let write_buf = b"ping\n"; |
| let mut write_buf_cursor = 0; |
| |
| loop { |
| let mut interest = Interest::READABLE; |
| if write_buf_cursor < write_buf.len() { |
| interest |= Interest::WRITABLE; |
| } |
| |
| let ready = client.ready(interest).await?; |
| |
| if ready.is_readable() { |
| let buf = &mut read_buf[read_buf_cursor..]; |
| |
| match client.try_read(buf) { |
| Ok(n) => { |
| read_buf_cursor += n; |
| |
| if read_buf_cursor == read_buf.len() { |
| break; |
| } |
| } |
| Err(e) if e.kind() == io::ErrorKind::WouldBlock => { |
| continue; |
| } |
| Err(e) => { |
| return Err(e); |
| } |
| } |
| } |
| |
| if ready.is_writable() { |
| let buf = &write_buf[write_buf_cursor..]; |
| |
| if buf.is_empty() { |
| continue; |
| } |
| |
| match client.try_write(buf) { |
| Ok(n) => { |
| write_buf_cursor += n; |
| } |
| Err(e) if e.kind() == io::ErrorKind::WouldBlock => { |
| continue; |
| } |
| Err(e) => { |
| return Err(e); |
| } |
| } |
| } |
| } |
| |
| let buf = String::from_utf8_lossy(&read_buf).into_owned(); |
| |
| Ok::<_, io::Error>(buf) |
| })); |
| } |
| |
| for client in clients { |
| let result = client.await?; |
| assert_eq!(result?, "pong\n"); |
| } |
| |
| server.await??; |
| Ok(()) |
| } |
| |
| // This tests what happens when a client tries to disconnect. |
| #[tokio::test] |
| async fn test_named_pipe_mode_message() -> io::Result<()> { |
| const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-mode-message"; |
| |
| let server = ServerOptions::new() |
| .pipe_mode(PipeMode::Message) |
| .create(PIPE_NAME)?; |
| |
| let _ = ClientOptions::new().open(PIPE_NAME)?; |
| server.connect().await?; |
| Ok(()) |
| } |
| |
| // This tests `NamedPipeServer::connect` with various access settings. |
| #[tokio::test] |
| async fn test_named_pipe_access() -> io::Result<()> { |
| const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-access"; |
| |
| for (inb, outb) in [(true, true), (true, false), (false, true)] { |
| let (tx, rx) = tokio::sync::oneshot::channel(); |
| let server = tokio::spawn(async move { |
| let s = ServerOptions::new() |
| .access_inbound(inb) |
| .access_outbound(outb) |
| .create(PIPE_NAME)?; |
| let mut connect_fut = tokio_test::task::spawn(s.connect()); |
| assert!(connect_fut.poll().is_pending()); |
| tx.send(()).unwrap(); |
| connect_fut.await |
| }); |
| |
| // Wait for the server to call connect. |
| rx.await.unwrap(); |
| let _ = ClientOptions::new().read(outb).write(inb).open(PIPE_NAME)?; |
| |
| server.await??; |
| } |
| Ok(()) |
| } |
| |
| fn num_instances(pipe_name: impl AsRef<str>) -> io::Result<u32> { |
| use ntapi::ntioapi; |
| |
| let mut name = pipe_name.as_ref().encode_utf16().collect::<Vec<_>>(); |
| let mut name = UNICODE_STRING { |
| Length: (name.len() * mem::size_of::<u16>()) as u16, |
| MaximumLength: (name.len() * mem::size_of::<u16>()) as u16, |
| Buffer: name.as_mut_ptr(), |
| }; |
| let root = std::fs::File::open(r"\\.\Pipe\")?; |
| let mut io_status_block = unsafe { mem::zeroed() }; |
| let mut file_directory_information = [0_u8; 1024]; |
| |
| let status = unsafe { |
| ntioapi::NtQueryDirectoryFile( |
| root.as_raw_handle(), |
| std::ptr::null_mut(), |
| None, |
| std::ptr::null_mut(), |
| &mut io_status_block, |
| &mut file_directory_information as *mut _ as *mut _, |
| 1024, |
| ntioapi::FileDirectoryInformation, |
| 0, |
| &mut name as *mut _ as _, |
| 0, |
| ) |
| }; |
| |
| if status as u32 != NO_ERROR { |
| return Err(io::Error::last_os_error()); |
| } |
| |
| let info = unsafe { |
| mem::transmute::<_, &ntioapi::FILE_DIRECTORY_INFORMATION>(&file_directory_information) |
| }; |
| let raw_name = unsafe { |
| std::slice::from_raw_parts( |
| info.FileName.as_ptr(), |
| info.FileNameLength as usize / mem::size_of::<u16>(), |
| ) |
| }; |
| let name = String::from_utf16(raw_name).unwrap(); |
| let num_instances = unsafe { *info.EndOfFile.QuadPart() }; |
| |
| assert_eq!(name, pipe_name.as_ref()); |
| |
| Ok(num_instances as u32) |
| } |