Initial import of tokio_stream am: c0f49ebcec

Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/tokio-stream/+/1558921

MUST ONLY BE SUBMITTED BY AUTOMERGER

Change-Id: I15fea460ae433ca839c7f7631008997d2e0b67ac
diff --git a/CHANGELOG.md b/CHANGELOG.md
new file mode 100644
index 0000000..bbb7d8c
--- /dev/null
+++ b/CHANGELOG.md
@@ -0,0 +1,22 @@
+# 0.1.2 (January 12, 2021)
+
+Fixed
+
+ - docs: fix some wrappers missing in documentation (#3378)
+
+# 0.1.1 (January 4, 2021)
+
+Added
+
+ - add `Stream` wrappers ([#3343])
+
+Fixed
+
+ - move `async-stream` to `dev-dependencies` ([#3366])
+
+[#3366]: https://github.com/tokio-rs/tokio/pull/3366
+[#3343]: https://github.com/tokio-rs/tokio/pull/3343
+
+# 0.1.0 (December 23, 2020)
+
+ - Initial release
diff --git a/Cargo.toml b/Cargo.toml
new file mode 100644
index 0000000..443228a
--- /dev/null
+++ b/Cargo.toml
@@ -0,0 +1,55 @@
+# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO
+#
+# When uploading crates to the registry Cargo will automatically
+# "normalize" Cargo.toml files for maximal compatibility
+# with all versions of Cargo and also rewrite `path` dependencies
+# to registry (e.g., crates.io) dependencies
+#
+# If you believe there's an error in this file please file an
+# issue against the rust-lang/cargo repository. If you're
+# editing this file be aware that the upstream Cargo.toml
+# will likely look very different (and much more reasonable)
+
+[package]
+edition = "2018"
+name = "tokio-stream"
+version = "0.1.2"
+authors = ["Tokio Contributors <team@tokio.rs>"]
+description = "Utilities to work with `Stream` and `tokio`.\n"
+homepage = "https://tokio.rs"
+documentation = "https://docs.rs/tokio-stream/0.1.2/tokio_stream"
+categories = ["asynchronous"]
+license = "MIT"
+repository = "https://github.com/tokio-rs/tokio"
+[package.metadata.docs.rs]
+all-features = true
+rustdoc-args = ["--cfg", "docsrs"]
+[dependencies.futures-core]
+version = "0.3.0"
+
+[dependencies.pin-project-lite]
+version = "0.2.0"
+
+[dependencies.tokio]
+version = "1.0"
+features = ["sync"]
+[dev-dependencies.async-stream]
+version = "0.3"
+
+[dev-dependencies.futures]
+version = "0.3"
+default-features = false
+
+[dev-dependencies.proptest]
+version = "0.10.0"
+
+[dev-dependencies.tokio]
+version = "1.0"
+features = ["full", "test-util"]
+
+[features]
+default = ["time"]
+fs = ["tokio/fs"]
+io-util = ["tokio/io-util"]
+net = ["tokio/net"]
+time = ["tokio/time"]
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
new file mode 100644
index 0000000..d662c38
--- /dev/null
+++ b/Cargo.toml.orig
@@ -0,0 +1,44 @@
+[package]
+name = "tokio-stream"
+# When releasing to crates.io:
+# - Remove path dependencies
+# - Update html_root_url.
+# - Update doc url
+#   - Cargo.toml
+# - Update CHANGELOG.md.
+# - Create "tokio-stream-0.1.x" git tag.
+version = "0.1.2"
+edition = "2018"
+authors = ["Tokio Contributors <team@tokio.rs>"]
+license = "MIT"
+repository = "https://github.com/tokio-rs/tokio"
+homepage = "https://tokio.rs"
+documentation = "https://docs.rs/tokio-stream/0.1.2/tokio_stream"
+description = """
+Utilities to work with `Stream` and `tokio`.
+"""
+categories = ["asynchronous"]
+
+[features]
+default = ["time"]
+time = ["tokio/time"]
+net = ["tokio/net"]
+io-util = ["tokio/io-util"]
+fs = ["tokio/fs"]
+
+[dependencies]
+futures-core = { version = "0.3.0" }
+pin-project-lite = "0.2.0"
+tokio = { version = "1.0", features = ["sync"] }
+
+[dev-dependencies]
+tokio = { version = "1.0", features = ["full", "test-util"] }
+tokio-test = { path = "../tokio-test" }
+async-stream = "0.3"
+futures = { version = "0.3", default-features = false }
+
+proptest = "0.10.0"
+
+[package.metadata.docs.rs]
+all-features = true
+rustdoc-args = ["--cfg", "docsrs"]
diff --git a/src/empty.rs b/src/empty.rs
new file mode 100644
index 0000000..965dcf5
--- /dev/null
+++ b/src/empty.rs
@@ -0,0 +1,50 @@
+use crate::Stream;
+
+use core::marker::PhantomData;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+
+/// Stream for the [`empty`](fn@empty) function.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Empty<T>(PhantomData<T>);
+
+impl<T> Unpin for Empty<T> {}
+unsafe impl<T> Send for Empty<T> {}
+unsafe impl<T> Sync for Empty<T> {}
+
+/// Creates a stream that yields nothing.
+///
+/// The returned stream is immediately ready and returns `None`. Use
+/// [`stream::pending()`](super::pending()) to obtain a stream that is never
+/// ready.
+///
+/// # Examples
+///
+/// Basic usage:
+///
+/// ```
+/// use tokio_stream::{self as stream, StreamExt};
+///
+/// #[tokio::main]
+/// async fn main() {
+///     let mut none = stream::empty::<i32>();
+///
+///     assert_eq!(None, none.next().await);
+/// }
+/// ```
+pub const fn empty<T>() -> Empty<T> {
+    Empty(PhantomData)
+}
+
+impl<T> Stream for Empty<T> {
+    type Item = T;
+
+    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
+        Poll::Ready(None)
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        (0, Some(0))
+    }
+}
diff --git a/src/iter.rs b/src/iter.rs
new file mode 100644
index 0000000..128be61
--- /dev/null
+++ b/src/iter.rs
@@ -0,0 +1,67 @@
+use crate::Stream;
+
+use core::pin::Pin;
+use core::task::{Context, Poll};
+
+/// Stream for the [`iter`](fn@iter) function.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Iter<I> {
+    iter: I,
+    yield_amt: usize,
+}
+
+impl<I> Unpin for Iter<I> {}
+
+/// Converts an `Iterator` into a `Stream` which is always ready
+/// to yield the next value.
+///
+/// Iterators in Rust don't express the ability to block, so this adapter
+/// simply always calls `iter.next()` and returns that.
+///
+/// ```
+/// # async fn dox() {
+/// use tokio_stream::{self as stream, StreamExt};
+///
+/// let mut stream = stream::iter(vec![17, 19]);
+///
+/// assert_eq!(stream.next().await, Some(17));
+/// assert_eq!(stream.next().await, Some(19));
+/// assert_eq!(stream.next().await, None);
+/// # }
+/// ```
+pub fn iter<I>(i: I) -> Iter<I::IntoIter>
+where
+    I: IntoIterator,
+{
+    Iter {
+        iter: i.into_iter(),
+        yield_amt: 0,
+    }
+}
+
+impl<I> Stream for Iter<I>
+where
+    I: Iterator,
+{
+    type Item = I::Item;
+
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<I::Item>> {
+        // TODO: add coop back
+        if self.yield_amt >= 32 {
+            self.yield_amt = 0;
+
+            cx.waker().wake_by_ref();
+
+            Poll::Pending
+        } else {
+            self.yield_amt += 1;
+
+            Poll::Ready(self.iter.next())
+        }
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        self.iter.size_hint()
+    }
+}
diff --git a/src/lib.rs b/src/lib.rs
new file mode 100644
index 0000000..307a839
--- /dev/null
+++ b/src/lib.rs
@@ -0,0 +1,105 @@
+#![doc(html_root_url = "https://docs.rs/tokio-stream/0.1.2")]
+#![allow(
+    clippy::cognitive_complexity,
+    clippy::large_enum_variant,
+    clippy::needless_doctest_main
+)]
+#![warn(
+    missing_debug_implementations,
+    missing_docs,
+    rust_2018_idioms,
+    unreachable_pub
+)]
+#![cfg_attr(docsrs, deny(broken_intra_doc_links))]
+#![doc(test(
+    no_crate_inject,
+    attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
+))]
+#![cfg_attr(docsrs, feature(doc_cfg))]
+#![cfg_attr(docsrs, deny(broken_intra_doc_links))]
+#![doc(test(
+    no_crate_inject,
+    attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
+))]
+#![cfg_attr(docsrs, feature(doc_cfg))]
+
+//! Stream utilities for Tokio.
+//!
+//! A `Stream` is an asynchronous sequence of values. It can be thought of as
+//! an asynchronous version of the standard library's `Iterator` trait.
+//!
+//! This crate provides helpers to work with them. For examples of usage and a more in-depth
+//! description of streams you can also refer to the [streams
+//! tutorial](https://tokio.rs/tokio/tutorial/streams) on the tokio website.
+//!
+//! # Iterating over a Stream
+//!
+//! Due to similarities with the standard library's `Iterator` trait, some new
+//! users may assume that they can use `for in` syntax to iterate over a
+//! `Stream`, but this is unfortunately not possible. Instead, you can use a
+//! `while let` loop as follows:
+//!
+//! ```rust
+//! use tokio_stream::{self as stream, StreamExt};
+//!
+//! #[tokio::main]
+//! async fn main() {
+//!     let mut stream = stream::iter(vec![0, 1, 2]);
+//!
+//!     while let Some(value) = stream.next().await {
+//!         println!("Got {}", value);
+//!     }
+//! }
+//! ```
+//!
+//! # Returning a Stream from a function
+//!
+//! A common way to stream values from a function is to pass in the sender
+//! half of a channel and use the receiver as the stream. This requires awaiting
+//! both futures to ensure progress is made. Another alternative is the
+//! [async-stream] crate, which contains macros that provide a `yield` keyword
+//! and allow you to return an `impl Stream`.
+//!
+//! [async-stream]: https://docs.rs/async-stream
+//!
+//! # Conversion to and from AsyncRead/AsyncWrite
+//!
+//! It is often desirable to convert a `Stream` into an [`AsyncRead`],
+//! especially when dealing with plaintext formats streamed over the network.
+//! The opposite conversion from an [`AsyncRead`] into a `Stream` is also
+//! another commonly required feature. To enable these conversions,
+//! [`tokio-util`] provides the [`StreamReader`] and [`ReaderStream`]
+//! types when the io feature is enabled.
+//!
+//! [`tokio-util`]: https://docs.rs/tokio-util/0.4/tokio_util/codec/index.html
+//! [`tokio::io`]: https://docs.rs/tokio/1.0/tokio/io/index.html
+//! [`AsyncRead`]: https://docs.rs/tokio/1.0/tokio/io/trait.AsyncRead.html
+//! [`AsyncWrite`]: https://docs.rs/tokio/1.0/tokio/io/trait.AsyncWrite.html
+//! [`ReaderStream`]: https://docs.rs/tokio-util/0.4/tokio_util/io/struct.ReaderStream.html
+//! [`StreamReader`]: https://docs.rs/tokio-util/0.4/tokio_util/io/struct.StreamReader.html
+
+#[macro_use]
+mod macros;
+
+pub mod wrappers;
+
+mod stream_ext;
+pub use stream_ext::{collect::FromStream, StreamExt};
+
+mod empty;
+pub use empty::{empty, Empty};
+
+mod iter;
+pub use iter::{iter, Iter};
+
+mod once;
+pub use once::{once, Once};
+
+mod pending;
+pub use pending::{pending, Pending};
+
+mod stream_map;
+pub use stream_map::StreamMap;
+
+#[doc(no_inline)]
+pub use futures_core::Stream;
diff --git a/src/macros.rs b/src/macros.rs
new file mode 100644
index 0000000..39ad86c
--- /dev/null
+++ b/src/macros.rs
@@ -0,0 +1,48 @@
+macro_rules! cfg_fs {
+    ($($item:item)*) => {
+        $(
+            #[cfg(feature = "fs")]
+            #[cfg_attr(docsrs, doc(cfg(feature = "fs")))]
+            $item
+        )*
+    }
+}
+
+macro_rules! cfg_io_util {
+    ($($item:item)*) => {
+        $(
+            #[cfg(feature = "io-util")]
+            #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
+            $item
+        )*
+    }
+}
+
+macro_rules! cfg_net {
+    ($($item:item)*) => {
+        $(
+            #[cfg(feature = "net")]
+            #[cfg_attr(docsrs, doc(cfg(feature = "net")))]
+            $item
+        )*
+    }
+}
+
+macro_rules! cfg_time {
+    ($($item:item)*) => {
+        $(
+            #[cfg(feature = "time")]
+            #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
+            $item
+        )*
+    }
+}
+
+macro_rules! ready {
+    ($e:expr $(,)?) => {
+        match $e {
+            std::task::Poll::Ready(t) => t,
+            std::task::Poll::Pending => return std::task::Poll::Pending,
+        }
+    };
+}
diff --git a/src/once.rs b/src/once.rs
new file mode 100644
index 0000000..04b4c05
--- /dev/null
+++ b/src/once.rs
@@ -0,0 +1,52 @@
+use crate::{Iter, Stream};
+
+use core::option;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+
+/// Stream for the [`once`](fn@once) function.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Once<T> {
+    iter: Iter<option::IntoIter<T>>,
+}
+
+impl<I> Unpin for Once<I> {}
+
+/// Creates a stream that emits an element exactly once.
+///
+/// The returned stream is immediately ready and emits the provided value once.
+///
+/// # Examples
+///
+/// ```
+/// use tokio_stream::{self as stream, StreamExt};
+///
+/// #[tokio::main]
+/// async fn main() {
+///     // one is the loneliest number
+///     let mut one = stream::once(1);
+///
+///     assert_eq!(Some(1), one.next().await);
+///
+///     // just one, that's all we get
+///     assert_eq!(None, one.next().await);
+/// }
+/// ```
+pub fn once<T>(value: T) -> Once<T> {
+    Once {
+        iter: crate::iter(Some(value).into_iter()),
+    }
+}
+
+impl<T> Stream for Once<T> {
+    type Item = T;
+
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+        Pin::new(&mut self.iter).poll_next(cx)
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        self.iter.size_hint()
+    }
+}
diff --git a/src/pending.rs b/src/pending.rs
new file mode 100644
index 0000000..b50fd33
--- /dev/null
+++ b/src/pending.rs
@@ -0,0 +1,54 @@
+use crate::Stream;
+
+use core::marker::PhantomData;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+
+/// Stream for the [`pending`](fn@pending) function.
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct Pending<T>(PhantomData<T>);
+
+impl<T> Unpin for Pending<T> {}
+unsafe impl<T> Send for Pending<T> {}
+unsafe impl<T> Sync for Pending<T> {}
+
+/// Creates a stream that is never ready
+///
+/// The returned stream is never ready. Attempting to call
+/// [`next()`](crate::StreamExt::next) will never complete. Use
+/// [`stream::empty()`](super::empty()) to obtain a stream that is is
+/// immediately empty but returns no values.
+///
+/// # Examples
+///
+/// Basic usage:
+///
+/// ```no_run
+/// use tokio_stream::{self as stream, StreamExt};
+///
+/// #[tokio::main]
+/// async fn main() {
+///     let mut never = stream::pending::<i32>();
+///
+///     // This will never complete
+///     never.next().await;
+///
+///     unreachable!();
+/// }
+/// ```
+pub const fn pending<T>() -> Pending<T> {
+    Pending(PhantomData)
+}
+
+impl<T> Stream for Pending<T> {
+    type Item = T;
+
+    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
+        Poll::Pending
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        (0, None)
+    }
+}
diff --git a/src/stream_ext.rs b/src/stream_ext.rs
new file mode 100644
index 0000000..51532ee
--- /dev/null
+++ b/src/stream_ext.rs
@@ -0,0 +1,917 @@
+use futures_core::Stream;
+
+mod all;
+use all::AllFuture;
+
+mod any;
+use any::AnyFuture;
+
+mod chain;
+use chain::Chain;
+
+pub(crate) mod collect;
+use collect::{Collect, FromStream};
+
+mod filter;
+use filter::Filter;
+
+mod filter_map;
+use filter_map::FilterMap;
+
+mod fold;
+use fold::FoldFuture;
+
+mod fuse;
+use fuse::Fuse;
+
+mod map;
+use map::Map;
+
+mod merge;
+use merge::Merge;
+
+mod next;
+use next::Next;
+
+mod skip;
+use skip::Skip;
+
+mod skip_while;
+use skip_while::SkipWhile;
+
+mod try_next;
+use try_next::TryNext;
+
+mod take;
+use take::Take;
+
+mod take_while;
+use take_while::TakeWhile;
+
+cfg_time! {
+    mod timeout;
+    use timeout::Timeout;
+    use tokio::time::Duration;
+    mod throttle;
+    use throttle::{throttle, Throttle};
+}
+
+/// An extension trait for the [`Stream`] trait that provides a variety of
+/// convenient combinator functions.
+///
+/// Be aware that the `Stream` trait in Tokio is a re-export of the trait found
+/// in the [futures] crate, however both Tokio and futures provide separate
+/// `StreamExt` utility traits, and some utilities are only available on one of
+/// these traits. Click [here][futures-StreamExt] to see the other `StreamExt`
+/// trait in the futures crate.
+///
+/// If you need utilities from both `StreamExt` traits, you should prefer to
+/// import one of them, and use the other through the fully qualified call
+/// syntax. For example:
+/// ```
+/// // import one of the traits:
+/// use futures::stream::StreamExt;
+/// # #[tokio::main(flavor = "current_thread")]
+/// # async fn main() {
+///
+/// let a = tokio_stream::iter(vec![1, 3, 5]);
+/// let b = tokio_stream::iter(vec![2, 4, 6]);
+///
+/// // use the fully qualified call syntax for the other trait:
+/// let merged = tokio_stream::StreamExt::merge(a, b);
+///
+/// // use normal call notation for futures::stream::StreamExt::collect
+/// let output: Vec<_> = merged.collect().await;
+/// assert_eq!(output, vec![1, 2, 3, 4, 5, 6]);
+/// # }
+/// ```
+///
+/// [`Stream`]: crate::Stream
+/// [futures]: https://docs.rs/futures
+/// [futures-StreamExt]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html
+pub trait StreamExt: Stream {
+    /// Consumes and returns the next value in the stream or `None` if the
+    /// stream is finished.
+    ///
+    /// Equivalent to:
+    ///
+    /// ```ignore
+    /// async fn next(&mut self) -> Option<Self::Item>;
+    /// ```
+    ///
+    /// Note that because `next` doesn't take ownership over the stream,
+    /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
+    /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
+    /// be done by boxing the stream using [`Box::pin`] or
+    /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
+    /// crate.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// # #[tokio::main]
+    /// # async fn main() {
+    /// use tokio_stream::{self as stream, StreamExt};
+    ///
+    /// let mut stream = stream::iter(1..=3);
+    ///
+    /// assert_eq!(stream.next().await, Some(1));
+    /// assert_eq!(stream.next().await, Some(2));
+    /// assert_eq!(stream.next().await, Some(3));
+    /// assert_eq!(stream.next().await, None);
+    /// # }
+    /// ```
+    fn next(&mut self) -> Next<'_, Self>
+    where
+        Self: Unpin,
+    {
+        Next::new(self)
+    }
+
+    /// Consumes and returns the next item in the stream. If an error is
+    /// encountered before the next item, the error is returned instead.
+    ///
+    /// Equivalent to:
+    ///
+    /// ```ignore
+    /// async fn try_next(&mut self) -> Result<Option<T>, E>;
+    /// ```
+    ///
+    /// This is similar to the [`next`](StreamExt::next) combinator,
+    /// but returns a [`Result<Option<T>, E>`](Result) rather than
+    /// an [`Option<Result<T, E>>`](Option), making for easy use
+    /// with the [`?`](std::ops::Try) operator.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// # #[tokio::main]
+    /// # async fn main() {
+    /// use tokio_stream::{self as stream, StreamExt};
+    ///
+    /// let mut stream = stream::iter(vec![Ok(1), Ok(2), Err("nope")]);
+    ///
+    /// assert_eq!(stream.try_next().await, Ok(Some(1)));
+    /// assert_eq!(stream.try_next().await, Ok(Some(2)));
+    /// assert_eq!(stream.try_next().await, Err("nope"));
+    /// # }
+    /// ```
+    fn try_next<T, E>(&mut self) -> TryNext<'_, Self>
+    where
+        Self: Stream<Item = Result<T, E>> + Unpin,
+    {
+        TryNext::new(self)
+    }
+
+    /// Maps this stream's items to a different type, returning a new stream of
+    /// the resulting type.
+    ///
+    /// The provided closure is executed over all elements of this stream as
+    /// they are made available. It is executed inline with calls to
+    /// [`poll_next`](Stream::poll_next).
+    ///
+    /// Note that this function consumes the stream passed into it and returns a
+    /// wrapped version of it, similar to the existing `map` methods in the
+    /// standard library.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// # #[tokio::main]
+    /// # async fn main() {
+    /// use tokio_stream::{self as stream, StreamExt};
+    ///
+    /// let stream = stream::iter(1..=3);
+    /// let mut stream = stream.map(|x| x + 3);
+    ///
+    /// assert_eq!(stream.next().await, Some(4));
+    /// assert_eq!(stream.next().await, Some(5));
+    /// assert_eq!(stream.next().await, Some(6));
+    /// # }
+    /// ```
+    fn map<T, F>(self, f: F) -> Map<Self, F>
+    where
+        F: FnMut(Self::Item) -> T,
+        Self: Sized,
+    {
+        Map::new(self, f)
+    }
+
+    /// Combine two streams into one by interleaving the output of both as it
+    /// is produced.
+    ///
+    /// Values are produced from the merged stream in the order they arrive from
+    /// the two source streams. If both source streams provide values
+    /// simultaneously, the merge stream alternates between them. This provides
+    /// some level of fairness. You should not chain calls to `merge`, as this
+    /// will break the fairness of the merging.
+    ///
+    /// The merged stream completes once **both** source streams complete. When
+    /// one source stream completes before the other, the merge stream
+    /// exclusively polls the remaining stream.
+    ///
+    /// For merging multiple streams, consider using [`StreamMap`] instead.
+    ///
+    /// [`StreamMap`]: crate::StreamMap
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// use tokio_stream::{StreamExt, Stream};
+    /// use tokio::sync::mpsc;
+    /// use tokio::time;
+    ///
+    /// use std::time::Duration;
+    /// use std::pin::Pin;
+    ///
+    /// # /*
+    /// #[tokio::main]
+    /// # */
+    /// # #[tokio::main(flavor = "current_thread")]
+    /// async fn main() {
+    /// # time::pause();
+    ///     let (tx1, mut rx1) = mpsc::channel::<usize>(10);
+    ///     let (tx2, mut rx2) = mpsc::channel::<usize>(10);
+    ///
+    ///     // Convert the channels to a `Stream`.
+    ///     let rx1 = Box::pin(async_stream::stream! {
+    ///           while let Some(item) = rx1.recv().await {
+    ///               yield item;
+    ///           }
+    ///     }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
+    ///
+    ///     let rx2 = Box::pin(async_stream::stream! {
+    ///           while let Some(item) = rx2.recv().await {
+    ///               yield item;
+    ///           }
+    ///     }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
+    ///
+    ///     let mut rx = rx1.merge(rx2);
+    ///
+    ///     tokio::spawn(async move {
+    ///         // Send some values immediately
+    ///         tx1.send(1).await.unwrap();
+    ///         tx1.send(2).await.unwrap();
+    ///
+    ///         // Let the other task send values
+    ///         time::sleep(Duration::from_millis(20)).await;
+    ///
+    ///         tx1.send(4).await.unwrap();
+    ///     });
+    ///
+    ///     tokio::spawn(async move {
+    ///         // Wait for the first task to send values
+    ///         time::sleep(Duration::from_millis(5)).await;
+    ///
+    ///         tx2.send(3).await.unwrap();
+    ///
+    ///         time::sleep(Duration::from_millis(25)).await;
+    ///
+    ///         // Send the final value
+    ///         tx2.send(5).await.unwrap();
+    ///     });
+    ///
+    ///    assert_eq!(1, rx.next().await.unwrap());
+    ///    assert_eq!(2, rx.next().await.unwrap());
+    ///    assert_eq!(3, rx.next().await.unwrap());
+    ///    assert_eq!(4, rx.next().await.unwrap());
+    ///    assert_eq!(5, rx.next().await.unwrap());
+    ///
+    ///    // The merged stream is consumed
+    ///    assert!(rx.next().await.is_none());
+    /// }
+    /// ```
+    fn merge<U>(self, other: U) -> Merge<Self, U>
+    where
+        U: Stream<Item = Self::Item>,
+        Self: Sized,
+    {
+        Merge::new(self, other)
+    }
+
+    /// Filters the values produced by this stream according to the provided
+    /// predicate.
+    ///
+    /// As values of this stream are made available, the provided predicate `f`
+    /// will be run against them. If the predicate
+    /// resolves to `true`, then the stream will yield the value, but if the
+    /// predicate resolves to `false`, then the value
+    /// will be discarded and the next value will be produced.
+    ///
+    /// Note that this function consumes the stream passed into it and returns a
+    /// wrapped version of it, similar to [`Iterator::filter`] method in the
+    /// standard library.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// # #[tokio::main]
+    /// # async fn main() {
+    /// use tokio_stream::{self as stream, StreamExt};
+    ///
+    /// let stream = stream::iter(1..=8);
+    /// let mut evens = stream.filter(|x| x % 2 == 0);
+    ///
+    /// assert_eq!(Some(2), evens.next().await);
+    /// assert_eq!(Some(4), evens.next().await);
+    /// assert_eq!(Some(6), evens.next().await);
+    /// assert_eq!(Some(8), evens.next().await);
+    /// assert_eq!(None, evens.next().await);
+    /// # }
+    /// ```
+    fn filter<F>(self, f: F) -> Filter<Self, F>
+    where
+        F: FnMut(&Self::Item) -> bool,
+        Self: Sized,
+    {
+        Filter::new(self, f)
+    }
+
+    /// Filters the values produced by this stream while simultaneously mapping
+    /// them to a different type according to the provided closure.
+    ///
+    /// As values of this stream are made available, the provided function will
+    /// be run on them. If the predicate `f` resolves to
+    /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
+    /// it resolves to [`None`], then the value will be skipped.
+    ///
+    /// Note that this function consumes the stream passed into it and returns a
+    /// wrapped version of it, similar to [`Iterator::filter_map`] method in the
+    /// standard library.
+    ///
+    /// # Examples
+    /// ```
+    /// # #[tokio::main]
+    /// # async fn main() {
+    /// use tokio_stream::{self as stream, StreamExt};
+    ///
+    /// let stream = stream::iter(1..=8);
+    /// let mut evens = stream.filter_map(|x| {
+    ///     if x % 2 == 0 { Some(x + 1) } else { None }
+    /// });
+    ///
+    /// assert_eq!(Some(3), evens.next().await);
+    /// assert_eq!(Some(5), evens.next().await);
+    /// assert_eq!(Some(7), evens.next().await);
+    /// assert_eq!(Some(9), evens.next().await);
+    /// assert_eq!(None, evens.next().await);
+    /// # }
+    /// ```
+    fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
+    where
+        F: FnMut(Self::Item) -> Option<T>,
+        Self: Sized,
+    {
+        FilterMap::new(self, f)
+    }
+
+    /// Creates a stream which ends after the first `None`.
+    ///
+    /// After a stream returns `None`, behavior is undefined. Future calls to
+    /// `poll_next` may or may not return `Some(T)` again or they may panic.
+    /// `fuse()` adapts a stream, ensuring that after `None` is given, it will
+    /// return `None` forever.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// use tokio_stream::{Stream, StreamExt};
+    ///
+    /// use std::pin::Pin;
+    /// use std::task::{Context, Poll};
+    ///
+    /// // a stream which alternates between Some and None
+    /// struct Alternate {
+    ///     state: i32,
+    /// }
+    ///
+    /// impl Stream for Alternate {
+    ///     type Item = i32;
+    ///
+    ///     fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
+    ///         let val = self.state;
+    ///         self.state = self.state + 1;
+    ///
+    ///         // if it's even, Some(i32), else None
+    ///         if val % 2 == 0 {
+    ///             Poll::Ready(Some(val))
+    ///         } else {
+    ///             Poll::Ready(None)
+    ///         }
+    ///     }
+    /// }
+    ///
+    /// #[tokio::main]
+    /// async fn main() {
+    ///     let mut stream = Alternate { state: 0 };
+    ///
+    ///     // the stream goes back and forth
+    ///     assert_eq!(stream.next().await, Some(0));
+    ///     assert_eq!(stream.next().await, None);
+    ///     assert_eq!(stream.next().await, Some(2));
+    ///     assert_eq!(stream.next().await, None);
+    ///
+    ///     // however, once it is fused
+    ///     let mut stream = stream.fuse();
+    ///
+    ///     assert_eq!(stream.next().await, Some(4));
+    ///     assert_eq!(stream.next().await, None);
+    ///
+    ///     // it will always return `None` after the first time.
+    ///     assert_eq!(stream.next().await, None);
+    ///     assert_eq!(stream.next().await, None);
+    ///     assert_eq!(stream.next().await, None);
+    /// }
+    /// ```
+    fn fuse(self) -> Fuse<Self>
+    where
+        Self: Sized,
+    {
+        Fuse::new(self)
+    }
+
+    /// Creates a new stream of at most `n` items of the underlying stream.
+    ///
+    /// Once `n` items have been yielded from this stream then it will always
+    /// return that the stream is done.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// # #[tokio::main]
+    /// # async fn main() {
+    /// use tokio_stream::{self as stream, StreamExt};
+    ///
+    /// let mut stream = stream::iter(1..=10).take(3);
+    ///
+    /// assert_eq!(Some(1), stream.next().await);
+    /// assert_eq!(Some(2), stream.next().await);
+    /// assert_eq!(Some(3), stream.next().await);
+    /// assert_eq!(None, stream.next().await);
+    /// # }
+    /// ```
+    fn take(self, n: usize) -> Take<Self>
+    where
+        Self: Sized,
+    {
+        Take::new(self, n)
+    }
+
+    /// Take elements from this stream while the provided predicate
+    /// resolves to `true`.
+    ///
+    /// This function, like `Iterator::take_while`, will take elements from the
+    /// stream until the predicate `f` resolves to `false`. Once one element
+    /// returns false it will always return that the stream is done.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// # #[tokio::main]
+    /// # async fn main() {
+    /// use tokio_stream::{self as stream, StreamExt};
+    ///
+    /// let mut stream = stream::iter(1..=10).take_while(|x| *x <= 3);
+    ///
+    /// assert_eq!(Some(1), stream.next().await);
+    /// assert_eq!(Some(2), stream.next().await);
+    /// assert_eq!(Some(3), stream.next().await);
+    /// assert_eq!(None, stream.next().await);
+    /// # }
+    /// ```
+    fn take_while<F>(self, f: F) -> TakeWhile<Self, F>
+    where
+        F: FnMut(&Self::Item) -> bool,
+        Self: Sized,
+    {
+        TakeWhile::new(self, f)
+    }
+
+    /// Creates a new stream that will skip the `n` first items of the
+    /// underlying stream.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// # #[tokio::main]
+    /// # async fn main() {
+    /// use tokio_stream::{self as stream, StreamExt};
+    ///
+    /// let mut stream = stream::iter(1..=10).skip(7);
+    ///
+    /// assert_eq!(Some(8), stream.next().await);
+    /// assert_eq!(Some(9), stream.next().await);
+    /// assert_eq!(Some(10), stream.next().await);
+    /// assert_eq!(None, stream.next().await);
+    /// # }
+    /// ```
+    fn skip(self, n: usize) -> Skip<Self>
+    where
+        Self: Sized,
+    {
+        Skip::new(self, n)
+    }
+
+    /// Skip elements from the underlying stream while the provided predicate
+    /// resolves to `true`.
+    ///
+    /// This function, like [`Iterator::skip_while`], will ignore elemets from the
+    /// stream until the predicate `f` resolves to `false`. Once one element
+    /// returns false, the rest of the elements will be yielded.
+    ///
+    /// [`Iterator::skip_while`]: std::iter::Iterator::skip_while()
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// # #[tokio::main]
+    /// # async fn main() {
+    /// use tokio_stream::{self as stream, StreamExt};
+    /// let mut stream = stream::iter(vec![1,2,3,4,1]).skip_while(|x| *x < 3);
+    ///
+    /// assert_eq!(Some(3), stream.next().await);
+    /// assert_eq!(Some(4), stream.next().await);
+    /// assert_eq!(Some(1), stream.next().await);
+    /// assert_eq!(None, stream.next().await);
+    /// # }
+    /// ```
+    fn skip_while<F>(self, f: F) -> SkipWhile<Self, F>
+    where
+        F: FnMut(&Self::Item) -> bool,
+        Self: Sized,
+    {
+        SkipWhile::new(self, f)
+    }
+
+    /// Tests if every element of the stream matches a predicate.
+    ///
+    /// Equivalent to:
+    ///
+    /// ```ignore
+    /// async fn all<F>(&mut self, f: F) -> bool;
+    /// ```
+    ///
+    /// `all()` takes a closure that returns `true` or `false`. It applies
+    /// this closure to each element of the stream, and if they all return
+    /// `true`, then so does `all`. If any of them return `false`, it
+    /// returns `false`. An empty stream returns `true`.
+    ///
+    /// `all()` is short-circuiting; in other words, it will stop processing
+    /// as soon as it finds a `false`, given that no matter what else happens,
+    /// the result will also be `false`.
+    ///
+    /// An empty stream returns `true`.
+    ///
+    /// # Examples
+    ///
+    /// Basic usage:
+    ///
+    /// ```
+    /// # #[tokio::main]
+    /// # async fn main() {
+    /// use tokio_stream::{self as stream, StreamExt};
+    ///
+    /// let a = [1, 2, 3];
+    ///
+    /// assert!(stream::iter(&a).all(|&x| x > 0).await);
+    ///
+    /// assert!(!stream::iter(&a).all(|&x| x > 2).await);
+    /// # }
+    /// ```
+    ///
+    /// Stopping at the first `false`:
+    ///
+    /// ```
+    /// # #[tokio::main]
+    /// # async fn main() {
+    /// use tokio_stream::{self as stream, StreamExt};
+    ///
+    /// let a = [1, 2, 3];
+    ///
+    /// let mut iter = stream::iter(&a);
+    ///
+    /// assert!(!iter.all(|&x| x != 2).await);
+    ///
+    /// // we can still use `iter`, as there are more elements.
+    /// assert_eq!(iter.next().await, Some(&3));
+    /// # }
+    /// ```
+    fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F>
+    where
+        Self: Unpin,
+        F: FnMut(Self::Item) -> bool,
+    {
+        AllFuture::new(self, f)
+    }
+
+    /// Tests if any element of the stream matches a predicate.
+    ///
+    /// Equivalent to:
+    ///
+    /// ```ignore
+    /// async fn any<F>(&mut self, f: F) -> bool;
+    /// ```
+    ///
+    /// `any()` takes a closure that returns `true` or `false`. It applies
+    /// this closure to each element of the stream, and if any of them return
+    /// `true`, then so does `any()`. If they all return `false`, it
+    /// returns `false`.
+    ///
+    /// `any()` is short-circuiting; in other words, it will stop processing
+    /// as soon as it finds a `true`, given that no matter what else happens,
+    /// the result will also be `true`.
+    ///
+    /// An empty stream returns `false`.
+    ///
+    /// Basic usage:
+    ///
+    /// ```
+    /// # #[tokio::main]
+    /// # async fn main() {
+    /// use tokio_stream::{self as stream, StreamExt};
+    ///
+    /// let a = [1, 2, 3];
+    ///
+    /// assert!(stream::iter(&a).any(|&x| x > 0).await);
+    ///
+    /// assert!(!stream::iter(&a).any(|&x| x > 5).await);
+    /// # }
+    /// ```
+    ///
+    /// Stopping at the first `true`:
+    ///
+    /// ```
+    /// # #[tokio::main]
+    /// # async fn main() {
+    /// use tokio_stream::{self as stream, StreamExt};
+    ///
+    /// let a = [1, 2, 3];
+    ///
+    /// let mut iter = stream::iter(&a);
+    ///
+    /// assert!(iter.any(|&x| x != 2).await);
+    ///
+    /// // we can still use `iter`, as there are more elements.
+    /// assert_eq!(iter.next().await, Some(&2));
+    /// # }
+    /// ```
+    fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F>
+    where
+        Self: Unpin,
+        F: FnMut(Self::Item) -> bool,
+    {
+        AnyFuture::new(self, f)
+    }
+
+    /// Combine two streams into one by first returning all values from the
+    /// first stream then all values from the second stream.
+    ///
+    /// As long as `self` still has values to emit, no values from `other` are
+    /// emitted, even if some are ready.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// use tokio_stream::{self as stream, StreamExt};
+    ///
+    /// #[tokio::main]
+    /// async fn main() {
+    ///     let one = stream::iter(vec![1, 2, 3]);
+    ///     let two = stream::iter(vec![4, 5, 6]);
+    ///
+    ///     let mut stream = one.chain(two);
+    ///
+    ///     assert_eq!(stream.next().await, Some(1));
+    ///     assert_eq!(stream.next().await, Some(2));
+    ///     assert_eq!(stream.next().await, Some(3));
+    ///     assert_eq!(stream.next().await, Some(4));
+    ///     assert_eq!(stream.next().await, Some(5));
+    ///     assert_eq!(stream.next().await, Some(6));
+    ///     assert_eq!(stream.next().await, None);
+    /// }
+    /// ```
+    fn chain<U>(self, other: U) -> Chain<Self, U>
+    where
+        U: Stream<Item = Self::Item>,
+        Self: Sized,
+    {
+        Chain::new(self, other)
+    }
+
+    /// A combinator that applies a function to every element in a stream
+    /// producing a single, final value.
+    ///
+    /// Equivalent to:
+    ///
+    /// ```ignore
+    /// async fn fold<B, F>(self, init: B, f: F) -> B;
+    /// ```
+    ///
+    /// # Examples
+    /// Basic usage:
+    /// ```
+    /// # #[tokio::main]
+    /// # async fn main() {
+    /// use tokio_stream::{self as stream, *};
+    ///
+    /// let s = stream::iter(vec![1u8, 2, 3]);
+    /// let sum = s.fold(0, |acc, x| acc + x).await;
+    ///
+    /// assert_eq!(sum, 6);
+    /// # }
+    /// ```
+    fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, B, F>
+    where
+        Self: Sized,
+        F: FnMut(B, Self::Item) -> B,
+    {
+        FoldFuture::new(self, init, f)
+    }
+
+    /// Drain stream pushing all emitted values into a collection.
+    ///
+    /// Equivalent to:
+    ///
+    /// ```ignore
+    /// async fn collect<T>(self) -> T;
+    /// ```
+    ///
+    /// `collect` streams all values, awaiting as needed. Values are pushed into
+    /// a collection. A number of different target collection types are
+    /// supported, including [`Vec`](std::vec::Vec),
+    /// [`String`](std::string::String), and [`Bytes`].
+    ///
+    /// [`Bytes`]: https://docs.rs/bytes/0.6.0/bytes/struct.Bytes.html
+    ///
+    /// # `Result`
+    ///
+    /// `collect()` can also be used with streams of type `Result<T, E>` where
+    /// `T: FromStream<_>`. In this case, `collect()` will stream as long as
+    /// values yielded from the stream are `Ok(_)`. If `Err(_)` is encountered,
+    /// streaming is terminated and `collect()` returns the `Err`.
+    ///
+    /// # Notes
+    ///
+    /// `FromStream` is currently a sealed trait. Stabilization is pending
+    /// enhancements to the Rust language.
+    ///
+    /// # Examples
+    ///
+    /// Basic usage:
+    ///
+    /// ```
+    /// use tokio_stream::{self as stream, StreamExt};
+    ///
+    /// #[tokio::main]
+    /// async fn main() {
+    ///     let doubled: Vec<i32> =
+    ///         stream::iter(vec![1, 2, 3])
+    ///             .map(|x| x * 2)
+    ///             .collect()
+    ///             .await;
+    ///
+    ///     assert_eq!(vec![2, 4, 6], doubled);
+    /// }
+    /// ```
+    ///
+    /// Collecting a stream of `Result` values
+    ///
+    /// ```
+    /// use tokio_stream::{self as stream, StreamExt};
+    ///
+    /// #[tokio::main]
+    /// async fn main() {
+    ///     // A stream containing only `Ok` values will be collected
+    ///     let values: Result<Vec<i32>, &str> =
+    ///         stream::iter(vec![Ok(1), Ok(2), Ok(3)])
+    ///             .collect()
+    ///             .await;
+    ///
+    ///     assert_eq!(Ok(vec![1, 2, 3]), values);
+    ///
+    ///     // A stream containing `Err` values will return the first error.
+    ///     let results = vec![Ok(1), Err("no"), Ok(2), Ok(3), Err("nein")];
+    ///
+    ///     let values: Result<Vec<i32>, &str> =
+    ///         stream::iter(results)
+    ///             .collect()
+    ///             .await;
+    ///
+    ///     assert_eq!(Err("no"), values);
+    /// }
+    /// ```
+    fn collect<T>(self) -> Collect<Self, T>
+    where
+        T: FromStream<Self::Item>,
+        Self: Sized,
+    {
+        Collect::new(self)
+    }
+
+    /// Applies a per-item timeout to the passed stream.
+    ///
+    /// `timeout()` takes a `Duration` that represents the maximum amount of
+    /// time each element of the stream has to complete before timing out.
+    ///
+    /// If the wrapped stream yields a value before the deadline is reached, the
+    /// value is returned. Otherwise, an error is returned. The caller may decide
+    /// to continue consuming the stream and will eventually get the next source
+    /// stream value once it becomes available.
+    ///
+    /// # Notes
+    ///
+    /// This function consumes the stream passed into it and returns a
+    /// wrapped version of it.
+    ///
+    /// Polling the returned stream will continue to poll the inner stream even
+    /// if one or more items time out.
+    ///
+    /// # Examples
+    ///
+    /// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
+    ///
+    /// ```
+    /// # #[tokio::main]
+    /// # async fn main() {
+    /// use tokio_stream::{self as stream, StreamExt};
+    /// use std::time::Duration;
+    /// # let int_stream = stream::iter(1..=3);
+    ///
+    /// let int_stream = int_stream.timeout(Duration::from_secs(1));
+    /// tokio::pin!(int_stream);
+    ///
+    /// // When no items time out, we get the 3 elements in succession:
+    /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
+    /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
+    /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
+    /// assert_eq!(int_stream.try_next().await, Ok(None));
+    ///
+    /// // If the second item times out, we get an error and continue polling the stream:
+    /// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
+    /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
+    /// assert!(int_stream.try_next().await.is_err());
+    /// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
+    /// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
+    /// assert_eq!(int_stream.try_next().await, Ok(None));
+    ///
+    /// // If we want to stop consuming the source stream the first time an
+    /// // element times out, we can use the `take_while` operator:
+    /// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
+    /// let mut int_stream = int_stream.take_while(Result::is_ok);
+    ///
+    /// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
+    /// assert_eq!(int_stream.try_next().await, Ok(None));
+    /// # }
+    /// ```
+    #[cfg(all(feature = "time"))]
+    #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
+    fn timeout(self, duration: Duration) -> Timeout<Self>
+    where
+        Self: Sized,
+    {
+        Timeout::new(self, duration)
+    }
+
+    /// Slows down a stream by enforcing a delay between items.
+    ///
+    /// # Example
+    ///
+    /// Create a throttled stream.
+    /// ```rust,no_run
+    /// use std::time::Duration;
+    /// use tokio_stream::StreamExt;
+    ///
+    /// # async fn dox() {
+    /// let item_stream = futures::stream::repeat("one").throttle(Duration::from_secs(2));
+    /// tokio::pin!(item_stream);
+    ///
+    /// loop {
+    ///     // The string will be produced at most every 2 seconds
+    ///     println!("{:?}", item_stream.next().await);
+    /// }
+    /// # }
+    /// ```
+    #[cfg(all(feature = "time"))]
+    #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
+    fn throttle(self, duration: Duration) -> Throttle<Self>
+    where
+        Self: Sized,
+    {
+        throttle(duration, self)
+    }
+}
+
+impl<St: ?Sized> StreamExt for St where St: Stream {}
+
+/// Merge the size hints from two streams.
+fn merge_size_hints(
+    (left_low, left_high): (usize, Option<usize>),
+    (right_low, right_hign): (usize, Option<usize>),
+) -> (usize, Option<usize>) {
+    let low = left_low.saturating_add(right_low);
+    let high = match (left_high, right_hign) {
+        (Some(h1), Some(h2)) => h1.checked_add(h2),
+        _ => None,
+    };
+    (low, high)
+}
diff --git a/src/stream_ext/all.rs b/src/stream_ext/all.rs
new file mode 100644
index 0000000..11573f9
--- /dev/null
+++ b/src/stream_ext/all.rs
@@ -0,0 +1,55 @@
+use crate::Stream;
+
+use core::future::Future;
+use core::marker::PhantomPinned;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+    /// Future for the [`all`](super::StreamExt::all) method.
+    #[derive(Debug)]
+    #[must_use = "futures do nothing unless you `.await` or poll them"]
+    pub struct AllFuture<'a, St: ?Sized, F> {
+        stream: &'a mut St,
+        f: F,
+        // Make this future `!Unpin` for compatibility with async trait methods.
+        #[pin]
+        _pin: PhantomPinned,
+    }
+}
+
+impl<'a, St: ?Sized, F> AllFuture<'a, St, F> {
+    pub(super) fn new(stream: &'a mut St, f: F) -> Self {
+        Self {
+            stream,
+            f,
+            _pin: PhantomPinned,
+        }
+    }
+}
+
+impl<St, F> Future for AllFuture<'_, St, F>
+where
+    St: ?Sized + Stream + Unpin,
+    F: FnMut(St::Item) -> bool,
+{
+    type Output = bool;
+
+    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+        let me = self.project();
+        let next = futures_core::ready!(Pin::new(me.stream).poll_next(cx));
+
+        match next {
+            Some(v) => {
+                if !(me.f)(v) {
+                    Poll::Ready(false)
+                } else {
+                    cx.waker().wake_by_ref();
+                    Poll::Pending
+                }
+            }
+            None => Poll::Ready(true),
+        }
+    }
+}
diff --git a/src/stream_ext/any.rs b/src/stream_ext/any.rs
new file mode 100644
index 0000000..4c4c593
--- /dev/null
+++ b/src/stream_ext/any.rs
@@ -0,0 +1,55 @@
+use crate::Stream;
+
+use core::future::Future;
+use core::marker::PhantomPinned;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+    /// Future for the [`any`](super::StreamExt::any) method.
+    #[derive(Debug)]
+    #[must_use = "futures do nothing unless you `.await` or poll them"]
+    pub struct AnyFuture<'a, St: ?Sized, F> {
+        stream: &'a mut St,
+        f: F,
+        // Make this future `!Unpin` for compatibility with async trait methods.
+        #[pin]
+        _pin: PhantomPinned,
+    }
+}
+
+impl<'a, St: ?Sized, F> AnyFuture<'a, St, F> {
+    pub(super) fn new(stream: &'a mut St, f: F) -> Self {
+        Self {
+            stream,
+            f,
+            _pin: PhantomPinned,
+        }
+    }
+}
+
+impl<St, F> Future for AnyFuture<'_, St, F>
+where
+    St: ?Sized + Stream + Unpin,
+    F: FnMut(St::Item) -> bool,
+{
+    type Output = bool;
+
+    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+        let me = self.project();
+        let next = futures_core::ready!(Pin::new(me.stream).poll_next(cx));
+
+        match next {
+            Some(v) => {
+                if (me.f)(v) {
+                    Poll::Ready(true)
+                } else {
+                    cx.waker().wake_by_ref();
+                    Poll::Pending
+                }
+            }
+            None => Poll::Ready(false),
+        }
+    }
+}
diff --git a/src/stream_ext/chain.rs b/src/stream_ext/chain.rs
new file mode 100644
index 0000000..bd64f33
--- /dev/null
+++ b/src/stream_ext/chain.rs
@@ -0,0 +1,50 @@
+use crate::stream_ext::Fuse;
+use crate::Stream;
+
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+    /// Stream returned by the [`chain`](super::StreamExt::chain) method.
+    pub struct Chain<T, U> {
+        #[pin]
+        a: Fuse<T>,
+        #[pin]
+        b: U,
+    }
+}
+
+impl<T, U> Chain<T, U> {
+    pub(super) fn new(a: T, b: U) -> Chain<T, U>
+    where
+        T: Stream,
+        U: Stream,
+    {
+        Chain { a: Fuse::new(a), b }
+    }
+}
+
+impl<T, U> Stream for Chain<T, U>
+where
+    T: Stream,
+    U: Stream<Item = T::Item>,
+{
+    type Item = T::Item;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
+        use Poll::Ready;
+
+        let me = self.project();
+
+        if let Some(v) = ready!(me.a.poll_next(cx)) {
+            return Ready(Some(v));
+        }
+
+        me.b.poll_next(cx)
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        super::merge_size_hints(self.a.size_hint(), self.b.size_hint())
+    }
+}
diff --git a/src/stream_ext/collect.rs b/src/stream_ext/collect.rs
new file mode 100644
index 0000000..23f48b0
--- /dev/null
+++ b/src/stream_ext/collect.rs
@@ -0,0 +1,233 @@
+use crate::Stream;
+
+use core::future::Future;
+use core::marker::PhantomPinned;
+use core::mem;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+// Do not export this struct until `FromStream` can be unsealed.
+pin_project! {
+    /// Future returned by the [`collect`](super::StreamExt::collect) method.
+    #[must_use = "futures do nothing unless you `.await` or poll them"]
+    #[derive(Debug)]
+    pub struct Collect<T, U>
+    where
+        T: Stream,
+        U: FromStream<T::Item>,
+    {
+        #[pin]
+        stream: T,
+        collection: U::InternalCollection,
+        // Make this future `!Unpin` for compatibility with async trait methods.
+        #[pin]
+        _pin: PhantomPinned,
+    }
+}
+
+/// Convert from a [`Stream`](crate::Stream).
+///
+/// This trait is not intended to be used directly. Instead, call
+/// [`StreamExt::collect()`](super::StreamExt::collect).
+///
+/// # Implementing
+///
+/// Currently, this trait may not be implemented by third parties. The trait is
+/// sealed in order to make changes in the future. Stabilization is pending
+/// enhancements to the Rust language.
+pub trait FromStream<T>: sealed::FromStreamPriv<T> {}
+
+impl<T, U> Collect<T, U>
+where
+    T: Stream,
+    U: FromStream<T::Item>,
+{
+    pub(super) fn new(stream: T) -> Collect<T, U> {
+        let (lower, upper) = stream.size_hint();
+        let collection = U::initialize(sealed::Internal, lower, upper);
+
+        Collect {
+            stream,
+            collection,
+            _pin: PhantomPinned,
+        }
+    }
+}
+
+impl<T, U> Future for Collect<T, U>
+where
+    T: Stream,
+    U: FromStream<T::Item>,
+{
+    type Output = U;
+
+    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<U> {
+        use Poll::Ready;
+
+        loop {
+            let mut me = self.as_mut().project();
+
+            let item = match ready!(me.stream.poll_next(cx)) {
+                Some(item) => item,
+                None => {
+                    return Ready(U::finalize(sealed::Internal, &mut me.collection));
+                }
+            };
+
+            if !U::extend(sealed::Internal, &mut me.collection, item) {
+                return Ready(U::finalize(sealed::Internal, &mut me.collection));
+            }
+        }
+    }
+}
+
+// ===== FromStream implementations
+
+impl FromStream<()> for () {}
+
+impl sealed::FromStreamPriv<()> for () {
+    type InternalCollection = ();
+
+    fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) {}
+
+    fn extend(_: sealed::Internal, _collection: &mut (), _item: ()) -> bool {
+        true
+    }
+
+    fn finalize(_: sealed::Internal, _collection: &mut ()) {}
+}
+
+impl<T: AsRef<str>> FromStream<T> for String {}
+
+impl<T: AsRef<str>> sealed::FromStreamPriv<T> for String {
+    type InternalCollection = String;
+
+    fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) -> String {
+        String::new()
+    }
+
+    fn extend(_: sealed::Internal, collection: &mut String, item: T) -> bool {
+        collection.push_str(item.as_ref());
+        true
+    }
+
+    fn finalize(_: sealed::Internal, collection: &mut String) -> String {
+        mem::replace(collection, String::new())
+    }
+}
+
+impl<T> FromStream<T> for Vec<T> {}
+
+impl<T> sealed::FromStreamPriv<T> for Vec<T> {
+    type InternalCollection = Vec<T>;
+
+    fn initialize(_: sealed::Internal, lower: usize, _upper: Option<usize>) -> Vec<T> {
+        Vec::with_capacity(lower)
+    }
+
+    fn extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool {
+        collection.push(item);
+        true
+    }
+
+    fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Vec<T> {
+        mem::replace(collection, vec![])
+    }
+}
+
+impl<T> FromStream<T> for Box<[T]> {}
+
+impl<T> sealed::FromStreamPriv<T> for Box<[T]> {
+    type InternalCollection = Vec<T>;
+
+    fn initialize(_: sealed::Internal, lower: usize, upper: Option<usize>) -> Vec<T> {
+        <Vec<T> as sealed::FromStreamPriv<T>>::initialize(sealed::Internal, lower, upper)
+    }
+
+    fn extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool {
+        <Vec<T> as sealed::FromStreamPriv<T>>::extend(sealed::Internal, collection, item)
+    }
+
+    fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Box<[T]> {
+        <Vec<T> as sealed::FromStreamPriv<T>>::finalize(sealed::Internal, collection)
+            .into_boxed_slice()
+    }
+}
+
+impl<T, U, E> FromStream<Result<T, E>> for Result<U, E> where U: FromStream<T> {}
+
+impl<T, U, E> sealed::FromStreamPriv<Result<T, E>> for Result<U, E>
+where
+    U: FromStream<T>,
+{
+    type InternalCollection = Result<U::InternalCollection, E>;
+
+    fn initialize(
+        _: sealed::Internal,
+        lower: usize,
+        upper: Option<usize>,
+    ) -> Result<U::InternalCollection, E> {
+        Ok(U::initialize(sealed::Internal, lower, upper))
+    }
+
+    fn extend(
+        _: sealed::Internal,
+        collection: &mut Self::InternalCollection,
+        item: Result<T, E>,
+    ) -> bool {
+        assert!(collection.is_ok());
+        match item {
+            Ok(item) => {
+                let collection = collection.as_mut().ok().expect("invalid state");
+                U::extend(sealed::Internal, collection, item)
+            }
+            Err(err) => {
+                *collection = Err(err);
+                false
+            }
+        }
+    }
+
+    fn finalize(_: sealed::Internal, collection: &mut Self::InternalCollection) -> Result<U, E> {
+        if let Ok(collection) = collection.as_mut() {
+            Ok(U::finalize(sealed::Internal, collection))
+        } else {
+            let res = mem::replace(collection, Ok(U::initialize(sealed::Internal, 0, Some(0))));
+
+            if let Err(err) = res {
+                Err(err)
+            } else {
+                unreachable!();
+            }
+        }
+    }
+}
+
+pub(crate) mod sealed {
+    #[doc(hidden)]
+    pub trait FromStreamPriv<T> {
+        /// Intermediate type used during collection process
+        ///
+        /// The name of this type is internal and cannot be relied upon.
+        type InternalCollection;
+
+        /// Initialize the collection
+        fn initialize(
+            internal: Internal,
+            lower: usize,
+            upper: Option<usize>,
+        ) -> Self::InternalCollection;
+
+        /// Extend the collection with the received item
+        ///
+        /// Return `true` to continue streaming, `false` complete collection.
+        fn extend(internal: Internal, collection: &mut Self::InternalCollection, item: T) -> bool;
+
+        /// Finalize collection into target type.
+        fn finalize(internal: Internal, collection: &mut Self::InternalCollection) -> Self;
+    }
+
+    #[allow(missing_debug_implementations)]
+    pub struct Internal;
+}
diff --git a/src/stream_ext/filter.rs b/src/stream_ext/filter.rs
new file mode 100644
index 0000000..f3dd871
--- /dev/null
+++ b/src/stream_ext/filter.rs
@@ -0,0 +1,58 @@
+use crate::Stream;
+
+use core::fmt;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+    /// Stream returned by the [`filter`](super::StreamExt::filter) method.
+    #[must_use = "streams do nothing unless polled"]
+    pub struct Filter<St, F> {
+        #[pin]
+        stream: St,
+        f: F,
+    }
+}
+
+impl<St, F> fmt::Debug for Filter<St, F>
+where
+    St: fmt::Debug,
+{
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("Filter")
+            .field("stream", &self.stream)
+            .finish()
+    }
+}
+
+impl<St, F> Filter<St, F> {
+    pub(super) fn new(stream: St, f: F) -> Self {
+        Self { stream, f }
+    }
+}
+
+impl<St, F> Stream for Filter<St, F>
+where
+    St: Stream,
+    F: FnMut(&St::Item) -> bool,
+{
+    type Item = St::Item;
+
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> {
+        loop {
+            match ready!(self.as_mut().project().stream.poll_next(cx)) {
+                Some(e) => {
+                    if (self.as_mut().project().f)(&e) {
+                        return Poll::Ready(Some(e));
+                    }
+                }
+                None => return Poll::Ready(None),
+            }
+        }
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        (0, self.stream.size_hint().1) // can't know a lower bound, due to the predicate
+    }
+}
diff --git a/src/stream_ext/filter_map.rs b/src/stream_ext/filter_map.rs
new file mode 100644
index 0000000..fe604a6
--- /dev/null
+++ b/src/stream_ext/filter_map.rs
@@ -0,0 +1,58 @@
+use crate::Stream;
+
+use core::fmt;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+    /// Stream returned by the [`filter_map`](super::StreamExt::filter_map) method.
+    #[must_use = "streams do nothing unless polled"]
+    pub struct FilterMap<St, F> {
+        #[pin]
+        stream: St,
+        f: F,
+    }
+}
+
+impl<St, F> fmt::Debug for FilterMap<St, F>
+where
+    St: fmt::Debug,
+{
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("FilterMap")
+            .field("stream", &self.stream)
+            .finish()
+    }
+}
+
+impl<St, F> FilterMap<St, F> {
+    pub(super) fn new(stream: St, f: F) -> Self {
+        Self { stream, f }
+    }
+}
+
+impl<St, F, T> Stream for FilterMap<St, F>
+where
+    St: Stream,
+    F: FnMut(St::Item) -> Option<T>,
+{
+    type Item = T;
+
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+        loop {
+            match ready!(self.as_mut().project().stream.poll_next(cx)) {
+                Some(e) => {
+                    if let Some(e) = (self.as_mut().project().f)(e) {
+                        return Poll::Ready(Some(e));
+                    }
+                }
+                None => return Poll::Ready(None),
+            }
+        }
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        (0, self.stream.size_hint().1) // can't know a lower bound, due to the predicate
+    }
+}
diff --git a/src/stream_ext/fold.rs b/src/stream_ext/fold.rs
new file mode 100644
index 0000000..e2e97d8
--- /dev/null
+++ b/src/stream_ext/fold.rs
@@ -0,0 +1,57 @@
+use crate::Stream;
+
+use core::future::Future;
+use core::marker::PhantomPinned;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+    /// Future returned by the [`fold`](super::StreamExt::fold) method.
+    #[derive(Debug)]
+    #[must_use = "futures do nothing unless you `.await` or poll them"]
+    pub struct FoldFuture<St, B, F> {
+        #[pin]
+        stream: St,
+        acc: Option<B>,
+        f: F,
+        // Make this future `!Unpin` for compatibility with async trait methods.
+        #[pin]
+        _pin: PhantomPinned,
+    }
+}
+
+impl<St, B, F> FoldFuture<St, B, F> {
+    pub(super) fn new(stream: St, init: B, f: F) -> Self {
+        Self {
+            stream,
+            acc: Some(init),
+            f,
+            _pin: PhantomPinned,
+        }
+    }
+}
+
+impl<St, B, F> Future for FoldFuture<St, B, F>
+where
+    St: Stream,
+    F: FnMut(B, St::Item) -> B,
+{
+    type Output = B;
+
+    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+        let mut me = self.project();
+        loop {
+            let next = ready!(me.stream.as_mut().poll_next(cx));
+
+            match next {
+                Some(v) => {
+                    let old = me.acc.take().unwrap();
+                    let new = (me.f)(old, v);
+                    *me.acc = Some(new);
+                }
+                None => return Poll::Ready(me.acc.take().unwrap()),
+            }
+        }
+    }
+}
diff --git a/src/stream_ext/fuse.rs b/src/stream_ext/fuse.rs
new file mode 100644
index 0000000..2500641
--- /dev/null
+++ b/src/stream_ext/fuse.rs
@@ -0,0 +1,53 @@
+use crate::Stream;
+
+use pin_project_lite::pin_project;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+pin_project! {
+    /// Stream returned by [`fuse()`][super::StreamExt::fuse].
+    #[derive(Debug)]
+    pub struct Fuse<T> {
+        #[pin]
+        stream: Option<T>,
+    }
+}
+
+impl<T> Fuse<T>
+where
+    T: Stream,
+{
+    pub(crate) fn new(stream: T) -> Fuse<T> {
+        Fuse {
+            stream: Some(stream),
+        }
+    }
+}
+
+impl<T> Stream for Fuse<T>
+where
+    T: Stream,
+{
+    type Item = T::Item;
+
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
+        let res = match Option::as_pin_mut(self.as_mut().project().stream) {
+            Some(stream) => ready!(stream.poll_next(cx)),
+            None => return Poll::Ready(None),
+        };
+
+        if res.is_none() {
+            // Do not poll the stream anymore
+            self.as_mut().project().stream.set(None);
+        }
+
+        Poll::Ready(res)
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        match self.stream {
+            Some(ref stream) => stream.size_hint(),
+            None => (0, Some(0)),
+        }
+    }
+}
diff --git a/src/stream_ext/map.rs b/src/stream_ext/map.rs
new file mode 100644
index 0000000..e6b47cd
--- /dev/null
+++ b/src/stream_ext/map.rs
@@ -0,0 +1,51 @@
+use crate::Stream;
+
+use core::fmt;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+    /// Stream for the [`map`](super::StreamExt::map) method.
+    #[must_use = "streams do nothing unless polled"]
+    pub struct Map<St, F> {
+        #[pin]
+        stream: St,
+        f: F,
+    }
+}
+
+impl<St, F> fmt::Debug for Map<St, F>
+where
+    St: fmt::Debug,
+{
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("Map").field("stream", &self.stream).finish()
+    }
+}
+
+impl<St, F> Map<St, F> {
+    pub(super) fn new(stream: St, f: F) -> Self {
+        Map { stream, f }
+    }
+}
+
+impl<St, F, T> Stream for Map<St, F>
+where
+    St: Stream,
+    F: FnMut(St::Item) -> T,
+{
+    type Item = T;
+
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+        self.as_mut()
+            .project()
+            .stream
+            .poll_next(cx)
+            .map(|opt| opt.map(|x| (self.as_mut().project().f)(x)))
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        self.stream.size_hint()
+    }
+}
diff --git a/src/stream_ext/merge.rs b/src/stream_ext/merge.rs
new file mode 100644
index 0000000..9d5123c
--- /dev/null
+++ b/src/stream_ext/merge.rs
@@ -0,0 +1,90 @@
+use crate::stream_ext::Fuse;
+use crate::Stream;
+
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+    /// Stream returned by the [`merge`](super::StreamExt::merge) method.
+    pub struct Merge<T, U> {
+        #[pin]
+        a: Fuse<T>,
+        #[pin]
+        b: Fuse<U>,
+        // When `true`, poll `a` first, otherwise, `poll` b`.
+        a_first: bool,
+    }
+}
+
+impl<T, U> Merge<T, U> {
+    pub(super) fn new(a: T, b: U) -> Merge<T, U>
+    where
+        T: Stream,
+        U: Stream,
+    {
+        Merge {
+            a: Fuse::new(a),
+            b: Fuse::new(b),
+            a_first: true,
+        }
+    }
+}
+
+impl<T, U> Stream for Merge<T, U>
+where
+    T: Stream,
+    U: Stream<Item = T::Item>,
+{
+    type Item = T::Item;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
+        let me = self.project();
+        let a_first = *me.a_first;
+
+        // Toggle the flag
+        *me.a_first = !a_first;
+
+        if a_first {
+            poll_next(me.a, me.b, cx)
+        } else {
+            poll_next(me.b, me.a, cx)
+        }
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        super::merge_size_hints(self.a.size_hint(), self.b.size_hint())
+    }
+}
+
+fn poll_next<T, U>(
+    first: Pin<&mut T>,
+    second: Pin<&mut U>,
+    cx: &mut Context<'_>,
+) -> Poll<Option<T::Item>>
+where
+    T: Stream,
+    U: Stream<Item = T::Item>,
+{
+    use Poll::*;
+
+    let mut done = true;
+
+    match first.poll_next(cx) {
+        Ready(Some(val)) => return Ready(Some(val)),
+        Ready(None) => {}
+        Pending => done = false,
+    }
+
+    match second.poll_next(cx) {
+        Ready(Some(val)) => return Ready(Some(val)),
+        Ready(None) => {}
+        Pending => done = false,
+    }
+
+    if done {
+        Ready(None)
+    } else {
+        Pending
+    }
+}
diff --git a/src/stream_ext/next.rs b/src/stream_ext/next.rs
new file mode 100644
index 0000000..175490c
--- /dev/null
+++ b/src/stream_ext/next.rs
@@ -0,0 +1,37 @@
+use crate::Stream;
+
+use core::future::Future;
+use core::marker::PhantomPinned;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+    /// Future for the [`next`](super::StreamExt::next) method.
+    #[derive(Debug)]
+    #[must_use = "futures do nothing unless you `.await` or poll them"]
+    pub struct Next<'a, St: ?Sized> {
+        stream: &'a mut St,
+        // Make this future `!Unpin` for compatibility with async trait methods.
+        #[pin]
+        _pin: PhantomPinned,
+    }
+}
+
+impl<'a, St: ?Sized> Next<'a, St> {
+    pub(super) fn new(stream: &'a mut St) -> Self {
+        Next {
+            stream,
+            _pin: PhantomPinned,
+        }
+    }
+}
+
+impl<St: ?Sized + Stream + Unpin> Future for Next<'_, St> {
+    type Output = Option<St::Item>;
+
+    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+        let me = self.project();
+        Pin::new(me.stream).poll_next(cx)
+    }
+}
diff --git a/src/stream_ext/skip.rs b/src/stream_ext/skip.rs
new file mode 100644
index 0000000..80a0a0a
--- /dev/null
+++ b/src/stream_ext/skip.rs
@@ -0,0 +1,63 @@
+use crate::Stream;
+
+use core::fmt;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+    /// Stream for the [`skip`](super::StreamExt::skip) method.
+    #[must_use = "streams do nothing unless polled"]
+    pub struct Skip<St> {
+        #[pin]
+        stream: St,
+        remaining: usize,
+    }
+}
+
+impl<St> fmt::Debug for Skip<St>
+where
+    St: fmt::Debug,
+{
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("Skip")
+            .field("stream", &self.stream)
+            .finish()
+    }
+}
+
+impl<St> Skip<St> {
+    pub(super) fn new(stream: St, remaining: usize) -> Self {
+        Self { stream, remaining }
+    }
+}
+
+impl<St> Stream for Skip<St>
+where
+    St: Stream,
+{
+    type Item = St::Item;
+
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        loop {
+            match ready!(self.as_mut().project().stream.poll_next(cx)) {
+                Some(e) => {
+                    if self.remaining == 0 {
+                        return Poll::Ready(Some(e));
+                    }
+                    *self.as_mut().project().remaining -= 1;
+                }
+                None => return Poll::Ready(None),
+            }
+        }
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        let (lower, upper) = self.stream.size_hint();
+
+        let lower = lower.saturating_sub(self.remaining);
+        let upper = upper.map(|x| x.saturating_sub(self.remaining));
+
+        (lower, upper)
+    }
+}
diff --git a/src/stream_ext/skip_while.rs b/src/stream_ext/skip_while.rs
new file mode 100644
index 0000000..985a926
--- /dev/null
+++ b/src/stream_ext/skip_while.rs
@@ -0,0 +1,73 @@
+use crate::Stream;
+
+use core::fmt;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+    /// Stream for the [`skip_while`](super::StreamExt::skip_while) method.
+    #[must_use = "streams do nothing unless polled"]
+    pub struct SkipWhile<St, F> {
+        #[pin]
+        stream: St,
+        predicate: Option<F>,
+    }
+}
+
+impl<St, F> fmt::Debug for SkipWhile<St, F>
+where
+    St: fmt::Debug,
+{
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("SkipWhile")
+            .field("stream", &self.stream)
+            .finish()
+    }
+}
+
+impl<St, F> SkipWhile<St, F> {
+    pub(super) fn new(stream: St, predicate: F) -> Self {
+        Self {
+            stream,
+            predicate: Some(predicate),
+        }
+    }
+}
+
+impl<St, F> Stream for SkipWhile<St, F>
+where
+    St: Stream,
+    F: FnMut(&St::Item) -> bool,
+{
+    type Item = St::Item;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        let mut this = self.project();
+        if let Some(predicate) = this.predicate {
+            loop {
+                match ready!(this.stream.as_mut().poll_next(cx)) {
+                    Some(item) => {
+                        if !(predicate)(&item) {
+                            *this.predicate = None;
+                            return Poll::Ready(Some(item));
+                        }
+                    }
+                    None => return Poll::Ready(None),
+                }
+            }
+        } else {
+            this.stream.poll_next(cx)
+        }
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        let (lower, upper) = self.stream.size_hint();
+
+        if self.predicate.is_some() {
+            return (0, upper);
+        }
+
+        (lower, upper)
+    }
+}
diff --git a/src/stream_ext/take.rs b/src/stream_ext/take.rs
new file mode 100644
index 0000000..c75648f
--- /dev/null
+++ b/src/stream_ext/take.rs
@@ -0,0 +1,76 @@
+use crate::Stream;
+
+use core::cmp;
+use core::fmt;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+    /// Stream for the [`take`](super::StreamExt::take) method.
+    #[must_use = "streams do nothing unless polled"]
+    pub struct Take<St> {
+        #[pin]
+        stream: St,
+        remaining: usize,
+    }
+}
+
+impl<St> fmt::Debug for Take<St>
+where
+    St: fmt::Debug,
+{
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("Take")
+            .field("stream", &self.stream)
+            .finish()
+    }
+}
+
+impl<St> Take<St> {
+    pub(super) fn new(stream: St, remaining: usize) -> Self {
+        Self { stream, remaining }
+    }
+}
+
+impl<St> Stream for Take<St>
+where
+    St: Stream,
+{
+    type Item = St::Item;
+
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        if *self.as_mut().project().remaining > 0 {
+            self.as_mut().project().stream.poll_next(cx).map(|ready| {
+                match &ready {
+                    Some(_) => {
+                        *self.as_mut().project().remaining -= 1;
+                    }
+                    None => {
+                        *self.as_mut().project().remaining = 0;
+                    }
+                }
+                ready
+            })
+        } else {
+            Poll::Ready(None)
+        }
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        if self.remaining == 0 {
+            return (0, Some(0));
+        }
+
+        let (lower, upper) = self.stream.size_hint();
+
+        let lower = cmp::min(lower, self.remaining as usize);
+
+        let upper = match upper {
+            Some(x) if x < self.remaining as usize => Some(x),
+            _ => Some(self.remaining as usize),
+        };
+
+        (lower, upper)
+    }
+}
diff --git a/src/stream_ext/take_while.rs b/src/stream_ext/take_while.rs
new file mode 100644
index 0000000..5ce4dd9
--- /dev/null
+++ b/src/stream_ext/take_while.rs
@@ -0,0 +1,79 @@
+use crate::Stream;
+
+use core::fmt;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+    /// Stream for the [`take_while`](super::StreamExt::take_while) method.
+    #[must_use = "streams do nothing unless polled"]
+    pub struct TakeWhile<St, F> {
+        #[pin]
+        stream: St,
+        predicate: F,
+        done: bool,
+    }
+}
+
+impl<St, F> fmt::Debug for TakeWhile<St, F>
+where
+    St: fmt::Debug,
+{
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("TakeWhile")
+            .field("stream", &self.stream)
+            .field("done", &self.done)
+            .finish()
+    }
+}
+
+impl<St, F> TakeWhile<St, F> {
+    pub(super) fn new(stream: St, predicate: F) -> Self {
+        Self {
+            stream,
+            predicate,
+            done: false,
+        }
+    }
+}
+
+impl<St, F> Stream for TakeWhile<St, F>
+where
+    St: Stream,
+    F: FnMut(&St::Item) -> bool,
+{
+    type Item = St::Item;
+
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        if !*self.as_mut().project().done {
+            self.as_mut().project().stream.poll_next(cx).map(|ready| {
+                let ready = ready.and_then(|item| {
+                    if !(self.as_mut().project().predicate)(&item) {
+                        None
+                    } else {
+                        Some(item)
+                    }
+                });
+
+                if ready.is_none() {
+                    *self.as_mut().project().done = true;
+                }
+
+                ready
+            })
+        } else {
+            Poll::Ready(None)
+        }
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        if self.done {
+            return (0, Some(0));
+        }
+
+        let (_, upper) = self.stream.size_hint();
+
+        (0, upper)
+    }
+}
diff --git a/src/stream_ext/throttle.rs b/src/stream_ext/throttle.rs
new file mode 100644
index 0000000..99f3e0e
--- /dev/null
+++ b/src/stream_ext/throttle.rs
@@ -0,0 +1,97 @@
+//! Slow down a stream by enforcing a delay between items.
+
+use crate::Stream;
+use tokio::time::{Duration, Instant, Sleep};
+
+use std::future::Future;
+use std::marker::Unpin;
+use std::pin::Pin;
+use std::task::{self, Poll};
+
+use pin_project_lite::pin_project;
+
+pub(super) fn throttle<T>(duration: Duration, stream: T) -> Throttle<T>
+where
+    T: Stream,
+{
+    Throttle {
+        delay: tokio::time::sleep_until(Instant::now() + duration),
+        duration,
+        has_delayed: true,
+        stream,
+    }
+}
+
+pin_project! {
+    /// Stream for the [`throttle`](throttle) function.
+    #[derive(Debug)]
+    #[must_use = "streams do nothing unless polled"]
+    pub struct Throttle<T> {
+        #[pin]
+        delay: Sleep,
+        duration: Duration,
+
+        // Set to true when `delay` has returned ready, but `stream` hasn't.
+        has_delayed: bool,
+
+        // The stream to throttle
+        #[pin]
+        stream: T,
+    }
+}
+
+// XXX: are these safe if `T: !Unpin`?
+impl<T: Unpin> Throttle<T> {
+    /// Acquires a reference to the underlying stream that this combinator is
+    /// pulling from.
+    pub fn get_ref(&self) -> &T {
+        &self.stream
+    }
+
+    /// Acquires a mutable reference to the underlying stream that this combinator
+    /// is pulling from.
+    ///
+    /// Note that care must be taken to avoid tampering with the state of the stream
+    /// which may otherwise confuse this combinator.
+    pub fn get_mut(&mut self) -> &mut T {
+        &mut self.stream
+    }
+
+    /// Consumes this combinator, returning the underlying stream.
+    ///
+    /// Note that this may discard intermediate state of this combinator, so care
+    /// should be taken to avoid losing resources when this is called.
+    pub fn into_inner(self) -> T {
+        self.stream
+    }
+}
+
+impl<T: Stream> Stream for Throttle<T> {
+    type Item = T::Item;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
+        let mut me = self.project();
+        let dur = *me.duration;
+
+        if !*me.has_delayed && !is_zero(dur) {
+            ready!(me.delay.as_mut().poll(cx));
+            *me.has_delayed = true;
+        }
+
+        let value = ready!(me.stream.poll_next(cx));
+
+        if value.is_some() {
+            if !is_zero(dur) {
+                me.delay.reset(Instant::now() + dur);
+            }
+
+            *me.has_delayed = false;
+        }
+
+        Poll::Ready(value)
+    }
+}
+
+fn is_zero(dur: Duration) -> bool {
+    dur == Duration::from_millis(0)
+}
diff --git a/src/stream_ext/timeout.rs b/src/stream_ext/timeout.rs
new file mode 100644
index 0000000..de17dc0
--- /dev/null
+++ b/src/stream_ext/timeout.rs
@@ -0,0 +1,96 @@
+use crate::stream_ext::Fuse;
+use crate::Stream;
+use tokio::time::{Instant, Sleep};
+
+use core::future::Future;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+use std::fmt;
+use std::time::Duration;
+
+pin_project! {
+    /// Stream returned by the [`timeout`](super::StreamExt::timeout) method.
+    #[must_use = "streams do nothing unless polled"]
+    #[derive(Debug)]
+    pub struct Timeout<S> {
+        #[pin]
+        stream: Fuse<S>,
+        #[pin]
+        deadline: Sleep,
+        duration: Duration,
+        poll_deadline: bool,
+    }
+}
+
+/// Error returned by `Timeout`.
+#[derive(Debug, PartialEq)]
+pub struct Elapsed(());
+
+impl<S: Stream> Timeout<S> {
+    pub(super) fn new(stream: S, duration: Duration) -> Self {
+        let next = Instant::now() + duration;
+        let deadline = tokio::time::sleep_until(next);
+
+        Timeout {
+            stream: Fuse::new(stream),
+            deadline,
+            duration,
+            poll_deadline: true,
+        }
+    }
+}
+
+impl<S: Stream> Stream for Timeout<S> {
+    type Item = Result<S::Item, Elapsed>;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        let me = self.project();
+
+        match me.stream.poll_next(cx) {
+            Poll::Ready(v) => {
+                if v.is_some() {
+                    let next = Instant::now() + *me.duration;
+                    me.deadline.reset(next);
+                    *me.poll_deadline = true;
+                }
+                return Poll::Ready(v.map(Ok));
+            }
+            Poll::Pending => {}
+        };
+
+        if *me.poll_deadline {
+            ready!(me.deadline.poll(cx));
+            *me.poll_deadline = false;
+            return Poll::Ready(Some(Err(Elapsed::new())));
+        }
+
+        Poll::Pending
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        self.stream.size_hint()
+    }
+}
+
+// ===== impl Elapsed =====
+
+impl Elapsed {
+    pub(crate) fn new() -> Self {
+        Elapsed(())
+    }
+}
+
+impl fmt::Display for Elapsed {
+    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+        "deadline has elapsed".fmt(fmt)
+    }
+}
+
+impl std::error::Error for Elapsed {}
+
+impl From<Elapsed> for std::io::Error {
+    fn from(_err: Elapsed) -> std::io::Error {
+        std::io::ErrorKind::TimedOut.into()
+    }
+}
diff --git a/src/stream_ext/try_next.rs b/src/stream_ext/try_next.rs
new file mode 100644
index 0000000..af27d87
--- /dev/null
+++ b/src/stream_ext/try_next.rs
@@ -0,0 +1,39 @@
+use crate::stream_ext::Next;
+use crate::Stream;
+
+use core::future::Future;
+use core::marker::PhantomPinned;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+    /// Future for the [`try_next`](super::StreamExt::try_next) method.
+    #[derive(Debug)]
+    #[must_use = "futures do nothing unless you `.await` or poll them"]
+    pub struct TryNext<'a, St: ?Sized> {
+        #[pin]
+        inner: Next<'a, St>,
+        // Make this future `!Unpin` for compatibility with async trait methods.
+        #[pin]
+        _pin: PhantomPinned,
+    }
+}
+
+impl<'a, St: ?Sized> TryNext<'a, St> {
+    pub(super) fn new(stream: &'a mut St) -> Self {
+        Self {
+            inner: Next::new(stream),
+            _pin: PhantomPinned,
+        }
+    }
+}
+
+impl<T, E, St: ?Sized + Stream<Item = Result<T, E>> + Unpin> Future for TryNext<'_, St> {
+    type Output = Result<Option<T>, E>;
+
+    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+        let me = self.project();
+        me.inner.poll(cx).map(Option::transpose)
+    }
+}
diff --git a/src/stream_map.rs b/src/stream_map.rs
new file mode 100644
index 0000000..85b60cf
--- /dev/null
+++ b/src/stream_map.rs
@@ -0,0 +1,664 @@
+use crate::Stream;
+
+use std::borrow::Borrow;
+use std::hash::Hash;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+/// Combine many streams into one, indexing each source stream with a unique
+/// key.
+///
+/// `StreamMap` is similar to [`StreamExt::merge`] in that it combines source
+/// streams into a single merged stream that yields values in the order that
+/// they arrive from the source streams. However, `StreamMap` has a lot more
+/// flexibility in usage patterns.
+///
+/// `StreamMap` can:
+///
+/// * Merge an arbitrary number of streams.
+/// * Track which source stream the value was received from.
+/// * Handle inserting and removing streams from the set of managed streams at
+///   any point during iteration.
+///
+/// All source streams held by `StreamMap` are indexed using a key. This key is
+/// included with the value when a source stream yields a value. The key is also
+/// used to remove the stream from the `StreamMap` before the stream has
+/// completed streaming.
+///
+/// # `Unpin`
+///
+/// Because the `StreamMap` API moves streams during runtime, both streams and
+/// keys must be `Unpin`. In order to insert a `!Unpin` stream into a
+/// `StreamMap`, use [`pin!`] to pin the stream to the stack or [`Box::pin`] to
+/// pin the stream in the heap.
+///
+/// # Implementation
+///
+/// `StreamMap` is backed by a `Vec<(K, V)>`. There is no guarantee that this
+/// internal implementation detail will persist in future versions, but it is
+/// important to know the runtime implications. In general, `StreamMap` works
+/// best with a "smallish" number of streams as all entries are scanned on
+/// insert, remove, and polling. In cases where a large number of streams need
+/// to be merged, it may be advisable to use tasks sending values on a shared
+/// [`mpsc`] channel.
+///
+/// [`StreamExt::merge`]: crate::StreamExt::merge
+/// [`mpsc`]: https://docs.rs/tokio/1.0/tokio/sync/mpsc/index.html
+/// [`pin!`]: https://docs.rs/tokio/1.0/tokio/macro.pin.html
+/// [`Box::pin`]: std::boxed::Box::pin
+///
+/// # Examples
+///
+/// Merging two streams, then remove them after receiving the first value
+///
+/// ```
+/// use tokio_stream::{StreamExt, StreamMap, Stream};
+/// use tokio::sync::mpsc;
+/// use std::pin::Pin;
+///
+/// #[tokio::main]
+/// async fn main() {
+///     let (tx1, mut rx1) = mpsc::channel::<usize>(10);
+///     let (tx2, mut rx2) = mpsc::channel::<usize>(10);
+///
+///     // Convert the channels to a `Stream`.
+///     let rx1 = Box::pin(async_stream::stream! {
+///           while let Some(item) = rx1.recv().await {
+///               yield item;
+///           }
+///     }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
+///
+///     let rx2 = Box::pin(async_stream::stream! {
+///           while let Some(item) = rx2.recv().await {
+///               yield item;
+///           }
+///     }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
+///
+///     tokio::spawn(async move {
+///         tx1.send(1).await.unwrap();
+///
+///         // This value will never be received. The send may or may not return
+///         // `Err` depending on if the remote end closed first or not.
+///         let _ = tx1.send(2).await;
+///     });
+///
+///     tokio::spawn(async move {
+///         tx2.send(3).await.unwrap();
+///         let _ = tx2.send(4).await;
+///     });
+///
+///     let mut map = StreamMap::new();
+///
+///     // Insert both streams
+///     map.insert("one", rx1);
+///     map.insert("two", rx2);
+///
+///     // Read twice
+///     for _ in 0..2 {
+///         let (key, val) = map.next().await.unwrap();
+///
+///         if key == "one" {
+///             assert_eq!(val, 1);
+///         } else {
+///             assert_eq!(val, 3);
+///         }
+///
+///         // Remove the stream to prevent reading the next value
+///         map.remove(key);
+///     }
+/// }
+/// ```
+///
+/// This example models a read-only client to a chat system with channels. The
+/// client sends commands to join and leave channels. `StreamMap` is used to
+/// manage active channel subscriptions.
+///
+/// For simplicity, messages are displayed with `println!`, but they could be
+/// sent to the client over a socket.
+///
+/// ```no_run
+/// use tokio_stream::{Stream, StreamExt, StreamMap};
+///
+/// enum Command {
+///     Join(String),
+///     Leave(String),
+/// }
+///
+/// fn commands() -> impl Stream<Item = Command> {
+///     // Streams in user commands by parsing `stdin`.
+/// # tokio_stream::pending()
+/// }
+///
+/// // Join a channel, returns a stream of messages received on the channel.
+/// fn join(channel: &str) -> impl Stream<Item = String> + Unpin {
+///     // left as an exercise to the reader
+/// # tokio_stream::pending()
+/// }
+///
+/// #[tokio::main]
+/// async fn main() {
+///     let mut channels = StreamMap::new();
+///
+///     // Input commands (join / leave channels).
+///     let cmds = commands();
+///     tokio::pin!(cmds);
+///
+///     loop {
+///         tokio::select! {
+///             Some(cmd) = cmds.next() => {
+///                 match cmd {
+///                     Command::Join(chan) => {
+///                         // Join the channel and add it to the `channels`
+///                         // stream map
+///                         let msgs = join(&chan);
+///                         channels.insert(chan, msgs);
+///                     }
+///                     Command::Leave(chan) => {
+///                         channels.remove(&chan);
+///                     }
+///                 }
+///             }
+///             Some((chan, msg)) = channels.next() => {
+///                 // Received a message, display it on stdout with the channel
+///                 // it originated from.
+///                 println!("{}: {}", chan, msg);
+///             }
+///             // Both the `commands` stream and the `channels` stream are
+///             // complete. There is no more work to do, so leave the loop.
+///             else => break,
+///         }
+///     }
+/// }
+/// ```
+#[derive(Debug)]
+pub struct StreamMap<K, V> {
+    /// Streams stored in the map
+    entries: Vec<(K, V)>,
+}
+
+impl<K, V> StreamMap<K, V> {
+    /// An iterator visiting all key-value pairs in arbitrary order.
+    ///
+    /// The iterator element type is &'a (K, V).
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// use tokio_stream::{StreamMap, pending};
+    ///
+    /// let mut map = StreamMap::new();
+    ///
+    /// map.insert("a", pending::<i32>());
+    /// map.insert("b", pending());
+    /// map.insert("c", pending());
+    ///
+    /// for (key, stream) in map.iter() {
+    ///     println!("({}, {:?})", key, stream);
+    /// }
+    /// ```
+    pub fn iter(&self) -> impl Iterator<Item = &(K, V)> {
+        self.entries.iter()
+    }
+
+    /// An iterator visiting all key-value pairs mutably in arbitrary order.
+    ///
+    /// The iterator element type is &'a mut (K, V).
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// use tokio_stream::{StreamMap, pending};
+    ///
+    /// let mut map = StreamMap::new();
+    ///
+    /// map.insert("a", pending::<i32>());
+    /// map.insert("b", pending());
+    /// map.insert("c", pending());
+    ///
+    /// for (key, stream) in map.iter_mut() {
+    ///     println!("({}, {:?})", key, stream);
+    /// }
+    /// ```
+    pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut (K, V)> {
+        self.entries.iter_mut()
+    }
+
+    /// Creates an empty `StreamMap`.
+    ///
+    /// The stream map is initially created with a capacity of `0`, so it will
+    /// not allocate until it is first inserted into.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// use tokio_stream::{StreamMap, Pending};
+    ///
+    /// let map: StreamMap<&str, Pending<()>> = StreamMap::new();
+    /// ```
+    pub fn new() -> StreamMap<K, V> {
+        StreamMap { entries: vec![] }
+    }
+
+    /// Creates an empty `StreamMap` with the specified capacity.
+    ///
+    /// The stream map will be able to hold at least `capacity` elements without
+    /// reallocating. If `capacity` is 0, the stream map will not allocate.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// use tokio_stream::{StreamMap, Pending};
+    ///
+    /// let map: StreamMap<&str, Pending<()>> = StreamMap::with_capacity(10);
+    /// ```
+    pub fn with_capacity(capacity: usize) -> StreamMap<K, V> {
+        StreamMap {
+            entries: Vec::with_capacity(capacity),
+        }
+    }
+
+    /// Returns an iterator visiting all keys in arbitrary order.
+    ///
+    /// The iterator element type is &'a K.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// use tokio_stream::{StreamMap, pending};
+    ///
+    /// let mut map = StreamMap::new();
+    ///
+    /// map.insert("a", pending::<i32>());
+    /// map.insert("b", pending());
+    /// map.insert("c", pending());
+    ///
+    /// for key in map.keys() {
+    ///     println!("{}", key);
+    /// }
+    /// ```
+    pub fn keys(&self) -> impl Iterator<Item = &K> {
+        self.iter().map(|(k, _)| k)
+    }
+
+    /// An iterator visiting all values in arbitrary order.
+    ///
+    /// The iterator element type is &'a V.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// use tokio_stream::{StreamMap, pending};
+    ///
+    /// let mut map = StreamMap::new();
+    ///
+    /// map.insert("a", pending::<i32>());
+    /// map.insert("b", pending());
+    /// map.insert("c", pending());
+    ///
+    /// for stream in map.values() {
+    ///     println!("{:?}", stream);
+    /// }
+    /// ```
+    pub fn values(&self) -> impl Iterator<Item = &V> {
+        self.iter().map(|(_, v)| v)
+    }
+
+    /// An iterator visiting all values mutably in arbitrary order.
+    ///
+    /// The iterator element type is &'a mut V.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// use tokio_stream::{StreamMap, pending};
+    ///
+    /// let mut map = StreamMap::new();
+    ///
+    /// map.insert("a", pending::<i32>());
+    /// map.insert("b", pending());
+    /// map.insert("c", pending());
+    ///
+    /// for stream in map.values_mut() {
+    ///     println!("{:?}", stream);
+    /// }
+    /// ```
+    pub fn values_mut(&mut self) -> impl Iterator<Item = &mut V> {
+        self.iter_mut().map(|(_, v)| v)
+    }
+
+    /// Returns the number of streams the map can hold without reallocating.
+    ///
+    /// This number is a lower bound; the `StreamMap` might be able to hold
+    /// more, but is guaranteed to be able to hold at least this many.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// use tokio_stream::{StreamMap, Pending};
+    ///
+    /// let map: StreamMap<i32, Pending<()>> = StreamMap::with_capacity(100);
+    /// assert!(map.capacity() >= 100);
+    /// ```
+    pub fn capacity(&self) -> usize {
+        self.entries.capacity()
+    }
+
+    /// Returns the number of streams in the map.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// use tokio_stream::{StreamMap, pending};
+    ///
+    /// let mut a = StreamMap::new();
+    /// assert_eq!(a.len(), 0);
+    /// a.insert(1, pending::<i32>());
+    /// assert_eq!(a.len(), 1);
+    /// ```
+    pub fn len(&self) -> usize {
+        self.entries.len()
+    }
+
+    /// Returns `true` if the map contains no elements.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// use std::collections::HashMap;
+    ///
+    /// let mut a = HashMap::new();
+    /// assert!(a.is_empty());
+    /// a.insert(1, "a");
+    /// assert!(!a.is_empty());
+    /// ```
+    pub fn is_empty(&self) -> bool {
+        self.entries.is_empty()
+    }
+
+    /// Clears the map, removing all key-stream pairs. Keeps the allocated
+    /// memory for reuse.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// use tokio_stream::{StreamMap, pending};
+    ///
+    /// let mut a = StreamMap::new();
+    /// a.insert(1, pending::<i32>());
+    /// a.clear();
+    /// assert!(a.is_empty());
+    /// ```
+    pub fn clear(&mut self) {
+        self.entries.clear();
+    }
+
+    /// Insert a key-stream pair into the map.
+    ///
+    /// If the map did not have this key present, `None` is returned.
+    ///
+    /// If the map did have this key present, the new `stream` replaces the old
+    /// one and the old stream is returned.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// use tokio_stream::{StreamMap, pending};
+    ///
+    /// let mut map = StreamMap::new();
+    ///
+    /// assert!(map.insert(37, pending::<i32>()).is_none());
+    /// assert!(!map.is_empty());
+    ///
+    /// map.insert(37, pending());
+    /// assert!(map.insert(37, pending()).is_some());
+    /// ```
+    pub fn insert(&mut self, k: K, stream: V) -> Option<V>
+    where
+        K: Hash + Eq,
+    {
+        let ret = self.remove(&k);
+        self.entries.push((k, stream));
+
+        ret
+    }
+
+    /// Removes a key from the map, returning the stream at the key if the key was previously in the map.
+    ///
+    /// The key may be any borrowed form of the map's key type, but `Hash` and
+    /// `Eq` on the borrowed form must match those for the key type.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// use tokio_stream::{StreamMap, pending};
+    ///
+    /// let mut map = StreamMap::new();
+    /// map.insert(1, pending::<i32>());
+    /// assert!(map.remove(&1).is_some());
+    /// assert!(map.remove(&1).is_none());
+    /// ```
+    pub fn remove<Q: ?Sized>(&mut self, k: &Q) -> Option<V>
+    where
+        K: Borrow<Q>,
+        Q: Hash + Eq,
+    {
+        for i in 0..self.entries.len() {
+            if self.entries[i].0.borrow() == k {
+                return Some(self.entries.swap_remove(i).1);
+            }
+        }
+
+        None
+    }
+
+    /// Returns `true` if the map contains a stream for the specified key.
+    ///
+    /// The key may be any borrowed form of the map's key type, but `Hash` and
+    /// `Eq` on the borrowed form must match those for the key type.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// use tokio_stream::{StreamMap, pending};
+    ///
+    /// let mut map = StreamMap::new();
+    /// map.insert(1, pending::<i32>());
+    /// assert_eq!(map.contains_key(&1), true);
+    /// assert_eq!(map.contains_key(&2), false);
+    /// ```
+    pub fn contains_key<Q: ?Sized>(&self, k: &Q) -> bool
+    where
+        K: Borrow<Q>,
+        Q: Hash + Eq,
+    {
+        for i in 0..self.entries.len() {
+            if self.entries[i].0.borrow() == k {
+                return true;
+            }
+        }
+
+        false
+    }
+}
+
+impl<K, V> StreamMap<K, V>
+where
+    K: Unpin,
+    V: Stream + Unpin,
+{
+    /// Polls the next value, includes the vec entry index
+    fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, V::Item)>> {
+        use Poll::*;
+
+        let start = self::rand::thread_rng_n(self.entries.len() as u32) as usize;
+        let mut idx = start;
+
+        for _ in 0..self.entries.len() {
+            let (_, stream) = &mut self.entries[idx];
+
+            match Pin::new(stream).poll_next(cx) {
+                Ready(Some(val)) => return Ready(Some((idx, val))),
+                Ready(None) => {
+                    // Remove the entry
+                    self.entries.swap_remove(idx);
+
+                    // Check if this was the last entry, if so the cursor needs
+                    // to wrap
+                    if idx == self.entries.len() {
+                        idx = 0;
+                    } else if idx < start && start <= self.entries.len() {
+                        // The stream being swapped into the current index has
+                        // already been polled, so skip it.
+                        idx = idx.wrapping_add(1) % self.entries.len();
+                    }
+                }
+                Pending => {
+                    idx = idx.wrapping_add(1) % self.entries.len();
+                }
+            }
+        }
+
+        // If the map is empty, then the stream is complete.
+        if self.entries.is_empty() {
+            Ready(None)
+        } else {
+            Pending
+        }
+    }
+}
+
+impl<K, V> Default for StreamMap<K, V> {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl<K, V> Stream for StreamMap<K, V>
+where
+    K: Clone + Unpin,
+    V: Stream + Unpin,
+{
+    type Item = (K, V::Item);
+
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        if let Some((idx, val)) = ready!(self.poll_next_entry(cx)) {
+            let key = self.entries[idx].0.clone();
+            Poll::Ready(Some((key, val)))
+        } else {
+            Poll::Ready(None)
+        }
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        let mut ret = (0, Some(0));
+
+        for (_, stream) in &self.entries {
+            let hint = stream.size_hint();
+
+            ret.0 += hint.0;
+
+            match (ret.1, hint.1) {
+                (Some(a), Some(b)) => ret.1 = Some(a + b),
+                (Some(_), None) => ret.1 = None,
+                _ => {}
+            }
+        }
+
+        ret
+    }
+}
+
+mod rand {
+    use std::cell::Cell;
+
+    mod loom {
+        #[cfg(not(loom))]
+        pub(crate) mod rand {
+            use std::collections::hash_map::RandomState;
+            use std::hash::{BuildHasher, Hash, Hasher};
+            use std::sync::atomic::AtomicU32;
+            use std::sync::atomic::Ordering::Relaxed;
+
+            static COUNTER: AtomicU32 = AtomicU32::new(1);
+
+            pub(crate) fn seed() -> u64 {
+                let rand_state = RandomState::new();
+
+                let mut hasher = rand_state.build_hasher();
+
+                // Hash some unique-ish data to generate some new state
+                COUNTER.fetch_add(1, Relaxed).hash(&mut hasher);
+
+                // Get the seed
+                hasher.finish()
+            }
+        }
+
+        #[cfg(loom)]
+        pub(crate) mod rand {
+            pub(crate) fn seed() -> u64 {
+                1
+            }
+        }
+    }
+
+    /// Fast random number generate
+    ///
+    /// Implement xorshift64+: 2 32-bit xorshift sequences added together.
+    /// Shift triplet [17,7,16] was calculated as indicated in Marsaglia's
+    /// Xorshift paper: https://www.jstatsoft.org/article/view/v008i14/xorshift.pdf
+    /// This generator passes the SmallCrush suite, part of TestU01 framework:
+    /// http://simul.iro.umontreal.ca/testu01/tu01.html
+    #[derive(Debug)]
+    pub(crate) struct FastRand {
+        one: Cell<u32>,
+        two: Cell<u32>,
+    }
+
+    impl FastRand {
+        /// Initialize a new, thread-local, fast random number generator.
+        pub(crate) fn new(seed: u64) -> FastRand {
+            let one = (seed >> 32) as u32;
+            let mut two = seed as u32;
+
+            if two == 0 {
+                // This value cannot be zero
+                two = 1;
+            }
+
+            FastRand {
+                one: Cell::new(one),
+                two: Cell::new(two),
+            }
+        }
+
+        pub(crate) fn fastrand_n(&self, n: u32) -> u32 {
+            // This is similar to fastrand() % n, but faster.
+            // See https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
+            let mul = (self.fastrand() as u64).wrapping_mul(n as u64);
+            (mul >> 32) as u32
+        }
+
+        fn fastrand(&self) -> u32 {
+            let mut s1 = self.one.get();
+            let s0 = self.two.get();
+
+            s1 ^= s1 << 17;
+            s1 = s1 ^ s0 ^ s1 >> 7 ^ s0 >> 16;
+
+            self.one.set(s0);
+            self.two.set(s1);
+
+            s0.wrapping_add(s1)
+        }
+    }
+
+    // Used by `StreamMap`
+    pub(crate) fn thread_rng_n(n: u32) -> u32 {
+        thread_local! {
+            static THREAD_RNG: FastRand = FastRand::new(loom::rand::seed());
+        }
+
+        THREAD_RNG.with(|rng| rng.fastrand_n(n))
+    }
+}
diff --git a/src/wrappers.rs b/src/wrappers.rs
new file mode 100644
index 0000000..c0ffb23
--- /dev/null
+++ b/src/wrappers.rs
@@ -0,0 +1,35 @@
+//! Wrappers for Tokio types that implement `Stream`.
+
+mod mpsc_bounded;
+pub use mpsc_bounded::ReceiverStream;
+
+mod mpsc_unbounded;
+pub use mpsc_unbounded::UnboundedReceiverStream;
+
+cfg_time! {
+    mod interval;
+    pub use interval::IntervalStream;
+}
+
+cfg_net! {
+    mod tcp_listener;
+    pub use tcp_listener::TcpListenerStream;
+
+    #[cfg(unix)]
+    mod unix_listener;
+    #[cfg(unix)]
+    pub use unix_listener::UnixListenerStream;
+}
+
+cfg_io_util! {
+    mod split;
+    pub use split::SplitStream;
+
+    mod lines;
+    pub use lines::LinesStream;
+}
+
+cfg_fs! {
+    mod read_dir;
+    pub use read_dir::ReadDirStream;
+}
diff --git a/src/wrappers/interval.rs b/src/wrappers/interval.rs
new file mode 100644
index 0000000..2bf0194
--- /dev/null
+++ b/src/wrappers/interval.rs
@@ -0,0 +1,50 @@
+use crate::Stream;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::time::{Instant, Interval};
+
+/// A wrapper around [`Interval`] that implements [`Stream`].
+///
+/// [`Interval`]: struct@tokio::time::Interval
+/// [`Stream`]: trait@crate::Stream
+#[derive(Debug)]
+#[cfg_attr(docsrs, doc(cfg(feature = "time")))]
+pub struct IntervalStream {
+    inner: Interval,
+}
+
+impl IntervalStream {
+    /// Create a new `IntervalStream`.
+    pub fn new(interval: Interval) -> Self {
+        Self { inner: interval }
+    }
+
+    /// Get back the inner `Interval`.
+    pub fn into_inner(self) -> Interval {
+        self.inner
+    }
+}
+
+impl Stream for IntervalStream {
+    type Item = Instant;
+
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Instant>> {
+        self.inner.poll_tick(cx).map(Some)
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        (std::usize::MAX, None)
+    }
+}
+
+impl AsRef<Interval> for IntervalStream {
+    fn as_ref(&self) -> &Interval {
+        &self.inner
+    }
+}
+
+impl AsMut<Interval> for IntervalStream {
+    fn as_mut(&mut self) -> &mut Interval {
+        &mut self.inner
+    }
+}
diff --git a/src/wrappers/lines.rs b/src/wrappers/lines.rs
new file mode 100644
index 0000000..ad3c253
--- /dev/null
+++ b/src/wrappers/lines.rs
@@ -0,0 +1,60 @@
+use crate::Stream;
+use pin_project_lite::pin_project;
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::io::{AsyncBufRead, Lines};
+
+pin_project! {
+    /// A wrapper around [`tokio::io::Lines`] that implements [`Stream`].
+    ///
+    /// [`tokio::io::Lines`]: struct@tokio::io::Lines
+    /// [`Stream`]: trait@crate::Stream
+    #[derive(Debug)]
+    #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
+    pub struct LinesStream<R> {
+        #[pin]
+        inner: Lines<R>,
+    }
+}
+
+impl<R> LinesStream<R> {
+    /// Create a new `LinesStream`.
+    pub fn new(lines: Lines<R>) -> Self {
+        Self { inner: lines }
+    }
+
+    /// Get back the inner `Lines`.
+    pub fn into_inner(self) -> Lines<R> {
+        self.inner
+    }
+
+    /// Obtain a pinned reference to the inner `Lines<R>`.
+    #[allow(clippy::wrong_self_convention)] // https://github.com/rust-lang/rust-clippy/issues/4546
+    pub fn as_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Lines<R>> {
+        self.project().inner
+    }
+}
+
+impl<R: AsyncBufRead> Stream for LinesStream<R> {
+    type Item = io::Result<String>;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        self.project()
+            .inner
+            .poll_next_line(cx)
+            .map(Result::transpose)
+    }
+}
+
+impl<R> AsRef<Lines<R>> for LinesStream<R> {
+    fn as_ref(&self) -> &Lines<R> {
+        &self.inner
+    }
+}
+
+impl<R> AsMut<Lines<R>> for LinesStream<R> {
+    fn as_mut(&mut self) -> &mut Lines<R> {
+        &mut self.inner
+    }
+}
diff --git a/src/wrappers/mpsc_bounded.rs b/src/wrappers/mpsc_bounded.rs
new file mode 100644
index 0000000..e4f9000
--- /dev/null
+++ b/src/wrappers/mpsc_bounded.rs
@@ -0,0 +1,59 @@
+use crate::Stream;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::sync::mpsc::Receiver;
+
+/// A wrapper around [`tokio::sync::mpsc::Receiver`] that implements [`Stream`].
+///
+/// [`tokio::sync::mpsc::Receiver`]: struct@tokio::sync::mpsc::Receiver
+/// [`Stream`]: trait@crate::Stream
+#[derive(Debug)]
+pub struct ReceiverStream<T> {
+    inner: Receiver<T>,
+}
+
+impl<T> ReceiverStream<T> {
+    /// Create a new `ReceiverStream`.
+    pub fn new(recv: Receiver<T>) -> Self {
+        Self { inner: recv }
+    }
+
+    /// Get back the inner `Receiver`.
+    pub fn into_inner(self) -> Receiver<T> {
+        self.inner
+    }
+
+    /// Closes the receiving half of a channel without dropping it.
+    ///
+    /// This prevents any further messages from being sent on the channel while
+    /// still enabling the receiver to drain messages that are buffered. Any
+    /// outstanding [`Permit`] values will still be able to send messages.
+    ///
+    /// To guarantee no messages are dropped, after calling `close()`, you must
+    /// receive all items from the stream until `None` is returned.
+    ///
+    /// [`Permit`]: struct@tokio::sync::mpsc::Permit
+    pub fn close(&mut self) {
+        self.inner.close()
+    }
+}
+
+impl<T> Stream for ReceiverStream<T> {
+    type Item = T;
+
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        self.inner.poll_recv(cx)
+    }
+}
+
+impl<T> AsRef<Receiver<T>> for ReceiverStream<T> {
+    fn as_ref(&self) -> &Receiver<T> {
+        &self.inner
+    }
+}
+
+impl<T> AsMut<Receiver<T>> for ReceiverStream<T> {
+    fn as_mut(&mut self) -> &mut Receiver<T> {
+        &mut self.inner
+    }
+}
diff --git a/src/wrappers/mpsc_unbounded.rs b/src/wrappers/mpsc_unbounded.rs
new file mode 100644
index 0000000..bc5f40c
--- /dev/null
+++ b/src/wrappers/mpsc_unbounded.rs
@@ -0,0 +1,53 @@
+use crate::Stream;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::sync::mpsc::UnboundedReceiver;
+
+/// A wrapper around [`tokio::sync::mpsc::UnboundedReceiver`] that implements [`Stream`].
+///
+/// [`tokio::sync::mpsc::UnboundedReceiver`]: struct@tokio::sync::mpsc::UnboundedReceiver
+/// [`Stream`]: trait@crate::Stream
+#[derive(Debug)]
+pub struct UnboundedReceiverStream<T> {
+    inner: UnboundedReceiver<T>,
+}
+
+impl<T> UnboundedReceiverStream<T> {
+    /// Create a new `UnboundedReceiverStream`.
+    pub fn new(recv: UnboundedReceiver<T>) -> Self {
+        Self { inner: recv }
+    }
+
+    /// Get back the inner `UnboundedReceiver`.
+    pub fn into_inner(self) -> UnboundedReceiver<T> {
+        self.inner
+    }
+
+    /// Closes the receiving half of a channel without dropping it.
+    ///
+    /// This prevents any further messages from being sent on the channel while
+    /// still enabling the receiver to drain messages that are buffered.
+    pub fn close(&mut self) {
+        self.inner.close()
+    }
+}
+
+impl<T> Stream for UnboundedReceiverStream<T> {
+    type Item = T;
+
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        self.inner.poll_recv(cx)
+    }
+}
+
+impl<T> AsRef<UnboundedReceiver<T>> for UnboundedReceiverStream<T> {
+    fn as_ref(&self) -> &UnboundedReceiver<T> {
+        &self.inner
+    }
+}
+
+impl<T> AsMut<UnboundedReceiver<T>> for UnboundedReceiverStream<T> {
+    fn as_mut(&mut self) -> &mut UnboundedReceiver<T> {
+        &mut self.inner
+    }
+}
diff --git a/src/wrappers/read_dir.rs b/src/wrappers/read_dir.rs
new file mode 100644
index 0000000..b5cf54f
--- /dev/null
+++ b/src/wrappers/read_dir.rs
@@ -0,0 +1,47 @@
+use crate::Stream;
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::fs::{DirEntry, ReadDir};
+
+/// A wrapper around [`tokio::fs::ReadDir`] that implements [`Stream`].
+///
+/// [`tokio::fs::ReadDir`]: struct@tokio::fs::ReadDir
+/// [`Stream`]: trait@crate::Stream
+#[derive(Debug)]
+#[cfg_attr(docsrs, doc(cfg(feature = "fs")))]
+pub struct ReadDirStream {
+    inner: ReadDir,
+}
+
+impl ReadDirStream {
+    /// Create a new `ReadDirStream`.
+    pub fn new(read_dir: ReadDir) -> Self {
+        Self { inner: read_dir }
+    }
+
+    /// Get back the inner `ReadDir`.
+    pub fn into_inner(self) -> ReadDir {
+        self.inner
+    }
+}
+
+impl Stream for ReadDirStream {
+    type Item = io::Result<DirEntry>;
+
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        self.inner.poll_next_entry(cx).map(Result::transpose)
+    }
+}
+
+impl AsRef<ReadDir> for ReadDirStream {
+    fn as_ref(&self) -> &ReadDir {
+        &self.inner
+    }
+}
+
+impl AsMut<ReadDir> for ReadDirStream {
+    fn as_mut(&mut self) -> &mut ReadDir {
+        &mut self.inner
+    }
+}
diff --git a/src/wrappers/split.rs b/src/wrappers/split.rs
new file mode 100644
index 0000000..5a6bb2d
--- /dev/null
+++ b/src/wrappers/split.rs
@@ -0,0 +1,60 @@
+use crate::Stream;
+use pin_project_lite::pin_project;
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::io::{AsyncBufRead, Split};
+
+pin_project! {
+    /// A wrapper around [`tokio::io::Split`] that implements [`Stream`].
+    ///
+    /// [`tokio::io::Split`]: struct@tokio::io::Split
+    /// [`Stream`]: trait@crate::Stream
+    #[derive(Debug)]
+    #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
+    pub struct SplitStream<R> {
+        #[pin]
+        inner: Split<R>,
+    }
+}
+
+impl<R> SplitStream<R> {
+    /// Create a new `SplitStream`.
+    pub fn new(split: Split<R>) -> Self {
+        Self { inner: split }
+    }
+
+    /// Get back the inner `Split`.
+    pub fn into_inner(self) -> Split<R> {
+        self.inner
+    }
+
+    /// Obtain a pinned reference to the inner `Split<R>`.
+    #[allow(clippy::wrong_self_convention)] // https://github.com/rust-lang/rust-clippy/issues/4546
+    pub fn as_pin_mut(self: Pin<&mut Self>) -> Pin<&mut Split<R>> {
+        self.project().inner
+    }
+}
+
+impl<R: AsyncBufRead> Stream for SplitStream<R> {
+    type Item = io::Result<Vec<u8>>;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        self.project()
+            .inner
+            .poll_next_segment(cx)
+            .map(Result::transpose)
+    }
+}
+
+impl<R> AsRef<Split<R>> for SplitStream<R> {
+    fn as_ref(&self) -> &Split<R> {
+        &self.inner
+    }
+}
+
+impl<R> AsMut<Split<R>> for SplitStream<R> {
+    fn as_mut(&mut self) -> &mut Split<R> {
+        &mut self.inner
+    }
+}
diff --git a/src/wrappers/tcp_listener.rs b/src/wrappers/tcp_listener.rs
new file mode 100644
index 0000000..ce7cb16
--- /dev/null
+++ b/src/wrappers/tcp_listener.rs
@@ -0,0 +1,54 @@
+use crate::Stream;
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::net::{TcpListener, TcpStream};
+
+/// A wrapper around [`TcpListener`] that implements [`Stream`].
+///
+/// [`TcpListener`]: struct@tokio::net::TcpListener
+/// [`Stream`]: trait@crate::Stream
+#[derive(Debug)]
+#[cfg_attr(docsrs, doc(cfg(feature = "net")))]
+pub struct TcpListenerStream {
+    inner: TcpListener,
+}
+
+impl TcpListenerStream {
+    /// Create a new `TcpListenerStream`.
+    pub fn new(listener: TcpListener) -> Self {
+        Self { inner: listener }
+    }
+
+    /// Get back the inner `TcpListener`.
+    pub fn into_inner(self) -> TcpListener {
+        self.inner
+    }
+}
+
+impl Stream for TcpListenerStream {
+    type Item = io::Result<TcpStream>;
+
+    fn poll_next(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<io::Result<TcpStream>>> {
+        match self.inner.poll_accept(cx) {
+            Poll::Ready(Ok((stream, _))) => Poll::Ready(Some(Ok(stream))),
+            Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
+            Poll::Pending => Poll::Pending,
+        }
+    }
+}
+
+impl AsRef<TcpListener> for TcpListenerStream {
+    fn as_ref(&self) -> &TcpListener {
+        &self.inner
+    }
+}
+
+impl AsMut<TcpListener> for TcpListenerStream {
+    fn as_mut(&mut self) -> &mut TcpListener {
+        &mut self.inner
+    }
+}
diff --git a/src/wrappers/unix_listener.rs b/src/wrappers/unix_listener.rs
new file mode 100644
index 0000000..0beba58
--- /dev/null
+++ b/src/wrappers/unix_listener.rs
@@ -0,0 +1,54 @@
+use crate::Stream;
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tokio::net::{UnixListener, UnixStream};
+
+/// A wrapper around [`UnixListener`] that implements [`Stream`].
+///
+/// [`UnixListener`]: struct@tokio::net::UnixListener
+/// [`Stream`]: trait@crate::Stream
+#[derive(Debug)]
+#[cfg_attr(docsrs, doc(cfg(all(unix, feature = "net"))))]
+pub struct UnixListenerStream {
+    inner: UnixListener,
+}
+
+impl UnixListenerStream {
+    /// Create a new `UnixListenerStream`.
+    pub fn new(listener: UnixListener) -> Self {
+        Self { inner: listener }
+    }
+
+    /// Get back the inner `UnixListener`.
+    pub fn into_inner(self) -> UnixListener {
+        self.inner
+    }
+}
+
+impl Stream for UnixListenerStream {
+    type Item = io::Result<UnixStream>;
+
+    fn poll_next(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<io::Result<UnixStream>>> {
+        match self.inner.poll_accept(cx) {
+            Poll::Ready(Ok((stream, _))) => Poll::Ready(Some(Ok(stream))),
+            Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
+            Poll::Pending => Poll::Pending,
+        }
+    }
+}
+
+impl AsRef<UnixListener> for UnixListenerStream {
+    fn as_ref(&self) -> &UnixListener {
+        &self.inner
+    }
+}
+
+impl AsMut<UnixListener> for UnixListenerStream {
+    fn as_mut(&mut self) -> &mut UnixListener {
+        &mut self.inner
+    }
+}
diff --git a/tests/async_send_sync.rs b/tests/async_send_sync.rs
new file mode 100644
index 0000000..c06bebd
--- /dev/null
+++ b/tests/async_send_sync.rs
@@ -0,0 +1,105 @@
+use std::rc::Rc;
+
+#[allow(dead_code)]
+type BoxStream<T> = std::pin::Pin<Box<dyn tokio_stream::Stream<Item = T>>>;
+
+#[allow(dead_code)]
+fn require_send<T: Send>(_t: &T) {}
+#[allow(dead_code)]
+fn require_sync<T: Sync>(_t: &T) {}
+#[allow(dead_code)]
+fn require_unpin<T: Unpin>(_t: &T) {}
+
+#[allow(dead_code)]
+struct Invalid;
+
+trait AmbiguousIfSend<A> {
+    fn some_item(&self) {}
+}
+impl<T: ?Sized> AmbiguousIfSend<()> for T {}
+impl<T: ?Sized + Send> AmbiguousIfSend<Invalid> for T {}
+
+trait AmbiguousIfSync<A> {
+    fn some_item(&self) {}
+}
+impl<T: ?Sized> AmbiguousIfSync<()> for T {}
+impl<T: ?Sized + Sync> AmbiguousIfSync<Invalid> for T {}
+
+trait AmbiguousIfUnpin<A> {
+    fn some_item(&self) {}
+}
+impl<T: ?Sized> AmbiguousIfUnpin<()> for T {}
+impl<T: ?Sized + Unpin> AmbiguousIfUnpin<Invalid> for T {}
+
+macro_rules! into_todo {
+    ($typ:ty) => {{
+        let x: $typ = todo!();
+        x
+    }};
+}
+
+macro_rules! async_assert_fn {
+    ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): Send & Sync) => {
+        #[allow(unreachable_code)]
+        #[allow(unused_variables)]
+        const _: fn() = || {
+            let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* );
+            require_send(&f);
+            require_sync(&f);
+        };
+    };
+    ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): Send & !Sync) => {
+        #[allow(unreachable_code)]
+        #[allow(unused_variables)]
+        const _: fn() = || {
+            let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* );
+            require_send(&f);
+            AmbiguousIfSync::some_item(&f);
+        };
+    };
+    ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): !Send & Sync) => {
+        #[allow(unreachable_code)]
+        #[allow(unused_variables)]
+        const _: fn() = || {
+            let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* );
+            AmbiguousIfSend::some_item(&f);
+            require_sync(&f);
+        };
+    };
+    ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): !Send & !Sync) => {
+        #[allow(unreachable_code)]
+        #[allow(unused_variables)]
+        const _: fn() = || {
+            let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* );
+            AmbiguousIfSend::some_item(&f);
+            AmbiguousIfSync::some_item(&f);
+        };
+    };
+    ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): !Unpin) => {
+        #[allow(unreachable_code)]
+        #[allow(unused_variables)]
+        const _: fn() = || {
+            let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* );
+            AmbiguousIfUnpin::some_item(&f);
+        };
+    };
+    ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): Unpin) => {
+        #[allow(unreachable_code)]
+        #[allow(unused_variables)]
+        const _: fn() = || {
+            let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* );
+            require_unpin(&f);
+        };
+    };
+}
+
+async_assert_fn!(tokio_stream::empty<Rc<u8>>(): Send & Sync);
+async_assert_fn!(tokio_stream::pending<Rc<u8>>(): Send & Sync);
+async_assert_fn!(tokio_stream::iter(std::vec::IntoIter<u8>): Send & Sync);
+
+async_assert_fn!(tokio_stream::StreamExt::next(&mut BoxStream<()>): !Unpin);
+async_assert_fn!(tokio_stream::StreamExt::try_next(&mut BoxStream<Result<(), ()>>): !Unpin);
+async_assert_fn!(tokio_stream::StreamExt::all(&mut BoxStream<()>, fn(())->bool): !Unpin);
+async_assert_fn!(tokio_stream::StreamExt::any(&mut BoxStream<()>, fn(())->bool): !Unpin);
+async_assert_fn!(tokio_stream::StreamExt::fold(&mut BoxStream<()>, (), fn((), ())->()): !Unpin);
+async_assert_fn!(tokio_stream::StreamExt::collect<Vec<()>>(&mut BoxStream<()>): !Unpin);
diff --git a/tests/stream_chain.rs b/tests/stream_chain.rs
new file mode 100644
index 0000000..759de30
--- /dev/null
+++ b/tests/stream_chain.rs
@@ -0,0 +1,100 @@
+use tokio_stream::{self as stream, Stream, StreamExt};
+use tokio_test::{assert_pending, assert_ready, task};
+
+mod support {
+    pub(crate) mod mpsc;
+}
+
+use support::mpsc;
+
+#[tokio::test]
+async fn basic_usage() {
+    let one = stream::iter(vec![1, 2, 3]);
+    let two = stream::iter(vec![4, 5, 6]);
+
+    let mut stream = one.chain(two);
+
+    assert_eq!(stream.size_hint(), (6, Some(6)));
+    assert_eq!(stream.next().await, Some(1));
+
+    assert_eq!(stream.size_hint(), (5, Some(5)));
+    assert_eq!(stream.next().await, Some(2));
+
+    assert_eq!(stream.size_hint(), (4, Some(4)));
+    assert_eq!(stream.next().await, Some(3));
+
+    assert_eq!(stream.size_hint(), (3, Some(3)));
+    assert_eq!(stream.next().await, Some(4));
+
+    assert_eq!(stream.size_hint(), (2, Some(2)));
+    assert_eq!(stream.next().await, Some(5));
+
+    assert_eq!(stream.size_hint(), (1, Some(1)));
+    assert_eq!(stream.next().await, Some(6));
+
+    assert_eq!(stream.size_hint(), (0, Some(0)));
+    assert_eq!(stream.next().await, None);
+
+    assert_eq!(stream.size_hint(), (0, Some(0)));
+    assert_eq!(stream.next().await, None);
+}
+
+#[tokio::test]
+async fn pending_first() {
+    let (tx1, rx1) = mpsc::unbounded_channel_stream();
+    let (tx2, rx2) = mpsc::unbounded_channel_stream();
+
+    let mut stream = task::spawn(rx1.chain(rx2));
+    assert_eq!(stream.size_hint(), (0, None));
+
+    assert_pending!(stream.poll_next());
+
+    tx2.send(2).unwrap();
+    assert!(!stream.is_woken());
+
+    assert_pending!(stream.poll_next());
+
+    tx1.send(1).unwrap();
+    assert!(stream.is_woken());
+    assert_eq!(Some(1), assert_ready!(stream.poll_next()));
+
+    assert_pending!(stream.poll_next());
+
+    drop(tx1);
+
+    assert_eq!(stream.size_hint(), (0, None));
+
+    assert!(stream.is_woken());
+    assert_eq!(Some(2), assert_ready!(stream.poll_next()));
+
+    assert_eq!(stream.size_hint(), (0, None));
+
+    drop(tx2);
+
+    assert_eq!(stream.size_hint(), (0, None));
+    assert_eq!(None, assert_ready!(stream.poll_next()));
+}
+
+#[test]
+fn size_overflow() {
+    struct Monster;
+
+    impl tokio_stream::Stream for Monster {
+        type Item = ();
+        fn poll_next(
+            self: std::pin::Pin<&mut Self>,
+            _cx: &mut std::task::Context<'_>,
+        ) -> std::task::Poll<Option<()>> {
+            panic!()
+        }
+
+        fn size_hint(&self) -> (usize, Option<usize>) {
+            (usize::max_value(), Some(usize::max_value()))
+        }
+    }
+
+    let m1 = Monster;
+    let m2 = Monster;
+    let m = m1.chain(m2);
+    assert_eq!(m.size_hint(), (usize::max_value(), None));
+}
diff --git a/tests/stream_collect.rs b/tests/stream_collect.rs
new file mode 100644
index 0000000..07659a1
--- /dev/null
+++ b/tests/stream_collect.rs
@@ -0,0 +1,146 @@
+use tokio_stream::{self as stream, StreamExt};
+use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task};
+
+mod support {
+    pub(crate) mod mpsc;
+}
+
+use support::mpsc;
+
+#[allow(clippy::let_unit_value)]
+#[tokio::test]
+async fn empty_unit() {
+    // Drains the stream.
+    let mut iter = vec![(), (), ()].into_iter();
+    let _: () = stream::iter(&mut iter).collect().await;
+    assert!(iter.next().is_none());
+}
+
+#[tokio::test]
+async fn empty_vec() {
+    let coll: Vec<u32> = stream::empty().collect().await;
+    assert!(coll.is_empty());
+}
+
+#[tokio::test]
+async fn empty_box_slice() {
+    let coll: Box<[u32]> = stream::empty().collect().await;
+    assert!(coll.is_empty());
+}
+
+#[tokio::test]
+async fn empty_string() {
+    let coll: String = stream::empty::<&str>().collect().await;
+    assert!(coll.is_empty());
+}
+
+#[tokio::test]
+async fn empty_result() {
+    let coll: Result<Vec<u32>, &str> = stream::empty().collect().await;
+    assert_eq!(Ok(vec![]), coll);
+}
+
+#[tokio::test]
+async fn collect_vec_items() {
+    let (tx, rx) = mpsc::unbounded_channel_stream();
+    let mut fut = task::spawn(rx.collect::<Vec<i32>>());
+
+    assert_pending!(fut.poll());
+
+    tx.send(1).unwrap();
+    assert!(fut.is_woken());
+    assert_pending!(fut.poll());
+
+    tx.send(2).unwrap();
+    assert!(fut.is_woken());
+    assert_pending!(fut.poll());
+
+    drop(tx);
+    assert!(fut.is_woken());
+    let coll = assert_ready!(fut.poll());
+    assert_eq!(vec![1, 2], coll);
+}
+
+#[tokio::test]
+async fn collect_string_items() {
+    let (tx, rx) = mpsc::unbounded_channel_stream();
+
+    let mut fut = task::spawn(rx.collect::<String>());
+
+    assert_pending!(fut.poll());
+
+    tx.send("hello ".to_string()).unwrap();
+    assert!(fut.is_woken());
+    assert_pending!(fut.poll());
+
+    tx.send("world".to_string()).unwrap();
+    assert!(fut.is_woken());
+    assert_pending!(fut.poll());
+
+    drop(tx);
+    assert!(fut.is_woken());
+    let coll = assert_ready!(fut.poll());
+    assert_eq!("hello world", coll);
+}
+
+#[tokio::test]
+async fn collect_str_items() {
+    let (tx, rx) = mpsc::unbounded_channel_stream();
+
+    let mut fut = task::spawn(rx.collect::<String>());
+
+    assert_pending!(fut.poll());
+
+    tx.send("hello ").unwrap();
+    assert!(fut.is_woken());
+    assert_pending!(fut.poll());
+
+    tx.send("world").unwrap();
+    assert!(fut.is_woken());
+    assert_pending!(fut.poll());
+
+    drop(tx);
+    assert!(fut.is_woken());
+    let coll = assert_ready!(fut.poll());
+    assert_eq!("hello world", coll);
+}
+
+#[tokio::test]
+async fn collect_results_ok() {
+    let (tx, rx) = mpsc::unbounded_channel_stream();
+
+    let mut fut = task::spawn(rx.collect::<Result<String, &str>>());
+
+    assert_pending!(fut.poll());
+
+    tx.send(Ok("hello ")).unwrap();
+    assert!(fut.is_woken());
+    assert_pending!(fut.poll());
+
+    tx.send(Ok("world")).unwrap();
+    assert!(fut.is_woken());
+    assert_pending!(fut.poll());
+
+    drop(tx);
+    assert!(fut.is_woken());
+    let coll = assert_ready_ok!(fut.poll());
+    assert_eq!("hello world", coll);
+}
+
+#[tokio::test]
+async fn collect_results_err() {
+    let (tx, rx) = mpsc::unbounded_channel_stream();
+
+    let mut fut = task::spawn(rx.collect::<Result<String, &str>>());
+
+    assert_pending!(fut.poll());
+
+    tx.send(Ok("hello ")).unwrap();
+    assert!(fut.is_woken());
+    assert_pending!(fut.poll());
+
+    tx.send(Err("oh no")).unwrap();
+    assert!(fut.is_woken());
+    let err = assert_ready_err!(fut.poll());
+    assert_eq!("oh no", err);
+}
diff --git a/tests/stream_empty.rs b/tests/stream_empty.rs
new file mode 100644
index 0000000..c06f5c4
--- /dev/null
+++ b/tests/stream_empty.rs
@@ -0,0 +1,11 @@
+use tokio_stream::{self as stream, Stream, StreamExt};
+
+#[tokio::test]
+async fn basic_usage() {
+    let mut stream = stream::empty::<i32>();
+
+    for _ in 0..2 {
+        assert_eq!(stream.size_hint(), (0, Some(0)));
+        assert_eq!(None, stream.next().await);
+    }
+}
diff --git a/tests/stream_fuse.rs b/tests/stream_fuse.rs
new file mode 100644
index 0000000..9b6cf05
--- /dev/null
+++ b/tests/stream_fuse.rs
@@ -0,0 +1,50 @@
+use tokio_stream::{Stream, StreamExt};
+
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+// a stream which alternates between Some and None
+struct Alternate {
+    state: i32,
+}
+
+impl Stream for Alternate {
+    type Item = i32;
+
+    fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
+        let val = self.state;
+        self.state += 1;
+
+        // if it's even, Some(i32), else None
+        if val % 2 == 0 {
+            Poll::Ready(Some(val))
+        } else {
+            Poll::Ready(None)
+        }
+    }
+}
+
+#[tokio::test]
+async fn basic_usage() {
+    let mut stream = Alternate { state: 0 };
+
+    // the stream goes back and forth
+    assert_eq!(stream.next().await, Some(0));
+    assert_eq!(stream.next().await, None);
+    assert_eq!(stream.next().await, Some(2));
+    assert_eq!(stream.next().await, None);
+
+    // however, once it is fused
+    let mut stream = stream.fuse();
+
+    assert_eq!(stream.size_hint(), (0, None));
+    assert_eq!(stream.next().await, Some(4));
+
+    assert_eq!(stream.size_hint(), (0, None));
+    assert_eq!(stream.next().await, None);
+
+    // it will always return `None` after the first time.
+    assert_eq!(stream.size_hint(), (0, Some(0)));
+    assert_eq!(stream.next().await, None);
+    assert_eq!(stream.size_hint(), (0, Some(0)));
+}
diff --git a/tests/stream_iter.rs b/tests/stream_iter.rs
new file mode 100644
index 0000000..8b9ee3c
--- /dev/null
+++ b/tests/stream_iter.rs
@@ -0,0 +1,18 @@
+use tokio_stream as stream;
+use tokio_test::task;
+
+use std::iter;
+
+#[tokio::test]
+async fn coop() {
+    let mut stream = task::spawn(stream::iter(iter::repeat(1)));
+
+    for _ in 0..10_000 {
+        if stream.poll_next().is_pending() {
+            assert!(stream.is_woken());
+            return;
+        }
+    }
+
+    panic!("did not yield");
+}
diff --git a/tests/stream_merge.rs b/tests/stream_merge.rs
new file mode 100644
index 0000000..69cd568
--- /dev/null
+++ b/tests/stream_merge.rs
@@ -0,0 +1,83 @@
+use tokio_stream::{self as stream, Stream, StreamExt};
+use tokio_test::task;
+use tokio_test::{assert_pending, assert_ready};
+
+mod support {
+    pub(crate) mod mpsc;
+}
+
+use support::mpsc;
+
+#[tokio::test]
+async fn merge_sync_streams() {
+    let mut s = stream::iter(vec![0, 2, 4, 6]).merge(stream::iter(vec![1, 3, 5]));
+
+    for i in 0..7 {
+        let rem = 7 - i;
+        assert_eq!(s.size_hint(), (rem, Some(rem)));
+        assert_eq!(Some(i), s.next().await);
+    }
+
+    assert!(s.next().await.is_none());
+}
+
+#[tokio::test]
+async fn merge_async_streams() {
+    let (tx1, rx1) = mpsc::unbounded_channel_stream();
+    let (tx2, rx2) = mpsc::unbounded_channel_stream();
+
+    let mut rx = task::spawn(rx1.merge(rx2));
+
+    assert_eq!(rx.size_hint(), (0, None));
+
+    assert_pending!(rx.poll_next());
+
+    tx1.send(1).unwrap();
+
+    assert!(rx.is_woken());
+    assert_eq!(Some(1), assert_ready!(rx.poll_next()));
+
+    assert_pending!(rx.poll_next());
+    tx2.send(2).unwrap();
+
+    assert!(rx.is_woken());
+    assert_eq!(Some(2), assert_ready!(rx.poll_next()));
+    assert_pending!(rx.poll_next());
+
+    drop(tx1);
+    assert!(rx.is_woken());
+    assert_pending!(rx.poll_next());
+
+    tx2.send(3).unwrap();
+    assert!(rx.is_woken());
+    assert_eq!(Some(3), assert_ready!(rx.poll_next()));
+    assert_pending!(rx.poll_next());
+
+    drop(tx2);
+    assert!(rx.is_woken());
+    assert_eq!(None, assert_ready!(rx.poll_next()));
+}
+
+#[test]
+fn size_overflow() {
+    struct Monster;
+
+    impl tokio_stream::Stream for Monster {
+        type Item = ();
+        fn poll_next(
+            self: std::pin::Pin<&mut Self>,
+            _cx: &mut std::task::Context<'_>,
+        ) -> std::task::Poll<Option<()>> {
+            panic!()
+        }
+
+        fn size_hint(&self) -> (usize, Option<usize>) {
+            (usize::max_value(), Some(usize::max_value()))
+        }
+    }
+
+    let m1 = Monster;
+    let m2 = Monster;
+    let m = m1.merge(m2);
+    assert_eq!(m.size_hint(), (usize::max_value(), None));
+}
diff --git a/tests/stream_once.rs b/tests/stream_once.rs
new file mode 100644
index 0000000..f32bad3
--- /dev/null
+++ b/tests/stream_once.rs
@@ -0,0 +1,12 @@
+use tokio_stream::{self as stream, Stream, StreamExt};
+
+#[tokio::test]
+async fn basic_usage() {
+    let mut one = stream::once(1);
+
+    assert_eq!(one.size_hint(), (1, Some(1)));
+    assert_eq!(Some(1), one.next().await);
+
+    assert_eq!(one.size_hint(), (0, Some(0)));
+    assert_eq!(None, one.next().await);
+}
diff --git a/tests/stream_pending.rs b/tests/stream_pending.rs
new file mode 100644
index 0000000..87b5d03
--- /dev/null
+++ b/tests/stream_pending.rs
@@ -0,0 +1,14 @@
+use tokio_stream::{self as stream, Stream, StreamExt};
+use tokio_test::{assert_pending, task};
+
+#[tokio::test]
+async fn basic_usage() {
+    let mut stream = stream::pending::<i32>();
+
+    for _ in 0..2 {
+        assert_eq!(stream.size_hint(), (0, None));
+
+        let mut next = task::spawn(async { stream.next().await });
+        assert_pending!(next.poll());
+    }
+}
diff --git a/tests/stream_stream_map.rs b/tests/stream_stream_map.rs
new file mode 100644
index 0000000..53f3d86
--- /dev/null
+++ b/tests/stream_stream_map.rs
@@ -0,0 +1,386 @@
+use tokio_stream::{self as stream, pending, Stream, StreamExt, StreamMap};
+use tokio_test::{assert_ok, assert_pending, assert_ready, task};
+
+mod support {
+    pub(crate) mod mpsc;
+}
+
+use support::mpsc;
+
+use std::pin::Pin;
+
+macro_rules! assert_ready_some {
+    ($($t:tt)*) => {
+        match assert_ready!($($t)*) {
+            Some(v) => v,
+            None => panic!("expected `Some`, got `None`"),
+        }
+    };
+}
+
+macro_rules! assert_ready_none {
+    ($($t:tt)*) => {
+        match assert_ready!($($t)*) {
+            None => {}
+            Some(v) => panic!("expected `None`, got `Some({:?})`", v),
+        }
+    };
+}
+
+#[tokio::test]
+async fn empty() {
+    let mut map = StreamMap::<&str, stream::Pending<()>>::new();
+
+    assert_eq!(map.len(), 0);
+    assert!(map.is_empty());
+
+    assert!(map.next().await.is_none());
+    assert!(map.next().await.is_none());
+
+    assert!(map.remove("foo").is_none());
+}
+
+#[tokio::test]
+async fn single_entry() {
+    let mut map = task::spawn(StreamMap::new());
+    let (tx, rx) = mpsc::unbounded_channel_stream();
+    let rx = Box::pin(rx);
+
+    assert_ready_none!(map.poll_next());
+
+    assert!(map.insert("foo", rx).is_none());
+    assert!(map.contains_key("foo"));
+    assert!(!map.contains_key("bar"));
+
+    assert_eq!(map.len(), 1);
+    assert!(!map.is_empty());
+
+    assert_pending!(map.poll_next());
+
+    assert_ok!(tx.send(1));
+
+    assert!(map.is_woken());
+    let (k, v) = assert_ready_some!(map.poll_next());
+    assert_eq!(k, "foo");
+    assert_eq!(v, 1);
+
+    assert_pending!(map.poll_next());
+
+    assert_ok!(tx.send(2));
+
+    assert!(map.is_woken());
+    let (k, v) = assert_ready_some!(map.poll_next());
+    assert_eq!(k, "foo");
+    assert_eq!(v, 2);
+
+    assert_pending!(map.poll_next());
+    drop(tx);
+    assert!(map.is_woken());
+    assert_ready_none!(map.poll_next());
+}
+
+#[tokio::test]
+async fn multiple_entries() {
+    let mut map = task::spawn(StreamMap::new());
+    let (tx1, rx1) = mpsc::unbounded_channel_stream();
+    let (tx2, rx2) = mpsc::unbounded_channel_stream();
+
+    let rx1 = Box::pin(rx1);
+    let rx2 = Box::pin(rx2);
+
+    map.insert("foo", rx1);
+    map.insert("bar", rx2);
+
+    assert_pending!(map.poll_next());
+
+    assert_ok!(tx1.send(1));
+
+    assert!(map.is_woken());
+    let (k, v) = assert_ready_some!(map.poll_next());
+    assert_eq!(k, "foo");
+    assert_eq!(v, 1);
+
+    assert_pending!(map.poll_next());
+
+    assert_ok!(tx2.send(2));
+
+    assert!(map.is_woken());
+    let (k, v) = assert_ready_some!(map.poll_next());
+    assert_eq!(k, "bar");
+    assert_eq!(v, 2);
+
+    assert_pending!(map.poll_next());
+
+    assert_ok!(tx1.send(3));
+    assert_ok!(tx2.send(4));
+
+    assert!(map.is_woken());
+
+    // Given the randomization, there is no guarantee what order the values will
+    // be received in.
+    let mut v = (0..2)
+        .map(|_| assert_ready_some!(map.poll_next()))
+        .collect::<Vec<_>>();
+
+    assert_pending!(map.poll_next());
+
+    v.sort_unstable();
+    assert_eq!(v[0].0, "bar");
+    assert_eq!(v[0].1, 4);
+    assert_eq!(v[1].0, "foo");
+    assert_eq!(v[1].1, 3);
+
+    drop(tx1);
+    assert!(map.is_woken());
+    assert_pending!(map.poll_next());
+    drop(tx2);
+
+    assert_ready_none!(map.poll_next());
+}
+
+#[tokio::test]
+async fn insert_remove() {
+    let mut map = task::spawn(StreamMap::new());
+    let (tx, rx) = mpsc::unbounded_channel_stream();
+
+    let rx = Box::pin(rx);
+
+    assert_ready_none!(map.poll_next());
+
+    assert!(map.insert("foo", rx).is_none());
+    let rx = map.remove("foo").unwrap();
+
+    assert_ok!(tx.send(1));
+
+    assert!(!map.is_woken());
+    assert_ready_none!(map.poll_next());
+
+    assert!(map.insert("bar", rx).is_none());
+
+    let v = assert_ready_some!(map.poll_next());
+    assert_eq!(v.0, "bar");
+    assert_eq!(v.1, 1);
+
+    assert!(map.remove("bar").is_some());
+    assert_ready_none!(map.poll_next());
+
+    assert!(map.is_empty());
+    assert_eq!(0, map.len());
+}
+
+#[tokio::test]
+async fn replace() {
+    let mut map = task::spawn(StreamMap::new());
+    let (tx1, rx1) = mpsc::unbounded_channel_stream();
+    let (tx2, rx2) = mpsc::unbounded_channel_stream();
+
+    let rx1 = Box::pin(rx1);
+    let rx2 = Box::pin(rx2);
+
+    assert!(map.insert("foo", rx1).is_none());
+
+    assert_pending!(map.poll_next());
+
+    let _rx1 = map.insert("foo", rx2).unwrap();
+
+    assert_pending!(map.poll_next());
+
+    tx1.send(1).unwrap();
+    assert_pending!(map.poll_next());
+
+    tx2.send(2).unwrap();
+    assert!(map.is_woken());
+    let v = assert_ready_some!(map.poll_next());
+    assert_eq!(v.0, "foo");
+    assert_eq!(v.1, 2);
+}
+
+#[test]
+fn size_hint_with_upper() {
+    let mut map = StreamMap::new();
+
+    map.insert("a", stream::iter(vec![1]));
+    map.insert("b", stream::iter(vec![1, 2]));
+    map.insert("c", stream::iter(vec![1, 2, 3]));
+
+    assert_eq!(3, map.len());
+    assert!(!map.is_empty());
+
+    let size_hint = map.size_hint();
+    assert_eq!(size_hint, (6, Some(6)));
+}
+
+#[test]
+fn size_hint_without_upper() {
+    let mut map = StreamMap::new();
+
+    map.insert("a", pin_box(stream::iter(vec![1])));
+    map.insert("b", pin_box(stream::iter(vec![1, 2])));
+    map.insert("c", pin_box(pending()));
+
+    let size_hint = map.size_hint();
+    assert_eq!(size_hint, (3, None));
+}
+
+#[test]
+fn new_capacity_zero() {
+    let map = StreamMap::<&str, stream::Pending<()>>::new();
+    assert_eq!(0, map.capacity());
+
+    assert!(map.keys().next().is_none());
+}
+
+#[test]
+fn with_capacity() {
+    let map = StreamMap::<&str, stream::Pending<()>>::with_capacity(10);
+    assert!(10 <= map.capacity());
+
+    assert!(map.keys().next().is_none());
+}
+
+#[test]
+fn iter_keys() {
+    let mut map = StreamMap::new();
+
+    map.insert("a", pending::<i32>());
+    map.insert("b", pending());
+    map.insert("c", pending());
+
+    let mut keys = map.keys().collect::<Vec<_>>();
+    keys.sort_unstable();
+
+    assert_eq!(&keys[..], &[&"a", &"b", &"c"]);
+}
+
+#[test]
+fn iter_values() {
+    let mut map = StreamMap::new();
+
+    map.insert("a", stream::iter(vec![1]));
+    map.insert("b", stream::iter(vec![1, 2]));
+    map.insert("c", stream::iter(vec![1, 2, 3]));
+
+    let mut size_hints = map.values().map(|s| s.size_hint().0).collect::<Vec<_>>();
+
+    size_hints.sort_unstable();
+
+    assert_eq!(&size_hints[..], &[1, 2, 3]);
+}
+
+#[test]
+fn iter_values_mut() {
+    let mut map = StreamMap::new();
+
+    map.insert("a", stream::iter(vec![1]));
+    map.insert("b", stream::iter(vec![1, 2]));
+    map.insert("c", stream::iter(vec![1, 2, 3]));
+
+    let mut size_hints = map
+        .values_mut()
+        .map(|s: &mut _| s.size_hint().0)
+        .collect::<Vec<_>>();
+
+    size_hints.sort_unstable();
+
+    assert_eq!(&size_hints[..], &[1, 2, 3]);
+}
+
+#[test]
+fn clear() {
+    let mut map = task::spawn(StreamMap::new());
+
+    map.insert("a", stream::iter(vec![1]));
+    map.insert("b", stream::iter(vec![1, 2]));
+    map.insert("c", stream::iter(vec![1, 2, 3]));
+
+    assert_ready_some!(map.poll_next());
+
+    map.clear();
+
+    assert_ready_none!(map.poll_next());
+    assert!(map.is_empty());
+}
+
+#[test]
+fn contains_key_borrow() {
+    let mut map = StreamMap::new();
+    map.insert("foo".to_string(), pending::<()>());
+
+    assert!(map.contains_key("foo"));
+}
+
+#[test]
+fn one_ready_many_none() {
+    // Run a few times because of randomness
+    for _ in 0..100 {
+        let mut map = task::spawn(StreamMap::new());
+
+        map.insert(0, pin_box(stream::empty()));
+        map.insert(1, pin_box(stream::empty()));
+        map.insert(2, pin_box(stream::once("hello")));
+        map.insert(3, pin_box(stream::pending()));
+
+        let v = assert_ready_some!(map.poll_next());
+        assert_eq!(v, (2, "hello"));
+    }
+}
+
+proptest::proptest! {
+    #[test]
+    fn fuzz_pending_complete_mix(kinds: Vec<bool>) {
+        use std::task::{Context, Poll};
+
+        struct DidPoll<T> {
+            did_poll: bool,
+            inner: T,
+        }
+
+        impl<T: Stream + Unpin> Stream for DidPoll<T> {
+            type Item = T::Item;
+
+            fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
+                -> Poll<Option<T::Item>>
+            {
+                self.did_poll = true;
+                Pin::new(&mut self.inner).poll_next(cx)
+            }
+        }
+
+        for _ in 0..10 {
+            let mut map = task::spawn(StreamMap::new());
+            let mut expect = 0;
+
+            for (i, &is_empty) in kinds.iter().enumerate() {
+                let inner = if is_empty {
+                    pin_box(stream::empty::<()>())
+                } else {
+                    expect += 1;
+                    pin_box(stream::pending::<()>())
+                };
+
+                let stream = DidPoll {
+                    did_poll: false,
+                    inner,
+                };
+
+                map.insert(i, stream);
+            }
+
+            if expect == 0 {
+                assert_ready_none!(map.poll_next());
+            } else {
+                assert_pending!(map.poll_next());
+
+                assert_eq!(expect, map.values().count());
+
+                for stream in map.values() {
+                    assert!(stream.did_poll);
+                }
+            }
+        }
+    }
+}
+
+fn pin_box<T: Stream<Item = U> + 'static, U>(s: T) -> Pin<Box<dyn Stream<Item = U>>> {
+    Box::pin(s)
+}
diff --git a/tests/stream_timeout.rs b/tests/stream_timeout.rs
new file mode 100644
index 0000000..5697ace
--- /dev/null
+++ b/tests/stream_timeout.rs
@@ -0,0 +1,109 @@
+#![cfg(feature = "full")]
+
+use tokio::time::{self, sleep, Duration};
+use tokio_stream::{self, StreamExt};
+use tokio_test::*;
+
+use futures::StreamExt as _;
+
+async fn maybe_sleep(idx: i32) -> i32 {
+    if idx % 2 == 0 {
+        sleep(ms(200)).await;
+    }
+    idx
+}
+
+fn ms(n: u64) -> Duration {
+    Duration::from_millis(n)
+}
+
+#[tokio::test]
+async fn basic_usage() {
+    time::pause();
+
+    // Items 2 and 4 time out. If we run the stream until it completes,
+    // we end up with the following items:
+    //
+    // [Ok(1), Err(Elapsed), Ok(2), Ok(3), Err(Elapsed), Ok(4)]
+
+    let stream = stream::iter(1..=4).then(maybe_sleep).timeout(ms(100));
+    let mut stream = task::spawn(stream);
+
+    // First item completes immediately
+    assert_ready_eq!(stream.poll_next(), Some(Ok(1)));
+
+    // Second item is delayed 200ms, times out after 100ms
+    assert_pending!(stream.poll_next());
+
+    time::advance(ms(150)).await;
+    let v = assert_ready!(stream.poll_next());
+    assert!(v.unwrap().is_err());
+
+    assert_pending!(stream.poll_next());
+
+    time::advance(ms(100)).await;
+    assert_ready_eq!(stream.poll_next(), Some(Ok(2)));
+
+    // Third item is ready immediately
+    assert_ready_eq!(stream.poll_next(), Some(Ok(3)));
+
+    // Fourth item is delayed 200ms, times out after 100ms
+    assert_pending!(stream.poll_next());
+
+    time::advance(ms(60)).await;
+    assert_pending!(stream.poll_next()); // nothing ready yet
+
+    time::advance(ms(60)).await;
+    let v = assert_ready!(stream.poll_next());
+    assert!(v.unwrap().is_err()); // timeout!
+
+    time::advance(ms(120)).await;
+    assert_ready_eq!(stream.poll_next(), Some(Ok(4)));
+
+    // Done.
+    assert_ready_eq!(stream.poll_next(), None);
+}
+
+#[tokio::test]
+async fn return_elapsed_errors_only_once() {
+    time::pause();
+
+    let stream = stream::iter(1..=3).then(maybe_sleep).timeout(ms(50));
+    let mut stream = task::spawn(stream);
+
+    // First item completes immediately
+    assert_ready_eq!(stream.poll_next(), Some(Ok(1)));
+
+    // Second item is delayed 200ms, times out after 50ms. Only one `Elapsed`
+    // error is returned.
+    assert_pending!(stream.poll_next());
+    //
+    time::advance(ms(51)).await;
+    let v = assert_ready!(stream.poll_next());
+    assert!(v.unwrap().is_err()); // timeout!
+
+    // deadline elapses again, but no error is returned
+    time::advance(ms(50)).await;
+    assert_pending!(stream.poll_next());
+
+    time::advance(ms(100)).await;
+    assert_ready_eq!(stream.poll_next(), Some(Ok(2)));
+    assert_ready_eq!(stream.poll_next(), Some(Ok(3)));
+
+    // Done
+    assert_ready_eq!(stream.poll_next(), None);
+}
+
+#[tokio::test]
+async fn no_timeouts() {
+    let stream = stream::iter(vec![1, 3, 5])
+        .then(maybe_sleep)
+        .timeout(ms(100));
+
+    let mut stream = task::spawn(stream);
+
+    assert_ready_eq!(stream.poll_next(), Some(Ok(1)));
+    assert_ready_eq!(stream.poll_next(), Some(Ok(3)));
+    assert_ready_eq!(stream.poll_next(), Some(Ok(5)));
+    assert_ready_eq!(stream.poll_next(), None);
+}
diff --git a/tests/support/mpsc.rs b/tests/support/mpsc.rs
new file mode 100644
index 0000000..09dbe04
--- /dev/null
+++ b/tests/support/mpsc.rs
@@ -0,0 +1,15 @@
+use async_stream::stream;
+use tokio::sync::mpsc::{self, UnboundedSender};
+use tokio_stream::Stream;
+
+pub fn unbounded_channel_stream<T: Unpin>() -> (UnboundedSender<T>, impl Stream<Item = T>) {
+    let (tx, mut rx) = mpsc::unbounded_channel();
+
+    let stream = stream! {
+        while let Some(item) = rx.recv().await {
+            yield item;
+        }
+    };
+
+    (tx, stream)
+}
diff --git a/tests/time_throttle.rs b/tests/time_throttle.rs
new file mode 100644
index 0000000..42a643b
--- /dev/null
+++ b/tests/time_throttle.rs
@@ -0,0 +1,28 @@
+#![warn(rust_2018_idioms)]
+#![cfg(feature = "full")]
+
+use tokio::time;
+use tokio_stream::StreamExt;
+use tokio_test::*;
+
+use std::time::Duration;
+
+#[tokio::test]
+async fn usage() {
+    time::pause();
+
+    let mut stream = task::spawn(futures::stream::repeat(()).throttle(Duration::from_millis(100)));
+
+    assert_ready!(stream.poll_next());
+    assert_pending!(stream.poll_next());
+
+    time::advance(Duration::from_millis(90)).await;
+
+    assert_pending!(stream.poll_next());
+
+    time::advance(Duration::from_millis(101)).await;
+
+    assert!(stream.is_woken());
+
+    assert_ready!(stream.poll_next());
+}