Merge "Upgrade rust/crates/futures to 0.3.17" am: 25df33f235 am: 4203cedb6b

Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/futures/+/1833575

Change-Id: I014ca2c221997ea509c709019b21264bc2a0de10
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index 99dc8b0..ffd4f55 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,5 @@
 {
   "git": {
-    "sha1": "ab38fd29d3f84f8fc028fa7883e53dba423da0ee"
+    "sha1": "7caefa51304e78fd5018cd5d2a03f3b9089cc010"
   }
 }
diff --git a/Android.bp b/Android.bp
index f6e9a98..9053466 100644
--- a/Android.bp
+++ b/Android.bp
@@ -41,6 +41,8 @@
     name: "libfutures",
     host_supported: true,
     crate_name: "futures",
+    cargo_env_compat: true,
+    cargo_pkg_version: "0.3.17",
     srcs: ["src/lib.rs"],
     edition: "2018",
     features: [
@@ -67,24 +69,3 @@
     ],
     min_sdk_version: "29",
 }
-
-// dependent_library ["feature_list"]
-//   autocfg-1.0.1
-//   futures-channel-0.3.16 "alloc,futures-sink,sink,std"
-//   futures-core-0.3.16 "alloc,std"
-//   futures-executor-0.3.16 "std"
-//   futures-io-0.3.16 "std"
-//   futures-macro-0.3.16
-//   futures-sink-0.3.16 "alloc,std"
-//   futures-task-0.3.16 "alloc,std"
-//   futures-util-0.3.16 "alloc,async-await,async-await-macro,channel,futures-channel,futures-io,futures-macro,futures-sink,io,memchr,proc-macro-hack,proc-macro-nested,sink,slab,std"
-//   memchr-2.4.0 "default,std"
-//   pin-project-lite-0.2.7
-//   pin-utils-0.1.0
-//   proc-macro-hack-0.5.19
-//   proc-macro-nested-0.1.7
-//   proc-macro2-1.0.28 "default,proc-macro"
-//   quote-1.0.9 "default,proc-macro"
-//   slab-0.4.4 "default,std"
-//   syn-1.0.74 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote,visit-mut"
-//   unicode-xid-0.2.2 "default"
diff --git a/Cargo.toml b/Cargo.toml
index c934741..17c4d5f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -12,7 +12,7 @@
 [package]
 edition = "2018"
 name = "futures"
-version = "0.3.16"
+version = "0.3.17"
 authors = ["Alex Crichton <alex@alexcrichton.com>"]
 description = "An implementation of futures and streams featuring zero allocations,\ncomposability, and iterator-like interfaces.\n"
 homepage = "https://rust-lang.github.io/futures-rs"
@@ -29,33 +29,33 @@
 [package.metadata.playground]
 features = ["std", "async-await", "compat", "io-compat", "executor", "thread-pool"]
 [dependencies.futures-channel]
-version = "0.3.16"
+version = "0.3.17"
 features = ["sink"]
 default-features = false
 
 [dependencies.futures-core]
-version = "0.3.16"
+version = "0.3.17"
 default-features = false
 
 [dependencies.futures-executor]
-version = "0.3.16"
+version = "0.3.17"
 optional = true
 default-features = false
 
 [dependencies.futures-io]
-version = "0.3.16"
+version = "0.3.17"
 default-features = false
 
 [dependencies.futures-sink]
-version = "0.3.16"
+version = "0.3.17"
 default-features = false
 
 [dependencies.futures-task]
-version = "0.3.16"
+version = "0.3.17"
 default-features = false
 
 [dependencies.futures-util]
-version = "0.3.16"
+version = "0.3.17"
 features = ["sink"]
 default-features = false
 [dev-dependencies.assert_matches]
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 04cea8a..b01b12e 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,7 +1,7 @@
 [package]
 name = "futures"
 edition = "2018"
-version = "0.3.16"
+version = "0.3.17"
 authors = ["Alex Crichton <alex@alexcrichton.com>"]
 license = "MIT OR Apache-2.0"
 readme = "../README.md"
@@ -16,13 +16,13 @@
 categories = ["asynchronous"]
 
 [dependencies]
-futures-core = { path = "../futures-core", version = "0.3.16", default-features = false }
-futures-task = { path = "../futures-task", version = "0.3.16", default-features = false }
-futures-channel = { path = "../futures-channel", version = "0.3.16", default-features = false, features = ["sink"] }
-futures-executor = { path = "../futures-executor", version = "0.3.16", default-features = false, optional = true }
-futures-io = { path = "../futures-io", version = "0.3.16", default-features = false }
-futures-sink = { path = "../futures-sink", version = "0.3.16", default-features = false }
-futures-util = { path = "../futures-util", version = "0.3.16", default-features = false, features = ["sink"] }
+futures-core = { path = "../futures-core", version = "0.3.17", default-features = false }
+futures-task = { path = "../futures-task", version = "0.3.17", default-features = false }
+futures-channel = { path = "../futures-channel", version = "0.3.17", default-features = false, features = ["sink"] }
+futures-executor = { path = "../futures-executor", version = "0.3.17", default-features = false, optional = true }
+futures-io = { path = "../futures-io", version = "0.3.17", default-features = false }
+futures-sink = { path = "../futures-sink", version = "0.3.17", default-features = false }
+futures-util = { path = "../futures-util", version = "0.3.17", default-features = false, features = ["sink"] }
 
 [dev-dependencies]
 futures-executor = { path = "../futures-executor", features = ["thread-pool"] }
diff --git a/METADATA b/METADATA
index 60344d2..ba70d3e 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@
   }
   url {
     type: ARCHIVE
-    value: "https://static.crates.io/crates/futures/futures-0.3.16.crate"
+    value: "https://static.crates.io/crates/futures/futures-0.3.17.crate"
   }
-  version: "0.3.16"
+  version: "0.3.17"
   license_type: NOTICE
   last_upgrade_date {
     year: 2021
-    month: 8
-    day: 9
+    month: 9
+    day: 22
   }
 }
diff --git a/src/lib.rs b/src/lib.rs
index 287696f..362aa3c 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -102,26 +102,26 @@
 #[cfg(all(feature = "read-initializer", not(feature = "unstable")))]
 compile_error!("The `read-initializer` feature requires the `unstable` feature as an explicit opt-in to unstable features");
 
-#[doc(hidden)]
+#[doc(no_inline)]
 pub use futures_core::future::{Future, TryFuture};
-#[doc(hidden)]
+#[doc(no_inline)]
 pub use futures_util::future::{FutureExt, TryFutureExt};
 
-#[doc(hidden)]
+#[doc(no_inline)]
 pub use futures_core::stream::{Stream, TryStream};
-#[doc(hidden)]
+#[doc(no_inline)]
 pub use futures_util::stream::{StreamExt, TryStreamExt};
 
-#[doc(hidden)]
+#[doc(no_inline)]
 pub use futures_sink::Sink;
-#[doc(hidden)]
+#[doc(no_inline)]
 pub use futures_util::sink::SinkExt;
 
 #[cfg(feature = "std")]
-#[doc(hidden)]
+#[doc(no_inline)]
 pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
 #[cfg(feature = "std")]
-#[doc(hidden)]
+#[doc(no_inline)]
 pub use futures_util::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
 
 // Macro reexports
@@ -137,6 +137,10 @@
 #[doc(inline)]
 pub use futures_util::{future, never, sink, stream, task};
 
+#[cfg(feature = "std")]
+#[cfg(feature = "async-await")]
+pub use futures_util::stream_select;
+
 #[cfg(feature = "alloc")]
 #[doc(inline)]
 pub use futures_channel as channel;
diff --git a/tests/async_await_macros.rs b/tests/async_await_macros.rs
index 19833d0..ce1f3a3 100644
--- a/tests/async_await_macros.rs
+++ b/tests/async_await_macros.rs
@@ -4,7 +4,9 @@
 use futures::sink::SinkExt;
 use futures::stream::StreamExt;
 use futures::task::{Context, Poll};
-use futures::{join, pending, pin_mut, poll, select, select_biased, try_join};
+use futures::{
+    join, pending, pin_mut, poll, select, select_biased, stream, stream_select, try_join,
+};
 use std::mem;
 
 #[test]
@@ -309,6 +311,42 @@
 }
 
 #[test]
+#[allow(unused_assignments)]
+fn stream_select() {
+    // stream_select! macro
+    block_on(async {
+        let endless_ints = |i| stream::iter(vec![i].into_iter().cycle());
+
+        let mut endless_ones = stream_select!(endless_ints(1i32), stream::pending());
+        assert_eq!(endless_ones.next().await, Some(1));
+        assert_eq!(endless_ones.next().await, Some(1));
+
+        let mut finite_list =
+            stream_select!(stream::iter(vec![1].into_iter()), stream::iter(vec![1].into_iter()));
+        assert_eq!(finite_list.next().await, Some(1));
+        assert_eq!(finite_list.next().await, Some(1));
+        assert_eq!(finite_list.next().await, None);
+
+        let endless_mixed = stream_select!(endless_ints(1i32), endless_ints(2), endless_ints(3));
+        // Take 1000, and assert a somewhat even distribution of values.
+        // The fairness is randomized, but over 1000 samples we should be pretty close to even.
+        // This test may be a bit flaky. Feel free to adjust the margins as you see fit.
+        let mut count = 0;
+        let results = endless_mixed
+            .take_while(move |_| {
+                count += 1;
+                let ret = count < 1000;
+                async move { ret }
+            })
+            .collect::<Vec<_>>()
+            .await;
+        assert!(results.iter().filter(|x| **x == 1).count() >= 299);
+        assert!(results.iter().filter(|x| **x == 2).count() >= 299);
+        assert!(results.iter().filter(|x| **x == 3).count() >= 299);
+    });
+}
+
+#[test]
 fn join_size() {
     let fut = async {
         let ready = future::ready(0i32);
diff --git a/tests/auto_traits.rs b/tests/auto_traits.rs
index e0192a1..b3d8b00 100644
--- a/tests/auto_traits.rs
+++ b/tests/auto_traits.rs
@@ -470,6 +470,13 @@
     assert_not_impl!(PollFn<*const ()>: Sync);
     assert_impl!(PollFn<PhantomPinned>: Unpin);
 
+    assert_impl!(PollImmediate<SendStream>: Send);
+    assert_not_impl!(PollImmediate<LocalStream<()>>: Send);
+    assert_impl!(PollImmediate<SyncStream>: Sync);
+    assert_not_impl!(PollImmediate<LocalStream<()>>: Sync);
+    assert_impl!(PollImmediate<UnpinStream>: Unpin);
+    assert_not_impl!(PollImmediate<PinnedStream>: Unpin);
+
     assert_impl!(Ready<()>: Send);
     assert_not_impl!(Ready<*const ()>: Send);
     assert_impl!(Ready<()>: Sync);
@@ -810,6 +817,12 @@
     assert_impl!(Seek<'_, ()>: Unpin);
     assert_not_impl!(Seek<'_, PhantomPinned>: Unpin);
 
+    assert_impl!(SeeKRelative<'_, ()>: Send);
+    assert_not_impl!(SeeKRelative<'_, *const ()>: Send);
+    assert_impl!(SeeKRelative<'_, ()>: Sync);
+    assert_not_impl!(SeeKRelative<'_, *const ()>: Sync);
+    assert_impl!(SeeKRelative<'_, PhantomPinned>: Unpin);
+
     assert_impl!(Sink: Send);
     assert_impl!(Sink: Sync);
     assert_impl!(Sink: Unpin);
@@ -1430,6 +1443,14 @@
     assert_not_impl!(Peek<'_, LocalStream<()>>: Sync);
     assert_impl!(Peek<'_, PinnedStream>: Unpin);
 
+    assert_impl!(PeekMut<'_, SendStream<()>>: Send);
+    assert_not_impl!(PeekMut<'_, SendStream>: Send);
+    assert_not_impl!(PeekMut<'_, LocalStream<()>>: Send);
+    assert_impl!(PeekMut<'_, SyncStream<()>>: Sync);
+    assert_not_impl!(PeekMut<'_, SyncStream>: Sync);
+    assert_not_impl!(PeekMut<'_, LocalStream<()>>: Sync);
+    assert_impl!(PeekMut<'_, PinnedStream>: Unpin);
+
     assert_impl!(Peekable<SendStream<()>>: Send);
     assert_not_impl!(Peekable<SendStream>: Send);
     assert_not_impl!(Peekable<LocalStream>: Send);
@@ -1451,6 +1472,13 @@
     assert_not_impl!(PollFn<*const ()>: Sync);
     assert_impl!(PollFn<PhantomPinned>: Unpin);
 
+    assert_impl!(PollImmediate<SendStream>: Send);
+    assert_not_impl!(PollImmediate<LocalStream<()>>: Send);
+    assert_impl!(PollImmediate<SyncStream>: Sync);
+    assert_not_impl!(PollImmediate<LocalStream<()>>: Sync);
+    assert_impl!(PollImmediate<UnpinStream>: Unpin);
+    assert_not_impl!(PollImmediate<PinnedStream>: Unpin);
+
     assert_impl!(ReadyChunks<SendStream<()>>: Send);
     assert_not_impl!(ReadyChunks<SendStream>: Send);
     assert_not_impl!(ReadyChunks<LocalStream>: Send);
diff --git a/tests/io_buf_reader.rs b/tests/io_buf_reader.rs
index d60df87..717297c 100644
--- a/tests/io_buf_reader.rs
+++ b/tests/io_buf_reader.rs
@@ -2,25 +2,17 @@
 use futures::future::{Future, FutureExt};
 use futures::io::{
     AllowStdIo, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt,
-    BufReader, Cursor, SeekFrom,
+    BufReader, SeekFrom,
 };
+use futures::pin_mut;
 use futures::task::{Context, Poll};
 use futures_test::task::noop_context;
+use pin_project::pin_project;
 use std::cmp;
 use std::io;
 use std::pin::Pin;
 
-macro_rules! run_fill_buf {
-    ($reader:expr) => {{
-        let mut cx = noop_context();
-        loop {
-            if let Poll::Ready(x) = Pin::new(&mut $reader).poll_fill_buf(&mut cx) {
-                break x;
-            }
-        }
-    }};
-}
-
+// helper for maybe_pending_* tests
 fn run<F: Future + Unpin>(mut f: F) -> F::Output {
     let mut cx = noop_context();
     loop {
@@ -30,6 +22,49 @@
     }
 }
 
+// https://github.com/rust-lang/futures-rs/pull/2489#discussion_r697865719
+#[pin_project(!Unpin)]
+struct Cursor<T> {
+    #[pin]
+    inner: futures::io::Cursor<T>,
+}
+
+impl<T> Cursor<T> {
+    fn new(inner: T) -> Self {
+        Self { inner: futures::io::Cursor::new(inner) }
+    }
+}
+
+impl AsyncRead for Cursor<&[u8]> {
+    fn poll_read(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        buf: &mut [u8],
+    ) -> Poll<io::Result<usize>> {
+        self.project().inner.poll_read(cx, buf)
+    }
+}
+
+impl AsyncBufRead for Cursor<&[u8]> {
+    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
+        self.project().inner.poll_fill_buf(cx)
+    }
+
+    fn consume(self: Pin<&mut Self>, amt: usize) {
+        self.project().inner.consume(amt)
+    }
+}
+
+impl AsyncSeek for Cursor<&[u8]> {
+    fn poll_seek(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        pos: SeekFrom,
+    ) -> Poll<io::Result<u64>> {
+        self.project().inner.poll_seek(cx, pos)
+    }
+}
+
 struct MaybePending<'a> {
     inner: &'a [u8],
     ready_read: bool,
@@ -80,54 +115,119 @@
 
 #[test]
 fn test_buffered_reader() {
-    let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
-    let mut reader = BufReader::with_capacity(2, inner);
+    block_on(async {
+        let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
+        let mut reader = BufReader::with_capacity(2, inner);
 
-    let mut buf = [0, 0, 0];
-    let nread = block_on(reader.read(&mut buf));
-    assert_eq!(nread.unwrap(), 3);
-    assert_eq!(buf, [5, 6, 7]);
-    assert_eq!(reader.buffer(), []);
+        let mut buf = [0, 0, 0];
+        let nread = reader.read(&mut buf).await.unwrap();
+        assert_eq!(nread, 3);
+        assert_eq!(buf, [5, 6, 7]);
+        assert_eq!(reader.buffer(), []);
 
-    let mut buf = [0, 0];
-    let nread = block_on(reader.read(&mut buf));
-    assert_eq!(nread.unwrap(), 2);
-    assert_eq!(buf, [0, 1]);
-    assert_eq!(reader.buffer(), []);
+        let mut buf = [0, 0];
+        let nread = reader.read(&mut buf).await.unwrap();
+        assert_eq!(nread, 2);
+        assert_eq!(buf, [0, 1]);
+        assert_eq!(reader.buffer(), []);
 
-    let mut buf = [0];
-    let nread = block_on(reader.read(&mut buf));
-    assert_eq!(nread.unwrap(), 1);
-    assert_eq!(buf, [2]);
-    assert_eq!(reader.buffer(), [3]);
+        let mut buf = [0];
+        let nread = reader.read(&mut buf).await.unwrap();
+        assert_eq!(nread, 1);
+        assert_eq!(buf, [2]);
+        assert_eq!(reader.buffer(), [3]);
 
-    let mut buf = [0, 0, 0];
-    let nread = block_on(reader.read(&mut buf));
-    assert_eq!(nread.unwrap(), 1);
-    assert_eq!(buf, [3, 0, 0]);
-    assert_eq!(reader.buffer(), []);
+        let mut buf = [0, 0, 0];
+        let nread = reader.read(&mut buf).await.unwrap();
+        assert_eq!(nread, 1);
+        assert_eq!(buf, [3, 0, 0]);
+        assert_eq!(reader.buffer(), []);
 
-    let nread = block_on(reader.read(&mut buf));
-    assert_eq!(nread.unwrap(), 1);
-    assert_eq!(buf, [4, 0, 0]);
-    assert_eq!(reader.buffer(), []);
+        let nread = reader.read(&mut buf).await.unwrap();
+        assert_eq!(nread, 1);
+        assert_eq!(buf, [4, 0, 0]);
+        assert_eq!(reader.buffer(), []);
 
-    assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
+        assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
+    });
 }
 
 #[test]
 fn test_buffered_reader_seek() {
-    let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
-    let mut reader = BufReader::with_capacity(2, Cursor::new(inner));
+    block_on(async {
+        let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
+        let reader = BufReader::with_capacity(2, Cursor::new(inner));
+        pin_mut!(reader);
 
-    assert_eq!(block_on(reader.seek(SeekFrom::Start(3))).ok(), Some(3));
-    assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
-    assert_eq!(run(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), None);
-    assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
-    assert_eq!(block_on(reader.seek(SeekFrom::Current(1))).ok(), Some(4));
-    assert_eq!(run_fill_buf!(reader).ok(), Some(&[1, 2][..]));
-    Pin::new(&mut reader).consume(1);
-    assert_eq!(block_on(reader.seek(SeekFrom::Current(-2))).ok(), Some(3));
+        assert_eq!(reader.seek(SeekFrom::Start(3)).await.unwrap(), 3);
+        assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]);
+        assert!(reader.seek(SeekFrom::Current(i64::MIN)).await.is_err());
+        assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]);
+        assert_eq!(reader.seek(SeekFrom::Current(1)).await.unwrap(), 4);
+        assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[1, 2][..]);
+        reader.as_mut().consume(1);
+        assert_eq!(reader.seek(SeekFrom::Current(-2)).await.unwrap(), 3);
+    });
+}
+
+#[test]
+fn test_buffered_reader_seek_relative() {
+    block_on(async {
+        let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
+        let reader = BufReader::with_capacity(2, Cursor::new(inner));
+        pin_mut!(reader);
+
+        assert!(reader.as_mut().seek_relative(3).await.is_ok());
+        assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]);
+        assert!(reader.as_mut().seek_relative(0).await.is_ok());
+        assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]);
+        assert!(reader.as_mut().seek_relative(1).await.is_ok());
+        assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[1][..]);
+        assert!(reader.as_mut().seek_relative(-1).await.is_ok());
+        assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]);
+        assert!(reader.as_mut().seek_relative(2).await.is_ok());
+        assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[2, 3][..]);
+    });
+}
+
+#[test]
+fn test_buffered_reader_invalidated_after_read() {
+    block_on(async {
+        let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
+        let reader = BufReader::with_capacity(3, Cursor::new(inner));
+        pin_mut!(reader);
+
+        assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[5, 6, 7][..]);
+        reader.as_mut().consume(3);
+
+        let mut buffer = [0, 0, 0, 0, 0];
+        assert_eq!(reader.read(&mut buffer).await.unwrap(), 5);
+        assert_eq!(buffer, [0, 1, 2, 3, 4]);
+
+        assert!(reader.as_mut().seek_relative(-2).await.is_ok());
+        let mut buffer = [0, 0];
+        assert_eq!(reader.read(&mut buffer).await.unwrap(), 2);
+        assert_eq!(buffer, [3, 4]);
+    });
+}
+
+#[test]
+fn test_buffered_reader_invalidated_after_seek() {
+    block_on(async {
+        let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
+        let reader = BufReader::with_capacity(3, Cursor::new(inner));
+        pin_mut!(reader);
+
+        assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[5, 6, 7][..]);
+        reader.as_mut().consume(3);
+
+        assert!(reader.seek(SeekFrom::Current(5)).await.is_ok());
+
+        assert!(reader.as_mut().seek_relative(-2).await.is_ok());
+        let mut buffer = [0, 0];
+        assert_eq!(reader.read(&mut buffer).await.unwrap(), 2);
+        assert_eq!(buffer, [3, 4]);
+    });
 }
 
 #[test]
@@ -156,24 +256,27 @@
                     self.pos = self.pos.wrapping_add(n as u64);
                 }
                 SeekFrom::End(n) => {
-                    self.pos = u64::max_value().wrapping_add(n as u64);
+                    self.pos = u64::MAX.wrapping_add(n as u64);
                 }
             }
             Ok(self.pos)
         }
     }
 
-    let mut reader = BufReader::with_capacity(5, AllowStdIo::new(PositionReader { pos: 0 }));
-    assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1, 2, 3, 4][..]));
-    assert_eq!(block_on(reader.seek(SeekFrom::End(-5))).ok(), Some(u64::max_value() - 5));
-    assert_eq!(run_fill_buf!(reader).ok().map(|s| s.len()), Some(5));
-    // the following seek will require two underlying seeks
-    let expected = 9_223_372_036_854_775_802;
-    assert_eq!(block_on(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), Some(expected));
-    assert_eq!(run_fill_buf!(reader).ok().map(|s| s.len()), Some(5));
-    // seeking to 0 should empty the buffer.
-    assert_eq!(block_on(reader.seek(SeekFrom::Current(0))).ok(), Some(expected));
-    assert_eq!(reader.get_ref().get_ref().pos, expected);
+    block_on(async {
+        let reader = BufReader::with_capacity(5, AllowStdIo::new(PositionReader { pos: 0 }));
+        pin_mut!(reader);
+        assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1, 2, 3, 4][..]);
+        assert_eq!(reader.seek(SeekFrom::End(-5)).await.unwrap(), u64::MAX - 5);
+        assert_eq!(reader.as_mut().fill_buf().await.unwrap().len(), 5);
+        // the following seek will require two underlying seeks
+        let expected = 9_223_372_036_854_775_802;
+        assert_eq!(reader.seek(SeekFrom::Current(i64::MIN)).await.unwrap(), expected);
+        assert_eq!(reader.as_mut().fill_buf().await.unwrap().len(), 5);
+        // seeking to 0 should empty the buffer.
+        assert_eq!(reader.seek(SeekFrom::Current(0)).await.unwrap(), expected);
+        assert_eq!(reader.get_ref().get_ref().pos, expected);
+    });
 }
 
 #[test]
@@ -193,16 +296,18 @@
         }
     }
 
-    let inner = ShortReader { lengths: vec![0, 1, 2, 0, 1, 0] };
-    let mut reader = BufReader::new(AllowStdIo::new(inner));
-    let mut buf = [0, 0];
-    assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
-    assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 1);
-    assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 2);
-    assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
-    assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 1);
-    assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
-    assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
+    block_on(async {
+        let inner = ShortReader { lengths: vec![0, 1, 2, 0, 1, 0] };
+        let mut reader = BufReader::new(AllowStdIo::new(inner));
+        let mut buf = [0, 0];
+        assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
+        assert_eq!(reader.read(&mut buf).await.unwrap(), 1);
+        assert_eq!(reader.read(&mut buf).await.unwrap(), 2);
+        assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
+        assert_eq!(reader.read(&mut buf).await.unwrap(), 1);
+        assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
+        assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
+    });
 }
 
 #[test]
@@ -263,7 +368,9 @@
 // https://github.com/rust-lang/futures-rs/pull/1573#discussion_r281162309
 #[test]
 fn maybe_pending_seek() {
+    #[pin_project]
     struct MaybePendingSeek<'a> {
+        #[pin]
         inner: Cursor<&'a [u8]>,
         ready: bool,
     }
@@ -276,25 +383,21 @@
 
     impl AsyncRead for MaybePendingSeek<'_> {
         fn poll_read(
-            mut self: Pin<&mut Self>,
+            self: Pin<&mut Self>,
             cx: &mut Context<'_>,
             buf: &mut [u8],
         ) -> Poll<io::Result<usize>> {
-            Pin::new(&mut self.inner).poll_read(cx, buf)
+            self.project().inner.poll_read(cx, buf)
         }
     }
 
     impl AsyncBufRead for MaybePendingSeek<'_> {
-        fn poll_fill_buf(
-            mut self: Pin<&mut Self>,
-            cx: &mut Context<'_>,
-        ) -> Poll<io::Result<&[u8]>> {
-            let this: *mut Self = &mut *self as *mut _;
-            Pin::new(&mut unsafe { &mut *this }.inner).poll_fill_buf(cx)
+        fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
+            self.project().inner.poll_fill_buf(cx)
         }
 
-        fn consume(mut self: Pin<&mut Self>, amt: usize) {
-            Pin::new(&mut self.inner).consume(amt)
+        fn consume(self: Pin<&mut Self>, amt: usize) {
+            self.project().inner.consume(amt)
         }
     }
 
@@ -305,24 +408,25 @@
             pos: SeekFrom,
         ) -> Poll<io::Result<u64>> {
             if self.ready {
-                self.ready = false;
-                Pin::new(&mut self.inner).poll_seek(cx, pos)
+                *self.as_mut().project().ready = false;
+                self.project().inner.poll_seek(cx, pos)
             } else {
-                self.ready = true;
+                *self.project().ready = true;
                 Poll::Pending
             }
         }
     }
 
     let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
-    let mut reader = BufReader::with_capacity(2, MaybePendingSeek::new(inner));
+    let reader = BufReader::with_capacity(2, MaybePendingSeek::new(inner));
+    pin_mut!(reader);
 
     assert_eq!(run(reader.seek(SeekFrom::Current(3))).ok(), Some(3));
-    assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
-    assert_eq!(run(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), None);
-    assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
+    assert_eq!(run(reader.as_mut().fill_buf()).ok(), Some(&[0, 1][..]));
+    assert_eq!(run(reader.seek(SeekFrom::Current(i64::MIN))).ok(), None);
+    assert_eq!(run(reader.as_mut().fill_buf()).ok(), Some(&[0, 1][..]));
     assert_eq!(run(reader.seek(SeekFrom::Current(1))).ok(), Some(4));
-    assert_eq!(run_fill_buf!(reader).ok(), Some(&[1, 2][..]));
+    assert_eq!(run(reader.as_mut().fill_buf()).ok(), Some(&[1, 2][..]));
     Pin::new(&mut reader).consume(1);
     assert_eq!(run(reader.seek(SeekFrom::Current(-2))).ok(), Some(3));
 }
diff --git a/tests/stream_peekable.rs b/tests/stream_peekable.rs
index 2fa7f3a..153fcc2 100644
--- a/tests/stream_peekable.rs
+++ b/tests/stream_peekable.rs
@@ -9,6 +9,25 @@
         pin_mut!(peekable);
         assert_eq!(peekable.as_mut().peek().await, Some(&1u8));
         assert_eq!(peekable.collect::<Vec<u8>>().await, vec![1, 2, 3]);
+
+        let s = stream::once(async { 1 }).peekable();
+        pin_mut!(s);
+        assert_eq!(s.as_mut().peek().await, Some(&1u8));
+        assert_eq!(s.collect::<Vec<u8>>().await, vec![1]);
+    });
+}
+
+#[test]
+fn peekable_mut() {
+    block_on(async {
+        let s = stream::iter(vec![1u8, 2, 3]).peekable();
+        pin_mut!(s);
+        if let Some(p) = s.as_mut().peek_mut().await {
+            if *p == 1 {
+                *p = 5;
+            }
+        }
+        assert_eq!(s.collect::<Vec<_>>().await, vec![5, 2, 3]);
     });
 }