blob: 6c53e922c1f25f6dea697903f33aa8bfa2b4672b [file] [log] [blame]
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use futures_io::{AsyncBufRead, AsyncRead};
use futures_lite::ready;
use crate::{decode, read::ProgressAction, BandRef, PacketLineRef, StreamingPeekableIter, TextRef, U16_HEX_BYTES};
type ReadLineResult<'a> = Option<std::io::Result<Result<PacketLineRef<'a>, decode::Error>>>;
/// An implementor of [`AsyncBufRead`] yielding packet lines on each call to [`read_line()`][AsyncBufRead::read_line()].
/// It's also possible to hide the underlying packet lines using the [`Read`][AsyncRead] implementation which is useful
/// if they represent binary data, like the one of a pack file.
pub struct WithSidebands<'a, T, F>
where
T: AsyncRead,
{
state: State<'a, T>,
handle_progress: Option<F>,
pos: usize,
cap: usize,
}
impl<'a, T, F> Drop for WithSidebands<'a, T, F>
where
T: AsyncRead,
{
fn drop(&mut self) {
if let State::Idle { ref mut parent } = self.state {
parent
.as_mut()
.expect("parent is always available if we are idle")
.reset();
}
}
}
impl<'a, T> WithSidebands<'a, T, fn(bool, &[u8]) -> ProgressAction>
where
T: AsyncRead,
{
/// Create a new instance with the given provider as `parent`.
pub fn new(parent: &'a mut StreamingPeekableIter<T>) -> Self {
WithSidebands {
state: State::Idle { parent: Some(parent) },
handle_progress: None,
pos: 0,
cap: 0,
}
}
}
enum State<'a, T> {
Idle {
parent: Option<&'a mut StreamingPeekableIter<T>>,
},
ReadLine {
read_line: Pin<Box<dyn Future<Output = ReadLineResult<'a>> + 'a>>,
parent_inactive: Option<*mut StreamingPeekableIter<T>>,
},
}
/// # SAFETY
/// It's safe because T is `Send` and we have a test that assures that our `StreamingPeekableIter` is `Send` as well,
/// hence the `*mut _` is `Send`.
/// `read_line` isn't send and we can't declare it as such as it forces `Send` in all places (BUT WHY IS THAT A PROBLEM, I don't recall).
/// However, it's only used when pinned and thus isn't actually sent anywhere, it's a secondary state of the future used after it was Send
/// to a thread possibly.
// TODO: Is it possible to declare it as it should be?
#[allow(unsafe_code, clippy::non_send_fields_in_send_ty)]
unsafe impl<'a, T> Send for State<'a, T> where T: Send {}
impl<'a, T, F> WithSidebands<'a, T, F>
where
T: AsyncRead + Unpin,
F: FnMut(bool, &[u8]) -> ProgressAction + Unpin,
{
/// Create a new instance with the given `parent` provider and the `handle_progress` function.
///
/// Progress or error information will be passed to the given `handle_progress(is_error, text)` function, with `is_error: bool`
/// being true in case the `text` is to be interpreted as error.
pub fn with_progress_handler(parent: &'a mut StreamingPeekableIter<T>, handle_progress: F) -> Self {
WithSidebands {
state: State::Idle { parent: Some(parent) },
handle_progress: Some(handle_progress),
pos: 0,
cap: 0,
}
}
/// Create a new instance without a progress handler.
pub fn without_progress_handler(parent: &'a mut StreamingPeekableIter<T>) -> Self {
WithSidebands {
state: State::Idle { parent: Some(parent) },
handle_progress: None,
pos: 0,
cap: 0,
}
}
/// Forwards to the parent [`StreamingPeekableIter::reset_with()`]
pub fn reset_with(&mut self, delimiters: &'static [PacketLineRef<'static>]) {
if let State::Idle { ref mut parent } = self.state {
parent
.as_mut()
.expect("parent is always available if we are idle")
.reset_with(delimiters)
}
}
/// Forwards to the parent [`StreamingPeekableIter::stopped_at()`]
pub fn stopped_at(&self) -> Option<PacketLineRef<'static>> {
match self.state {
State::Idle { ref parent } => {
parent
.as_ref()
.expect("parent is always available if we are idle")
.stopped_at
}
_ => None,
}
}
/// Set or unset the progress handler.
pub fn set_progress_handler(&mut self, handle_progress: Option<F>) {
self.handle_progress = handle_progress;
}
/// Effectively forwards to the parent [`StreamingPeekableIter::peek_line()`], allowing to see what would be returned
/// next on a call to [`read_line()`][io::BufRead::read_line()].
///
/// # Warning
///
/// This skips all sideband handling and may return an unprocessed line with sidebands still contained in it.
pub async fn peek_data_line(&mut self) -> Option<std::io::Result<Result<&[u8], decode::Error>>> {
match self.state {
State::Idle { ref mut parent } => match parent
.as_mut()
.expect("parent is always available if we are idle")
.peek_line()
.await
{
Some(Ok(Ok(PacketLineRef::Data(line)))) => Some(Ok(Ok(line))),
Some(Ok(Err(err))) => Some(Ok(Err(err))),
Some(Err(err)) => Some(Err(err)),
_ => None,
},
_ => None,
}
}
/// Read a packet line as string line.
pub fn read_line_to_string<'b>(&'b mut self, buf: &'b mut String) -> ReadLineFuture<'a, 'b, T, F> {
ReadLineFuture { parent: self, buf }
}
/// Read a packet line from the underlying packet reader, returning empty lines if a stop-packetline was reached.
///
/// # Warning
///
/// This skips all sideband handling and may return an unprocessed line with sidebands still contained in it.
pub async fn read_data_line(&mut self) -> Option<std::io::Result<Result<PacketLineRef<'_>, decode::Error>>> {
match &mut self.state {
State::Idle { parent: Some(parent) } => {
assert_eq!(
self.cap, 0,
"we don't support partial buffers right now - read-line must be used consistently"
);
parent.read_line().await
}
_ => None,
}
}
}
pub struct ReadDataLineFuture<'a, 'b, T: AsyncRead, F> {
parent: &'b mut WithSidebands<'a, T, F>,
buf: &'b mut Vec<u8>,
}
impl<'a, 'b, T, F> Future for ReadDataLineFuture<'a, 'b, T, F>
where
T: AsyncRead + Unpin,
F: FnMut(bool, &[u8]) -> ProgressAction + Unpin,
{
type Output = std::io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
assert_eq!(
self.parent.cap, 0,
"we don't support partial buffers right now - read-line must be used consistently"
);
let Self { buf, parent } = &mut *self;
let line = ready!(Pin::new(parent).poll_fill_buf(cx))?;
buf.clear();
buf.extend_from_slice(line);
let bytes = line.len();
self.parent.cap = 0;
Poll::Ready(Ok(bytes))
}
}
pub struct ReadLineFuture<'a, 'b, T: AsyncRead, F> {
parent: &'b mut WithSidebands<'a, T, F>,
buf: &'b mut String,
}
impl<'a, 'b, T, F> Future for ReadLineFuture<'a, 'b, T, F>
where
T: AsyncRead + Unpin,
F: FnMut(bool, &[u8]) -> ProgressAction + Unpin,
{
type Output = std::io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
assert_eq!(
self.parent.cap, 0,
"we don't support partial buffers right now - read-line must be used consistently"
);
let Self { buf, parent } = &mut *self;
let line = std::str::from_utf8(ready!(Pin::new(parent).poll_fill_buf(cx))?)
.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?;
buf.clear();
buf.push_str(line);
let bytes = line.len();
self.parent.cap = 0;
Poll::Ready(Ok(bytes))
}
}
impl<'a, T, F> AsyncBufRead for WithSidebands<'a, T, F>
where
T: AsyncRead + Unpin,
F: FnMut(bool, &[u8]) -> ProgressAction + Unpin,
{
fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<&[u8]>> {
use std::io;
use futures_lite::FutureExt;
{
let this = self.as_mut().get_mut();
if this.pos >= this.cap {
let (ofs, cap) = loop {
match this.state {
State::Idle { ref mut parent } => {
let parent = parent.take().expect("parent to be present here");
let inactive = parent as *mut _;
this.state = State::ReadLine {
read_line: parent.read_line().boxed_local(),
parent_inactive: Some(inactive),
}
}
State::ReadLine {
ref mut read_line,
ref mut parent_inactive,
} => {
let line = ready!(read_line.poll(cx));
this.state = {
let parent = parent_inactive.take().expect("parent pointer always set");
// SAFETY: It's safe to recover the original mutable reference (from which
// the `read_line` future was created as the latter isn't accessible anymore
// once the state is set to Idle. In other words, either one or the other are
// accessible, never both at the same time.
// Also: We keep a pointer around which is protected by borrowcheck since it's created
// from a legal mutable reference which is moved into the read_line future - if it was manually
// implemented we would be able to re-obtain it from there.
#[allow(unsafe_code)]
let parent = unsafe { &mut *parent };
State::Idle { parent: Some(parent) }
};
let line = match line {
Some(line) => line?.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?,
None => break (0, 0),
};
match this.handle_progress.as_mut() {
Some(handle_progress) => {
let band = line
.decode_band()
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
const ENCODED_BAND: usize = 1;
match band {
BandRef::Data(d) => {
if d.is_empty() {
continue;
}
break (U16_HEX_BYTES + ENCODED_BAND, d.len());
}
BandRef::Progress(d) => {
let text = TextRef::from(d).0;
match handle_progress(false, text) {
ProgressAction::Continue => {}
ProgressAction::Interrupt => {
return Poll::Ready(Err(io::Error::new(
std::io::ErrorKind::Other,
"interrupted by user",
)))
}
};
}
BandRef::Error(d) => {
let text = TextRef::from(d).0;
match handle_progress(true, text) {
ProgressAction::Continue => {}
ProgressAction::Interrupt => {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::Other,
"interrupted by user",
)))
}
};
}
};
}
None => {
break match line.as_slice() {
Some(d) => (U16_HEX_BYTES, d.len()),
None => {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"encountered non-data line in a data-line only context",
)))
}
}
}
}
}
}
};
this.cap = cap + ofs;
this.pos = ofs;
}
}
let range = self.pos..self.cap;
match &self.get_mut().state {
State::Idle { parent } => Poll::Ready(Ok(&parent.as_ref().expect("parent always available").buf[range])),
State::ReadLine { .. } => unreachable!("at least in theory"),
}
}
fn consume(self: Pin<&mut Self>, amt: usize) {
let this = self.get_mut();
this.pos = std::cmp::min(this.pos + amt, this.cap);
}
}
impl<'a, T, F> AsyncRead for WithSidebands<'a, T, F>
where
T: AsyncRead + Unpin,
F: FnMut(bool, &[u8]) -> ProgressAction + Unpin,
{
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<std::io::Result<usize>> {
let nread = {
use std::io::Read;
let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
rem.read(buf)?
};
self.consume(nread);
Poll::Ready(Ok(nread))
}
}
#[cfg(test)]
mod tests {
use super::*;
fn receiver<T: Send>(_i: T) {}
/// We want to declare items containing pointers of `StreamingPeekableIter` `Send` as well, so it must be `Send` itself.
#[test]
fn streaming_peekable_iter_is_send() {
receiver(StreamingPeekableIter::new(Vec::<u8>::new(), &[], false));
}
#[test]
fn state_is_send() {
let mut s = StreamingPeekableIter::new(Vec::<u8>::new(), &[], false);
receiver(State::Idle { parent: Some(&mut s) });
}
}