| use std::{ |
| future::Future, |
| pin::Pin, |
| task::{Context, Poll}, |
| }; |
| |
| use futures_io::{AsyncBufRead, AsyncRead}; |
| use futures_lite::ready; |
| |
| use crate::{decode, read::ProgressAction, BandRef, PacketLineRef, StreamingPeekableIter, TextRef, U16_HEX_BYTES}; |
| |
| type ReadLineResult<'a> = Option<std::io::Result<Result<PacketLineRef<'a>, decode::Error>>>; |
| /// An implementor of [`AsyncBufRead`] yielding packet lines on each call to [`read_line()`][AsyncBufRead::read_line()]. |
| /// It's also possible to hide the underlying packet lines using the [`Read`][AsyncRead] implementation which is useful |
| /// if they represent binary data, like the one of a pack file. |
| pub struct WithSidebands<'a, T, F> |
| where |
| T: AsyncRead, |
| { |
| state: State<'a, T>, |
| handle_progress: Option<F>, |
| pos: usize, |
| cap: usize, |
| } |
| |
| impl<'a, T, F> Drop for WithSidebands<'a, T, F> |
| where |
| T: AsyncRead, |
| { |
| fn drop(&mut self) { |
| if let State::Idle { ref mut parent } = self.state { |
| parent |
| .as_mut() |
| .expect("parent is always available if we are idle") |
| .reset(); |
| } |
| } |
| } |
| |
| impl<'a, T> WithSidebands<'a, T, fn(bool, &[u8]) -> ProgressAction> |
| where |
| T: AsyncRead, |
| { |
| /// Create a new instance with the given provider as `parent`. |
| pub fn new(parent: &'a mut StreamingPeekableIter<T>) -> Self { |
| WithSidebands { |
| state: State::Idle { parent: Some(parent) }, |
| handle_progress: None, |
| pos: 0, |
| cap: 0, |
| } |
| } |
| } |
| |
| enum State<'a, T> { |
| Idle { |
| parent: Option<&'a mut StreamingPeekableIter<T>>, |
| }, |
| ReadLine { |
| read_line: Pin<Box<dyn Future<Output = ReadLineResult<'a>> + 'a>>, |
| parent_inactive: Option<*mut StreamingPeekableIter<T>>, |
| }, |
| } |
| |
| /// # SAFETY |
| /// It's safe because T is `Send` and we have a test that assures that our `StreamingPeekableIter` is `Send` as well, |
| /// hence the `*mut _` is `Send`. |
| /// `read_line` isn't send and we can't declare it as such as it forces `Send` in all places (BUT WHY IS THAT A PROBLEM, I don't recall). |
| /// However, it's only used when pinned and thus isn't actually sent anywhere, it's a secondary state of the future used after it was Send |
| /// to a thread possibly. |
| // TODO: Is it possible to declare it as it should be? |
| #[allow(unsafe_code, clippy::non_send_fields_in_send_ty)] |
| unsafe impl<'a, T> Send for State<'a, T> where T: Send {} |
| |
| impl<'a, T, F> WithSidebands<'a, T, F> |
| where |
| T: AsyncRead + Unpin, |
| F: FnMut(bool, &[u8]) -> ProgressAction + Unpin, |
| { |
| /// Create a new instance with the given `parent` provider and the `handle_progress` function. |
| /// |
| /// Progress or error information will be passed to the given `handle_progress(is_error, text)` function, with `is_error: bool` |
| /// being true in case the `text` is to be interpreted as error. |
| pub fn with_progress_handler(parent: &'a mut StreamingPeekableIter<T>, handle_progress: F) -> Self { |
| WithSidebands { |
| state: State::Idle { parent: Some(parent) }, |
| handle_progress: Some(handle_progress), |
| pos: 0, |
| cap: 0, |
| } |
| } |
| |
| /// Create a new instance without a progress handler. |
| pub fn without_progress_handler(parent: &'a mut StreamingPeekableIter<T>) -> Self { |
| WithSidebands { |
| state: State::Idle { parent: Some(parent) }, |
| handle_progress: None, |
| pos: 0, |
| cap: 0, |
| } |
| } |
| |
| /// Forwards to the parent [`StreamingPeekableIter::reset_with()`] |
| pub fn reset_with(&mut self, delimiters: &'static [PacketLineRef<'static>]) { |
| if let State::Idle { ref mut parent } = self.state { |
| parent |
| .as_mut() |
| .expect("parent is always available if we are idle") |
| .reset_with(delimiters) |
| } |
| } |
| |
| /// Forwards to the parent [`StreamingPeekableIter::stopped_at()`] |
| pub fn stopped_at(&self) -> Option<PacketLineRef<'static>> { |
| match self.state { |
| State::Idle { ref parent } => { |
| parent |
| .as_ref() |
| .expect("parent is always available if we are idle") |
| .stopped_at |
| } |
| _ => None, |
| } |
| } |
| |
| /// Set or unset the progress handler. |
| pub fn set_progress_handler(&mut self, handle_progress: Option<F>) { |
| self.handle_progress = handle_progress; |
| } |
| |
| /// Effectively forwards to the parent [`StreamingPeekableIter::peek_line()`], allowing to see what would be returned |
| /// next on a call to [`read_line()`][io::BufRead::read_line()]. |
| /// |
| /// # Warning |
| /// |
| /// This skips all sideband handling and may return an unprocessed line with sidebands still contained in it. |
| pub async fn peek_data_line(&mut self) -> Option<std::io::Result<Result<&[u8], decode::Error>>> { |
| match self.state { |
| State::Idle { ref mut parent } => match parent |
| .as_mut() |
| .expect("parent is always available if we are idle") |
| .peek_line() |
| .await |
| { |
| Some(Ok(Ok(PacketLineRef::Data(line)))) => Some(Ok(Ok(line))), |
| Some(Ok(Err(err))) => Some(Ok(Err(err))), |
| Some(Err(err)) => Some(Err(err)), |
| _ => None, |
| }, |
| _ => None, |
| } |
| } |
| |
| /// Read a packet line as string line. |
| pub fn read_line_to_string<'b>(&'b mut self, buf: &'b mut String) -> ReadLineFuture<'a, 'b, T, F> { |
| ReadLineFuture { parent: self, buf } |
| } |
| |
| /// Read a packet line from the underlying packet reader, returning empty lines if a stop-packetline was reached. |
| /// |
| /// # Warning |
| /// |
| /// This skips all sideband handling and may return an unprocessed line with sidebands still contained in it. |
| pub async fn read_data_line(&mut self) -> Option<std::io::Result<Result<PacketLineRef<'_>, decode::Error>>> { |
| match &mut self.state { |
| State::Idle { parent: Some(parent) } => { |
| assert_eq!( |
| self.cap, 0, |
| "we don't support partial buffers right now - read-line must be used consistently" |
| ); |
| parent.read_line().await |
| } |
| _ => None, |
| } |
| } |
| } |
| |
| pub struct ReadDataLineFuture<'a, 'b, T: AsyncRead, F> { |
| parent: &'b mut WithSidebands<'a, T, F>, |
| buf: &'b mut Vec<u8>, |
| } |
| |
| impl<'a, 'b, T, F> Future for ReadDataLineFuture<'a, 'b, T, F> |
| where |
| T: AsyncRead + Unpin, |
| F: FnMut(bool, &[u8]) -> ProgressAction + Unpin, |
| { |
| type Output = std::io::Result<usize>; |
| |
| fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| assert_eq!( |
| self.parent.cap, 0, |
| "we don't support partial buffers right now - read-line must be used consistently" |
| ); |
| let Self { buf, parent } = &mut *self; |
| let line = ready!(Pin::new(parent).poll_fill_buf(cx))?; |
| buf.clear(); |
| buf.extend_from_slice(line); |
| let bytes = line.len(); |
| self.parent.cap = 0; |
| Poll::Ready(Ok(bytes)) |
| } |
| } |
| |
| pub struct ReadLineFuture<'a, 'b, T: AsyncRead, F> { |
| parent: &'b mut WithSidebands<'a, T, F>, |
| buf: &'b mut String, |
| } |
| |
| impl<'a, 'b, T, F> Future for ReadLineFuture<'a, 'b, T, F> |
| where |
| T: AsyncRead + Unpin, |
| F: FnMut(bool, &[u8]) -> ProgressAction + Unpin, |
| { |
| type Output = std::io::Result<usize>; |
| |
| fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| assert_eq!( |
| self.parent.cap, 0, |
| "we don't support partial buffers right now - read-line must be used consistently" |
| ); |
| let Self { buf, parent } = &mut *self; |
| let line = std::str::from_utf8(ready!(Pin::new(parent).poll_fill_buf(cx))?) |
| .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?; |
| buf.clear(); |
| buf.push_str(line); |
| let bytes = line.len(); |
| self.parent.cap = 0; |
| Poll::Ready(Ok(bytes)) |
| } |
| } |
| |
| impl<'a, T, F> AsyncBufRead for WithSidebands<'a, T, F> |
| where |
| T: AsyncRead + Unpin, |
| F: FnMut(bool, &[u8]) -> ProgressAction + Unpin, |
| { |
| fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<&[u8]>> { |
| use std::io; |
| |
| use futures_lite::FutureExt; |
| { |
| let this = self.as_mut().get_mut(); |
| if this.pos >= this.cap { |
| let (ofs, cap) = loop { |
| match this.state { |
| State::Idle { ref mut parent } => { |
| let parent = parent.take().expect("parent to be present here"); |
| let inactive = parent as *mut _; |
| this.state = State::ReadLine { |
| read_line: parent.read_line().boxed_local(), |
| parent_inactive: Some(inactive), |
| } |
| } |
| State::ReadLine { |
| ref mut read_line, |
| ref mut parent_inactive, |
| } => { |
| let line = ready!(read_line.poll(cx)); |
| |
| this.state = { |
| let parent = parent_inactive.take().expect("parent pointer always set"); |
| // SAFETY: It's safe to recover the original mutable reference (from which |
| // the `read_line` future was created as the latter isn't accessible anymore |
| // once the state is set to Idle. In other words, either one or the other are |
| // accessible, never both at the same time. |
| // Also: We keep a pointer around which is protected by borrowcheck since it's created |
| // from a legal mutable reference which is moved into the read_line future - if it was manually |
| // implemented we would be able to re-obtain it from there. |
| #[allow(unsafe_code)] |
| let parent = unsafe { &mut *parent }; |
| State::Idle { parent: Some(parent) } |
| }; |
| |
| let line = match line { |
| Some(line) => line?.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?, |
| None => break (0, 0), |
| }; |
| |
| match this.handle_progress.as_mut() { |
| Some(handle_progress) => { |
| let band = line |
| .decode_band() |
| .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; |
| const ENCODED_BAND: usize = 1; |
| match band { |
| BandRef::Data(d) => { |
| if d.is_empty() { |
| continue; |
| } |
| break (U16_HEX_BYTES + ENCODED_BAND, d.len()); |
| } |
| BandRef::Progress(d) => { |
| let text = TextRef::from(d).0; |
| match handle_progress(false, text) { |
| ProgressAction::Continue => {} |
| ProgressAction::Interrupt => { |
| return Poll::Ready(Err(io::Error::new( |
| std::io::ErrorKind::Other, |
| "interrupted by user", |
| ))) |
| } |
| }; |
| } |
| BandRef::Error(d) => { |
| let text = TextRef::from(d).0; |
| match handle_progress(true, text) { |
| ProgressAction::Continue => {} |
| ProgressAction::Interrupt => { |
| return Poll::Ready(Err(io::Error::new( |
| io::ErrorKind::Other, |
| "interrupted by user", |
| ))) |
| } |
| }; |
| } |
| }; |
| } |
| None => { |
| break match line.as_slice() { |
| Some(d) => (U16_HEX_BYTES, d.len()), |
| None => { |
| return Poll::Ready(Err(io::Error::new( |
| io::ErrorKind::UnexpectedEof, |
| "encountered non-data line in a data-line only context", |
| ))) |
| } |
| } |
| } |
| } |
| } |
| } |
| }; |
| this.cap = cap + ofs; |
| this.pos = ofs; |
| } |
| } |
| let range = self.pos..self.cap; |
| match &self.get_mut().state { |
| State::Idle { parent } => Poll::Ready(Ok(&parent.as_ref().expect("parent always available").buf[range])), |
| State::ReadLine { .. } => unreachable!("at least in theory"), |
| } |
| } |
| |
| fn consume(self: Pin<&mut Self>, amt: usize) { |
| let this = self.get_mut(); |
| this.pos = std::cmp::min(this.pos + amt, this.cap); |
| } |
| } |
| |
| impl<'a, T, F> AsyncRead for WithSidebands<'a, T, F> |
| where |
| T: AsyncRead + Unpin, |
| F: FnMut(bool, &[u8]) -> ProgressAction + Unpin, |
| { |
| fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<std::io::Result<usize>> { |
| let nread = { |
| use std::io::Read; |
| let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?; |
| rem.read(buf)? |
| }; |
| self.consume(nread); |
| Poll::Ready(Ok(nread)) |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| fn receiver<T: Send>(_i: T) {} |
| |
| /// We want to declare items containing pointers of `StreamingPeekableIter` `Send` as well, so it must be `Send` itself. |
| #[test] |
| fn streaming_peekable_iter_is_send() { |
| receiver(StreamingPeekableIter::new(Vec::<u8>::new(), &[], false)); |
| } |
| |
| #[test] |
| fn state_is_send() { |
| let mut s = StreamingPeekableIter::new(Vec::<u8>::new(), &[], false); |
| receiver(State::Idle { parent: Some(&mut s) }); |
| } |
| } |