Merge "Upgrade tokio-stream to 0.1.11"
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index f2822a6..94e3fd4 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,6 @@
 {
   "git": {
-    "sha1": "d1a400912e82505c18c6c0c1f05cda06f334e201"
-  }
-}
+    "sha1": "ae0d49d59c0c63efafde73306af5d0d94046b50d"
+  },
+  "path_in_vcs": "tokio-stream"
+}
\ No newline at end of file
diff --git a/Android.bp b/Android.bp
index c619fe9..4b00918 100644
--- a/Android.bp
+++ b/Android.bp
@@ -23,7 +23,7 @@
     host_supported: true,
     crate_name: "tokio_stream",
     cargo_env_compat: true,
-    cargo_pkg_version: "0.1.8",
+    cargo_pkg_version: "0.1.11",
     srcs: ["src/lib.rs"],
     edition: "2018",
     features: [
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4ef469e..05c2b18 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,37 @@
+# 0.1.11 (October 11, 2022)
+
+- time: allow `StreamExt::chunks_timeout` outside of a runtime ([#5036])
+
+[#5036]: https://github.com/tokio-rs/tokio/pull/5036
+
+# 0.1.10 (Sept 18, 2022)
+
+- time: add `StreamExt::chunks_timeout` ([#4695])
+- stream: add track_caller to public APIs ([#4786])
+
+[#4695]: https://github.com/tokio-rs/tokio/pull/4695
+[#4786]: https://github.com/tokio-rs/tokio/pull/4786
+
+# 0.1.9 (June 4, 2022)
+
+- deps: upgrade `tokio-util` dependency to `0.7.x` ([#3762])
+- stream: add `StreamExt::map_while` ([#4351])
+- stream: add `StreamExt::then` ([#4355])
+- stream: add cancel-safety docs to `StreamExt::next` and `try_next` ([#4715])
+- stream: expose `Elapsed` error ([#4502])
+- stream: expose `Timeout` ([#4601])
+- stream: implement `Extend` for `StreamMap` ([#4272])
+- sync: add `Clone` to `RecvError` types ([#4560])
+
+[#3762]: https://github.com/tokio-rs/tokio/pull/3762
+[#4272]: https://github.com/tokio-rs/tokio/pull/4272
+[#4351]: https://github.com/tokio-rs/tokio/pull/4351
+[#4355]: https://github.com/tokio-rs/tokio/pull/4355
+[#4502]: https://github.com/tokio-rs/tokio/pull/4502
+[#4560]: https://github.com/tokio-rs/tokio/pull/4560
+[#4601]: https://github.com/tokio-rs/tokio/pull/4601
+[#4715]: https://github.com/tokio-rs/tokio/pull/4715
+
 # 0.1.8 (October 29, 2021)
 
 - stream: add `From<Receiver<T>>` impl for receiver streams ([#4080])
diff --git a/Cargo.toml b/Cargo.toml
index 699d94a..df1dc53 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -11,19 +11,29 @@
 
 [package]
 edition = "2018"
+rust-version = "1.49"
 name = "tokio-stream"
-version = "0.1.8"
+version = "0.1.11"
 authors = ["Tokio Contributors <team@tokio.rs>"]
-description = "Utilities to work with `Stream` and `tokio`.\n"
+description = """
+Utilities to work with `Stream` and `tokio`.
+"""
 homepage = "https://tokio.rs"
-documentation = "https://docs.rs/tokio-stream/0.1.8/tokio_stream"
 categories = ["asynchronous"]
 license = "MIT"
 repository = "https://github.com/tokio-rs/tokio"
+
 [package.metadata.docs.rs]
 all-features = true
-rustc-args = ["--cfg", "docsrs"]
-rustdoc-args = ["--cfg", "docsrs"]
+rustdoc-args = [
+    "--cfg",
+    "docsrs",
+]
+rustc-args = [
+    "--cfg",
+    "docsrs",
+]
+
 [dependencies.futures-core]
 version = "0.3.0"
 
@@ -35,8 +45,9 @@
 features = ["sync"]
 
 [dependencies.tokio-util]
-version = "0.6.3"
+version = "0.7.0"
 optional = true
+
 [dev-dependencies.async-stream]
 version = "0.3"
 
@@ -44,12 +55,15 @@
 version = "0.3"
 default-features = false
 
-[dev-dependencies.proptest]
-version = "1"
+[dev-dependencies.parking_lot]
+version = "0.12.0"
 
 [dev-dependencies.tokio]
 version = "1.2.0"
-features = ["full", "test-util"]
+features = [
+    "full",
+    "test-util",
+]
 
 [features]
 default = ["time"]
@@ -57,5 +71,11 @@
 io-util = ["tokio/io-util"]
 net = ["tokio/net"]
 signal = ["tokio/signal"]
-sync = ["tokio/sync", "tokio-util"]
+sync = [
+    "tokio/sync",
+    "tokio-util",
+]
 time = ["tokio/time"]
+
+[target."cfg(not(target_arch = \"wasm32\"))".dev-dependencies.proptest]
+version = "1"
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 83f8551..6dfa978 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -2,17 +2,15 @@
 name = "tokio-stream"
 # When releasing to crates.io:
 # - Remove path dependencies
-# - Update doc url
-#   - Cargo.toml
 # - Update CHANGELOG.md.
 # - Create "tokio-stream-0.1.x" git tag.
-version = "0.1.8"
+version = "0.1.11"
 edition = "2018"
+rust-version = "1.49"
 authors = ["Tokio Contributors <team@tokio.rs>"]
 license = "MIT"
 repository = "https://github.com/tokio-rs/tokio"
 homepage = "https://tokio.rs"
-documentation = "https://docs.rs/tokio-stream/0.1.8/tokio_stream"
 description = """
 Utilities to work with `Stream` and `tokio`.
 """
@@ -31,14 +29,16 @@
 futures-core = { version = "0.3.0" }
 pin-project-lite = "0.2.0"
 tokio = { version = "1.8.0", path = "../tokio", features = ["sync"] }
-tokio-util = { version = "0.6.3", path = "../tokio-util", optional = true }
+tokio-util = { version = "0.7.0", path = "../tokio-util", optional = true }
 
 [dev-dependencies]
 tokio = { version = "1.2.0", path = "../tokio", features = ["full", "test-util"] }
 async-stream = "0.3"
+parking_lot = "0.12.0"
 tokio-test = { path = "../tokio-test" }
 futures = { version = "0.3", default-features = false }
 
+[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
 proptest = "1"
 
 [package.metadata.docs.rs]
diff --git a/LICENSE b/LICENSE
index ffa38bb..8af5baf 100644
--- a/LICENSE
+++ b/LICENSE
@@ -1,4 +1,4 @@
-Copyright (c) 2021 Tokio Contributors
+Copyright (c) 2022 Tokio Contributors
 
 Permission is hereby granted, free of charge, to any
 person obtaining a copy of this software and associated
diff --git a/METADATA b/METADATA
index 68e56e9..1bf6cf9 100644
--- a/METADATA
+++ b/METADATA
@@ -1,3 +1,7 @@
+# This project was upgraded with external_updater.
+# Usage: tools/external_updater/updater.sh update rust/crates/tokio-stream
+# For more info, check https://cs.android.com/android/platform/superproject/+/master:tools/external_updater/README.md
+
 name: "tokio-stream"
 description: "Utilities to work with `Stream` and `tokio`."
 third_party {
@@ -7,13 +11,13 @@
   }
   url {
     type: ARCHIVE
-    value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.8.crate"
+    value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.11.crate"
   }
-  version: "0.1.8"
+  version: "0.1.11"
   license_type: NOTICE
   last_upgrade_date {
     year: 2022
-    month: 3
-    day: 1
+    month: 12
+    day: 12
   }
 }
diff --git a/src/lib.rs b/src/lib.rs
index b7f232f..bbd4cef 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -10,7 +10,6 @@
     unreachable_pub
 )]
 #![cfg_attr(docsrs, feature(doc_cfg))]
-#![cfg_attr(docsrs, deny(rustdoc::broken_intra_doc_links))]
 #![doc(test(
     no_crate_inject,
     attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
@@ -78,6 +77,9 @@
 
 mod stream_ext;
 pub use stream_ext::{collect::FromStream, StreamExt};
+cfg_time! {
+    pub use stream_ext::timeout::{Elapsed, Timeout};
+}
 
 mod empty;
 pub use empty::{empty, Empty};
diff --git a/src/stream_ext.rs b/src/stream_ext.rs
index 1157c9e..6cea7b5 100644
--- a/src/stream_ext.rs
+++ b/src/stream_ext.rs
@@ -1,3 +1,4 @@
+use core::future::Future;
 use futures_core::Stream;
 
 mod all;
@@ -27,6 +28,9 @@
 mod map;
 use map::Map;
 
+mod map_while;
+use map_while::MapWhile;
+
 mod merge;
 use merge::Merge;
 
@@ -39,21 +43,26 @@
 mod skip_while;
 use skip_while::SkipWhile;
 
-mod try_next;
-use try_next::TryNext;
-
 mod take;
 use take::Take;
 
 mod take_while;
 use take_while::TakeWhile;
 
+mod then;
+use then::Then;
+
+mod try_next;
+use try_next::TryNext;
+
 cfg_time! {
-    mod timeout;
+    pub(crate) mod timeout;
     use timeout::Timeout;
     use tokio::time::Duration;
     mod throttle;
     use throttle::{throttle, Throttle};
+    mod chunks_timeout;
+    use chunks_timeout::ChunksTimeout;
 }
 
 /// An extension trait for the [`Stream`] trait that provides a variety of
@@ -106,6 +115,12 @@
     /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
     /// crate.
     ///
+    /// # Cancel safety
+    ///
+    /// This method is cancel safe. The returned future only
+    /// holds onto a reference to the underlying stream,
+    /// so dropping it will never lose a value.
+    ///
     /// # Examples
     ///
     /// ```
@@ -142,6 +157,12 @@
     /// an [`Option<Result<T, E>>`](Option), making for easy use
     /// with the [`?`](std::ops::Try) operator.
     ///
+    /// # Cancel safety
+    ///
+    /// This method is cancel safe. The returned future only
+    /// holds onto a reference to the underlying stream,
+    /// so dropping it will never lose a value.
+    ///
     /// # Examples
     ///
     /// ```
@@ -197,6 +218,93 @@
         Map::new(self, f)
     }
 
+    /// Map this stream's items to a different type for as long as determined by
+    /// the provided closure. A stream of the target type will be returned,
+    /// which will yield elements until the closure returns `None`.
+    ///
+    /// The provided closure is executed over all elements of this stream as
+    /// they are made available, until it returns `None`. It is executed inline
+    /// with calls to [`poll_next`](Stream::poll_next). Once `None` is returned,
+    /// the underlying stream will not be polled again.
+    ///
+    /// Note that this function consumes the stream passed into it and returns a
+    /// wrapped version of it, similar to the [`Iterator::map_while`] method in the
+    /// standard library.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// # #[tokio::main]
+    /// # async fn main() {
+    /// use tokio_stream::{self as stream, StreamExt};
+    ///
+    /// let stream = stream::iter(1..=10);
+    /// let mut stream = stream.map_while(|x| {
+    ///     if x < 4 {
+    ///         Some(x + 3)
+    ///     } else {
+    ///         None
+    ///     }
+    /// });
+    /// assert_eq!(stream.next().await, Some(4));
+    /// assert_eq!(stream.next().await, Some(5));
+    /// assert_eq!(stream.next().await, Some(6));
+    /// assert_eq!(stream.next().await, None);
+    /// # }
+    /// ```
+    fn map_while<T, F>(self, f: F) -> MapWhile<Self, F>
+    where
+        F: FnMut(Self::Item) -> Option<T>,
+        Self: Sized,
+    {
+        MapWhile::new(self, f)
+    }
+
+    /// Maps this stream's items asynchronously to a different type, returning a
+    /// new stream of the resulting type.
+    ///
+    /// The provided closure is executed over all elements of this stream as
+    /// they are made available, and the returned future is executed. Only one
+    /// future is executed at the time.
+    ///
+    /// Note that this function consumes the stream passed into it and returns a
+    /// wrapped version of it, similar to the existing `then` methods in the
+    /// standard library.
+    ///
+    /// Be aware that if the future is not `Unpin`, then neither is the `Stream`
+    /// returned by this method. To handle this, you can use `tokio::pin!` as in
+    /// the example below or put the stream in a `Box` with `Box::pin(stream)`.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// # #[tokio::main]
+    /// # async fn main() {
+    /// use tokio_stream::{self as stream, StreamExt};
+    ///
+    /// async fn do_async_work(value: i32) -> i32 {
+    ///     value + 3
+    /// }
+    ///
+    /// let stream = stream::iter(1..=3);
+    /// let stream = stream.then(do_async_work);
+    ///
+    /// tokio::pin!(stream);
+    ///
+    /// assert_eq!(stream.next().await, Some(4));
+    /// assert_eq!(stream.next().await, Some(5));
+    /// assert_eq!(stream.next().await, Some(6));
+    /// # }
+    /// ```
+    fn then<F, Fut>(self, f: F) -> Then<Self, Fut, F>
+    where
+        F: FnMut(Self::Item) -> Fut,
+        Fut: Future,
+        Self: Sized,
+    {
+        Then::new(self, f)
+    }
+
     /// Combine two streams into one by interleaving the output of both as it
     /// is produced.
     ///
@@ -899,6 +1007,63 @@
     {
         throttle(duration, self)
     }
+
+    /// Batches the items in the given stream using a maximum duration and size for each batch.
+    ///
+    /// This stream returns the next batch of items in the following situations:
+    ///  1. The inner stream has returned at least `max_size` many items since the last batch.
+    ///  2. The time since the first item of a batch is greater than the given duration.
+    ///  3. The end of the stream is reached.
+    ///
+    /// The length of the returned vector is never empty or greater than the maximum size. Empty batches
+    /// will not be emitted if no items are received upstream.
+    ///
+    /// # Panics
+    ///
+    /// This function panics if `max_size` is zero
+    ///
+    /// # Example
+    ///
+    /// ```rust
+    /// use std::time::Duration;
+    /// use tokio::time;
+    /// use tokio_stream::{self as stream, StreamExt};
+    /// use futures::FutureExt;
+    ///
+    /// #[tokio::main]
+    /// # async fn _unused() {}
+    /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
+    /// async fn main() {
+    ///     let iter = vec![1, 2, 3, 4].into_iter();
+    ///     let stream0 = stream::iter(iter);
+    ///
+    ///     let iter = vec![5].into_iter();
+    ///     let stream1 = stream::iter(iter)
+    ///          .then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n));
+    ///
+    ///     let chunk_stream = stream0
+    ///         .chain(stream1)
+    ///         .chunks_timeout(3, Duration::from_secs(2));
+    ///     tokio::pin!(chunk_stream);
+    ///
+    ///     // a full batch was received
+    ///     assert_eq!(chunk_stream.next().await, Some(vec![1,2,3]));
+    ///     // deadline was reached before max_size was reached
+    ///     assert_eq!(chunk_stream.next().await, Some(vec![4]));
+    ///     // last element in the stream
+    ///     assert_eq!(chunk_stream.next().await, Some(vec![5]));
+    /// }
+    /// ```
+    #[cfg(feature = "time")]
+    #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
+    #[track_caller]
+    fn chunks_timeout(self, max_size: usize, duration: Duration) -> ChunksTimeout<Self>
+    where
+        Self: Sized,
+    {
+        assert!(max_size > 0, "`max_size` must be non-zero.");
+        ChunksTimeout::new(self, max_size, duration)
+    }
 }
 
 impl<St: ?Sized> StreamExt for St where St: Stream {}
@@ -906,10 +1071,10 @@
 /// Merge the size hints from two streams.
 fn merge_size_hints(
     (left_low, left_high): (usize, Option<usize>),
-    (right_low, right_hign): (usize, Option<usize>),
+    (right_low, right_high): (usize, Option<usize>),
 ) -> (usize, Option<usize>) {
     let low = left_low.saturating_add(right_low);
-    let high = match (left_high, right_hign) {
+    let high = match (left_high, right_high) {
         (Some(h1), Some(h2)) => h1.checked_add(h2),
         _ => None,
     };
diff --git a/src/stream_ext/chunks_timeout.rs b/src/stream_ext/chunks_timeout.rs
new file mode 100644
index 0000000..48acd93
--- /dev/null
+++ b/src/stream_ext/chunks_timeout.rs
@@ -0,0 +1,86 @@
+use crate::stream_ext::Fuse;
+use crate::Stream;
+use tokio::time::{sleep, Sleep};
+
+use core::future::Future;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+use std::time::Duration;
+
+pin_project! {
+    /// Stream returned by the [`chunks_timeout`](super::StreamExt::chunks_timeout) method.
+    #[must_use = "streams do nothing unless polled"]
+    #[derive(Debug)]
+    pub struct ChunksTimeout<S: Stream> {
+        #[pin]
+        stream: Fuse<S>,
+        #[pin]
+        deadline: Option<Sleep>,
+        duration: Duration,
+        items: Vec<S::Item>,
+        cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
+    }
+}
+
+impl<S: Stream> ChunksTimeout<S> {
+    pub(super) fn new(stream: S, max_size: usize, duration: Duration) -> Self {
+        ChunksTimeout {
+            stream: Fuse::new(stream),
+            deadline: None,
+            duration,
+            items: Vec::with_capacity(max_size),
+            cap: max_size,
+        }
+    }
+}
+
+impl<S: Stream> Stream for ChunksTimeout<S> {
+    type Item = Vec<S::Item>;
+
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        let mut me = self.as_mut().project();
+        loop {
+            match me.stream.as_mut().poll_next(cx) {
+                Poll::Pending => break,
+                Poll::Ready(Some(item)) => {
+                    if me.items.is_empty() {
+                        me.deadline.set(Some(sleep(*me.duration)));
+                        me.items.reserve_exact(*me.cap);
+                    }
+                    me.items.push(item);
+                    if me.items.len() >= *me.cap {
+                        return Poll::Ready(Some(std::mem::take(me.items)));
+                    }
+                }
+                Poll::Ready(None) => {
+                    // Returning Some here is only correct because we fuse the inner stream.
+                    let last = if me.items.is_empty() {
+                        None
+                    } else {
+                        Some(std::mem::take(me.items))
+                    };
+
+                    return Poll::Ready(last);
+                }
+            }
+        }
+
+        if !me.items.is_empty() {
+            if let Some(deadline) = me.deadline.as_pin_mut() {
+                ready!(deadline.poll(cx));
+            }
+            return Poll::Ready(Some(std::mem::take(me.items)));
+        }
+
+        Poll::Pending
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        let chunk_len = if self.items.is_empty() { 0 } else { 1 };
+        let (lower, upper) = self.stream.size_hint();
+        let lower = (lower / self.cap).saturating_add(chunk_len);
+        let upper = upper.and_then(|x| x.checked_add(chunk_len));
+        (lower, upper)
+    }
+}
diff --git a/src/stream_ext/collect.rs b/src/stream_ext/collect.rs
index a33a6d6..4b157a9 100644
--- a/src/stream_ext/collect.rs
+++ b/src/stream_ext/collect.rs
@@ -66,17 +66,17 @@
         use Poll::Ready;
 
         loop {
-            let mut me = self.as_mut().project();
+            let me = self.as_mut().project();
 
             let item = match ready!(me.stream.poll_next(cx)) {
                 Some(item) => item,
                 None => {
-                    return Ready(U::finalize(sealed::Internal, &mut me.collection));
+                    return Ready(U::finalize(sealed::Internal, me.collection));
                 }
             };
 
-            if !U::extend(sealed::Internal, &mut me.collection, item) {
-                return Ready(U::finalize(sealed::Internal, &mut me.collection));
+            if !U::extend(sealed::Internal, me.collection, item) {
+                return Ready(U::finalize(sealed::Internal, me.collection));
             }
         }
     }
diff --git a/src/stream_ext/map_while.rs b/src/stream_ext/map_while.rs
new file mode 100644
index 0000000..d4fd825
--- /dev/null
+++ b/src/stream_ext/map_while.rs
@@ -0,0 +1,52 @@
+use crate::Stream;
+
+use core::fmt;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+    /// Stream for the [`map_while`](super::StreamExt::map_while) method.
+    #[must_use = "streams do nothing unless polled"]
+    pub struct MapWhile<St, F> {
+        #[pin]
+        stream: St,
+        f: F,
+    }
+}
+
+impl<St, F> fmt::Debug for MapWhile<St, F>
+where
+    St: fmt::Debug,
+{
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("MapWhile")
+            .field("stream", &self.stream)
+            .finish()
+    }
+}
+
+impl<St, F> MapWhile<St, F> {
+    pub(super) fn new(stream: St, f: F) -> Self {
+        MapWhile { stream, f }
+    }
+}
+
+impl<St, F, T> Stream for MapWhile<St, F>
+where
+    St: Stream,
+    F: FnMut(St::Item) -> Option<T>,
+{
+    type Item = T;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+        let me = self.project();
+        let f = me.f;
+        me.stream.poll_next(cx).map(|opt| opt.and_then(f))
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        let (_, upper) = self.stream.size_hint();
+        (0, upper)
+    }
+}
diff --git a/src/stream_ext/next.rs b/src/stream_ext/next.rs
index 175490c..706069f 100644
--- a/src/stream_ext/next.rs
+++ b/src/stream_ext/next.rs
@@ -8,6 +8,13 @@
 
 pin_project! {
     /// Future for the [`next`](super::StreamExt::next) method.
+    ///
+    /// # Cancel safety
+    ///
+    /// This method is cancel safe. It only
+    /// holds onto a reference to the underlying stream,
+    /// so dropping it will never lose a value.
+    ///
     #[derive(Debug)]
     #[must_use = "futures do nothing unless you `.await` or poll them"]
     pub struct Next<'a, St: ?Sized> {
diff --git a/src/stream_ext/then.rs b/src/stream_ext/then.rs
new file mode 100644
index 0000000..7f6b5a2
--- /dev/null
+++ b/src/stream_ext/then.rs
@@ -0,0 +1,83 @@
+use crate::Stream;
+
+use core::fmt;
+use core::future::Future;
+use core::pin::Pin;
+use core::task::{Context, Poll};
+use pin_project_lite::pin_project;
+
+pin_project! {
+    /// Stream for the [`then`](super::StreamExt::then) method.
+    #[must_use = "streams do nothing unless polled"]
+    pub struct Then<St, Fut, F> {
+        #[pin]
+        stream: St,
+        #[pin]
+        future: Option<Fut>,
+        f: F,
+    }
+}
+
+impl<St, Fut, F> fmt::Debug for Then<St, Fut, F>
+where
+    St: fmt::Debug,
+{
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("Then")
+            .field("stream", &self.stream)
+            .finish()
+    }
+}
+
+impl<St, Fut, F> Then<St, Fut, F> {
+    pub(super) fn new(stream: St, f: F) -> Self {
+        Then {
+            stream,
+            future: None,
+            f,
+        }
+    }
+}
+
+impl<St, F, Fut> Stream for Then<St, Fut, F>
+where
+    St: Stream,
+    Fut: Future,
+    F: FnMut(St::Item) -> Fut,
+{
+    type Item = Fut::Output;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Fut::Output>> {
+        let mut me = self.project();
+
+        loop {
+            if let Some(future) = me.future.as_mut().as_pin_mut() {
+                match future.poll(cx) {
+                    Poll::Ready(item) => {
+                        me.future.set(None);
+                        return Poll::Ready(Some(item));
+                    }
+                    Poll::Pending => return Poll::Pending,
+                }
+            }
+
+            match me.stream.as_mut().poll_next(cx) {
+                Poll::Ready(Some(item)) => {
+                    me.future.set(Some((me.f)(item)));
+                }
+                Poll::Ready(None) => return Poll::Ready(None),
+                Poll::Pending => return Poll::Pending,
+            }
+        }
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        let future_len = if self.future.is_some() { 1 } else { 0 };
+        let (lower, upper) = self.stream.size_hint();
+
+        let lower = lower.saturating_add(future_len);
+        let upper = upper.and_then(|upper| upper.checked_add(future_len));
+
+        (lower, upper)
+    }
+}
diff --git a/src/stream_ext/try_next.rs b/src/stream_ext/try_next.rs
index af27d87..93aa3bc 100644
--- a/src/stream_ext/try_next.rs
+++ b/src/stream_ext/try_next.rs
@@ -9,6 +9,12 @@
 
 pin_project! {
     /// Future for the [`try_next`](super::StreamExt::try_next) method.
+    ///
+    /// # Cancel safety
+    ///
+    /// This method is cancel safe. It only
+    /// holds onto a reference to the underlying stream,
+    /// so dropping it will never lose a value.
     #[derive(Debug)]
     #[must_use = "futures do nothing unless you `.await` or poll them"]
     pub struct TryNext<'a, St: ?Sized> {
diff --git a/src/stream_map.rs b/src/stream_map.rs
index 80a521e..2159804 100644
--- a/src/stream_map.rs
+++ b/src/stream_map.rs
@@ -585,6 +585,15 @@
     }
 }
 
+impl<K, V> Extend<(K, V)> for StreamMap<K, V> {
+    fn extend<T>(&mut self, iter: T)
+    where
+        T: IntoIterator<Item = (K, V)>,
+    {
+        self.entries.extend(iter);
+    }
+}
+
 mod rand {
     use std::cell::Cell;
 
diff --git a/src/wrappers/broadcast.rs b/src/wrappers/broadcast.rs
index c8346a6..10184bf 100644
--- a/src/wrappers/broadcast.rs
+++ b/src/wrappers/broadcast.rs
@@ -14,11 +14,11 @@
 /// [`Stream`]: trait@crate::Stream
 #[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
 pub struct BroadcastStream<T> {
-    inner: ReusableBoxFuture<(Result<T, RecvError>, Receiver<T>)>,
+    inner: ReusableBoxFuture<'static, (Result<T, RecvError>, Receiver<T>)>,
 }
 
 /// An error returned from the inner stream of a [`BroadcastStream`].
-#[derive(Debug, PartialEq)]
+#[derive(Debug, PartialEq, Clone)]
 pub enum BroadcastStreamRecvError {
     /// The receiver lagged too far behind. Attempting to receive again will
     /// return the oldest message still retained by the channel.
diff --git a/src/wrappers/watch.rs b/src/wrappers/watch.rs
index bd3a18a..c682c9c 100644
--- a/src/wrappers/watch.rs
+++ b/src/wrappers/watch.rs
@@ -49,7 +49,7 @@
 /// [`Stream`]: trait@crate::Stream
 #[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
 pub struct WatchStream<T> {
-    inner: ReusableBoxFuture<(Result<(), RecvError>, Receiver<T>)>,
+    inner: ReusableBoxFuture<'static, (Result<(), RecvError>, Receiver<T>)>,
 }
 
 async fn make_future<T: Clone + Send + Sync>(
diff --git a/tests/chunks_timeout.rs b/tests/chunks_timeout.rs
new file mode 100644
index 0000000..ffc7dea
--- /dev/null
+++ b/tests/chunks_timeout.rs
@@ -0,0 +1,84 @@
+#![warn(rust_2018_idioms)]
+#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))]
+
+use tokio::time;
+use tokio_stream::{self as stream, StreamExt};
+use tokio_test::assert_pending;
+use tokio_test::task;
+
+use futures::FutureExt;
+use std::time::Duration;
+
+#[tokio::test(start_paused = true)]
+async fn usage() {
+    let iter = vec![1, 2, 3].into_iter();
+    let stream0 = stream::iter(iter);
+
+    let iter = vec![4].into_iter();
+    let stream1 =
+        stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n));
+
+    let chunk_stream = stream0
+        .chain(stream1)
+        .chunks_timeout(4, Duration::from_secs(2));
+
+    let mut chunk_stream = task::spawn(chunk_stream);
+
+    assert_pending!(chunk_stream.poll_next());
+    time::advance(Duration::from_secs(2)).await;
+    assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3]));
+
+    assert_pending!(chunk_stream.poll_next());
+    time::advance(Duration::from_secs(2)).await;
+    assert_eq!(chunk_stream.next().await, Some(vec![4]));
+}
+
+#[tokio::test(start_paused = true)]
+async fn full_chunk_with_timeout() {
+    let iter = vec![1, 2].into_iter();
+    let stream0 = stream::iter(iter);
+
+    let iter = vec![3].into_iter();
+    let stream1 =
+        stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(1)).map(move |_| n));
+
+    let iter = vec![4].into_iter();
+    let stream2 =
+        stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n));
+
+    let chunk_stream = stream0
+        .chain(stream1)
+        .chain(stream2)
+        .chunks_timeout(3, Duration::from_secs(2));
+
+    let mut chunk_stream = task::spawn(chunk_stream);
+
+    assert_pending!(chunk_stream.poll_next());
+    time::advance(Duration::from_secs(2)).await;
+    assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3]));
+
+    assert_pending!(chunk_stream.poll_next());
+    time::advance(Duration::from_secs(2)).await;
+    assert_eq!(chunk_stream.next().await, Some(vec![4]));
+}
+
+#[tokio::test]
+#[ignore]
+async fn real_time() {
+    let iter = vec![1, 2, 3, 4].into_iter();
+    let stream0 = stream::iter(iter);
+
+    let iter = vec![5].into_iter();
+    let stream1 =
+        stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n));
+
+    let chunk_stream = stream0
+        .chain(stream1)
+        .chunks_timeout(3, Duration::from_secs(2));
+
+    let mut chunk_stream = task::spawn(chunk_stream);
+
+    assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3]));
+    assert_eq!(chunk_stream.next().await, Some(vec![4]));
+    assert_eq!(chunk_stream.next().await, Some(vec![5]));
+}
diff --git a/tests/stream_panic.rs b/tests/stream_panic.rs
new file mode 100644
index 0000000..22c1c20
--- /dev/null
+++ b/tests/stream_panic.rs
@@ -0,0 +1,55 @@
+#![warn(rust_2018_idioms)]
+#![cfg(all(feature = "time", not(target_os = "wasi")))] // Wasi does not support panic recovery
+
+use parking_lot::{const_mutex, Mutex};
+use std::error::Error;
+use std::panic;
+use std::sync::Arc;
+use tokio::time::Duration;
+use tokio_stream::{self as stream, StreamExt};
+
+fn test_panic<Func: FnOnce() + panic::UnwindSafe>(func: Func) -> Option<String> {
+    static PANIC_MUTEX: Mutex<()> = const_mutex(());
+
+    {
+        let _guard = PANIC_MUTEX.lock();
+        let panic_file: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
+
+        let prev_hook = panic::take_hook();
+        {
+            let panic_file = panic_file.clone();
+            panic::set_hook(Box::new(move |panic_info| {
+                let panic_location = panic_info.location().unwrap();
+                panic_file
+                    .lock()
+                    .clone_from(&Some(panic_location.file().to_string()));
+            }));
+        }
+
+        let result = panic::catch_unwind(func);
+        // Return to the previously set panic hook (maybe default) so that we get nice error
+        // messages in the tests.
+        panic::set_hook(prev_hook);
+
+        if result.is_err() {
+            panic_file.lock().clone()
+        } else {
+            None
+        }
+    }
+}
+
+#[test]
+fn stream_chunks_timeout_panic_caller() -> Result<(), Box<dyn Error>> {
+    let panic_location_file = test_panic(|| {
+        let iter = vec![1, 2, 3].into_iter();
+        let stream0 = stream::iter(iter);
+
+        let _chunk_stream = stream0.chunks_timeout(0, Duration::from_secs(2));
+    });
+
+    // The panic location should be in this file
+    assert_eq!(&panic_location_file.unwrap(), file!());
+
+    Ok(())
+}
diff --git a/tests/stream_stream_map.rs b/tests/stream_stream_map.rs
index 53f3d86..ffc489b 100644
--- a/tests/stream_stream_map.rs
+++ b/tests/stream_stream_map.rs
@@ -325,6 +325,7 @@
     }
 }
 
+#[cfg(not(target_os = "wasi"))]
 proptest::proptest! {
     #[test]
     fn fuzz_pending_complete_mix(kinds: Vec<bool>) {
diff --git a/tests/stream_timeout.rs b/tests/stream_timeout.rs
index 5697ace..2338f83 100644
--- a/tests/stream_timeout.rs
+++ b/tests/stream_timeout.rs
@@ -1,10 +1,10 @@
-#![cfg(feature = "full")]
+#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))]
 
 use tokio::time::{self, sleep, Duration};
 use tokio_stream::{self, StreamExt};
 use tokio_test::*;
 
-use futures::StreamExt as _;
+use futures::stream;
 
 async fn maybe_sleep(idx: i32) -> i32 {
     if idx % 2 == 0 {
diff --git a/tests/time_throttle.rs b/tests/time_throttle.rs
index 42a643b..e6c9917 100644
--- a/tests/time_throttle.rs
+++ b/tests/time_throttle.rs
@@ -1,5 +1,5 @@
 #![warn(rust_2018_idioms)]
-#![cfg(feature = "full")]
+#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))]
 
 use tokio::time;
 use tokio_stream::StreamExt;