blob: 3662620a58374aa500741b2f45f873efd7d02e5c [file] [log] [blame]
use core::fmt;
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "sink")]
use futures_sink::Sink;
use pin_project::{pin_project, project};
// FIXME: docs, tests
/// Stream for the [`take_until`](super::StreamExt::take_until) method.
#[pin_project]
#[must_use = "streams do nothing unless polled"]
pub struct TakeUntil<St: Stream, Fut: Future> {
#[pin]
stream: St,
/// Contains the inner Future on start and None once the inner Future is resolved
/// or taken out by the user.
#[pin]
fut: Option<Fut>,
/// Contains fut's return value once fut is resolved
fut_result: Option<Fut::Output>,
/// Whether the future was taken out by the user.
free: bool,
}
impl<St, Fut> fmt::Debug for TakeUntil<St, Fut>
where
St: Stream + fmt::Debug,
St::Item: fmt::Debug,
Fut: Future + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TakeUntil")
.field("stream", &self.stream)
.field("fut", &self.fut)
.finish()
}
}
impl<St, Fut> TakeUntil<St, Fut>
where
St: Stream,
Fut: Future,
{
pub(super) fn new(stream: St, fut: Fut) -> TakeUntil<St, Fut> {
TakeUntil {
stream,
fut: Some(fut),
fut_result: None,
free: false,
}
}
delegate_access_inner!(stream, St, ());
/// Extract the stopping future out of the combinator.
/// The future is returned only if it isn't resolved yet, ie. if the stream isn't stopped yet.
/// Taking out the future means the combinator will be yielding
/// elements from the wrapped stream without ever stopping it.
pub fn take_future(&mut self) -> Option<Fut> {
if self.fut.is_some() {
self.free = true;
}
self.fut.take()
}
/// Once the stopping future is resolved, this method can be used
/// to extract the value returned by the stopping future.
///
/// This may be used to retrieve arbitrary data from the stopping
/// future, for example a reason why the stream was stopped.
///
/// This method will return `None` if the future isn't resovled yet,
/// or if the result was already taken out.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::future;
/// use futures::stream::{self, StreamExt};
/// use futures::task::Poll;
///
/// let stream = stream::iter(1..=10);
///
/// let mut i = 0;
/// let stop_fut = future::poll_fn(|_cx| {
/// i += 1;
/// if i <= 5 {
/// Poll::Pending
/// } else {
/// Poll::Ready("reason")
/// }
/// });
///
/// let mut stream = stream.take_until(stop_fut);
/// let _ = stream.by_ref().collect::<Vec<_>>().await;
///
/// let result = stream.take_result().unwrap();
/// assert_eq!(result, "reason");
/// # });
/// ```
pub fn take_result(&mut self) -> Option<Fut::Output> {
self.fut_result.take()
}
/// Whether the stream was stopped yet by the stopping future
/// being resolved.
pub fn is_stopped(&self) -> bool {
!self.free && self.fut.is_none()
}
}
impl<St, Fut> Stream for TakeUntil<St, Fut>
where
St: Stream,
Fut: Future,
{
type Item = St::Item;
#[project]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> {
#[project]
let TakeUntil { stream, mut fut, fut_result, free } = self.project();
if let Some(f) = fut.as_mut().as_pin_mut() {
if let Poll::Ready(result) = f.poll(cx) {
fut.set(None);
*fut_result = Some(result);
}
}
if !*free && fut.is_none() {
// Future resolved, inner stream stopped
Poll::Ready(None)
} else {
// Future either not resolved yet or taken out by the user
let item = ready!(stream.poll_next(cx));
if item.is_none() {
fut.set(None);
}
Poll::Ready(item)
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
if self.is_stopped() {
return (0, Some(0));
}
self.stream.size_hint()
}
}
impl<St, Fut> FusedStream for TakeUntil<St, Fut>
where
St: Stream,
Fut: Future,
{
fn is_terminated(&self) -> bool {
self.is_stopped()
}
}
// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<S, Fut, Item> Sink<Item> for TakeUntil<S, Fut>
where
S: Stream + Sink<Item>,
Fut: Future,
{
type Error = S::Error;
delegate_sink!(stream, Item);
}