blob: 6821c7ede71e2ab53ec2b1d36343115948409eae [file] [log] [blame]
use std::io::{BufRead, Read, Seek, Write};
use tokio::io::{
AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite,
AsyncWriteExt,
};
/// Use a [`tokio::io::AsyncRead`] synchronously as a [`std::io::Read`] or
/// a [`tokio::io::AsyncWrite`] synchronously as a [`std::io::Write`].
///
/// # Alternatives
///
/// In many cases, there are better alternatives to using `SyncIoBridge`, especially
/// if you want to avoid blocking the async runtime. Consider the following scenarios:
///
/// When hashing data, using `SyncIoBridge` can lead to suboptimal performance and
/// might not fully leverage the async capabilities of the system.
///
/// ### Why It Matters:
///
/// `SyncIoBridge` allows you to use asynchronous I/O operations in an synchronous
/// context by blocking the current thread. However, this can be inefficient because:
/// - **Inefficient Resource Usage**: `SyncIoBridge` takes up an entire OS thread,
/// which is inefficient compared to asynchronous code that can multiplex many
/// tasks on a single thread.
/// - **Thread Pool Saturation**: Excessive use of `SyncIoBridge` can exhaust the
/// async runtime's thread pool, reducing the number of threads available for
/// other tasks and impacting overall performance.
/// - **Missed Concurrency Benefits**: By using synchronous operations with
/// `SyncIoBridge`, you lose the ability to interleave tasks efficiently,
/// which is a key advantage of asynchronous programming.
///
/// ## Example 1: Hashing Data
///
/// The use of `SyncIoBridge` is unnecessary when hashing data. Instead, you can
/// process the data asynchronously by reading it into memory, which avoids blocking
/// the async runtime.
///
/// There are two strategies for avoiding `SyncIoBridge` when hashing data. When
/// the data fits into memory, the easiest is to read the data into a `Vec<u8>`
/// and hash it:
///
/// Explanation: This example demonstrates how to asynchronously read data from a
/// reader into memory and hash it using a synchronous hashing function. The
/// `SyncIoBridge` is avoided, ensuring that the async runtime is not blocked.
/// ```rust
/// use tokio::io::AsyncReadExt;
/// use tokio::io::AsyncRead;
/// use std::io::Cursor;
/// # mod blake3 { pub fn hash(_: &[u8]) {} }
///
/// async fn hash_contents(mut reader: impl AsyncRead + Unpin) -> Result<(), std::io::Error> {
/// // Read all data from the reader into a Vec<u8>.
/// let mut data = Vec::new();
/// reader.read_to_end(&mut data).await?;
///
/// // Hash the data using the blake3 hashing function.
/// let hash = blake3::hash(&data);
///
/// Ok(hash)
///}
///
/// #[tokio::main]
/// async fn main() -> Result<(), std::io::Error> {
/// // Example: In-memory data.
/// let data = b"Hello, world!"; // A byte slice.
/// let reader = Cursor::new(data); // Create an in-memory AsyncRead.
/// hash_contents(reader).await
/// }
/// ```
///
/// When the data doesn't fit into memory, the hashing library will usually
/// provide a `hasher` that you can repeatedly call `update` on to hash the data
/// one chunk at the time.
///
/// Explanation: This example demonstrates how to asynchronously stream data in
/// chunks for hashing. Each chunk is read asynchronously, and the hash is updated
/// incrementally. This avoids blocking and improves performance over using
/// `SyncIoBridge`.
///
/// ```rust
/// use tokio::io::AsyncReadExt;
/// use tokio::io::AsyncRead;
/// use std::io::Cursor;
/// # struct Hasher;
/// # impl Hasher { pub fn update(&mut self, _: &[u8]) {} pub fn finalize(&self) {} }
///
/// /// Asynchronously streams data from an async reader, processes it in chunks,
/// /// and hashes the data incrementally.
/// async fn hash_stream(mut reader: impl AsyncRead + Unpin, mut hasher: Hasher) -> Result<(), std::io::Error> {
/// // Create a buffer to read data into, sized for performance.
/// let mut data = vec![0; 64 * 1024];
/// loop {
/// // Read data from the reader into the buffer.
/// let len = reader.read(&mut data).await?;
/// if len == 0 { break; } // Exit loop if no more data.
///
/// // Update the hash with the data read.
/// hasher.update(&data[..len]);
/// }
///
/// // Finalize the hash after all data has been processed.
/// let hash = hasher.finalize();
///
/// Ok(hash)
///}
///
/// #[tokio::main]
/// async fn main() -> Result<(), std::io::Error> {
/// // Example: In-memory data.
/// let data = b"Hello, world!"; // A byte slice.
/// let reader = Cursor::new(data); // Create an in-memory AsyncRead.
/// let hasher = Hasher;
/// hash_stream(reader, hasher).await
/// }
/// ```
///
///
/// ## Example 2: Compressing Data
///
/// When compressing data, the use of `SyncIoBridge` is unnecessary as it introduces
/// blocking and inefficient code. Instead, you can utilize an async compression library
/// such as the [`async-compression`](https://docs.rs/async-compression/latest/async_compression/)
/// crate, which is built to handle asynchronous data streams efficiently.
///
/// Explanation: This example shows how to asynchronously compress data using an
/// async compression library. By reading and writing asynchronously, it avoids
/// blocking and is more efficient than using `SyncIoBridge` with a non-async
/// compression library.
///
/// ```ignore
/// use async_compression::tokio::write::GzipEncoder;
/// use std::io::Cursor;
/// use tokio::io::AsyncRead;
///
/// /// Asynchronously compresses data from an async reader using Gzip and an async encoder.
/// async fn compress_data(mut reader: impl AsyncRead + Unpin) -> Result<(), std::io::Error> {
/// let writer = tokio::io::sink();
///
/// // Create a Gzip encoder that wraps the writer.
/// let mut encoder = GzipEncoder::new(writer);
///
/// // Copy data from the reader to the encoder, compressing it.
/// tokio::io::copy(&mut reader, &mut encoder).await?;
///
/// Ok(())
///}
///
/// #[tokio::main]
/// async fn main() -> Result<(), std::io::Error> {
/// // Example: In-memory data.
/// let data = b"Hello, world!"; // A byte slice.
/// let reader = Cursor::new(data); // Create an in-memory AsyncRead.
/// compress_data(reader).await?;
///
/// Ok(())
/// }
/// ```
///
///
/// ## Example 3: Parsing Data Formats
///
///
/// `SyncIoBridge` is not ideal when parsing data formats such as `JSON`, as it
/// blocks async operations. A more efficient approach is to read data asynchronously
/// into memory and then `deserialize` it, avoiding unnecessary synchronization overhead.
///
/// Explanation: This example shows how to asynchronously read data into memory
/// and then parse it as `JSON`. By avoiding `SyncIoBridge`, the asynchronous runtime
/// remains unblocked, leading to better performance when working with asynchronous
/// I/O streams.
///
/// ```rust,no_run
/// use tokio::io::AsyncRead;
/// use tokio::io::AsyncReadExt;
/// use std::io::Cursor;
/// # mod serde {
/// # pub trait DeserializeOwned: 'static {}
/// # impl<T: 'static> DeserializeOwned for T {}
/// # }
/// # mod serde_json {
/// # use super::serde::DeserializeOwned;
/// # pub fn from_slice<T: DeserializeOwned>(_: &[u8]) -> Result<T, std::io::Error> {
/// # unimplemented!()
/// # }
/// # }
/// # #[derive(Debug)] struct MyStruct;
///
///
/// async fn parse_json(mut reader: impl AsyncRead + Unpin) -> Result<MyStruct, std::io::Error> {
/// // Read all data from the reader into a Vec<u8>.
/// let mut data = Vec::new();
/// reader.read_to_end(&mut data).await?;
///
/// // Deserialize the data from the Vec<u8> into a MyStruct instance.
/// let value: MyStruct = serde_json::from_slice(&data)?;
///
/// Ok(value)
///}
///
/// #[tokio::main]
/// async fn main() -> Result<(), std::io::Error> {
/// // Example: In-memory data.
/// let data = b"Hello, world!"; // A byte slice.
/// let reader = Cursor::new(data); // Create an in-memory AsyncRead.
/// parse_json(reader).await?;
/// Ok(())
/// }
/// ```
///
/// ## Correct Usage of `SyncIoBridge` inside `spawn_blocking`
///
/// `SyncIoBridge` is mainly useful when you need to interface with synchronous
/// libraries from an asynchronous context.
///
/// Explanation: This example shows how to use `SyncIoBridge` inside a `spawn_blocking`
/// task to safely perform synchronous I/O without blocking the async runtime. The
/// `spawn_blocking` ensures that the synchronous code is offloaded to a dedicated
/// thread pool, preventing it from interfering with the async tasks.
///
/// ```rust
/// use tokio::task::spawn_blocking;
/// use tokio_util::io::SyncIoBridge;
/// use tokio::io::AsyncRead;
/// use std::marker::Unpin;
/// use std::io::Cursor;
///
/// /// Wraps an async reader with `SyncIoBridge` and performs synchronous I/O operations in a blocking task.
/// async fn process_sync_io(reader: impl AsyncRead + Unpin + Send + 'static) -> Result<Vec<u8>, std::io::Error> {
/// // Wrap the async reader with `SyncIoBridge` to allow synchronous reading.
/// let mut sync_reader = SyncIoBridge::new(reader);
///
/// // Spawn a blocking task to perform synchronous I/O operations.
/// let result = spawn_blocking(move || {
/// // Create an in-memory buffer to hold the copied data.
/// let mut buffer = Vec::new();
/// // Copy data from the sync_reader to the buffer.
/// std::io::copy(&mut sync_reader, &mut buffer)?;
/// // Return the buffer containing the copied data.
/// Ok::<_, std::io::Error>(buffer)
/// })
/// .await??;
///
/// // Return the result from the blocking task.
/// Ok(result)
///}
///
/// #[tokio::main]
/// async fn main() -> Result<(), std::io::Error> {
/// // Example: In-memory data.
/// let data = b"Hello, world!"; // A byte slice.
/// let reader = Cursor::new(data); // Create an in-memory AsyncRead.
/// let result = process_sync_io(reader).await?;
///
/// // You can use `result` here as needed.
///
/// Ok(())
/// }
/// ```
///
#[derive(Debug)]
pub struct SyncIoBridge<T> {
src: T,
rt: tokio::runtime::Handle,
}
impl<T: AsyncBufRead + Unpin> BufRead for SyncIoBridge<T> {
fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
let src = &mut self.src;
self.rt.block_on(AsyncBufReadExt::fill_buf(src))
}
fn consume(&mut self, amt: usize) {
let src = &mut self.src;
AsyncBufReadExt::consume(src, amt)
}
fn read_until(&mut self, byte: u8, buf: &mut Vec<u8>) -> std::io::Result<usize> {
let src = &mut self.src;
self.rt
.block_on(AsyncBufReadExt::read_until(src, byte, buf))
}
fn read_line(&mut self, buf: &mut String) -> std::io::Result<usize> {
let src = &mut self.src;
self.rt.block_on(AsyncBufReadExt::read_line(src, buf))
}
}
impl<T: AsyncRead + Unpin> Read for SyncIoBridge<T> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let src = &mut self.src;
self.rt.block_on(AsyncReadExt::read(src, buf))
}
fn read_to_end(&mut self, buf: &mut Vec<u8>) -> std::io::Result<usize> {
let src = &mut self.src;
self.rt.block_on(src.read_to_end(buf))
}
fn read_to_string(&mut self, buf: &mut String) -> std::io::Result<usize> {
let src = &mut self.src;
self.rt.block_on(src.read_to_string(buf))
}
fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
let src = &mut self.src;
// The AsyncRead trait returns the count, synchronous doesn't.
let _n = self.rt.block_on(src.read_exact(buf))?;
Ok(())
}
}
impl<T: AsyncWrite + Unpin> Write for SyncIoBridge<T> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let src = &mut self.src;
self.rt.block_on(src.write(buf))
}
fn flush(&mut self) -> std::io::Result<()> {
let src = &mut self.src;
self.rt.block_on(src.flush())
}
fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
let src = &mut self.src;
self.rt.block_on(src.write_all(buf))
}
fn write_vectored(&mut self, bufs: &[std::io::IoSlice<'_>]) -> std::io::Result<usize> {
let src = &mut self.src;
self.rt.block_on(src.write_vectored(bufs))
}
}
impl<T: AsyncSeek + Unpin> Seek for SyncIoBridge<T> {
fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
let src = &mut self.src;
self.rt.block_on(AsyncSeekExt::seek(src, pos))
}
}
// Because https://doc.rust-lang.org/std/io/trait.Write.html#method.is_write_vectored is at the time
// of this writing still unstable, we expose this as part of a standalone method.
impl<T: AsyncWrite> SyncIoBridge<T> {
/// Determines if the underlying [`tokio::io::AsyncWrite`] target supports efficient vectored writes.
///
/// See [`tokio::io::AsyncWrite::is_write_vectored`].
pub fn is_write_vectored(&self) -> bool {
self.src.is_write_vectored()
}
}
impl<T: AsyncWrite + Unpin> SyncIoBridge<T> {
/// Shutdown this writer. This method provides a way to call the [`AsyncWriteExt::shutdown`]
/// function of the inner [`tokio::io::AsyncWrite`] instance.
///
/// # Errors
///
/// This method returns the same errors as [`AsyncWriteExt::shutdown`].
///
/// [`AsyncWriteExt::shutdown`]: tokio::io::AsyncWriteExt::shutdown
pub fn shutdown(&mut self) -> std::io::Result<()> {
let src = &mut self.src;
self.rt.block_on(src.shutdown())
}
}
impl<T: Unpin> SyncIoBridge<T> {
/// Use a [`tokio::io::AsyncRead`] synchronously as a [`std::io::Read`] or
/// a [`tokio::io::AsyncWrite`] as a [`std::io::Write`].
///
/// When this struct is created, it captures a handle to the current thread's runtime with [`tokio::runtime::Handle::current`].
/// It is hence OK to move this struct into a separate thread outside the runtime, as created
/// by e.g. [`tokio::task::spawn_blocking`].
///
/// Stated even more strongly: to make use of this bridge, you *must* move
/// it into a separate thread outside the runtime. The synchronous I/O will use the
/// underlying handle to block on the backing asynchronous source, via
/// [`tokio::runtime::Handle::block_on`]. As noted in the documentation for that
/// function, an attempt to `block_on` from an asynchronous execution context
/// will panic.
///
/// # Wrapping `!Unpin` types
///
/// Use e.g. `SyncIoBridge::new(Box::pin(src))`.
///
/// # Panics
///
/// This will panic if called outside the context of a Tokio runtime.
#[track_caller]
pub fn new(src: T) -> Self {
Self::new_with_handle(src, tokio::runtime::Handle::current())
}
/// Use a [`tokio::io::AsyncRead`] synchronously as a [`std::io::Read`] or
/// a [`tokio::io::AsyncWrite`] as a [`std::io::Write`].
///
/// This is the same as [`SyncIoBridge::new`], but allows passing an arbitrary handle and hence may
/// be initially invoked outside of an asynchronous context.
pub fn new_with_handle(src: T, rt: tokio::runtime::Handle) -> Self {
Self { src, rt }
}
/// Consume this bridge, returning the underlying stream.
pub fn into_inner(self) -> T {
self.src
}
}
impl<T> AsMut<T> for SyncIoBridge<T> {
fn as_mut(&mut self) -> &mut T {
&mut self.src
}
}
impl<T> AsRef<T> for SyncIoBridge<T> {
fn as_ref(&self) -> &T {
&self.src
}
}