Merge "Upgrade rust/crates/futures-util to 0.3.17"
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index 99dc8b0..ffd4f55 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,5 @@
 {
   "git": {
-    "sha1": "ab38fd29d3f84f8fc028fa7883e53dba423da0ee"
+    "sha1": "7caefa51304e78fd5018cd5d2a03f3b9089cc010"
   }
 }
diff --git a/Android.bp b/Android.bp
index 859f9da..4a73e79 100644
--- a/Android.bp
+++ b/Android.bp
@@ -41,6 +41,8 @@
     name: "futures-util_test_defaults",
     crate_name: "futures_util",
     srcs: ["src/lib.rs"],
+    cargo_env_compat: true,
+    cargo_pkg_version: "0.3.17",
     test_suites: ["general-tests"],
     auto_gen_config: true,
     edition: "2018",
@@ -99,6 +101,8 @@
     name: "libfutures_util",
     host_supported: true,
     crate_name: "futures_util",
+    cargo_env_compat: true,
+    cargo_pkg_version: "0.3.17",
     srcs: ["src/lib.rs"],
     edition: "2018",
     features: [
@@ -143,63 +147,3 @@
     ],
     min_sdk_version: "29",
 }
-
-// dependent_library ["feature_list"]
-//   autocfg-1.0.1
-//   byteorder-1.4.3 "default,std"
-//   bytes-0.4.12
-//   cfg-if-0.1.10
-//   cfg-if-1.0.0
-//   crossbeam-deque-0.7.4
-//   crossbeam-epoch-0.8.2 "default,lazy_static,std"
-//   crossbeam-queue-0.2.3 "default,std"
-//   crossbeam-utils-0.7.2 "default,lazy_static,std"
-//   fnv-1.0.7 "default,std"
-//   futures-0.1.31 "default,use_std,with-deprecated"
-//   futures-channel-0.3.16 "alloc,std"
-//   futures-core-0.3.16 "alloc,std"
-//   futures-io-0.3.16 "std"
-//   futures-macro-0.3.16
-//   futures-sink-0.3.16
-//   futures-task-0.3.16 "alloc,std"
-//   iovec-0.1.4
-//   lazy_static-1.4.0
-//   libc-0.2.98 "default,std"
-//   lock_api-0.3.4
-//   log-0.4.14
-//   maybe-uninit-2.0.0
-//   memchr-2.4.0 "default,std"
-//   memoffset-0.5.6 "default"
-//   mio-0.6.23 "default,with-deprecated"
-//   mio-uds-0.6.8
-//   net2-0.2.37 "default,duration"
-//   num_cpus-1.13.0
-//   parking_lot-0.9.0 "default"
-//   parking_lot_core-0.6.2
-//   pin-project-lite-0.2.7
-//   pin-utils-0.1.0
-//   proc-macro-hack-0.5.19
-//   proc-macro-nested-0.1.7
-//   proc-macro2-1.0.28 "default,proc-macro"
-//   quote-1.0.9 "default,proc-macro"
-//   rustc_version-0.2.3
-//   scopeguard-1.1.0
-//   semver-0.9.0 "default"
-//   semver-parser-0.7.0
-//   slab-0.4.4 "default,std"
-//   smallvec-0.6.14 "default,std"
-//   syn-1.0.74 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote"
-//   tokio-0.1.22 "bytes,codec,default,fs,io,mio,num_cpus,reactor,rt-full,sync,tcp,timer,tokio-codec,tokio-current-thread,tokio-executor,tokio-fs,tokio-io,tokio-reactor,tokio-sync,tokio-tcp,tokio-threadpool,tokio-timer,tokio-udp,tokio-uds,udp,uds"
-//   tokio-codec-0.1.2
-//   tokio-current-thread-0.1.7
-//   tokio-executor-0.1.10
-//   tokio-fs-0.1.7
-//   tokio-io-0.1.13
-//   tokio-reactor-0.1.12
-//   tokio-sync-0.1.8
-//   tokio-tcp-0.1.4
-//   tokio-threadpool-0.1.18
-//   tokio-timer-0.2.13
-//   tokio-udp-0.1.6
-//   tokio-uds-0.2.7
-//   unicode-xid-0.2.2 "default"
diff --git a/Cargo.toml b/Cargo.toml
index aaaebac..90010e9 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -12,7 +12,7 @@
 [package]
 edition = "2018"
 name = "futures-util"
-version = "0.3.16"
+version = "0.3.17"
 authors = ["Alex Crichton <alex@alexcrichton.com>"]
 description = "Common utilities and extension traits for the futures-rs library.\n"
 homepage = "https://rust-lang.github.io/futures-rs"
@@ -23,33 +23,33 @@
 all-features = true
 rustdoc-args = ["--cfg", "docsrs"]
 [dependencies.futures-channel]
-version = "0.3.16"
+version = "0.3.17"
 features = ["std"]
 optional = true
 default-features = false
 
 [dependencies.futures-core]
-version = "0.3.16"
+version = "0.3.17"
 default-features = false
 
 [dependencies.futures-io]
-version = "0.3.16"
+version = "0.3.17"
 features = ["std"]
 optional = true
 default-features = false
 
 [dependencies.futures-macro]
-version = "=0.3.16"
+version = "=0.3.17"
 optional = true
 default-features = false
 
 [dependencies.futures-sink]
-version = "0.3.16"
+version = "0.3.17"
 optional = true
 default-features = false
 
 [dependencies.futures-task]
-version = "0.3.16"
+version = "0.3.17"
 default-features = false
 
 [dependencies.futures_01]
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 98cace6..a8e9362 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,7 +1,7 @@
 [package]
 name = "futures-util"
 edition = "2018"
-version = "0.3.16"
+version = "0.3.17"
 authors = ["Alex Crichton <alex@alexcrichton.com>"]
 license = "MIT OR Apache-2.0"
 repository = "https://github.com/rust-lang/futures-rs"
@@ -39,12 +39,12 @@
 autocfg = "1"
 
 [dependencies]
-futures-core = { path = "../futures-core", version = "0.3.16", default-features = false }
-futures-task = { path = "../futures-task", version = "0.3.16", default-features = false }
-futures-channel = { path = "../futures-channel", version = "0.3.16", default-features = false, features = ["std"], optional = true }
-futures-io = { path = "../futures-io", version = "0.3.16", default-features = false, features = ["std"], optional = true }
-futures-sink = { path = "../futures-sink", version = "0.3.16", default-features = false, optional = true }
-futures-macro = { path = "../futures-macro", version = "=0.3.16", default-features = false, optional = true }
+futures-core = { path = "../futures-core", version = "0.3.17", default-features = false }
+futures-task = { path = "../futures-task", version = "0.3.17", default-features = false }
+futures-channel = { path = "../futures-channel", version = "0.3.17", default-features = false, features = ["std"], optional = true }
+futures-io = { path = "../futures-io", version = "0.3.17", default-features = false, features = ["std"], optional = true }
+futures-sink = { path = "../futures-sink", version = "0.3.17", default-features = false, optional = true }
+futures-macro = { path = "../futures-macro", version = "=0.3.17", default-features = false, optional = true }
 proc-macro-hack = { version = "0.5.19", optional = true }
 proc-macro-nested = { version = "0.1.2", optional = true }
 slab = { version = "0.4.2", optional = true }
diff --git a/METADATA b/METADATA
index ce359d5..0ace75a 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@
   }
   url {
     type: ARCHIVE
-    value: "https://static.crates.io/crates/futures-util/futures-util-0.3.16.crate"
+    value: "https://static.crates.io/crates/futures-util/futures-util-0.3.17.crate"
   }
-  version: "0.3.16"
+  version: "0.3.17"
   license_type: NOTICE
   last_upgrade_date {
     year: 2021
-    month: 8
-    day: 9
+    month: 9
+    day: 22
   }
 }
diff --git a/src/async_await/mod.rs b/src/async_await/mod.rs
index 5f5d4ac..7276da2 100644
--- a/src/async_await/mod.rs
+++ b/src/async_await/mod.rs
@@ -30,6 +30,13 @@
 #[cfg(feature = "async-await-macro")]
 pub use self::select_mod::*;
 
+// Primary export is a macro
+#[cfg(feature = "async-await-macro")]
+mod stream_select_mod;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/64762
+#[cfg(feature = "async-await-macro")]
+pub use self::stream_select_mod::*;
+
 #[cfg(feature = "std")]
 #[cfg(feature = "async-await-macro")]
 mod random;
diff --git a/src/async_await/stream_select_mod.rs b/src/async_await/stream_select_mod.rs
new file mode 100644
index 0000000..7743406
--- /dev/null
+++ b/src/async_await/stream_select_mod.rs
@@ -0,0 +1,45 @@
+//! The `stream_select` macro.
+
+#[cfg(feature = "std")]
+#[allow(unreachable_pub)]
+#[doc(hidden)]
+#[cfg_attr(not(fn_like_proc_macro), proc_macro_hack::proc_macro_hack(support_nested))]
+pub use futures_macro::stream_select_internal;
+
+/// Combines several streams, all producing the same `Item` type, into one stream.
+/// This is similar to `select_all` but does not require the streams to all be the same type.
+/// It also keeps the streams inline, and does not require `Box<dyn Stream>`s to be allocated.
+/// Streams passed to this macro must be `Unpin`.
+///
+/// If multiple streams are ready, one will be pseudo randomly selected at runtime.
+///
+/// This macro is gated behind the `async-await` feature of this library, which is activated by default.
+/// Note that `stream_select!` relies on `proc-macro-hack`, and may require to set the compiler's recursion
+/// limit very high, e.g. `#![recursion_limit="1024"]`.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::{stream, StreamExt, stream_select};
+/// let endless_ints = |i| stream::iter(vec![i].into_iter().cycle()).fuse();
+///
+/// let mut endless_numbers = stream_select!(endless_ints(1i32), endless_ints(2), endless_ints(3));
+/// match endless_numbers.next().await {
+///     Some(1) => println!("Got a 1"),
+///     Some(2) => println!("Got a 2"),
+///     Some(3) => println!("Got a 3"),
+///     _ => unreachable!(),
+/// }
+/// # });
+/// ```
+#[cfg(feature = "std")]
+#[macro_export]
+macro_rules! stream_select {
+    ($($tokens:tt)*) => {{
+        use $crate::__private as __futures_crate;
+        $crate::stream_select_internal! {
+            $( $tokens )*
+        }
+    }}
+}
diff --git a/src/future/join_all.rs b/src/future/join_all.rs
index 427e71c..2e52ac1 100644
--- a/src/future/join_all.rs
+++ b/src/future/join_all.rs
@@ -12,6 +12,9 @@
 
 use super::{assert_future, MaybeDone};
 
+#[cfg(not(futures_no_atomic_cas))]
+use crate::stream::{Collect, FuturesOrdered, StreamExt};
+
 fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> {
     // Safety: `std` _could_ make this unsound if it were to decide Pin's
     // invariants aren't required to transmit through slices. Otherwise this has
@@ -19,13 +22,29 @@
     unsafe { slice.get_unchecked_mut() }.iter_mut().map(|t| unsafe { Pin::new_unchecked(t) })
 }
 
-/// Future for the [`join_all`] function.
 #[must_use = "futures do nothing unless you `.await` or poll them"]
+/// Future for the [`join_all`] function.
 pub struct JoinAll<F>
 where
     F: Future,
 {
-    elems: Pin<Box<[MaybeDone<F>]>>,
+    kind: JoinAllKind<F>,
+}
+
+#[cfg(not(futures_no_atomic_cas))]
+const SMALL: usize = 30;
+
+pub(crate) enum JoinAllKind<F>
+where
+    F: Future,
+{
+    Small {
+        elems: Pin<Box<[MaybeDone<F>]>>,
+    },
+    #[cfg(not(futures_no_atomic_cas))]
+    Big {
+        fut: Collect<FuturesOrdered<F>, Vec<F::Output>>,
+    },
 }
 
 impl<F> fmt::Debug for JoinAll<F>
@@ -34,7 +53,13 @@
     F::Output: fmt::Debug,
 {
     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-        f.debug_struct("JoinAll").field("elems", &self.elems).finish()
+        match self.kind {
+            JoinAllKind::Small { ref elems } => {
+                f.debug_struct("JoinAll").field("elems", elems).finish()
+            }
+            #[cfg(not(futures_no_atomic_cas))]
+            JoinAllKind::Big { ref fut, .. } => fmt::Debug::fmt(fut, f),
+        }
     }
 }
 
@@ -50,10 +75,9 @@
 ///
 /// # See Also
 ///
-/// This is purposefully a very simple API for basic use-cases. In a lot of
-/// cases you will want to use the more powerful
-/// [`FuturesOrdered`][crate::stream::FuturesOrdered] APIs, or, if order does
-/// not matter, [`FuturesUnordered`][crate::stream::FuturesUnordered].
+/// `join_all` will switch to the more powerful [`FuturesOrdered`] for performance
+/// reasons if the number of futures is large. You may want to look into using it or
+/// it's counterpart [`FuturesUnordered`][crate::stream::FuturesUnordered] directly.
 ///
 /// Some examples for additional functionality provided by these are:
 ///
@@ -75,13 +99,33 @@
 /// assert_eq!(join_all(futures).await, [1, 2, 3]);
 /// # });
 /// ```
-pub fn join_all<I>(i: I) -> JoinAll<I::Item>
+pub fn join_all<I>(iter: I) -> JoinAll<I::Item>
 where
     I: IntoIterator,
     I::Item: Future,
 {
-    let elems: Box<[_]> = i.into_iter().map(MaybeDone::Future).collect();
-    assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { elems: elems.into() })
+    #[cfg(futures_no_atomic_cas)]
+    {
+        let elems = iter.into_iter().map(MaybeDone::Future).collect::<Box<[_]>>().into();
+        let kind = JoinAllKind::Small { elems };
+        assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { kind })
+    }
+    #[cfg(not(futures_no_atomic_cas))]
+    {
+        let iter = iter.into_iter();
+        let kind = match iter.size_hint().1 {
+            None => JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() },
+            Some(max) => {
+                if max <= SMALL {
+                    let elems = iter.map(MaybeDone::Future).collect::<Box<[_]>>().into();
+                    JoinAllKind::Small { elems }
+                } else {
+                    JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() }
+                }
+            }
+        };
+        assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { kind })
+    }
 }
 
 impl<F> Future for JoinAll<F>
@@ -91,20 +135,27 @@
     type Output = Vec<F::Output>;
 
     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
-        let mut all_done = true;
+        match &mut self.kind {
+            JoinAllKind::Small { elems } => {
+                let mut all_done = true;
 
-        for elem in iter_pin_mut(self.elems.as_mut()) {
-            if elem.poll(cx).is_pending() {
-                all_done = false;
+                for elem in iter_pin_mut(elems.as_mut()) {
+                    if elem.poll(cx).is_pending() {
+                        all_done = false;
+                    }
+                }
+
+                if all_done {
+                    let mut elems = mem::replace(elems, Box::pin([]));
+                    let result =
+                        iter_pin_mut(elems.as_mut()).map(|e| e.take_output().unwrap()).collect();
+                    Poll::Ready(result)
+                } else {
+                    Poll::Pending
+                }
             }
-        }
-
-        if all_done {
-            let mut elems = mem::replace(&mut self.elems, Box::pin([]));
-            let result = iter_pin_mut(elems.as_mut()).map(|e| e.take_output().unwrap()).collect();
-            Poll::Ready(result)
-        } else {
-            Poll::Pending
+            #[cfg(not(futures_no_atomic_cas))]
+            JoinAllKind::Big { fut } => Pin::new(fut).poll(cx),
         }
     }
 }
diff --git a/src/future/mod.rs b/src/future/mod.rs
index cd08264..374e365 100644
--- a/src/future/mod.rs
+++ b/src/future/mod.rs
@@ -68,6 +68,9 @@
 mod poll_fn;
 pub use self::poll_fn::{poll_fn, PollFn};
 
+mod poll_immediate;
+pub use self::poll_immediate::{poll_immediate, PollImmediate};
+
 mod ready;
 pub use self::ready::{err, ok, ready, Ready};
 
diff --git a/src/future/option.rs b/src/future/option.rs
index 426fe50..0bc3777 100644
--- a/src/future/option.rs
+++ b/src/future/option.rs
@@ -31,6 +31,12 @@
     }
 }
 
+impl<F> Default for OptionFuture<F> {
+    fn default() -> Self {
+        Self { inner: None }
+    }
+}
+
 impl<F: Future> Future for OptionFuture<F> {
     type Output = Option<F::Output>;
 
diff --git a/src/future/poll_immediate.rs b/src/future/poll_immediate.rs
new file mode 100644
index 0000000..5ae555c
--- /dev/null
+++ b/src/future/poll_immediate.rs
@@ -0,0 +1,126 @@
+use super::assert_future;
+use core::pin::Pin;
+use futures_core::task::{Context, Poll};
+use futures_core::{FusedFuture, Future, Stream};
+use pin_project_lite::pin_project;
+
+pin_project! {
+    /// Future for the [`poll_immediate`](poll_immediate()) function.
+    ///
+    /// It will never return [Poll::Pending](core::task::Poll::Pending)
+    #[derive(Debug, Clone)]
+    #[must_use = "futures do nothing unless you `.await` or poll them"]
+    pub struct PollImmediate<T> {
+        #[pin]
+        future: Option<T>
+    }
+}
+
+impl<T, F> Future for PollImmediate<F>
+where
+    F: Future<Output = T>,
+{
+    type Output = Option<T>;
+
+    #[inline]
+    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
+        let mut this = self.project();
+        let inner =
+            this.future.as_mut().as_pin_mut().expect("PollImmediate polled after completion");
+        match inner.poll(cx) {
+            Poll::Ready(t) => {
+                this.future.set(None);
+                Poll::Ready(Some(t))
+            }
+            Poll::Pending => Poll::Ready(None),
+        }
+    }
+}
+
+impl<T: Future> FusedFuture for PollImmediate<T> {
+    fn is_terminated(&self) -> bool {
+        self.future.is_none()
+    }
+}
+
+/// A [Stream](crate::stream::Stream) implementation that can be polled repeatedly until the future is done.
+/// The stream will never return [Poll::Pending](core::task::Poll::Pending)
+/// so polling it in a tight loop is worse than using a blocking synchronous function.
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::task::Poll;
+/// use futures::{StreamExt, future, pin_mut};
+/// use future::FusedFuture;
+///
+/// let f = async { 1_u32 };
+/// pin_mut!(f);
+/// let mut r = future::poll_immediate(f);
+/// assert_eq!(r.next().await, Some(Poll::Ready(1)));
+///
+/// let f = async {futures::pending!(); 42_u8};
+/// pin_mut!(f);
+/// let mut p = future::poll_immediate(f);
+/// assert_eq!(p.next().await, Some(Poll::Pending));
+/// assert!(!p.is_terminated());
+/// assert_eq!(p.next().await, Some(Poll::Ready(42)));
+/// assert!(p.is_terminated());
+/// assert_eq!(p.next().await, None);
+/// # });
+/// ```
+impl<T, F> Stream for PollImmediate<F>
+where
+    F: Future<Output = T>,
+{
+    type Item = Poll<T>;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        let mut this = self.project();
+        match this.future.as_mut().as_pin_mut() {
+            // inner is gone, so we can signal that the stream is closed.
+            None => Poll::Ready(None),
+            Some(fut) => Poll::Ready(Some(fut.poll(cx).map(|t| {
+                this.future.set(None);
+                t
+            }))),
+        }
+    }
+}
+
+/// Creates a future that is immediately ready with an Option of a value.
+/// Specifically this means that [poll](core::future::Future::poll()) always returns [Poll::Ready](core::task::Poll::Ready).
+///
+/// # Caution
+///
+/// When consuming the future by this function, note the following:
+///
+/// - This function does not guarantee that the future will run to completion, so it is generally incompatible with passing the non-cancellation-safe future by value.
+/// - Even if the future is cancellation-safe, creating and dropping new futures frequently may lead to performance problems.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future;
+///
+/// let r = future::poll_immediate(async { 1_u32 });
+/// assert_eq!(r.await, Some(1));
+///
+/// let p = future::poll_immediate(future::pending::<i32>());
+/// assert_eq!(p.await, None);
+/// # });
+/// ```
+///
+/// ### Reusing a future
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::{future, pin_mut};
+/// let f = async {futures::pending!(); 42_u8};
+/// pin_mut!(f);
+/// assert_eq!(None, future::poll_immediate(&mut f).await);
+/// assert_eq!(42, f.await);
+/// # });
+/// ```
+pub fn poll_immediate<F: Future>(f: F) -> PollImmediate<F> {
+    assert_future::<Option<F::Output>, PollImmediate<F>>(PollImmediate { future: Some(f) })
+}
diff --git a/src/io/buf_reader.rs b/src/io/buf_reader.rs
index 5931edc..2d585a9 100644
--- a/src/io/buf_reader.rs
+++ b/src/io/buf_reader.rs
@@ -1,4 +1,5 @@
 use super::DEFAULT_BUF_SIZE;
+use futures_core::future::Future;
 use futures_core::ready;
 use futures_core::task::{Context, Poll};
 #[cfg(feature = "read-initializer")]
@@ -73,6 +74,40 @@
     }
 }
 
+impl<R: AsyncRead + AsyncSeek> BufReader<R> {
+    /// Seeks relative to the current position. If the new position lies within the buffer,
+    /// the buffer will not be flushed, allowing for more efficient seeks.
+    /// This method does not return the location of the underlying reader, so the caller
+    /// must track this information themselves if it is required.
+    pub fn seek_relative(self: Pin<&mut Self>, offset: i64) -> SeeKRelative<'_, R> {
+        SeeKRelative { inner: self, offset, first: true }
+    }
+
+    /// Attempts to seek relative to the current position. If the new position lies within the buffer,
+    /// the buffer will not be flushed, allowing for more efficient seeks.
+    /// This method does not return the location of the underlying reader, so the caller
+    /// must track this information themselves if it is required.
+    pub fn poll_seek_relative(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        offset: i64,
+    ) -> Poll<io::Result<()>> {
+        let pos = self.pos as u64;
+        if offset < 0 {
+            if let Some(new_pos) = pos.checked_sub((-offset) as u64) {
+                *self.project().pos = new_pos as usize;
+                return Poll::Ready(Ok(()));
+            }
+        } else if let Some(new_pos) = pos.checked_add(offset as u64) {
+            if new_pos <= self.cap as u64 {
+                *self.project().pos = new_pos as usize;
+                return Poll::Ready(Ok(()));
+            }
+        }
+        self.poll_seek(cx, SeekFrom::Current(offset)).map(|res| res.map(|_| ()))
+    }
+}
+
 impl<R: AsyncRead> AsyncRead for BufReader<R> {
     fn poll_read(
         mut self: Pin<&mut Self>,
@@ -163,6 +198,10 @@
     /// `.into_inner()` immediately after a seek yields the underlying reader
     /// at the same position.
     ///
+    /// To seek without discarding the internal buffer, use
+    /// [`BufReader::seek_relative`](BufReader::seek_relative) or
+    /// [`BufReader::poll_seek_relative`](BufReader::poll_seek_relative).
+    ///
     /// See [`AsyncSeek`](futures_io::AsyncSeek) for more details.
     ///
     /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)`
@@ -200,3 +239,33 @@
         Poll::Ready(Ok(result))
     }
 }
+
+/// Future for the [`BufReader::seek_relative`](self::BufReader::seek_relative) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless polled"]
+pub struct SeeKRelative<'a, R> {
+    inner: Pin<&'a mut BufReader<R>>,
+    offset: i64,
+    first: bool,
+}
+
+impl<R> Future for SeeKRelative<'_, R>
+where
+    R: AsyncRead + AsyncSeek,
+{
+    type Output = io::Result<()>;
+
+    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+        let offset = self.offset;
+        if self.first {
+            self.first = false;
+            self.inner.as_mut().poll_seek_relative(cx, offset)
+        } else {
+            self.inner
+                .as_mut()
+                .as_mut()
+                .poll_seek(cx, SeekFrom::Current(offset))
+                .map(|res| res.map(|_| ()))
+        }
+    }
+}
diff --git a/src/io/mod.rs b/src/io/mod.rs
index b96223d..16cf5a7 100644
--- a/src/io/mod.rs
+++ b/src/io/mod.rs
@@ -56,7 +56,7 @@
 pub use self::allow_std::AllowStdIo;
 
 mod buf_reader;
-pub use self::buf_reader::BufReader;
+pub use self::buf_reader::{BufReader, SeeKRelative};
 
 mod buf_writer;
 pub use self::buf_writer::BufWriter;
diff --git a/src/lib.rs b/src/lib.rs
index 5f803a7..76d3799 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -301,18 +301,18 @@
 }
 
 pub mod future;
-#[doc(hidden)]
+#[doc(no_inline)]
 pub use crate::future::{Future, FutureExt, TryFuture, TryFutureExt};
 
 pub mod stream;
-#[doc(hidden)]
+#[doc(no_inline)]
 pub use crate::stream::{Stream, StreamExt, TryStream, TryStreamExt};
 
 #[cfg(feature = "sink")]
 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
 pub mod sink;
 #[cfg(feature = "sink")]
-#[doc(hidden)]
+#[doc(no_inline)]
 pub use crate::sink::{Sink, SinkExt};
 
 pub mod task;
@@ -329,7 +329,7 @@
 pub mod io;
 #[cfg(feature = "io")]
 #[cfg(feature = "std")]
-#[doc(hidden)]
+#[doc(no_inline)]
 pub use crate::io::{
     AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite,
     AsyncWriteExt,
diff --git a/src/stream/mod.rs b/src/stream/mod.rs
index 8cf9f80..ec685b9 100644
--- a/src/stream/mod.rs
+++ b/src/stream/mod.rs
@@ -19,8 +19,8 @@
 mod stream;
 pub use self::stream::{
     Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach,
-    Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, Peekable, Scan, SelectNextSome, Skip,
-    SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, Unzip, Zip,
+    Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan, SelectNextSome,
+    Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, Unzip, Zip,
 };
 
 #[cfg(feature = "std")]
@@ -88,6 +88,9 @@
 mod poll_fn;
 pub use self::poll_fn::{poll_fn, PollFn};
 
+mod poll_immediate;
+pub use self::poll_immediate::{poll_immediate, PollImmediate};
+
 mod select;
 pub use self::select::{select, Select};
 
diff --git a/src/stream/poll_immediate.rs b/src/stream/poll_immediate.rs
new file mode 100644
index 0000000..c7e8a5b
--- /dev/null
+++ b/src/stream/poll_immediate.rs
@@ -0,0 +1,80 @@
+use core::pin::Pin;
+use futures_core::task::{Context, Poll};
+use futures_core::Stream;
+use pin_project_lite::pin_project;
+
+pin_project! {
+    /// Stream for the [poll_immediate](poll_immediate()) function.
+    ///
+    /// It will never return [Poll::Pending](core::task::Poll::Pending)
+    #[derive(Debug, Clone)]
+    #[must_use = "futures do nothing unless you `.await` or poll them"]
+    pub struct PollImmediate<S> {
+        #[pin]
+        stream: Option<S>
+    }
+}
+
+impl<T, S> Stream for PollImmediate<S>
+where
+    S: Stream<Item = T>,
+{
+    type Item = Poll<T>;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        let mut this = self.project();
+        let stream = match this.stream.as_mut().as_pin_mut() {
+            // inner is gone, so we can continue to signal that the stream is closed.
+            None => return Poll::Ready(None),
+            Some(inner) => inner,
+        };
+
+        match stream.poll_next(cx) {
+            Poll::Ready(Some(t)) => Poll::Ready(Some(Poll::Ready(t))),
+            Poll::Ready(None) => {
+                this.stream.set(None);
+                Poll::Ready(None)
+            }
+            Poll::Pending => Poll::Ready(Some(Poll::Pending)),
+        }
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        self.stream.as_ref().map_or((0, Some(0)), Stream::size_hint)
+    }
+}
+
+impl<S: Stream> super::FusedStream for PollImmediate<S> {
+    fn is_terminated(&self) -> bool {
+        self.stream.is_none()
+    }
+}
+
+/// Creates a new stream that always immediately returns [Poll::Ready](core::task::Poll::Ready) when awaiting it.
+///
+/// This is useful when immediacy is more important than waiting for the next item to be ready.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::stream::{self, StreamExt};
+/// use futures::task::Poll;
+///
+/// let mut r = stream::poll_immediate(Box::pin(stream::iter(1_u32..3)));
+/// assert_eq!(r.next().await, Some(Poll::Ready(1)));
+/// assert_eq!(r.next().await, Some(Poll::Ready(2)));
+/// assert_eq!(r.next().await, None);
+///
+/// let mut p = stream::poll_immediate(Box::pin(stream::once(async {
+///     futures::pending!();
+///     42_u8
+/// })));
+/// assert_eq!(p.next().await, Some(Poll::Pending));
+/// assert_eq!(p.next().await, Some(Poll::Ready(42)));
+/// assert_eq!(p.next().await, None);
+/// # });
+/// ```
+pub fn poll_immediate<S: Stream>(s: S) -> PollImmediate<S> {
+    super::assert_stream::<Poll<S::Item>, PollImmediate<S>>(PollImmediate { stream: Some(s) })
+}
diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs
index b3b0155..86997f4 100644
--- a/src/stream/stream/mod.rs
+++ b/src/stream/stream/mod.rs
@@ -131,7 +131,7 @@
 
 mod peek;
 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
-pub use self::peek::{NextIf, NextIfEq, Peek, Peekable};
+pub use self::peek::{NextIf, NextIfEq, Peek, PeekMut, Peekable};
 
 mod skip;
 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
diff --git a/src/stream/stream/peek.rs b/src/stream/stream/peek.rs
index 217faba..c72dfc3 100644
--- a/src/stream/stream/peek.rs
+++ b/src/stream/stream/peek.rs
@@ -33,7 +33,7 @@
 
     delegate_access_inner!(stream, St, (.));
 
-    /// Produces a `Peek` future which retrieves a reference to the next item
+    /// Produces a future which retrieves a reference to the next item
     /// in the stream, or `None` if the underlying stream terminates.
     pub fn peek(self: Pin<&mut Self>) -> Peek<'_, St> {
         Peek { inner: Some(self) }
@@ -57,6 +57,54 @@
         })
     }
 
+    /// Produces a future which retrieves a mutable reference to the next item
+    /// in the stream, or `None` if the underlying stream terminates.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// # futures::executor::block_on(async {
+    /// use futures::stream::{self, StreamExt};
+    /// use futures::pin_mut;
+    ///
+    /// let stream = stream::iter(vec![1, 2, 3]).peekable();
+    /// pin_mut!(stream);
+    ///
+    /// assert_eq!(stream.as_mut().peek_mut().await, Some(&mut 1));
+    /// assert_eq!(stream.as_mut().next().await, Some(1));
+    ///
+    /// // Peek into the stream and modify the value which will be returned next
+    /// if let Some(p) = stream.as_mut().peek_mut().await {
+    ///     if *p == 2 {
+    ///         *p = 5;
+    ///     }
+    /// }
+    ///
+    /// assert_eq!(stream.collect::<Vec<_>>().await, vec![5, 3]);
+    /// # });
+    /// ```
+    pub fn peek_mut(self: Pin<&mut Self>) -> PeekMut<'_, St> {
+        PeekMut { inner: Some(self) }
+    }
+
+    /// Peek retrieves a mutable reference to the next item in the stream.
+    pub fn poll_peek_mut(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<&mut St::Item>> {
+        let mut this = self.project();
+
+        Poll::Ready(loop {
+            if this.peeked.is_some() {
+                break this.peeked.as_mut();
+            } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
+                *this.peeked = Some(item);
+            } else {
+                break None;
+            }
+        })
+    }
+
     /// Creates a future which will consume and return the next value of this
     /// stream if a condition is true.
     ///
@@ -221,6 +269,48 @@
 }
 
 pin_project! {
+    /// Future for the [`Peekable::peek_mut`](self::Peekable::peek_mut) method.
+    #[must_use = "futures do nothing unless polled"]
+    pub struct PeekMut<'a, St: Stream> {
+        inner: Option<Pin<&'a mut Peekable<St>>>,
+    }
+}
+
+impl<St> fmt::Debug for PeekMut<'_, St>
+where
+    St: Stream + fmt::Debug,
+    St::Item: fmt::Debug,
+{
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("PeekMut").field("inner", &self.inner).finish()
+    }
+}
+
+impl<St: Stream> FusedFuture for PeekMut<'_, St> {
+    fn is_terminated(&self) -> bool {
+        self.inner.is_none()
+    }
+}
+
+impl<'a, St> Future for PeekMut<'a, St>
+where
+    St: Stream,
+{
+    type Output = Option<&'a mut St::Item>;
+
+    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+        let inner = self.project().inner;
+        if let Some(peekable) = inner {
+            ready!(peekable.as_mut().poll_peek_mut(cx));
+
+            inner.take().unwrap().poll_peek_mut(cx)
+        } else {
+            panic!("PeekMut polled after completion")
+        }
+    }
+}
+
+pin_project! {
     /// Future for the [`Peekable::next_if`](self::Peekable::next_if) method.
     #[must_use = "futures do nothing unless polled"]
     pub struct NextIf<'a, St: Stream, F> {