use futures_core::Stream;

mod all;
use all::AllFuture;

mod any;
use any::AnyFuture;

mod chain;
use chain::Chain;

pub(crate) mod collect;
use collect::{Collect, FromStream};

mod filter;
use filter::Filter;

mod filter_map;
use filter_map::FilterMap;

mod fold;
use fold::FoldFuture;

mod fuse;
use fuse::Fuse;

mod map;
use map::Map;

mod merge;
use merge::Merge;

mod next;
use next::Next;

mod skip;
use skip::Skip;

mod skip_while;
use skip_while::SkipWhile;

mod try_next;
use try_next::TryNext;

mod take;
use take::Take;

mod take_while;
use take_while::TakeWhile;

cfg_time! {
    mod timeout;
    use timeout::Timeout;
    use tokio::time::Duration;
    mod throttle;
    use throttle::{throttle, Throttle};
}

/// An extension trait for the [`Stream`] trait that provides a variety of
/// convenient combinator functions.
///
/// Be aware that the `Stream` trait in Tokio is a re-export of the trait found
/// in the [futures] crate, however both Tokio and futures provide separate
/// `StreamExt` utility traits, and some utilities are only available on one of
/// these traits. Click [here][futures-StreamExt] to see the other `StreamExt`
/// trait in the futures crate.
///
/// If you need utilities from both `StreamExt` traits, you should prefer to
/// import one of them, and use the other through the fully qualified call
/// syntax. For example:
/// ```
/// // import one of the traits:
/// use futures::stream::StreamExt;
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() {
///
/// let a = tokio_stream::iter(vec![1, 3, 5]);
/// let b = tokio_stream::iter(vec![2, 4, 6]);
///
/// // use the fully qualified call syntax for the other trait:
/// let merged = tokio_stream::StreamExt::merge(a, b);
///
/// // use normal call notation for futures::stream::StreamExt::collect
/// let output: Vec<_> = merged.collect().await;
/// assert_eq!(output, vec![1, 2, 3, 4, 5, 6]);
/// # }
/// ```
///
/// [`Stream`]: crate::Stream
/// [futures]: https://docs.rs/futures
/// [futures-StreamExt]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html
pub trait StreamExt: Stream {
    /// Consumes and returns the next value in the stream or `None` if the
    /// stream is finished.
    ///
    /// Equivalent to:
    ///
    /// ```ignore
    /// async fn next(&mut self) -> Option<Self::Item>;
    /// ```
    ///
    /// Note that because `next` doesn't take ownership over the stream,
    /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
    /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
    /// be done by boxing the stream using [`Box::pin`] or
    /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
    /// crate.
    ///
    /// # Examples
    ///
    /// ```
    /// # #[tokio::main]
    /// # async fn main() {
    /// use tokio_stream::{self as stream, StreamExt};
    ///
    /// let mut stream = stream::iter(1..=3);
    ///
    /// assert_eq!(stream.next().await, Some(1));
    /// assert_eq!(stream.next().await, Some(2));
    /// assert_eq!(stream.next().await, Some(3));
    /// assert_eq!(stream.next().await, None);
    /// # }
    /// ```
    fn next(&mut self) -> Next<'_, Self>
    where
        Self: Unpin,
    {
        Next::new(self)
    }

    /// Consumes and returns the next item in the stream. If an error is
    /// encountered before the next item, the error is returned instead.
    ///
    /// Equivalent to:
    ///
    /// ```ignore
    /// async fn try_next(&mut self) -> Result<Option<T>, E>;
    /// ```
    ///
    /// This is similar to the [`next`](StreamExt::next) combinator,
    /// but returns a [`Result<Option<T>, E>`](Result) rather than
    /// an [`Option<Result<T, E>>`](Option), making for easy use
    /// with the [`?`](std::ops::Try) operator.
    ///
    /// # Examples
    ///
    /// ```
    /// # #[tokio::main]
    /// # async fn main() {
    /// use tokio_stream::{self as stream, StreamExt};
    ///
    /// let mut stream = stream::iter(vec![Ok(1), Ok(2), Err("nope")]);
    ///
    /// assert_eq!(stream.try_next().await, Ok(Some(1)));
    /// assert_eq!(stream.try_next().await, Ok(Some(2)));
    /// assert_eq!(stream.try_next().await, Err("nope"));
    /// # }
    /// ```
    fn try_next<T, E>(&mut self) -> TryNext<'_, Self>
    where
        Self: Stream<Item = Result<T, E>> + Unpin,
    {
        TryNext::new(self)
    }

    /// Maps this stream's items to a different type, returning a new stream of
    /// the resulting type.
    ///
    /// The provided closure is executed over all elements of this stream as
    /// they are made available. It is executed inline with calls to
    /// [`poll_next`](Stream::poll_next).
    ///
    /// Note that this function consumes the stream passed into it and returns a
    /// wrapped version of it, similar to the existing `map` methods in the
    /// standard library.
    ///
    /// # Examples
    ///
    /// ```
    /// # #[tokio::main]
    /// # async fn main() {
    /// use tokio_stream::{self as stream, StreamExt};
    ///
    /// let stream = stream::iter(1..=3);
    /// let mut stream = stream.map(|x| x + 3);
    ///
    /// assert_eq!(stream.next().await, Some(4));
    /// assert_eq!(stream.next().await, Some(5));
    /// assert_eq!(stream.next().await, Some(6));
    /// # }
    /// ```
    fn map<T, F>(self, f: F) -> Map<Self, F>
    where
        F: FnMut(Self::Item) -> T,
        Self: Sized,
    {
        Map::new(self, f)
    }

    /// Combine two streams into one by interleaving the output of both as it
    /// is produced.
    ///
    /// Values are produced from the merged stream in the order they arrive from
    /// the two source streams. If both source streams provide values
    /// simultaneously, the merge stream alternates between them. This provides
    /// some level of fairness. You should not chain calls to `merge`, as this
    /// will break the fairness of the merging.
    ///
    /// The merged stream completes once **both** source streams complete. When
    /// one source stream completes before the other, the merge stream
    /// exclusively polls the remaining stream.
    ///
    /// For merging multiple streams, consider using [`StreamMap`] instead.
    ///
    /// [`StreamMap`]: crate::StreamMap
    ///
    /// # Examples
    ///
    /// ```
    /// use tokio_stream::{StreamExt, Stream};
    /// use tokio::sync::mpsc;
    /// use tokio::time;
    ///
    /// use std::time::Duration;
    /// use std::pin::Pin;
    ///
    /// # /*
    /// #[tokio::main]
    /// # */
    /// # #[tokio::main(flavor = "current_thread")]
    /// async fn main() {
    /// # time::pause();
    ///     let (tx1, mut rx1) = mpsc::channel::<usize>(10);
    ///     let (tx2, mut rx2) = mpsc::channel::<usize>(10);
    ///
    ///     // Convert the channels to a `Stream`.
    ///     let rx1 = Box::pin(async_stream::stream! {
    ///           while let Some(item) = rx1.recv().await {
    ///               yield item;
    ///           }
    ///     }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
    ///
    ///     let rx2 = Box::pin(async_stream::stream! {
    ///           while let Some(item) = rx2.recv().await {
    ///               yield item;
    ///           }
    ///     }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
    ///
    ///     let mut rx = rx1.merge(rx2);
    ///
    ///     tokio::spawn(async move {
    ///         // Send some values immediately
    ///         tx1.send(1).await.unwrap();
    ///         tx1.send(2).await.unwrap();
    ///
    ///         // Let the other task send values
    ///         time::sleep(Duration::from_millis(20)).await;
    ///
    ///         tx1.send(4).await.unwrap();
    ///     });
    ///
    ///     tokio::spawn(async move {
    ///         // Wait for the first task to send values
    ///         time::sleep(Duration::from_millis(5)).await;
    ///
    ///         tx2.send(3).await.unwrap();
    ///
    ///         time::sleep(Duration::from_millis(25)).await;
    ///
    ///         // Send the final value
    ///         tx2.send(5).await.unwrap();
    ///     });
    ///
    ///    assert_eq!(1, rx.next().await.unwrap());
    ///    assert_eq!(2, rx.next().await.unwrap());
    ///    assert_eq!(3, rx.next().await.unwrap());
    ///    assert_eq!(4, rx.next().await.unwrap());
    ///    assert_eq!(5, rx.next().await.unwrap());
    ///
    ///    // The merged stream is consumed
    ///    assert!(rx.next().await.is_none());
    /// }
    /// ```
    fn merge<U>(self, other: U) -> Merge<Self, U>
    where
        U: Stream<Item = Self::Item>,
        Self: Sized,
    {
        Merge::new(self, other)
    }

    /// Filters the values produced by this stream according to the provided
    /// predicate.
    ///
    /// As values of this stream are made available, the provided predicate `f`
    /// will be run against them. If the predicate
    /// resolves to `true`, then the stream will yield the value, but if the
    /// predicate resolves to `false`, then the value
    /// will be discarded and the next value will be produced.
    ///
    /// Note that this function consumes the stream passed into it and returns a
    /// wrapped version of it, similar to [`Iterator::filter`] method in the
    /// standard library.
    ///
    /// # Examples
    ///
    /// ```
    /// # #[tokio::main]
    /// # async fn main() {
    /// use tokio_stream::{self as stream, StreamExt};
    ///
    /// let stream = stream::iter(1..=8);
    /// let mut evens = stream.filter(|x| x % 2 == 0);
    ///
    /// assert_eq!(Some(2), evens.next().await);
    /// assert_eq!(Some(4), evens.next().await);
    /// assert_eq!(Some(6), evens.next().await);
    /// assert_eq!(Some(8), evens.next().await);
    /// assert_eq!(None, evens.next().await);
    /// # }
    /// ```
    fn filter<F>(self, f: F) -> Filter<Self, F>
    where
        F: FnMut(&Self::Item) -> bool,
        Self: Sized,
    {
        Filter::new(self, f)
    }

    /// Filters the values produced by this stream while simultaneously mapping
    /// them to a different type according to the provided closure.
    ///
    /// As values of this stream are made available, the provided function will
    /// be run on them. If the predicate `f` resolves to
    /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
    /// it resolves to [`None`], then the value will be skipped.
    ///
    /// Note that this function consumes the stream passed into it and returns a
    /// wrapped version of it, similar to [`Iterator::filter_map`] method in the
    /// standard library.
    ///
    /// # Examples
    /// ```
    /// # #[tokio::main]
    /// # async fn main() {
    /// use tokio_stream::{self as stream, StreamExt};
    ///
    /// let stream = stream::iter(1..=8);
    /// let mut evens = stream.filter_map(|x| {
    ///     if x % 2 == 0 { Some(x + 1) } else { None }
    /// });
    ///
    /// assert_eq!(Some(3), evens.next().await);
    /// assert_eq!(Some(5), evens.next().await);
    /// assert_eq!(Some(7), evens.next().await);
    /// assert_eq!(Some(9), evens.next().await);
    /// assert_eq!(None, evens.next().await);
    /// # }
    /// ```
    fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
    where
        F: FnMut(Self::Item) -> Option<T>,
        Self: Sized,
    {
        FilterMap::new(self, f)
    }

    /// Creates a stream which ends after the first `None`.
    ///
    /// After a stream returns `None`, behavior is undefined. Future calls to
    /// `poll_next` may or may not return `Some(T)` again or they may panic.
    /// `fuse()` adapts a stream, ensuring that after `None` is given, it will
    /// return `None` forever.
    ///
    /// # Examples
    ///
    /// ```
    /// use tokio_stream::{Stream, StreamExt};
    ///
    /// use std::pin::Pin;
    /// use std::task::{Context, Poll};
    ///
    /// // a stream which alternates between Some and None
    /// struct Alternate {
    ///     state: i32,
    /// }
    ///
    /// impl Stream for Alternate {
    ///     type Item = i32;
    ///
    ///     fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
    ///         let val = self.state;
    ///         self.state = self.state + 1;
    ///
    ///         // if it's even, Some(i32), else None
    ///         if val % 2 == 0 {
    ///             Poll::Ready(Some(val))
    ///         } else {
    ///             Poll::Ready(None)
    ///         }
    ///     }
    /// }
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let mut stream = Alternate { state: 0 };
    ///
    ///     // the stream goes back and forth
    ///     assert_eq!(stream.next().await, Some(0));
    ///     assert_eq!(stream.next().await, None);
    ///     assert_eq!(stream.next().await, Some(2));
    ///     assert_eq!(stream.next().await, None);
    ///
    ///     // however, once it is fused
    ///     let mut stream = stream.fuse();
    ///
    ///     assert_eq!(stream.next().await, Some(4));
    ///     assert_eq!(stream.next().await, None);
    ///
    ///     // it will always return `None` after the first time.
    ///     assert_eq!(stream.next().await, None);
    ///     assert_eq!(stream.next().await, None);
    ///     assert_eq!(stream.next().await, None);
    /// }
    /// ```
    fn fuse(self) -> Fuse<Self>
    where
        Self: Sized,
    {
        Fuse::new(self)
    }

    /// Creates a new stream of at most `n` items of the underlying stream.
    ///
    /// Once `n` items have been yielded from this stream then it will always
    /// return that the stream is done.
    ///
    /// # Examples
    ///
    /// ```
    /// # #[tokio::main]
    /// # async fn main() {
    /// use tokio_stream::{self as stream, StreamExt};
    ///
    /// let mut stream = stream::iter(1..=10).take(3);
    ///
    /// assert_eq!(Some(1), stream.next().await);
    /// assert_eq!(Some(2), stream.next().await);
    /// assert_eq!(Some(3), stream.next().await);
    /// assert_eq!(None, stream.next().await);
    /// # }
    /// ```
    fn take(self, n: usize) -> Take<Self>
    where
        Self: Sized,
    {
        Take::new(self, n)
    }

    /// Take elements from this stream while the provided predicate
    /// resolves to `true`.
    ///
    /// This function, like `Iterator::take_while`, will take elements from the
    /// stream until the predicate `f` resolves to `false`. Once one element
    /// returns false it will always return that the stream is done.
    ///
    /// # Examples
    ///
    /// ```
    /// # #[tokio::main]
    /// # async fn main() {
    /// use tokio_stream::{self as stream, StreamExt};
    ///
    /// let mut stream = stream::iter(1..=10).take_while(|x| *x <= 3);
    ///
    /// assert_eq!(Some(1), stream.next().await);
    /// assert_eq!(Some(2), stream.next().await);
    /// assert_eq!(Some(3), stream.next().await);
    /// assert_eq!(None, stream.next().await);
    /// # }
    /// ```
    fn take_while<F>(self, f: F) -> TakeWhile<Self, F>
    where
        F: FnMut(&Self::Item) -> bool,
        Self: Sized,
    {
        TakeWhile::new(self, f)
    }

    /// Creates a new stream that will skip the `n` first items of the
    /// underlying stream.
    ///
    /// # Examples
    ///
    /// ```
    /// # #[tokio::main]
    /// # async fn main() {
    /// use tokio_stream::{self as stream, StreamExt};
    ///
    /// let mut stream = stream::iter(1..=10).skip(7);
    ///
    /// assert_eq!(Some(8), stream.next().await);
    /// assert_eq!(Some(9), stream.next().await);
    /// assert_eq!(Some(10), stream.next().await);
    /// assert_eq!(None, stream.next().await);
    /// # }
    /// ```
    fn skip(self, n: usize) -> Skip<Self>
    where
        Self: Sized,
    {
        Skip::new(self, n)
    }

    /// Skip elements from the underlying stream while the provided predicate
    /// resolves to `true`.
    ///
    /// This function, like [`Iterator::skip_while`], will ignore elemets from the
    /// stream until the predicate `f` resolves to `false`. Once one element
    /// returns false, the rest of the elements will be yielded.
    ///
    /// [`Iterator::skip_while`]: std::iter::Iterator::skip_while()
    ///
    /// # Examples
    ///
    /// ```
    /// # #[tokio::main]
    /// # async fn main() {
    /// use tokio_stream::{self as stream, StreamExt};
    /// let mut stream = stream::iter(vec![1,2,3,4,1]).skip_while(|x| *x < 3);
    ///
    /// assert_eq!(Some(3), stream.next().await);
    /// assert_eq!(Some(4), stream.next().await);
    /// assert_eq!(Some(1), stream.next().await);
    /// assert_eq!(None, stream.next().await);
    /// # }
    /// ```
    fn skip_while<F>(self, f: F) -> SkipWhile<Self, F>
    where
        F: FnMut(&Self::Item) -> bool,
        Self: Sized,
    {
        SkipWhile::new(self, f)
    }

    /// Tests if every element of the stream matches a predicate.
    ///
    /// Equivalent to:
    ///
    /// ```ignore
    /// async fn all<F>(&mut self, f: F) -> bool;
    /// ```
    ///
    /// `all()` takes a closure that returns `true` or `false`. It applies
    /// this closure to each element of the stream, and if they all return
    /// `true`, then so does `all`. If any of them return `false`, it
    /// returns `false`. An empty stream returns `true`.
    ///
    /// `all()` is short-circuiting; in other words, it will stop processing
    /// as soon as it finds a `false`, given that no matter what else happens,
    /// the result will also be `false`.
    ///
    /// An empty stream returns `true`.
    ///
    /// # Examples
    ///
    /// Basic usage:
    ///
    /// ```
    /// # #[tokio::main]
    /// # async fn main() {
    /// use tokio_stream::{self as stream, StreamExt};
    ///
    /// let a = [1, 2, 3];
    ///
    /// assert!(stream::iter(&a).all(|&x| x > 0).await);
    ///
    /// assert!(!stream::iter(&a).all(|&x| x > 2).await);
    /// # }
    /// ```
    ///
    /// Stopping at the first `false`:
    ///
    /// ```
    /// # #[tokio::main]
    /// # async fn main() {
    /// use tokio_stream::{self as stream, StreamExt};
    ///
    /// let a = [1, 2, 3];
    ///
    /// let mut iter = stream::iter(&a);
    ///
    /// assert!(!iter.all(|&x| x != 2).await);
    ///
    /// // we can still use `iter`, as there are more elements.
    /// assert_eq!(iter.next().await, Some(&3));
    /// # }
    /// ```
    fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F>
    where
        Self: Unpin,
        F: FnMut(Self::Item) -> bool,
    {
        AllFuture::new(self, f)
    }

    /// Tests if any element of the stream matches a predicate.
    ///
    /// Equivalent to:
    ///
    /// ```ignore
    /// async fn any<F>(&mut self, f: F) -> bool;
    /// ```
    ///
    /// `any()` takes a closure that returns `true` or `false`. It applies
    /// this closure to each element of the stream, and if any of them return
    /// `true`, then so does `any()`. If they all return `false`, it
    /// returns `false`.
    ///
    /// `any()` is short-circuiting; in other words, it will stop processing
    /// as soon as it finds a `true`, given that no matter what else happens,
    /// the result will also be `true`.
    ///
    /// An empty stream returns `false`.
    ///
    /// Basic usage:
    ///
    /// ```
    /// # #[tokio::main]
    /// # async fn main() {
    /// use tokio_stream::{self as stream, StreamExt};
    ///
    /// let a = [1, 2, 3];
    ///
    /// assert!(stream::iter(&a).any(|&x| x > 0).await);
    ///
    /// assert!(!stream::iter(&a).any(|&x| x > 5).await);
    /// # }
    /// ```
    ///
    /// Stopping at the first `true`:
    ///
    /// ```
    /// # #[tokio::main]
    /// # async fn main() {
    /// use tokio_stream::{self as stream, StreamExt};
    ///
    /// let a = [1, 2, 3];
    ///
    /// let mut iter = stream::iter(&a);
    ///
    /// assert!(iter.any(|&x| x != 2).await);
    ///
    /// // we can still use `iter`, as there are more elements.
    /// assert_eq!(iter.next().await, Some(&2));
    /// # }
    /// ```
    fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F>
    where
        Self: Unpin,
        F: FnMut(Self::Item) -> bool,
    {
        AnyFuture::new(self, f)
    }

    /// Combine two streams into one by first returning all values from the
    /// first stream then all values from the second stream.
    ///
    /// As long as `self` still has values to emit, no values from `other` are
    /// emitted, even if some are ready.
    ///
    /// # Examples
    ///
    /// ```
    /// use tokio_stream::{self as stream, StreamExt};
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let one = stream::iter(vec![1, 2, 3]);
    ///     let two = stream::iter(vec![4, 5, 6]);
    ///
    ///     let mut stream = one.chain(two);
    ///
    ///     assert_eq!(stream.next().await, Some(1));
    ///     assert_eq!(stream.next().await, Some(2));
    ///     assert_eq!(stream.next().await, Some(3));
    ///     assert_eq!(stream.next().await, Some(4));
    ///     assert_eq!(stream.next().await, Some(5));
    ///     assert_eq!(stream.next().await, Some(6));
    ///     assert_eq!(stream.next().await, None);
    /// }
    /// ```
    fn chain<U>(self, other: U) -> Chain<Self, U>
    where
        U: Stream<Item = Self::Item>,
        Self: Sized,
    {
        Chain::new(self, other)
    }

    /// A combinator that applies a function to every element in a stream
    /// producing a single, final value.
    ///
    /// Equivalent to:
    ///
    /// ```ignore
    /// async fn fold<B, F>(self, init: B, f: F) -> B;
    /// ```
    ///
    /// # Examples
    /// Basic usage:
    /// ```
    /// # #[tokio::main]
    /// # async fn main() {
    /// use tokio_stream::{self as stream, *};
    ///
    /// let s = stream::iter(vec![1u8, 2, 3]);
    /// let sum = s.fold(0, |acc, x| acc + x).await;
    ///
    /// assert_eq!(sum, 6);
    /// # }
    /// ```
    fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F>
    where
        Self: Sized,
        F: FnMut(B, Self::Item) -> B,
    {
        FoldFuture::new(self, init, f)
    }

    /// Drain stream pushing all emitted values into a collection.
    ///
    /// Equivalent to:
    ///
    /// ```ignore
    /// async fn collect<T>(self) -> T;
    /// ```
    ///
    /// `collect` streams all values, awaiting as needed. Values are pushed into
    /// a collection. A number of different target collection types are
    /// supported, including [`Vec`](std::vec::Vec),
    /// [`String`](std::string::String), and [`Bytes`].
    ///
    /// [`Bytes`]: https://docs.rs/bytes/0.6.0/bytes/struct.Bytes.html
    ///
    /// # `Result`
    ///
    /// `collect()` can also be used with streams of type `Result<T, E>` where
    /// `T: FromStream<_>`. In this case, `collect()` will stream as long as
    /// values yielded from the stream are `Ok(_)`. If `Err(_)` is encountered,
    /// streaming is terminated and `collect()` returns the `Err`.
    ///
    /// # Notes
    ///
    /// `FromStream` is currently a sealed trait. Stabilization is pending
    /// enhancements to the Rust language.
    ///
    /// # Examples
    ///
    /// Basic usage:
    ///
    /// ```
    /// use tokio_stream::{self as stream, StreamExt};
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let doubled: Vec<i32> =
    ///         stream::iter(vec![1, 2, 3])
    ///             .map(|x| x * 2)
    ///             .collect()
    ///             .await;
    ///
    ///     assert_eq!(vec![2, 4, 6], doubled);
    /// }
    /// ```
    ///
    /// Collecting a stream of `Result` values
    ///
    /// ```
    /// use tokio_stream::{self as stream, StreamExt};
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     // A stream containing only `Ok` values will be collected
    ///     let values: Result<Vec<i32>, &str> =
    ///         stream::iter(vec![Ok(1), Ok(2), Ok(3)])
    ///             .collect()
    ///             .await;
    ///
    ///     assert_eq!(Ok(vec![1, 2, 3]), values);
    ///
    ///     // A stream containing `Err` values will return the first error.
    ///     let results = vec![Ok(1), Err("no"), Ok(2), Ok(3), Err("nein")];
    ///
    ///     let values: Result<Vec<i32>, &str> =
    ///         stream::iter(results)
    ///             .collect()
    ///             .await;
    ///
    ///     assert_eq!(Err("no"), values);
    /// }
    /// ```
    fn collect<T>(self) -> Collect<Self, T>
    where
        T: FromStream<Self::Item>,
        Self: Sized,
    {
        Collect::new(self)
    }

    /// Applies a per-item timeout to the passed stream.
    ///
    /// `timeout()` takes a `Duration` that represents the maximum amount of
    /// time each element of the stream has to complete before timing out.
    ///
    /// If the wrapped stream yields a value before the deadline is reached, the
    /// value is returned. Otherwise, an error is returned. The caller may decide
    /// to continue consuming the stream and will eventually get the next source
    /// stream value once it becomes available.
    ///
    /// # Notes
    ///
    /// This function consumes the stream passed into it and returns a
    /// wrapped version of it.
    ///
    /// Polling the returned stream will continue to poll the inner stream even
    /// if one or more items time out.
    ///
    /// # Examples
    ///
    /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
    ///
    /// ```
    /// # #[tokio::main]
    /// # async fn main() {
    /// use tokio_stream::{self as stream, StreamExt};
    /// use std::time::Duration;
    /// # let int_stream = stream::iter(1..=3);
    ///
    /// let int_stream = int_stream.timeout(Duration::from_secs(1));
    /// tokio::pin!(int_stream);
    ///
    /// // When no items time out, we get the 3 elements in succession:
    /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
    /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
    /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
    /// assert_eq!(int_stream.try_next().await, Ok(None));
    ///
    /// // If the second item times out, we get an error and continue polling the stream:
    /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
    /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
    /// assert!(int_stream.try_next().await.is_err());
    /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
    /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
    /// assert_eq!(int_stream.try_next().await, Ok(None));
    ///
    /// // If we want to stop consuming the source stream the first time an
    /// // element times out, we can use the `take_while` operator:
    /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
    /// let mut int_stream = int_stream.take_while(Result::is_ok);
    ///
    /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
    /// assert_eq!(int_stream.try_next().await, Ok(None));
    /// # }
    /// ```
    #[cfg(all(feature = "time"))]
    #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
    fn timeout(self, duration: Duration) -> Timeout<Self>
    where
        Self: Sized,
    {
        Timeout::new(self, duration)
    }

    /// Slows down a stream by enforcing a delay between items.
    ///
    /// # Example
    ///
    /// Create a throttled stream.
    /// ```rust,no_run
    /// use std::time::Duration;
    /// use tokio_stream::StreamExt;
    ///
    /// # async fn dox() {
    /// let item_stream = futures::stream::repeat("one").throttle(Duration::from_secs(2));
    /// tokio::pin!(item_stream);
    ///
    /// loop {
    ///     // The string will be produced at most every 2 seconds
    ///     println!("{:?}", item_stream.next().await);
    /// }
    /// # }
    /// ```
    #[cfg(all(feature = "time"))]
    #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
    fn throttle(self, duration: Duration) -> Throttle<Self>
    where
        Self: Sized,
    {
        throttle(duration, self)
    }
}

impl<St: ?Sized> StreamExt for St where St: Stream {}

/// Merge the size hints from two streams.
fn merge_size_hints(
    (left_low, left_high): (usize, Option<usize>),
    (right_low, right_hign): (usize, Option<usize>),
) -> (usize, Option<usize>) {
    let low = left_low.saturating_add(right_low);
    let high = match (left_high, right_hign) {
        (Some(h1), Some(h2)) => h1.checked_add(h2),
        _ => None,
    };
    (low, high)
}
