| use crate::io::util::{BufReader, BufWriter}; |
| use crate::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}; |
| |
| use pin_project_lite::pin_project; |
| use std::io::{self, IoSlice, SeekFrom}; |
| use std::pin::Pin; |
| use std::task::{Context, Poll}; |
| |
| pin_project! { |
| /// Wraps a type that is [`AsyncWrite`] and [`AsyncRead`], and buffers its input and output. |
| /// |
| /// It can be excessively inefficient to work directly with something that implements [`AsyncWrite`] |
| /// and [`AsyncRead`]. For example, every `write`, however small, has to traverse the syscall |
| /// interface, and similarly, every read has to do the same. The [`BufWriter`] and [`BufReader`] |
| /// types aid with these problems respectively, but do so in only one direction. `BufStream` wraps |
| /// one in the other so that both directions are buffered. See their documentation for details. |
| #[derive(Debug)] |
| #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] |
| pub struct BufStream<RW> { |
| #[pin] |
| inner: BufReader<BufWriter<RW>>, |
| } |
| } |
| |
| impl<RW: AsyncRead + AsyncWrite> BufStream<RW> { |
| /// Wraps a type in both [`BufWriter`] and [`BufReader`]. |
| /// |
| /// See the documentation for those types and [`BufStream`] for details. |
| pub fn new(stream: RW) -> BufStream<RW> { |
| BufStream { |
| inner: BufReader::new(BufWriter::new(stream)), |
| } |
| } |
| |
| /// Creates a `BufStream` with the specified [`BufReader`] capacity and [`BufWriter`] |
| /// capacity. |
| /// |
| /// See the documentation for those types and [`BufStream`] for details. |
| pub fn with_capacity( |
| reader_capacity: usize, |
| writer_capacity: usize, |
| stream: RW, |
| ) -> BufStream<RW> { |
| BufStream { |
| inner: BufReader::with_capacity( |
| reader_capacity, |
| BufWriter::with_capacity(writer_capacity, stream), |
| ), |
| } |
| } |
| |
| /// Gets a reference to the underlying I/O object. |
| /// |
| /// It is inadvisable to directly read from the underlying I/O object. |
| pub fn get_ref(&self) -> &RW { |
| self.inner.get_ref().get_ref() |
| } |
| |
| /// Gets a mutable reference to the underlying I/O object. |
| /// |
| /// It is inadvisable to directly read from the underlying I/O object. |
| pub fn get_mut(&mut self) -> &mut RW { |
| self.inner.get_mut().get_mut() |
| } |
| |
| /// Gets a pinned mutable reference to the underlying I/O object. |
| /// |
| /// It is inadvisable to directly read from the underlying I/O object. |
| pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut RW> { |
| self.project().inner.get_pin_mut().get_pin_mut() |
| } |
| |
| /// Consumes this `BufStream`, returning the underlying I/O object. |
| /// |
| /// Note that any leftover data in the internal buffer is lost. |
| pub fn into_inner(self) -> RW { |
| self.inner.into_inner().into_inner() |
| } |
| } |
| |
| impl<RW> From<BufReader<BufWriter<RW>>> for BufStream<RW> { |
| fn from(b: BufReader<BufWriter<RW>>) -> Self { |
| BufStream { inner: b } |
| } |
| } |
| |
| impl<RW> From<BufWriter<BufReader<RW>>> for BufStream<RW> { |
| fn from(b: BufWriter<BufReader<RW>>) -> Self { |
| // we need to "invert" the reader and writer |
| let BufWriter { |
| inner: |
| BufReader { |
| inner, |
| buf: rbuf, |
| pos, |
| cap, |
| seek_state: rseek_state, |
| }, |
| buf: wbuf, |
| written, |
| seek_state: wseek_state, |
| } = b; |
| |
| BufStream { |
| inner: BufReader { |
| inner: BufWriter { |
| inner, |
| buf: wbuf, |
| written, |
| seek_state: wseek_state, |
| }, |
| buf: rbuf, |
| pos, |
| cap, |
| seek_state: rseek_state, |
| }, |
| } |
| } |
| } |
| |
| impl<RW: AsyncRead + AsyncWrite> AsyncWrite for BufStream<RW> { |
| fn poll_write( |
| self: Pin<&mut Self>, |
| cx: &mut Context<'_>, |
| buf: &[u8], |
| ) -> Poll<io::Result<usize>> { |
| self.project().inner.poll_write(cx, buf) |
| } |
| |
| fn poll_write_vectored( |
| self: Pin<&mut Self>, |
| cx: &mut Context<'_>, |
| bufs: &[IoSlice<'_>], |
| ) -> Poll<io::Result<usize>> { |
| self.project().inner.poll_write_vectored(cx, bufs) |
| } |
| |
| fn is_write_vectored(&self) -> bool { |
| self.inner.is_write_vectored() |
| } |
| |
| fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
| self.project().inner.poll_flush(cx) |
| } |
| |
| fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
| self.project().inner.poll_shutdown(cx) |
| } |
| } |
| |
| impl<RW: AsyncRead + AsyncWrite> AsyncRead for BufStream<RW> { |
| fn poll_read( |
| self: Pin<&mut Self>, |
| cx: &mut Context<'_>, |
| buf: &mut ReadBuf<'_>, |
| ) -> Poll<io::Result<()>> { |
| self.project().inner.poll_read(cx, buf) |
| } |
| } |
| |
| /// Seek to an offset, in bytes, in the underlying stream. |
| /// |
| /// The position used for seeking with `SeekFrom::Current(_)` is the |
| /// position the underlying stream would be at if the `BufStream` had no |
| /// internal buffer. |
| /// |
| /// Seeking always discards the internal buffer, even if the seek position |
| /// would otherwise fall within it. This guarantees that calling |
| /// `.into_inner()` immediately after a seek yields the underlying reader |
| /// at the same position. |
| /// |
| /// See [`AsyncSeek`] for more details. |
| /// |
| /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)` |
| /// where `n` minus the internal buffer length overflows an `i64`, two |
| /// seeks will be performed instead of one. If the second seek returns |
| /// `Err`, the underlying reader will be left at the same position it would |
| /// have if you called `seek` with `SeekFrom::Current(0)`. |
| impl<RW: AsyncRead + AsyncWrite + AsyncSeek> AsyncSeek for BufStream<RW> { |
| fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> { |
| self.project().inner.start_seek(position) |
| } |
| |
| fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> { |
| self.project().inner.poll_complete(cx) |
| } |
| } |
| |
| impl<RW: AsyncRead + AsyncWrite> AsyncBufRead for BufStream<RW> { |
| fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { |
| self.project().inner.poll_fill_buf(cx) |
| } |
| |
| fn consume(self: Pin<&mut Self>, amt: usize) { |
| self.project().inner.consume(amt); |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| |
| #[test] |
| fn assert_unpin() { |
| crate::is_unpin::<BufStream<()>>(); |
| } |
| } |