| // Copyright 2022 The ChromiumOS Authors |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use std::ffi::CString; |
| use std::fs::OpenOptions; |
| use std::io; |
| use std::io::Result; |
| use std::mem; |
| use std::os::windows::fs::OpenOptionsExt; |
| use std::process; |
| use std::ptr; |
| use std::sync::atomic::AtomicUsize; |
| use std::sync::atomic::Ordering; |
| |
| use rand::Rng; |
| use serde::Deserialize; |
| use serde::Serialize; |
| use win_util::fail_if_zero; |
| use win_util::SecurityAttributes; |
| use win_util::SelfRelativeSecurityDescriptor; |
| use winapi::shared::minwindef::DWORD; |
| use winapi::shared::minwindef::FALSE; |
| use winapi::shared::minwindef::TRUE; |
| use winapi::shared::winerror::ERROR_IO_INCOMPLETE; |
| use winapi::shared::winerror::ERROR_IO_PENDING; |
| use winapi::shared::winerror::ERROR_NO_DATA; |
| use winapi::shared::winerror::ERROR_PIPE_CONNECTED; |
| use winapi::um::errhandlingapi::GetLastError; |
| use winapi::um::fileapi::FlushFileBuffers; |
| use winapi::um::handleapi::INVALID_HANDLE_VALUE; |
| use winapi::um::ioapiset::CancelIoEx; |
| use winapi::um::ioapiset::GetOverlappedResult; |
| use winapi::um::minwinbase::OVERLAPPED; |
| use winapi::um::namedpipeapi::ConnectNamedPipe; |
| use winapi::um::namedpipeapi::DisconnectNamedPipe; |
| use winapi::um::namedpipeapi::GetNamedPipeInfo; |
| use winapi::um::namedpipeapi::PeekNamedPipe; |
| use winapi::um::namedpipeapi::SetNamedPipeHandleState; |
| use winapi::um::winbase::CreateNamedPipeA; |
| use winapi::um::winbase::FILE_FLAG_FIRST_PIPE_INSTANCE; |
| use winapi::um::winbase::FILE_FLAG_OVERLAPPED; |
| use winapi::um::winbase::PIPE_ACCESS_DUPLEX; |
| use winapi::um::winbase::PIPE_NOWAIT; |
| use winapi::um::winbase::PIPE_READMODE_BYTE; |
| use winapi::um::winbase::PIPE_READMODE_MESSAGE; |
| use winapi::um::winbase::PIPE_REJECT_REMOTE_CLIENTS; |
| use winapi::um::winbase::PIPE_TYPE_BYTE; |
| use winapi::um::winbase::PIPE_TYPE_MESSAGE; |
| use winapi::um::winbase::PIPE_WAIT; |
| use winapi::um::winbase::SECURITY_IDENTIFICATION; |
| |
| use super::RawDescriptor; |
| use crate::descriptor::AsRawDescriptor; |
| use crate::descriptor::FromRawDescriptor; |
| use crate::descriptor::IntoRawDescriptor; |
| use crate::descriptor::SafeDescriptor; |
| use crate::Event; |
| use crate::EventToken; |
| use crate::WaitContext; |
| |
| /// The default buffer size for all named pipes in the system. If this size is too small, writers |
| /// on named pipes that expect not to block *can* block until the reading side empties the buffer. |
| /// |
| /// The general rule is this should be *at least* as big as the largest message, otherwise |
| /// unexpected blocking behavior can result; for example, if too small, this can interact badly with |
| /// crate::platform::StreamChannel, which expects to be able to make a complete write before releasing |
| /// a lock that the opposite side needs to complete a read. This means that if the buffer is too |
| /// small: |
| /// * The writer can't complete its write and release the lock because the buffer is too small. |
| /// * The reader can't start reading because the lock is held by the writer, so it can't |
| /// relieve buffer pressure. Note that for message pipes, the reader couldn't do anything |
| /// to help anyway, because a message mode pipe should NOT have a partial read (which is |
| /// what we would need to relieve pressure). |
| /// * Conditions for deadlock are met, and both the reader & writer enter circular waiting. |
| pub const DEFAULT_BUFFER_SIZE: usize = 50 * 1024; |
| |
| static NEXT_PIPE_INDEX: AtomicUsize = AtomicUsize::new(1); |
| |
| /// Represents one end of a named pipe |
| #[derive(Serialize, Deserialize, Debug)] |
| pub struct PipeConnection { |
| handle: SafeDescriptor, |
| framing_mode: FramingMode, |
| blocking_mode: BlockingMode, |
| } |
| |
| /// Wraps the OVERLAPPED structure. Also keeps track of whether OVERLAPPED is being used by a |
| /// Readfile or WriteFile operation and holds onto the event object so it doesn't get dropped. |
| pub struct OverlappedWrapper { |
| // Allocated on the heap so that the OVERLAPPED struct doesn't move when performing I/O |
| // operations. |
| overlapped: Box<OVERLAPPED>, |
| // This field prevents the event handle from being dropped too early and allows callers to |
| // be notified when a read or write overlapped operation has completed. |
| h_event: Option<Event>, |
| in_use: bool, |
| } |
| |
| impl OverlappedWrapper { |
| pub fn get_h_event_ref(&self) -> Option<&Event> { |
| self.h_event.as_ref() |
| } |
| |
| /// Creates a valid `OVERLAPPED` struct used to pass into `ReadFile` and `WriteFile` in order |
| /// to perform asynchronous I/O. When passing in the OVERLAPPED struct, the Event object |
| /// returned must not be dropped. |
| /// |
| /// There is an option to create the event object and set it to the `hEvent` field. If hEvent |
| /// is not set and the named pipe handle was created with `FILE_FLAG_OVERLAPPED`, then the file |
| /// handle will be signaled when the operation is complete. In other words, you can use |
| /// `WaitForSingleObject` on the file handle. Not setting an event is highly discouraged by |
| /// Microsoft though. |
| pub fn new(include_event: bool) -> Result<OverlappedWrapper> { |
| let mut overlapped = OVERLAPPED::default(); |
| let h_event = if include_event { |
| Some(Event::new()?) |
| } else { |
| None |
| }; |
| |
| overlapped.hEvent = if let Some(event) = h_event.as_ref() { |
| event.as_raw_descriptor() |
| } else { |
| 0 as RawDescriptor |
| }; |
| |
| Ok(OverlappedWrapper { |
| overlapped: Box::new(overlapped), |
| h_event, |
| in_use: false, |
| }) |
| } |
| } |
| |
| // Safe because all of the contained fields may be safely sent to another thread. |
| unsafe impl Send for OverlappedWrapper {} |
| |
| pub trait WriteOverlapped { |
| /// Perform an overlapped write operation with the specified buffer and overlapped wrapper. |
| /// If successful, the write operation will complete asynchronously, and |
| /// `write_result()` should be called to get the result. |
| /// |
| /// NOTE: `buf` and `overlapped_wrapper` will be in use for the duration of |
| /// the overlapped operation. These should not be reused until |
| /// `write_result()` is called. |
| fn write_overlapped( |
| &mut self, |
| buf: &mut [u8], |
| overlapped_wrapper: &mut OverlappedWrapper, |
| ) -> io::Result<()>; |
| |
| /// Gets the result of the overlapped write operation. Must only be called |
| /// after issuing an overlapped write operation using `write_overlapped`. The |
| /// same `overlapped_wrapper` must be provided. |
| fn write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>; |
| |
| /// Tries to get the result of the overlapped write operation. Must only be |
| /// called once, and only after issuing an overlapped write operation using |
| /// `write_overlapped`. The same `overlapped_wrapper` must be provided. |
| /// |
| /// An error indicates that the operation hasn't completed yet and |
| /// `write_result` or `try_write_result` should be called again. |
| fn try_write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) |
| -> io::Result<usize>; |
| } |
| |
| pub trait ReadOverlapped { |
| /// Perform an overlapped read operation with the specified buffer and overlapped wrapper. |
| /// If successful, the read operation will complete asynchronously, and |
| /// `read_result()` should be called to get the result. |
| /// |
| /// NOTE: `buf` and `overlapped_wrapper` will be in use for the duration of |
| /// the overlapped operation. These should not be reused until |
| /// `read_result()` is called. |
| fn read_overlapped( |
| &mut self, |
| buf: &mut [u8], |
| overlapped_wrapper: &mut OverlappedWrapper, |
| ) -> io::Result<()>; |
| |
| /// Gets the result of the overlapped read operation. Must only be called |
| /// once, and only after issuing an overlapped read operation using |
| /// `read_overlapped`. The same `overlapped_wrapper` must be provided. |
| fn read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>; |
| |
| /// Tries to get the result of the overlapped read operation. Must only be called |
| /// after issuing an overlapped read operation using `read_overlapped`. The |
| /// same `overlapped_wrapper` must be provided. |
| /// |
| /// An error indicates that the operation hasn't completed yet and |
| /// `read_result` or `try_read_result` should be called again. |
| fn try_read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>; |
| } |
| |
| #[derive(Serialize, Deserialize, Copy, Clone, Debug, PartialEq, Eq)] |
| pub enum FramingMode { |
| Byte, |
| Message, |
| } |
| |
| impl FramingMode { |
| fn to_readmode(self) -> DWORD { |
| match self { |
| FramingMode::Message => PIPE_READMODE_MESSAGE, |
| FramingMode::Byte => PIPE_READMODE_BYTE, |
| } |
| } |
| |
| fn to_pipetype(self) -> DWORD { |
| match self { |
| FramingMode::Message => PIPE_TYPE_MESSAGE, |
| FramingMode::Byte => PIPE_TYPE_BYTE, |
| } |
| } |
| } |
| |
| #[derive(Serialize, Deserialize, Copy, Clone, PartialEq, Debug, Eq)] |
| pub enum BlockingMode { |
| /// Calls to read() block until data is received |
| Wait, |
| /// Calls to read() return immediately even if there is nothing read with error code 232 |
| /// (Rust maps this to BrokenPipe but it's actually ERROR_NO_DATA) |
| /// |
| /// NOTE: This mode is discouraged by the Windows API documentation. |
| NoWait, |
| } |
| |
| impl From<&BlockingMode> for DWORD { |
| fn from(blocking_mode: &BlockingMode) -> DWORD { |
| match blocking_mode { |
| BlockingMode::Wait => PIPE_WAIT, |
| BlockingMode::NoWait => PIPE_NOWAIT, |
| } |
| } |
| } |
| |
| /// Sets the handle state for a named pipe in a rust friendly way. |
| /// This is safe if the pipe handle is open. |
| unsafe fn set_named_pipe_handle_state( |
| pipe_handle: RawDescriptor, |
| client_mode: &mut DWORD, |
| ) -> Result<()> { |
| // Safe when the pipe handle is open. Safety also requires checking the return value, which we |
| // do below. |
| let success_flag = SetNamedPipeHandleState( |
| /* hNamedPipe= */ pipe_handle, |
| /* lpMode= */ client_mode, |
| /* lpMaxCollectionCount= */ ptr::null_mut(), |
| /* lpCollectDataTimeout= */ ptr::null_mut(), |
| ); |
| if success_flag == 0 { |
| Err(io::Error::last_os_error()) |
| } else { |
| Ok(()) |
| } |
| } |
| |
| pub fn pair( |
| framing_mode: &FramingMode, |
| blocking_mode: &BlockingMode, |
| timeout: u64, |
| ) -> Result<(PipeConnection, PipeConnection)> { |
| pair_with_buffer_size( |
| framing_mode, |
| blocking_mode, |
| timeout, |
| DEFAULT_BUFFER_SIZE, |
| false, |
| ) |
| } |
| |
| /// Creates a pair of handles connected to either end of a duplex named pipe. |
| /// |
| /// The pipe created will have a semi-random name and a default set of security options that |
| /// help prevent common named-pipe based vulnerabilities. Specifically the pipe is set to reject |
| /// remote clients, allow only a single server instance, and prevent impersonation by the server |
| /// end of the pipe. |
| /// |
| /// # Arguments |
| /// |
| /// * `framing_mode` - Whether the system should provide a simple byte stream (Byte) or an |
| /// automatically framed sequence of messages (Message). In message mode it's an |
| /// error to read fewer bytes than were sent in a message from the other end of |
| /// the pipe. |
| /// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or |
| /// return immediately if there is nothing available (NoWait). |
| /// * `timeout` - A timeout to apply for socket operations, in milliseconds. |
| /// Setting this to zero will create sockets with the system |
| /// default timeout. |
| /// * `buffer_size` - The default buffer size for the named pipe. The system should expand the |
| /// buffer automatically as needed, except in the case of NOWAIT pipes, where |
| /// it will just fail writes that don't fit in the buffer. |
| /// # Return value |
| /// |
| /// Returns a pair of pipes, of the form (server, client). Note that for some winapis, such as |
| /// FlushFileBuffers, the server & client ends WILL BEHAVE DIFFERENTLY. |
| pub fn pair_with_buffer_size( |
| framing_mode: &FramingMode, |
| blocking_mode: &BlockingMode, |
| timeout: u64, |
| buffer_size: usize, |
| overlapped: bool, |
| ) -> Result<(PipeConnection, PipeConnection)> { |
| // Give the pipe a unique name to avoid accidental collisions |
| let pipe_name = format!( |
| r"\\.\pipe\crosvm_ipc.pid{}.{}.rand{}", |
| process::id(), |
| NEXT_PIPE_INDEX.fetch_add(1, Ordering::SeqCst), |
| rand::thread_rng().gen::<u32>(), |
| ); |
| |
| let server_end = create_server_pipe( |
| &pipe_name, |
| framing_mode, |
| blocking_mode, |
| timeout, |
| buffer_size, |
| overlapped, |
| )?; |
| |
| // Open the named pipe we just created as the client |
| let client_end = create_client_pipe(&pipe_name, framing_mode, blocking_mode, overlapped)?; |
| |
| // Accept the client's connection |
| // Not sure if this is strictly needed but I'm doing it just in case. |
| // We expect at this point that the client will already be connected, |
| // so we'll get a return code of 0 and an ERROR_PIPE_CONNECTED. |
| // It's also OK if we get a return code of success. |
| server_end.wait_for_client_connection()?; |
| |
| Ok((server_end, client_end)) |
| } |
| |
| /// Creates a PipeConnection for the server end of a named pipe with the given path and pipe |
| /// settings. |
| /// |
| /// The pipe will be set to reject remote clients and allow only a single connection at a time. |
| /// |
| /// # Arguments |
| /// |
| /// * `pipe_name` - The path of the named pipe to create. Should be in the form |
| /// `\\.\pipe\<some-name>`. |
| /// * `framing_mode` - Whether the system should provide a simple byte stream (Byte) or an |
| /// automatically framed sequence of messages (Message). In message mode it's an |
| /// error to read fewer bytes than were sent in a message from the other end of |
| /// the pipe. |
| /// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or |
| /// return immediately if there is nothing available (NoWait). |
| /// * `timeout` - A timeout to apply for socket operations, in milliseconds. |
| /// Setting this to zero will create sockets with the system |
| /// default timeout. |
| /// * `buffer_size` - The default buffer size for the named pipe. The system should expand the |
| /// buffer automatically as needed, except in the case of NOWAIT pipes, where |
| /// it will just fail writes that don't fit in the buffer. |
| /// * `overlapped` - Sets whether overlapped mode is set on the pipe. |
| pub fn create_server_pipe( |
| pipe_name: &str, |
| framing_mode: &FramingMode, |
| blocking_mode: &BlockingMode, |
| timeout: u64, |
| buffer_size: usize, |
| overlapped: bool, |
| ) -> Result<PipeConnection> { |
| let c_pipe_name = CString::new(pipe_name).unwrap(); |
| |
| let mut open_mode_flags = PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE; |
| if overlapped { |
| open_mode_flags |= FILE_FLAG_OVERLAPPED |
| } |
| |
| // This sets flags so there will be an error if >1 instance (server end) |
| // of this pipe name is opened because we expect exactly one. |
| let server_handle = unsafe { |
| // Safe because security attributes are valid, pipe_name is valid C string, |
| // and we're checking the return code |
| CreateNamedPipeA( |
| c_pipe_name.as_ptr(), |
| /* dwOpenMode= */ |
| open_mode_flags, |
| /* dwPipeMode= */ |
| framing_mode.to_pipetype() |
| | framing_mode.to_readmode() |
| | DWORD::from(blocking_mode) |
| | PIPE_REJECT_REMOTE_CLIENTS, |
| /* nMaxInstances= */ 1, |
| /* nOutBufferSize= */ buffer_size as DWORD, |
| /* nInBufferSize= */ buffer_size as DWORD, |
| /* nDefaultTimeOut= */ timeout as DWORD, // Default is 50ms |
| /* lpSecurityAttributes= */ |
| SecurityAttributes::new_with_security_descriptor( |
| SelfRelativeSecurityDescriptor::get_singleton(), |
| /* inherit= */ true, |
| ) |
| .as_mut(), |
| ) |
| }; |
| |
| if server_handle == INVALID_HANDLE_VALUE { |
| Err(io::Error::last_os_error()) |
| } else { |
| unsafe { |
| Ok(PipeConnection { |
| handle: SafeDescriptor::from_raw_descriptor(server_handle), |
| framing_mode: *framing_mode, |
| blocking_mode: *blocking_mode, |
| }) |
| } |
| } |
| } |
| |
| /// Creates a PipeConnection for the client end of a named pipe with the given path and pipe |
| /// settings. |
| /// |
| /// The pipe will be set to prevent impersonation of the client by the server process. |
| /// |
| /// # Arguments |
| /// |
| /// * `pipe_name` - The path of the named pipe to create. Should be in the form |
| /// `\\.\pipe\<some-name>`. |
| /// * `framing_mode` - Whether the system should provide a simple byte stream (Byte) or an |
| /// automatically framed sequence of messages (Message). In message mode it's an |
| /// error to read fewer bytes than were sent in a message from the other end of |
| /// the pipe. |
| /// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or |
| /// return immediately if there is nothing available (NoWait). |
| /// * `overlapped` - Sets whether the pipe is opened in overlapped mode. |
| pub fn create_client_pipe( |
| pipe_name: &str, |
| framing_mode: &FramingMode, |
| blocking_mode: &BlockingMode, |
| overlapped: bool, |
| ) -> Result<PipeConnection> { |
| let client_handle = OpenOptions::new() |
| .read(true) |
| .write(true) |
| .create(true) |
| .security_qos_flags(SECURITY_IDENTIFICATION) |
| .custom_flags(if overlapped { FILE_FLAG_OVERLAPPED } else { 0 }) |
| .open(pipe_name)? |
| .into_raw_descriptor(); |
| |
| let mut client_mode = framing_mode.to_readmode() | DWORD::from(blocking_mode); |
| |
| // Safe because client_handle's open() call did not return an error. |
| unsafe { |
| set_named_pipe_handle_state(client_handle, &mut client_mode)?; |
| } |
| |
| Ok(PipeConnection { |
| // Safe because client_handle is valid |
| handle: unsafe { SafeDescriptor::from_raw_descriptor(client_handle) }, |
| framing_mode: *framing_mode, |
| blocking_mode: *blocking_mode, |
| }) |
| } |
| |
| // This is used to mark types which can be appropriately sent through the |
| // generic helper functions write_to_pipe and read_from_pipe. |
| pub trait PipeSendable { |
| // Default values used to fill in new empty indexes when resizing a buffer to |
| // a larger size. |
| fn default() -> Self; |
| } |
| impl PipeSendable for u8 { |
| fn default() -> Self { |
| 0 |
| } |
| } |
| impl PipeSendable for RawDescriptor { |
| fn default() -> Self { |
| ptr::null_mut() |
| } |
| } |
| |
| impl PipeConnection { |
| pub fn try_clone(&self) -> Result<PipeConnection> { |
| let copy_handle = self.handle.try_clone()?; |
| Ok(PipeConnection { |
| handle: copy_handle, |
| framing_mode: self.framing_mode, |
| blocking_mode: self.blocking_mode, |
| }) |
| } |
| |
| /// Creates a PipeConnection from an existing RawDescriptor, and the underlying the framing & |
| /// blocking modes. |
| /// |
| /// # Safety |
| /// 1. rd is valid and ownership is transferred to this function when it is called. |
| /// |
| /// To avoid undefined behavior, framing_mode & blocking_modes must match those of the |
| /// underlying pipe. |
| pub unsafe fn from_raw_descriptor( |
| rd: RawDescriptor, |
| framing_mode: FramingMode, |
| blocking_mode: BlockingMode, |
| ) -> PipeConnection { |
| PipeConnection { |
| handle: SafeDescriptor::from_raw_descriptor(rd), |
| framing_mode, |
| blocking_mode, |
| } |
| } |
| |
| /// Reads bytes from the pipe into the provided buffer, up to the capacity of the buffer. |
| /// Returns the number of bytes (not values) read. |
| /// |
| /// # Safety |
| /// |
| /// This is safe only when the following conditions hold: |
| /// 1. The data on the other end of the pipe is a valid binary representation of data for |
| /// type T, and |
| /// 2. The number of bytes read is a multiple of the size of T; this must be checked by |
| /// the caller. |
| /// If buf's type is file descriptors, this is only safe when those file descriptors are valid |
| /// for the process where this function was called. |
| pub unsafe fn read<T: PipeSendable>(&self, buf: &mut [T]) -> Result<usize> { |
| PipeConnection::read_internal(&self.handle, self.blocking_mode, buf, None) |
| } |
| |
| /// Similar to `PipeConnection::read` except it also allows: |
| /// 1. The same end of the named pipe to read and write at the same time in different |
| /// threads. |
| /// 2. Asynchronous read and write (read and write won't block). |
| /// |
| /// When reading, it will not block, but instead an `OVERLAPPED` struct that contains an event |
| /// (can be created with `OverlappedWrapper::new`) will be passed into |
| /// `ReadFile`. That event will be triggered when the read operation is complete. |
| /// |
| /// In order to get how many bytes were read, call `get_overlapped_result`. That function will |
| /// also help with waiting until the read operation is complete. |
| /// |
| /// # Safety |
| /// |
| /// Same as `PipeConnection::read` safety comments. In addition, the pipe MUST be opened in |
| /// overlapped mode otherwise there may be unexpected behavior. |
| pub unsafe fn read_overlapped<T: PipeSendable>( |
| &mut self, |
| buf: &mut [T], |
| overlapped_wrapper: &mut OverlappedWrapper, |
| ) -> Result<()> { |
| if overlapped_wrapper.in_use { |
| return Err(std::io::Error::new( |
| std::io::ErrorKind::InvalidInput, |
| "Overlapped struct already in use", |
| )); |
| } |
| overlapped_wrapper.in_use = true; |
| |
| PipeConnection::read_internal( |
| &self.handle, |
| self.blocking_mode, |
| buf, |
| Some(&mut overlapped_wrapper.overlapped), |
| )?; |
| Ok(()) |
| } |
| |
| /// Helper for `read_overlapped` and `read` |
| /// |
| /// # Safety |
| /// Comments `read_overlapped` or `read`, depending on which is used. |
| unsafe fn read_internal<T: PipeSendable>( |
| handle: &SafeDescriptor, |
| blocking_mode: BlockingMode, |
| buf: &mut [T], |
| overlapped: Option<&mut OVERLAPPED>, |
| ) -> Result<usize> { |
| let res = crate::platform::read_file( |
| handle, |
| buf.as_mut_ptr() as *mut u8, |
| mem::size_of_val(buf), |
| overlapped, |
| ); |
| match res { |
| Ok(bytes_read) => Ok(bytes_read), |
| Err(e) |
| if blocking_mode == BlockingMode::NoWait |
| && e.raw_os_error() == Some(ERROR_NO_DATA as i32) => |
| { |
| // A NOWAIT pipe will return ERROR_NO_DATA when no data is available; however, |
| // this code is interpreted as a std::io::ErrorKind::BrokenPipe, which is not |
| // correct. For further details see: |
| // https://docs.microsoft.com/en-us/windows/win32/debug/system-error-codes--0-499- |
| // https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipe-type-read-and-wait-modes |
| Err(std::io::Error::new(std::io::ErrorKind::WouldBlock, e)) |
| } |
| Err(e) => Err(e), |
| } |
| } |
| |
| /// Blockingly reads a `buf` bytes from the pipe. The blocking read can be interrupted |
| /// by an event on `exit_event`. |
| pub fn read_overlapped_blocking<T: PipeSendable>( |
| &mut self, |
| buf: &mut [T], |
| overlapped_wrapper: &mut OverlappedWrapper, |
| exit_event: &Event, |
| ) -> Result<()> { |
| // Safe because we are providing a valid buffer slice and also providing a valid |
| // overlapped struct. |
| unsafe { |
| self.read_overlapped(buf, overlapped_wrapper)?; |
| }; |
| |
| #[derive(EventToken)] |
| enum Token { |
| ReadOverlapped, |
| Exit, |
| } |
| |
| let wait_ctx = WaitContext::build_with(&[ |
| ( |
| overlapped_wrapper.get_h_event_ref().unwrap(), |
| Token::ReadOverlapped, |
| ), |
| (exit_event, Token::Exit), |
| ])?; |
| |
| let events = wait_ctx.wait()?; |
| for event in events { |
| match event.token { |
| Token::ReadOverlapped => { |
| let size_read_in_bytes = |
| self.get_overlapped_result(overlapped_wrapper)? as usize; |
| |
| // If this error shows, most likely the overlapped named pipe was set up |
| // incorrectly. |
| if size_read_in_bytes != buf.len() { |
| return Err(std::io::Error::new( |
| std::io::ErrorKind::UnexpectedEof, |
| "Short read", |
| )); |
| } |
| } |
| Token::Exit => { |
| return Err(std::io::Error::new( |
| std::io::ErrorKind::Interrupted, |
| "IO canceled on exit request", |
| )); |
| } |
| } |
| } |
| |
| Ok(()) |
| } |
| |
| /// Reads a variable size message and returns the message on success. |
| /// The size of the message is expected to proceed the message in |
| /// the form of `header_size` message. |
| /// |
| /// `parse_message_size` lets caller parse the header to extract |
| /// message size. |
| /// |
| /// Event on `exit_event` is used to interrupt the blocked read. |
| pub fn read_overlapped_blocking_message<F: FnOnce(&[u8]) -> usize>( |
| &mut self, |
| header_size: usize, |
| parse_message_size: F, |
| overlapped_wrapper: &mut OverlappedWrapper, |
| exit_event: &Event, |
| ) -> Result<Vec<u8>> { |
| let mut header = vec![0; header_size]; |
| header.resize_with(header_size, Default::default); |
| self.read_overlapped_blocking(&mut header, overlapped_wrapper, exit_event)?; |
| let message_size = parse_message_size(&header); |
| if message_size == 0 { |
| return Ok(vec![]); |
| } |
| let mut buf = vec![]; |
| buf.resize_with(message_size, Default::default); |
| self.read_overlapped_blocking(&mut buf, overlapped_wrapper, exit_event)?; |
| Ok(buf) |
| } |
| |
| /// Gets the size in bytes of data in the pipe. |
| /// |
| /// Note that PeekNamedPipes (the underlying win32 API) will return zero if the packets have |
| /// not finished writing on the producer side. |
| pub fn get_available_byte_count(&self) -> io::Result<u32> { |
| let mut total_bytes_avail: DWORD = 0; |
| |
| // Safe because the underlying pipe handle is guaranteed to be open, and the output values |
| // live at valid memory locations. |
| fail_if_zero!(unsafe { |
| PeekNamedPipe( |
| self.as_raw_descriptor(), |
| ptr::null_mut(), |
| 0, |
| ptr::null_mut(), |
| &mut total_bytes_avail, |
| ptr::null_mut(), |
| ) |
| }); |
| |
| Ok(total_bytes_avail) |
| } |
| |
| /// Writes the bytes from a slice into the pipe. Returns the number of bytes written, which |
| /// callers should check to ensure that it was the number expected. |
| pub fn write<T: PipeSendable>(&self, buf: &[T]) -> Result<usize> { |
| PipeConnection::write_internal(&self.handle, buf, None) |
| } |
| |
| /// Sends, blockingly,`buf` over the pipe in its entirety. Partial write is considered |
| /// as a failure. |
| pub fn write_overlapped_blocking_message<T: PipeSendable>( |
| &mut self, |
| buf: &[T], |
| overlapped_wrapper: &mut OverlappedWrapper, |
| ) -> Result<()> { |
| self.write_overlapped(buf, overlapped_wrapper)?; |
| |
| let size_written_in_bytes = self.get_overlapped_result(overlapped_wrapper)?; |
| |
| if size_written_in_bytes as usize != buf.len() { |
| return Err(std::io::Error::new( |
| std::io::ErrorKind::UnexpectedEof, |
| format!( |
| "Short write expected:{} found:{}", |
| size_written_in_bytes, |
| buf.len(), |
| ), |
| )); |
| } |
| Ok(()) |
| } |
| |
| /// Similar to `PipeConnection::write` except it also allows: |
| /// 1. The same end of the named pipe to read and write at the same time in different |
| /// threads. |
| /// 2. Asynchronous read and write (read and write won't block). |
| /// |
| /// When writing, it will not block, but instead an `OVERLAPPED` struct that contains an event |
| /// (can be created with `OverlappedWrapper::new`) will be passed into |
| /// `WriteFile`. That event will be triggered when the write operation is complete. |
| /// |
| /// In order to get how many bytes were written, call `get_overlapped_result`. That function will |
| /// also help with waiting until the write operation is complete. The pipe must be opened in |
| /// overlapped otherwise there may be unexpected behavior. |
| /// |
| /// WARNING: this function is unsafe. TODO(b/272812234): mark unsafe. |
| pub fn write_overlapped<T: PipeSendable>( |
| &mut self, |
| buf: &[T], |
| overlapped_wrapper: &mut OverlappedWrapper, |
| ) -> Result<()> { |
| if overlapped_wrapper.in_use { |
| return Err(std::io::Error::new( |
| std::io::ErrorKind::InvalidInput, |
| "Overlapped struct already in use", |
| )); |
| } |
| overlapped_wrapper.in_use = true; |
| |
| PipeConnection::write_internal( |
| &self.handle, |
| buf, |
| Some(&mut overlapped_wrapper.overlapped), |
| )?; |
| Ok(()) |
| } |
| |
| /// Helper for `write_overlapped` and `write`. |
| /// WARNING: this function is unsafe for overlapped IO. TODO(b/272812234): mark unsafe. |
| fn write_internal<T: PipeSendable>( |
| handle: &SafeDescriptor, |
| buf: &[T], |
| overlapped: Option<&mut OVERLAPPED>, |
| ) -> Result<usize> { |
| // Safe because buf points to memory valid until the write completes and we pass a valid |
| // length for that memory. |
| unsafe { |
| crate::platform::write_file( |
| handle, |
| buf.as_ptr() as *const u8, |
| mem::size_of_val(buf), |
| overlapped, |
| ) |
| } |
| } |
| |
| /// Sets the blocking mode on the pipe. |
| pub fn set_blocking(&mut self, blocking_mode: &BlockingMode) -> io::Result<()> { |
| let mut client_mode = DWORD::from(blocking_mode) | self.framing_mode.to_readmode(); |
| self.blocking_mode = *blocking_mode; |
| |
| // Safe because the pipe has not been closed (it is managed by this object). |
| unsafe { set_named_pipe_handle_state(self.handle.as_raw_descriptor(), &mut client_mode) } |
| } |
| |
| /// For a server named pipe, waits for a client to connect |
| pub fn wait_for_client_connection(&self) -> Result<()> { |
| let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event = */ true)?; |
| self.wait_for_client_connection_internal( |
| &mut overlapped_wrapper, |
| /* should_block = */ true, |
| ) |
| } |
| |
| /// For a server named pipe, waits for a client to connect using the given overlapped wrapper |
| /// to signal connection. |
| pub fn wait_for_client_connection_overlapped( |
| &self, |
| overlapped_wrapper: &mut OverlappedWrapper, |
| ) -> Result<()> { |
| self.wait_for_client_connection_internal( |
| overlapped_wrapper, |
| /* should_block = */ false, |
| ) |
| } |
| |
| fn wait_for_client_connection_internal( |
| &self, |
| overlapped_wrapper: &mut OverlappedWrapper, |
| should_block: bool, |
| ) -> Result<()> { |
| // Safe because the handle is valid and we're checking the return |
| // code according to the documentation |
| unsafe { |
| let success_flag = ConnectNamedPipe( |
| self.as_raw_descriptor(), |
| // Note: The overlapped structure is only used if the pipe was opened in |
| // OVERLAPPED mode, but is necessary in that case. |
| &mut *overlapped_wrapper.overlapped, |
| ); |
| if success_flag == 0 { |
| return match GetLastError() { |
| ERROR_PIPE_CONNECTED => { |
| if !should_block { |
| // If async, make sure the event is signalled to indicate the client |
| // is ready. |
| overlapped_wrapper.get_h_event_ref().unwrap().signal()?; |
| } |
| |
| Ok(()) |
| } |
| ERROR_IO_PENDING => { |
| if should_block { |
| overlapped_wrapper.get_h_event_ref().unwrap().wait()?; |
| } |
| Ok(()) |
| } |
| err => Err(io::Error::from_raw_os_error(err as i32)), |
| }; |
| } |
| } |
| Ok(()) |
| } |
| |
| /// Used for overlapped read and write operations. |
| /// |
| /// This will block until the ReadFile or WriteFile operation that also took in |
| /// `overlapped_wrapper` is complete, assuming `overlapped_wrapper` was created from |
| /// `OverlappedWrapper::new` or that `OVERLAPPED.hEvent` is set. This will also get |
| /// the number of bytes that were read or written. |
| pub fn get_overlapped_result( |
| &mut self, |
| overlapped_wrapper: &mut OverlappedWrapper, |
| ) -> io::Result<u32> { |
| let res = self.get_overlapped_result_internal(overlapped_wrapper, /* wait= */ true); |
| overlapped_wrapper.in_use = false; |
| res |
| } |
| |
| /// Used for overlapped read and write operations. |
| /// |
| /// This will return immediately, regardless of the completion status of the |
| /// ReadFile or WriteFile operation that took in `overlapped_wrapper`, |
| /// assuming `overlapped_wrapper` was created from `OverlappedWrapper::new` |
| /// or that `OVERLAPPED.hEvent` is set. This will also get the number of bytes |
| /// that were read or written, if completed. If the operation hasn't |
| /// completed, an error of kind `io::ErrorKind::WouldBlock` will be |
| /// returned. |
| pub fn try_get_overlapped_result( |
| &mut self, |
| overlapped_wrapper: &mut OverlappedWrapper, |
| ) -> io::Result<u32> { |
| let res = self.get_overlapped_result_internal(overlapped_wrapper, /* wait= */ false); |
| match res { |
| Err(err) if err.raw_os_error().unwrap() as u32 == ERROR_IO_INCOMPLETE => { |
| Err(io::Error::new(io::ErrorKind::WouldBlock, err)) |
| } |
| _ => { |
| overlapped_wrapper.in_use = false; |
| res |
| } |
| } |
| } |
| |
| fn get_overlapped_result_internal( |
| &mut self, |
| overlapped_wrapper: &mut OverlappedWrapper, |
| wait: bool, |
| ) -> io::Result<u32> { |
| if !overlapped_wrapper.in_use { |
| return Err(std::io::Error::new( |
| std::io::ErrorKind::InvalidInput, |
| "Overlapped struct is not in use", |
| )); |
| } |
| let mut size_transferred = 0; |
| // Safe as long as `overlapped_struct` isn't copied and also contains a valid event. |
| // Also the named pipe handle must created with `FILE_FLAG_OVERLAPPED`. |
| fail_if_zero!(unsafe { |
| GetOverlappedResult( |
| self.handle.as_raw_descriptor(), |
| &mut *overlapped_wrapper.overlapped, |
| &mut size_transferred, |
| if wait { TRUE } else { FALSE }, |
| ) |
| }); |
| |
| Ok(size_transferred) |
| } |
| |
| /// Cancels I/O Operations in the current process. Since `lpOverlapped` is null, this will |
| /// cancel all I/O requests for the file handle passed in. |
| pub fn cancel_io(&mut self) -> Result<()> { |
| fail_if_zero!(unsafe { |
| CancelIoEx( |
| self.handle.as_raw_descriptor(), |
| /* lpOverlapped= */ std::ptr::null_mut(), |
| ) |
| }); |
| |
| Ok(()) |
| } |
| |
| /// Get the framing mode of the pipe. |
| pub fn get_framing_mode(&self) -> FramingMode { |
| self.framing_mode |
| } |
| |
| /// Returns metadata about the connected NamedPipe. |
| pub fn get_info(&self, is_server_connection: bool) -> Result<NamedPipeInfo> { |
| let mut flags: u32 = 0; |
| // Marked mutable because they are mutated in a system call |
| #[allow(unused_mut)] |
| let mut incoming_buffer_size: u32 = 0; |
| #[allow(unused_mut)] |
| let mut outgoing_buffer_size: u32 = 0; |
| #[allow(unused_mut)] |
| let mut max_instances: u32 = 0; |
| // Client side with BYTE type are default flags |
| if is_server_connection { |
| flags |= 0x00000001 /* PIPE_SERVER_END */ |
| } |
| if self.framing_mode == FramingMode::Message { |
| flags |= 0x00000004 /* PIPE_TYPE_MESSAGE */ |
| } |
| // Safe because we have allocated all pointers and own |
| // them as mutable. |
| fail_if_zero!(unsafe { |
| GetNamedPipeInfo( |
| self.as_raw_descriptor(), |
| flags as *mut u32, |
| outgoing_buffer_size as *mut u32, |
| incoming_buffer_size as *mut u32, |
| max_instances as *mut u32, |
| ) |
| }); |
| |
| Ok(NamedPipeInfo { |
| outgoing_buffer_size, |
| incoming_buffer_size, |
| max_instances, |
| }) |
| } |
| |
| /// For a server pipe, flush the pipe contents. This will |
| /// block until the pipe is cleared by the client. Only |
| /// call this if you are sure the client is reading the |
| /// data! |
| pub fn flush_data_blocking(&self) -> Result<()> { |
| // Safe because the only buffers interacted with are |
| // outside of Rust memory |
| fail_if_zero!(unsafe { FlushFileBuffers(self.as_raw_descriptor()) }); |
| Ok(()) |
| } |
| |
| /// For a server pipe, disconnect all clients, discarding any buffered data. |
| pub fn disconnect_clients(&self) -> Result<()> { |
| // Safe because we own the handle passed in and know it will remain valid for the duration |
| // of the call. Discarded buffers are not managed by rust. |
| fail_if_zero!(unsafe { DisconnectNamedPipe(self.as_raw_descriptor()) }); |
| Ok(()) |
| } |
| } |
| |
| impl AsRawDescriptor for PipeConnection { |
| fn as_raw_descriptor(&self) -> RawDescriptor { |
| self.handle.as_raw_descriptor() |
| } |
| } |
| |
| impl IntoRawDescriptor for PipeConnection { |
| fn into_raw_descriptor(self) -> RawDescriptor { |
| self.handle.into_raw_descriptor() |
| } |
| } |
| |
| unsafe impl Send for PipeConnection {} |
| unsafe impl Sync for PipeConnection {} |
| |
| impl io::Read for PipeConnection { |
| fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
| // This is safe because PipeConnection::read is always safe for u8 |
| unsafe { PipeConnection::read(self, buf) } |
| } |
| } |
| |
| impl io::Write for PipeConnection { |
| fn write(&mut self, buf: &[u8]) -> io::Result<usize> { |
| PipeConnection::write(self, buf) |
| } |
| |
| fn flush(&mut self) -> io::Result<()> { |
| Ok(()) |
| } |
| } |
| |
| /// A simple data struct representing |
| /// metadata about a NamedPipe. |
| pub struct NamedPipeInfo { |
| pub outgoing_buffer_size: u32, |
| pub incoming_buffer_size: u32, |
| pub max_instances: u32, |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use std::mem::size_of; |
| |
| use super::*; |
| |
| #[test] |
| fn duplex_pipe_stream() { |
| let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap(); |
| |
| // Test both forward and reverse direction since the underlying APIs are a bit asymmetrical |
| unsafe { |
| for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() { |
| println!("{}", dir); |
| |
| sender.write(&[75, 77, 54, 82, 76, 65]).unwrap(); |
| |
| // Smaller than what we sent so we get multiple chunks |
| let mut recv_buffer: [u8; 4] = [0; 4]; |
| |
| let mut size = receiver.read(&mut recv_buffer).unwrap(); |
| assert_eq!(size, 4); |
| assert_eq!(recv_buffer, [75, 77, 54, 82]); |
| |
| size = receiver.read(&mut recv_buffer).unwrap(); |
| assert_eq!(size, 2); |
| assert_eq!(recv_buffer[0..2], [76, 65]); |
| } |
| } |
| } |
| |
| #[test] |
| fn available_byte_count_byte_mode() { |
| let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap(); |
| p1.write(&[1, 23, 45]).unwrap(); |
| assert_eq!(p2.get_available_byte_count().unwrap(), 3); |
| |
| // PeekNamedPipe should NOT touch the data in the pipe. So if we call it again, it should |
| // yield the same value. |
| assert_eq!(p2.get_available_byte_count().unwrap(), 3); |
| } |
| |
| #[test] |
| fn available_byte_count_message_mode() { |
| let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap(); |
| p1.write(&[1, 23, 45]).unwrap(); |
| assert_eq!(p2.get_available_byte_count().unwrap(), 3); |
| |
| // PeekNamedPipe should NOT touch the data in the pipe. So if we call it again, it should |
| // yield the same value. |
| assert_eq!(p2.get_available_byte_count().unwrap(), 3); |
| } |
| |
| #[test] |
| fn available_byte_count_message_mode_multiple_messages() { |
| let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap(); |
| p1.write(&[1, 2, 3]).unwrap(); |
| p1.write(&[4, 5]).unwrap(); |
| assert_eq!(p2.get_available_byte_count().unwrap(), 5); |
| } |
| |
| #[test] |
| fn duplex_pipe_message() { |
| let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap(); |
| |
| // Test both forward and reverse direction since the underlying APIs are a bit asymmetrical |
| unsafe { |
| for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() { |
| println!("{}", dir); |
| |
| // Send 2 messages so that we can check that message framing works |
| sender.write(&[1, 23, 45]).unwrap(); |
| sender.write(&[67, 89, 10]).unwrap(); |
| |
| let mut recv_buffer: [u8; 5] = [0; 5]; // Larger than required for messages |
| |
| let mut size = receiver.read(&mut recv_buffer).unwrap(); |
| assert_eq!(size, 3); |
| assert_eq!(recv_buffer[0..3], [1, 23, 45]); |
| |
| size = receiver.read(&mut recv_buffer).unwrap(); |
| assert_eq!(size, 3); |
| assert_eq!(recv_buffer[0..3], [67, 89, 10]); |
| } |
| } |
| } |
| |
| #[cfg(test)] |
| fn duplex_nowait_helper(p1: &PipeConnection, p2: &PipeConnection) { |
| let mut recv_buffer: [u8; 1] = [0; 1]; |
| |
| // Test both forward and reverse direction since the underlying APIs are a bit asymmetrical |
| unsafe { |
| for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() { |
| println!("{}", dir); |
| sender.write(&[1]).unwrap(); |
| assert_eq!(receiver.read(&mut recv_buffer).unwrap(), 1); // Should succeed! |
| assert_eq!( |
| receiver.read(&mut recv_buffer).unwrap_err().kind(), |
| std::io::ErrorKind::WouldBlock |
| ); |
| } |
| } |
| } |
| |
| #[test] |
| fn duplex_nowait() { |
| let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::NoWait, 0).unwrap(); |
| duplex_nowait_helper(&p1, &p2); |
| } |
| |
| #[test] |
| fn duplex_nowait_set_after_creation() { |
| // Tests non blocking setting after pipe creation |
| let (mut p1, mut p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap(); |
| p1.set_blocking(&BlockingMode::NoWait) |
| .expect("Failed to set blocking mode on pipe p1"); |
| p2.set_blocking(&BlockingMode::NoWait) |
| .expect("Failed to set blocking mode on pipe p2"); |
| duplex_nowait_helper(&p1, &p2); |
| } |
| |
| #[test] |
| fn duplex_overlapped() { |
| let pipe_name = generate_pipe_name(); |
| |
| let mut p1 = create_server_pipe( |
| &pipe_name, |
| &FramingMode::Message, |
| &BlockingMode::Wait, |
| /* timeout= */ 0, |
| /* buffer_size= */ 1000, |
| /* overlapped= */ true, |
| ) |
| .unwrap(); |
| |
| let mut p2 = create_client_pipe( |
| &pipe_name, |
| &FramingMode::Message, |
| &BlockingMode::Wait, |
| /* overlapped= */ true, |
| ) |
| .unwrap(); |
| |
| // Safe because `read_overlapped` can be called since overlapped struct is created. |
| unsafe { |
| let mut p1_overlapped_wrapper = |
| OverlappedWrapper::new(/* include_event= */ true).unwrap(); |
| p1.write_overlapped(&[75, 77, 54, 82, 76, 65], &mut p1_overlapped_wrapper) |
| .unwrap(); |
| let size = p1 |
| .get_overlapped_result(&mut p1_overlapped_wrapper) |
| .unwrap(); |
| assert_eq!(size, 6); |
| |
| let mut recv_buffer: [u8; 6] = [0; 6]; |
| |
| let mut p2_overlapped_wrapper = |
| OverlappedWrapper::new(/* include_event= */ true).unwrap(); |
| p2.read_overlapped(&mut recv_buffer, &mut p2_overlapped_wrapper) |
| .unwrap(); |
| let size = p2 |
| .get_overlapped_result(&mut p2_overlapped_wrapper) |
| .unwrap(); |
| assert_eq!(size, 6); |
| assert_eq!(recv_buffer, [75, 77, 54, 82, 76, 65]); |
| } |
| } |
| |
| #[test] |
| fn duplex_overlapped_test_in_use() { |
| let pipe_name = generate_pipe_name(); |
| |
| let mut p1 = create_server_pipe( |
| &pipe_name, |
| &FramingMode::Message, |
| &BlockingMode::Wait, |
| /* timeout= */ 0, |
| /* buffer_size= */ 1000, |
| /* overlapped= */ true, |
| ) |
| .unwrap(); |
| |
| let mut p2 = create_client_pipe( |
| &pipe_name, |
| &FramingMode::Message, |
| &BlockingMode::Wait, |
| /* overlapped= */ true, |
| ) |
| .unwrap(); |
| let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap(); |
| |
| let res = p1.get_overlapped_result(&mut overlapped_wrapper); |
| assert!(res.is_err()); |
| |
| let res = p1.write_overlapped(&[75, 77, 54, 82, 76, 65], &mut overlapped_wrapper); |
| assert!(res.is_ok()); |
| |
| let res = p2.write_overlapped(&[75, 77, 54, 82, 76, 65], &mut overlapped_wrapper); |
| assert!(res.is_err()); |
| |
| let mut recv_buffer: [u8; 6] = [0; 6]; |
| let res = unsafe { p2.read_overlapped(&mut recv_buffer, &mut overlapped_wrapper) }; |
| assert!(res.is_err()); |
| |
| let res = p1.get_overlapped_result(&mut overlapped_wrapper); |
| assert!(res.is_ok()); |
| |
| let mut recv_buffer: [u8; 6] = [0; 6]; |
| let res = unsafe { p2.read_overlapped(&mut recv_buffer, &mut overlapped_wrapper) }; |
| assert!(res.is_ok()); |
| } |
| |
| fn generate_pipe_name() -> String { |
| format!( |
| r"\\.\pipe\test-ipc-pipe-name.rand{}", |
| rand::thread_rng().gen::<u64>(), |
| ) |
| } |
| |
| #[test] |
| fn read_write_overlapped_message() { |
| let pipe_name = generate_pipe_name(); |
| |
| let mut p1 = create_server_pipe( |
| &pipe_name, |
| &FramingMode::Message, |
| &BlockingMode::Wait, |
| /* timeout= */ 0, |
| /* buffer_size= */ 1000, |
| /* overlapped= */ true, |
| ) |
| .unwrap(); |
| |
| let mut p2 = create_client_pipe( |
| &pipe_name, |
| &FramingMode::Message, |
| &BlockingMode::Wait, |
| /* overlapped= */ true, |
| ) |
| .unwrap(); |
| |
| // Safe because `read_overlapped` can be called since overlapped struct is created. |
| let mut p1_overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap(); |
| const MSG: [u8; 6] = [75, 77, 54, 82, 76, 65]; |
| p1.write_overlapped_blocking_message(&MSG.len().to_be_bytes(), &mut p1_overlapped_wrapper) |
| .unwrap(); |
| p1.write_overlapped_blocking_message(&MSG, &mut p1_overlapped_wrapper) |
| .unwrap(); |
| |
| let mut p2_overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap(); |
| let exit_event = Event::new().unwrap(); |
| let recv_buffer = p2 |
| .read_overlapped_blocking_message( |
| size_of::<usize>(), |
| |buf| usize::from_be_bytes(buf.try_into().expect("failed to get array from slice")), |
| &mut p2_overlapped_wrapper, |
| &exit_event, |
| ) |
| .unwrap(); |
| assert_eq!(recv_buffer, MSG); |
| } |
| } |