Mark ab/6881855 as merged

Bug: 172690556
Change-Id: I836982f948d75d6c96947ea342bb19c7a54cbdd0
diff --git a/Android.bp b/Android.bp
index f0705d9..0a32741 100644
--- a/Android.bp
+++ b/Android.bp
@@ -3,7 +3,6 @@
 rust_defaults {
     name: "futures-util_defaults",
     crate_name: "futures_util",
-    // has rustc warnings
     srcs: ["src/lib.rs"],
     test_suites: ["general-tests"],
     auto_gen_config: true,
@@ -56,7 +55,6 @@
 
 rust_library {
     name: "libfutures_util",
-    // has rustc warnings
     host_supported: true,
     crate_name: "futures_util",
     srcs: ["src/lib.rs"],
@@ -98,21 +96,21 @@
 }
 
 // dependent_library ["feature_list"]
-//   futures-channel-0.3.5 "alloc,std"
-//   futures-core-0.3.5 "alloc,std"
-//   futures-io-0.3.5 "std"
-//   futures-macro-0.3.5
-//   futures-sink-0.3.5
-//   futures-task-0.3.5 "alloc,once_cell,std"
+//   futures-channel-0.3.7 "alloc,std"
+//   futures-core-0.3.7 "alloc,std"
+//   futures-io-0.3.7 "std"
+//   futures-macro-0.3.7
+//   futures-sink-0.3.7
+//   futures-task-0.3.7 "alloc,once_cell,std"
 //   memchr-2.3.3 "default,std"
-//   once_cell-1.4.0 "std"
-//   pin-project-0.4.22
-//   pin-project-internal-0.4.22
+//   once_cell-1.4.1 "std"
+//   pin-project-1.0.1
+//   pin-project-internal-1.0.1
 //   pin-utils-0.1.0
-//   proc-macro-hack-0.5.16
+//   proc-macro-hack-0.5.18
 //   proc-macro-nested-0.1.6
-//   proc-macro2-1.0.18 "default,proc-macro"
+//   proc-macro2-1.0.24 "default,proc-macro"
 //   quote-1.0.7 "default,proc-macro"
 //   slab-0.4.2
-//   syn-1.0.34 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote,visit-mut"
+//   syn-1.0.48 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote,visit-mut"
 //   unicode-xid-0.2.1 "default"
diff --git a/Cargo.toml b/Cargo.toml
index d433de5..356f96f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,43 +13,44 @@
 [package]
 edition = "2018"
 name = "futures-util"
-version = "0.3.5"
+version = "0.3.7"
 authors = ["Alex Crichton <alex@alexcrichton.com>"]
 description = "Common utilities and extension traits for the futures-rs library.\n"
 homepage = "https://rust-lang.github.io/futures-rs"
-documentation = "https://docs.rs/futures-util/0.3.5"
+documentation = "https://docs.rs/futures-util/0.3.7"
 license = "MIT OR Apache-2.0"
 repository = "https://github.com/rust-lang/futures-rs"
 [package.metadata.docs.rs]
 all-features = true
+rustdoc-args = ["--cfg", "docsrs"]
 [dependencies.futures-channel]
-version = "0.3.5"
+version = "0.3.7"
 features = ["std"]
 optional = true
 default-features = false
 
 [dependencies.futures-core]
-version = "0.3.5"
+version = "0.3.7"
 default-features = false
 
 [dependencies.futures-io]
-version = "0.3.5"
+version = "0.3.7"
 features = ["std"]
 optional = true
 default-features = false
 
 [dependencies.futures-macro]
-version = "0.3.5"
+version = "=0.3.7"
 optional = true
 default-features = false
 
 [dependencies.futures-sink]
-version = "0.3.5"
+version = "0.3.7"
 optional = true
 default-features = false
 
 [dependencies.futures-task]
-version = "0.3.5"
+version = "0.3.7"
 default-features = false
 
 [dependencies.futures_01]
@@ -62,7 +63,7 @@
 optional = true
 
 [dependencies.pin-project]
-version = "0.4.8"
+version = "1.0.1"
 
 [dependencies.pin-utils]
 version = "0.1.0"
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 891e38f..c768e59 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,12 +1,12 @@
 [package]
 name = "futures-util"
 edition = "2018"
-version = "0.3.5"
+version = "0.3.7"
 authors = ["Alex Crichton <alex@alexcrichton.com>"]
 license = "MIT OR Apache-2.0"
 repository = "https://github.com/rust-lang/futures-rs"
 homepage = "https://rust-lang.github.io/futures-rs"
-documentation = "https://docs.rs/futures-util/0.3.5"
+documentation = "https://docs.rs/futures-util/0.3.7"
 description = """
 Common utilities and extension traits for the futures-rs library.
 """
@@ -33,12 +33,12 @@
 write-all-vectored = ["io"]
 
 [dependencies]
-futures-core = { path = "../futures-core", version = "0.3.5", default-features = false }
-futures-task = { path = "../futures-task", version = "0.3.5", default-features = false }
-futures-channel = { path = "../futures-channel", version = "0.3.5", default-features = false, features = ["std"], optional = true }
-futures-io = { path = "../futures-io", version = "0.3.5", default-features = false, features = ["std"], optional = true }
-futures-sink = { path = "../futures-sink", version = "0.3.5", default-features = false, optional = true }
-futures-macro = { path = "../futures-macro", version = "0.3.5", default-features = false, optional = true }
+futures-core = { path = "../futures-core", version = "0.3.7", default-features = false }
+futures-task = { path = "../futures-task", version = "0.3.7", default-features = false }
+futures-channel = { path = "../futures-channel", version = "0.3.7", default-features = false, features = ["std"], optional = true }
+futures-io = { path = "../futures-io", version = "0.3.7", default-features = false, features = ["std"], optional = true }
+futures-sink = { path = "../futures-sink", version = "0.3.7", default-features = false, optional = true }
+futures-macro = { path = "../futures-macro", version = "=0.3.7", default-features = false, optional = true }
 proc-macro-hack = { version = "0.5.9", optional = true }
 proc-macro-nested = { version = "0.1.2", optional = true }
 slab = { version = "0.4.2", optional = true }
@@ -46,7 +46,8 @@
 futures_01 = { version = "0.1.25", optional = true, package = "futures" }
 tokio-io = { version = "0.1.9", optional = true }
 pin-utils = "0.1.0"
-pin-project = "0.4.8"
+pin-project = "1.0.1"
 
 [package.metadata.docs.rs]
 all-features = true
+rustdoc-args = ["--cfg", "docsrs"]
diff --git a/METADATA b/METADATA
index 7f728a8..179eb51 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@
   }
   url {
     type: ARCHIVE
-    value: "https://static.crates.io/crates/futures-util/futures-util-0.3.5.crate"
+    value: "https://static.crates.io/crates/futures-util/futures-util-0.3.7.crate"
   }
-  version: "0.3.5"
+  version: "0.3.7"
   license_type: NOTICE
   last_upgrade_date {
     year: 2020
-    month: 5
-    day: 8
+    month: 10
+    day: 25
   }
 }
diff --git a/TEST_MAPPING b/TEST_MAPPING
index cd99d5d..7d1919a 100644
--- a/TEST_MAPPING
+++ b/TEST_MAPPING
@@ -2,8 +2,8 @@
 {
   "presubmit": [
     {
-      "name": "futures-util_host_test_src_lib",
-      "host": true
+      "host": true,
+      "name": "futures-util_host_test_src_lib"
     },
     {
       "name": "futures-util_device_test_src_lib"
diff --git a/src/async_await/join_mod.rs b/src/async_await/join_mod.rs
index 909cd3b..4200c08 100644
--- a/src/async_await/join_mod.rs
+++ b/src/async_await/join_mod.rs
@@ -22,8 +22,13 @@
         ///
         /// let a = async { 1 };
         /// let b = async { 2 };
-        ///
         /// assert_eq!(join!(a, b), (1, 2));
+        ///
+        /// // `join!` is variadic, so you can pass any number of futures
+        /// let c = async { 3 };
+        /// let d = async { 4 };
+        /// let e = async { 5 };
+        /// assert_eq!(join!(c, d, e), (3, 4, 5));
         /// # });
         /// ```
         $join
@@ -48,9 +53,14 @@
         /// use futures::try_join;
         ///
         /// let a = async { Ok::<i32, i32>(1) };
-        /// let b = async { Ok::<u64, i32>(2) };
-        ///
+        /// let b = async { Ok::<i32, i32>(2) };
         /// assert_eq!(try_join!(a, b), Ok((1, 2)));
+        ///
+        /// // `try_join!` is variadic, so you can pass any number of futures
+        /// let c = async { Ok::<i32, i32>(3) };
+        /// let d = async { Ok::<i32, i32>(4) };
+        /// let e = async { Ok::<i32, i32>(5) };
+        /// assert_eq!(try_join!(c, d, e), Ok((3, 4, 5)));
         /// # });
         /// ```
         ///
@@ -83,7 +93,7 @@
     #[macro_export]
     macro_rules! join {
         ($($tokens:tt)*) => {{
-            use $crate::__reexport as __futures_crate;
+            use $crate::__private as __futures_crate;
             $crate::join_internal! {
                 $( $tokens )*
             }
@@ -93,7 +103,7 @@
     #[macro_export]
     macro_rules! try_join {
         ($($tokens:tt)*) => {{
-            use $crate::__reexport as __futures_crate;
+            use $crate::__private as __futures_crate;
             $crate::try_join_internal! {
                 $( $tokens )*
             }
diff --git a/src/async_await/mod.rs b/src/async_await/mod.rs
index 69cae13..bdaed95 100644
--- a/src/async_await/mod.rs
+++ b/src/async_await/mod.rs
@@ -3,37 +3,37 @@
 //! This module contains a number of functions and combinators for working
 //! with `async`/`await` code.
 
-use futures_core::future::Future;
-use futures_core::stream::Stream;
-
-#[doc(hidden)]
-pub use futures_core::future::FusedFuture;
-#[doc(hidden)]
-pub use futures_core::stream::FusedStream;
+use futures_core::future::{Future, FusedFuture};
+use futures_core::stream::{Stream, FusedStream};
 
 #[macro_use]
 mod poll;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/64762
 pub use self::poll::*;
 
 #[macro_use]
 mod pending;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/64762
 pub use self::pending::*;
 
 // Primary export is a macro
 #[cfg(feature = "async-await-macro")]
 mod join_mod;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/64762
 #[cfg(feature = "async-await-macro")]
 pub use self::join_mod::*;
 
 // Primary export is a macro
 #[cfg(feature = "async-await-macro")]
 mod select_mod;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/64762
 #[cfg(feature = "async-await-macro")]
 pub use self::select_mod::*;
 
 #[cfg(feature = "std")]
 #[cfg(feature = "async-await-macro")]
 mod random;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/64762
 #[cfg(feature = "std")]
 #[cfg(feature = "async-await-macro")]
 pub use self::random::*;
diff --git a/src/async_await/pending.rs b/src/async_await/pending.rs
index b143869..e0cf341 100644
--- a/src/async_await/pending.rs
+++ b/src/async_await/pending.rs
@@ -15,7 +15,7 @@
 #[macro_export]
 macro_rules! pending {
     () => {
-        $crate::async_await::pending_once().await
+        $crate::__private::async_await::pending_once().await
     }
 }
 
diff --git a/src/async_await/poll.rs b/src/async_await/poll.rs
index dffa94b..ac70a53 100644
--- a/src/async_await/poll.rs
+++ b/src/async_await/poll.rs
@@ -12,7 +12,7 @@
 #[macro_export]
 macro_rules! poll {
     ($x:expr $(,)?) => {
-        $crate::async_await::poll($x).await
+        $crate::__private::async_await::poll($x).await
     }
 }
 
diff --git a/src/async_await/select_mod.rs b/src/async_await/select_mod.rs
index 0471f09..47eca4d 100644
--- a/src/async_await/select_mod.rs
+++ b/src/async_await/select_mod.rs
@@ -14,7 +14,7 @@
         /// (e.g. an `async fn` call) instead of a `Future` by name the `Unpin`
         /// requirement is relaxed, since the macro will pin the resulting `Future`
         /// on the stack. However the `Future` returned by the expression must
-        /// still implement `FusedFuture`. This difference is presented
+        /// still implement `FusedFuture`.
         ///
         /// Futures and streams which are not already fused can be fused using the
         /// `.fuse()` method. Note, though, that fusing a future or stream directly
@@ -84,7 +84,7 @@
         ///     a_res = async_identity_fn(62).fuse() => a_res + 1,
         ///     b_res = async_identity_fn(13).fuse() => b_res,
         /// };
-        /// assert!(res == 63 || res == 12);
+        /// assert!(res == 63 || res == 13);
         /// # });
         /// ```
         ///
@@ -167,7 +167,7 @@
         /// (e.g. an `async fn` call) instead of a `Future` by name the `Unpin`
         /// requirement is relaxed, since the macro will pin the resulting `Future`
         /// on the stack. However the `Future` returned by the expression must
-        /// still implement `FusedFuture`. This difference is presented
+        /// still implement `FusedFuture`.
         ///
         /// Futures and streams which are not already fused can be fused using the
         /// `.fuse()` method. Note, though, that fusing a future or stream directly
@@ -322,7 +322,7 @@
     #[macro_export]
     macro_rules! select {
         ($($tokens:tt)*) => {{
-            use $crate::__reexport as __futures_crate;
+            use $crate::__private as __futures_crate;
             $crate::select_internal! {
                 $( $tokens )*
             }
@@ -332,7 +332,7 @@
     #[macro_export]
     macro_rules! select_biased {
         ($($tokens:tt)*) => {{
-            use $crate::__reexport as __futures_crate;
+            use $crate::__private as __futures_crate;
             $crate::select_biased_internal! {
                 $( $tokens )*
             }
diff --git a/src/compat/compat01as03.rs b/src/compat/compat01as03.rs
index 562a7ad..95025d2 100644
--- a/src/compat/compat01as03.rs
+++ b/src/compat/compat01as03.rs
@@ -15,6 +15,7 @@
 use futures_sink::Sink as Sink03;
 
 #[cfg(feature = "io-compat")]
+#[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
 pub use io::{AsyncRead01CompatExt, AsyncWrite01CompatExt};
 
@@ -115,6 +116,7 @@
 
 /// Extension trait for futures 0.1 [`Sink`](futures_01::sink::Sink)
 #[cfg(feature = "sink")]
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
 pub trait Sink01CompatExt: Sink01 {
     /// Converts a futures 0.1
     /// [`Sink<SinkItem = T, SinkError = E>`](futures_01::sink::Sink)
@@ -180,6 +182,7 @@
 
 /// Converts a futures 0.1 Sink object to a futures 0.3-compatible version
 #[cfg(feature = "sink")]
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
 #[derive(Debug)]
 #[must_use = "sinks do nothing unless polled"]
 pub struct Compat01As03Sink<S, SinkItem> {
@@ -366,6 +369,7 @@
 }
 
 #[cfg(feature = "io-compat")]
+#[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
 mod io {
     use super::*;
     #[cfg(feature = "read-initializer")]
@@ -375,6 +379,7 @@
     use tokio_io::{AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01};
 
     /// Extension trait for tokio-io [`AsyncRead`](tokio_io::AsyncRead)
+    #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
     pub trait AsyncRead01CompatExt: AsyncRead01 {
         /// Converts a tokio-io [`AsyncRead`](tokio_io::AsyncRead) into a futures-io 0.3
         /// [`AsyncRead`](futures_io::AsyncRead).
@@ -403,6 +408,7 @@
     impl<R: AsyncRead01> AsyncRead01CompatExt for R {}
 
     /// Extension trait for tokio-io [`AsyncWrite`](tokio_io::AsyncWrite)
+    #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
     pub trait AsyncWrite01CompatExt: AsyncWrite01 {
         /// Converts a tokio-io [`AsyncWrite`](tokio_io::AsyncWrite) into a futures-io 0.3
         /// [`AsyncWrite`](futures_io::AsyncWrite).
diff --git a/src/compat/compat03as01.rs b/src/compat/compat03as01.rs
index 3fd2ae0..4841c5e 100644
--- a/src/compat/compat03as01.rs
+++ b/src/compat/compat03as01.rs
@@ -40,6 +40,7 @@
 /// Converts a futures 0.3 [`Sink`](futures_sink::Sink) into a futures 0.1
 /// [`Sink`](futures_01::sink::Sink).
 #[cfg(feature = "sink")]
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
 #[derive(Debug)]
 #[must_use = "sinks do nothing unless polled"]
 pub struct CompatSink<T, Item> {
@@ -236,6 +237,7 @@
 }
 
 #[cfg(feature = "io-compat")]
+#[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
 mod io {
     use super::*;
     use futures_io::{AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03};
diff --git a/src/compat/mod.rs b/src/compat/mod.rs
index 1826836..897a50a 100644
--- a/src/compat/mod.rs
+++ b/src/compat/mod.rs
@@ -9,11 +9,14 @@
 mod compat01as03;
 pub use self::compat01as03::{Compat01As03, Future01CompatExt, Stream01CompatExt};
 #[cfg(feature = "sink")]
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
 pub use self::compat01as03::{Compat01As03Sink, Sink01CompatExt};
 #[cfg(feature = "io-compat")]
+#[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
 pub use self::compat01as03::{AsyncRead01CompatExt, AsyncWrite01CompatExt};
 
 mod compat03as01;
 pub use self::compat03as01::Compat;
 #[cfg(feature = "sink")]
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
 pub use self::compat03as01::CompatSink;
diff --git a/src/fns.rs b/src/fns.rs
index 2b4e3c6..6908bff 100644
--- a/src/fns.rs
+++ b/src/fns.rs
@@ -140,6 +140,7 @@
 #[derive(Debug, Copy, Clone, Default)]
 pub struct InspectFn<F>(F);
 
+#[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058
 impl<F, A> FnOnce1<A> for InspectFn<F>
 where
     F: for<'a> FnOnce1<&'a A, Output=()>,
@@ -150,6 +151,7 @@
         arg
     }
 }
+#[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058
 impl<F, A> FnMut1<A> for InspectFn<F>
 where
     F: for<'a> FnMut1<&'a A, Output=()>,
@@ -159,6 +161,7 @@
         arg
     }
 }
+#[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058
 impl<F, A> Fn1<A> for InspectFn<F>
 where
     F: for<'a> Fn1<&'a A, Output=()>,
diff --git a/src/future/either.rs b/src/future/either.rs
index be28829..aa17fa7 100644
--- a/src/future/either.rs
+++ b/src/future/either.rs
@@ -4,11 +4,11 @@
 use futures_core::stream::{FusedStream, Stream};
 #[cfg(feature = "sink")]
 use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Combines two different futures, streams, or sinks having the same associated types into a single
 /// type.
-#[pin_project]
+#[pin_project(project = EitherProj)]
 #[derive(Debug, Clone)]
 pub enum Either<A, B> {
     /// First branch of the type
@@ -58,12 +58,10 @@
 {
     type Output = A::Output;
 
-    #[project]
-    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<A::Output> {
-        #[project]
+    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
         match self.project() {
-            Either::Left(x) => x.poll(cx),
-            Either::Right(x) => x.poll(cx),
+            EitherProj::Left(x) => x.poll(cx),
+            EitherProj::Right(x) => x.poll(cx),
         }
     }
 }
@@ -88,12 +86,10 @@
 {
     type Item = A::Item;
 
-    #[project]
-    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<A::Item>> {
-        #[project]
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
         match self.project() {
-            Either::Left(x) => x.poll_next(cx),
-            Either::Right(x) => x.poll_next(cx),
+            EitherProj::Left(x) => x.poll_next(cx),
+            EitherProj::Right(x) => x.poll_next(cx),
         }
     }
 }
@@ -119,39 +115,31 @@
 {
     type Error = A::Error;
 
-    #[project]
     fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
-        #[project]
         match self.project() {
-            Either::Left(x) => x.poll_ready(cx),
-            Either::Right(x) => x.poll_ready(cx),
+            EitherProj::Left(x) => x.poll_ready(cx),
+            EitherProj::Right(x) => x.poll_ready(cx),
         }
     }
 
-    #[project]
     fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
-        #[project]
         match self.project() {
-            Either::Left(x) => x.start_send(item),
-            Either::Right(x) => x.start_send(item),
+            EitherProj::Left(x) => x.start_send(item),
+            EitherProj::Right(x) => x.start_send(item),
         }
     }
 
-    #[project]
     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
-        #[project]
         match self.project() {
-            Either::Left(x) => x.poll_flush(cx),
-            Either::Right(x) => x.poll_flush(cx),
+            EitherProj::Left(x) => x.poll_flush(cx),
+            EitherProj::Right(x) => x.poll_flush(cx),
         }
     }
 
-    #[project]
     fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
-        #[project]
         match self.project() {
-            Either::Left(x) => x.poll_close(cx),
-            Either::Right(x) => x.poll_close(cx),
+            EitherProj::Left(x) => x.poll_close(cx),
+            EitherProj::Right(x) => x.poll_close(cx),
         }
     }
 }
@@ -182,29 +170,25 @@
             }
         }
 
-        #[project]
         fn poll_read(
             self: Pin<&mut Self>,
             cx: &mut Context<'_>,
             buf: &mut [u8],
         ) -> Poll<Result<usize>> {
-            #[project]
             match self.project() {
-                Either::Left(x) => x.poll_read(cx, buf),
-                Either::Right(x) => x.poll_read(cx, buf),
+                EitherProj::Left(x) => x.poll_read(cx, buf),
+                EitherProj::Right(x) => x.poll_read(cx, buf),
             }
         }
 
-        #[project]
         fn poll_read_vectored(
             self: Pin<&mut Self>,
             cx: &mut Context<'_>,
             bufs: &mut [IoSliceMut<'_>],
         ) -> Poll<Result<usize>> {
-            #[project]
             match self.project() {
-                Either::Left(x) => x.poll_read_vectored(cx, bufs),
-                Either::Right(x) => x.poll_read_vectored(cx, bufs),
+                EitherProj::Left(x) => x.poll_read_vectored(cx, bufs),
+                EitherProj::Right(x) => x.poll_read_vectored(cx, bufs),
             }
         }
     }
@@ -214,47 +198,39 @@
         A: AsyncWrite,
         B: AsyncWrite,
     {
-        #[project]
         fn poll_write(
             self: Pin<&mut Self>,
             cx: &mut Context<'_>,
             buf: &[u8],
         ) -> Poll<Result<usize>> {
-            #[project]
             match self.project() {
-                Either::Left(x) => x.poll_write(cx, buf),
-                Either::Right(x) => x.poll_write(cx, buf),
+                EitherProj::Left(x) => x.poll_write(cx, buf),
+                EitherProj::Right(x) => x.poll_write(cx, buf),
             }
         }
 
-        #[project]
         fn poll_write_vectored(
             self: Pin<&mut Self>,
             cx: &mut Context<'_>,
             bufs: &[IoSlice<'_>],
         ) -> Poll<Result<usize>> {
-            #[project]
             match self.project() {
-                Either::Left(x) => x.poll_write_vectored(cx, bufs),
-                Either::Right(x) => x.poll_write_vectored(cx, bufs),
+                EitherProj::Left(x) => x.poll_write_vectored(cx, bufs),
+                EitherProj::Right(x) => x.poll_write_vectored(cx, bufs),
             }
         }
 
-        #[project]
         fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
-            #[project]
             match self.project() {
-                Either::Left(x) => x.poll_flush(cx),
-                Either::Right(x) => x.poll_flush(cx),
+                EitherProj::Left(x) => x.poll_flush(cx),
+                EitherProj::Right(x) => x.poll_flush(cx),
             }
         }
 
-        #[project]
         fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
-            #[project]
             match self.project() {
-                Either::Left(x) => x.poll_close(cx),
-                Either::Right(x) => x.poll_close(cx),
+                EitherProj::Left(x) => x.poll_close(cx),
+                EitherProj::Right(x) => x.poll_close(cx),
             }
         }
     }
@@ -264,16 +240,14 @@
         A: AsyncSeek,
         B: AsyncSeek,
     {
-        #[project]
         fn poll_seek(
             self: Pin<&mut Self>,
             cx: &mut Context<'_>,
             pos: SeekFrom,
         ) -> Poll<Result<u64>> {
-            #[project]
             match self.project() {
-                Either::Left(x) => x.poll_seek(cx, pos),
-                Either::Right(x) => x.poll_seek(cx, pos),
+                EitherProj::Left(x) => x.poll_seek(cx, pos),
+                EitherProj::Right(x) => x.poll_seek(cx, pos),
             }
         }
     }
@@ -283,24 +257,17 @@
         A: AsyncBufRead,
         B: AsyncBufRead,
     {
-        #[project]
-        fn poll_fill_buf(
-            self: Pin<&mut Self>,
-            cx: &mut Context<'_>,
-        ) -> Poll<Result<&[u8]>> {
-            #[project]
+        fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
             match self.project() {
-                Either::Left(x) => x.poll_fill_buf(cx),
-                Either::Right(x) => x.poll_fill_buf(cx),
+                EitherProj::Left(x) => x.poll_fill_buf(cx),
+                EitherProj::Right(x) => x.poll_fill_buf(cx),
             }
         }
 
-        #[project]
         fn consume(self: Pin<&mut Self>, amt: usize) {
-            #[project]
             match self.project() {
-                Either::Left(x) => x.consume(amt),
-                Either::Right(x) => x.consume(amt),
+                EitherProj::Left(x) => x.consume(amt),
+                EitherProj::Right(x) => x.consume(amt),
             }
         }
     }
diff --git a/src/future/future/flatten.rs b/src/future/future/flatten.rs
index f59464c..53f75e2 100644
--- a/src/future/future/flatten.rs
+++ b/src/future/future/flatten.rs
@@ -4,9 +4,9 @@
 #[cfg(feature = "sink")]
 use futures_sink::Sink;
 use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
-#[pin_project]
+#[pin_project(project = FlattenProj)]
 #[derive(Debug)]
 pub enum Flatten<Fut1, Fut2> {
     First(#[pin] Fut1),
@@ -38,21 +38,19 @@
 {
     type Output = <Fut::Output as Future>::Output;
 
-    #[project]
     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
         Poll::Ready(loop {
-            #[project]
             match self.as_mut().project() {
-                Flatten::First(f) => {
+                FlattenProj::First(f) => {
                     let f = ready!(f.poll(cx));
                     self.set(Flatten::Second(f));
                 },
-                Flatten::Second(f) => {
+                FlattenProj::Second(f) => {
                     let output = ready!(f.poll(cx));
                     self.set(Flatten::Empty);
                     break output;
                 },
-                Flatten::Empty => panic!("Flatten polled after completion"),
+                FlattenProj::Empty => panic!("Flatten polled after completion"),
             }
         })
     }
@@ -76,23 +74,21 @@
 {
     type Item = <Fut::Output as Stream>::Item;
 
-    #[project]
     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
         Poll::Ready(loop {
-            #[project]
             match self.as_mut().project() {
-                Flatten::First(f) => {
+                FlattenProj::First(f) => {
                     let f = ready!(f.poll(cx));
                     self.set(Flatten::Second(f));
                 },
-                Flatten::Second(f) => {
+                FlattenProj::Second(f) => {
                     let output = ready!(f.poll_next(cx));
                     if output.is_none() {
                         self.set(Flatten::Empty);
                     }
                     break output;
                 },
-                Flatten::Empty => break None,
+                FlattenProj::Empty => break None,
             }
         })
     }
@@ -107,54 +103,46 @@
 {
     type Error = <Fut::Output as Sink<Item>>::Error;
 
-    #[project]
     fn poll_ready(
         mut self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Result<(), Self::Error>> {
         Poll::Ready(loop {
-            #[project]
             match self.as_mut().project() {
-                Flatten::First(f) => {
+                FlattenProj::First(f) => {
                     let f = ready!(f.poll(cx));
                     self.set(Flatten::Second(f));
                 },
-                Flatten::Second(f) => {
+                FlattenProj::Second(f) => {
                     break ready!(f.poll_ready(cx));
                 },
-                Flatten::Empty => panic!("poll_ready called after eof"),
+                FlattenProj::Empty => panic!("poll_ready called after eof"),
             }
         })
     }
 
-    #[project]
     fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
-        #[project]
         match self.project() {
-            Flatten::First(_) => panic!("poll_ready not called first"),
-            Flatten::Second(f) => f.start_send(item),
-            Flatten::Empty => panic!("start_send called after eof"),
+            FlattenProj::First(_) => panic!("poll_ready not called first"),
+            FlattenProj::Second(f) => f.start_send(item),
+            FlattenProj::Empty => panic!("start_send called after eof"),
         }
     }
 
-    #[project]
     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
-        #[project]
         match self.project() {
-            Flatten::First(_) => Poll::Ready(Ok(())),
-            Flatten::Second(f) => f.poll_flush(cx),
-            Flatten::Empty => panic!("poll_flush called after eof"),
+            FlattenProj::First(_) => Poll::Ready(Ok(())),
+            FlattenProj::Second(f) => f.poll_flush(cx),
+            FlattenProj::Empty => panic!("poll_flush called after eof"),
         }
     }
 
-    #[project]
     fn poll_close(
         mut self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Result<(), Self::Error>> {
-        #[project]
         let res = match self.as_mut().project() {
-            Flatten::Second(f) => f.poll_close(cx),
+            FlattenProj::Second(f) => f.poll_close(cx),
             _ => Poll::Ready(Ok(())),
         };
         if res.is_ready() {
diff --git a/src/future/future/fuse.rs b/src/future/future/fuse.rs
index 69a8a69..9f2e1ca 100644
--- a/src/future/future/fuse.rs
+++ b/src/future/future/fuse.rs
@@ -38,7 +38,7 @@
     /// sender.unbounded_send(()).unwrap();
     /// drop(sender);
     ///
-    /// // Use `Fuse::termianted()` to create an already-terminated future
+    /// // Use `Fuse::terminated()` to create an already-terminated future
     /// // which may be instantiated later.
     /// let foo_printer = Fuse::terminated();
     /// pin_mut!(foo_printer);
diff --git a/src/future/future/map.rs b/src/future/future/map.rs
index 080f871..8e7f636 100644
--- a/src/future/future/map.rs
+++ b/src/future/future/map.rs
@@ -1,13 +1,12 @@
 use core::pin::Pin;
-use core::ptr;
 use futures_core::future::{FusedFuture, Future};
 use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 use crate::fns::FnOnce1;
 
 /// Internal Map future
-#[pin_project]
+#[pin_project(project = MapProj, project_replace = MapProjOwn)]
 #[derive(Debug)]
 #[must_use = "futures do nothing unless you `.await` or poll them"]
 pub enum Map<Fut, F> {
@@ -19,17 +18,6 @@
     Complete,
 }
 
-// Helper type to mark a `Map` as complete without running its destructor.
-struct UnsafeMarkAsComplete<Fut, F>(*mut Map<Fut, F>);
-
-impl<Fut, F> Drop for UnsafeMarkAsComplete<Fut, F> {
-    fn drop(&mut self) {
-        unsafe {
-            ptr::write(self.0, Map::Complete);
-        }
-    }
-}
-
 impl<Fut, F> Map<Fut, F> {
     /// Creates a new Map.
     pub(crate) fn new(future: Fut, f: F) -> Map<Fut, F> {
@@ -55,35 +43,16 @@
 {
     type Output = T;
 
-    #[project]
     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
-        unsafe {
-            // Store this pointer for later...
-            let self_ptr: *mut Self = self.as_mut().get_unchecked_mut();
-            
-            match &mut *self_ptr {
-                Map::Incomplete { future, f } => {
-                    let mut future = Pin::new_unchecked(future);
-                    let output = match future.as_mut().poll(cx) {
-                        Poll::Ready(x) => x,
-                        Poll::Pending => return Poll::Pending,
-                    };
-    
-                    // Here be dragons
-                    let f = ptr::read(f);
-                    {
-                        // The ordering here is important, the call to `drop_in_place` must be
-                        // last as it may panic. Other lines must not panic.
-                        let _cleanup = UnsafeMarkAsComplete(self_ptr);
-                        ptr::drop_in_place(future.get_unchecked_mut());
-                    };
-
-                    // Phew, everything is back to normal, and we should be in the
-                    // `Complete` state!
-                    Poll::Ready(f.call_once(output))
-                },
-                Map::Complete => panic!("Map must not be polled after it returned `Poll::Ready`"),
-            }
+        match self.as_mut().project() {
+            MapProj::Incomplete { future, .. } => {
+                let output = ready!(future.poll(cx));
+                match self.project_replace(Map::Complete) {
+                    MapProjOwn::Incomplete { f, .. } => Poll::Ready(f.call_once(output)),
+                    MapProjOwn::Complete => unreachable!(),
+                }
+            },
+            MapProj::Complete => panic!("Map must not be polled after it returned `Poll::Ready`"),
         }
     }
 }
diff --git a/src/future/future/mod.rs b/src/future/future/mod.rs
index c3e035b..f5d5dd2 100644
--- a/src/future/future/mod.rs
+++ b/src/future/future/mod.rs
@@ -3,11 +3,14 @@
 //! This module contains a number of functions for working with `Future`s,
 //! including the `FutureExt` trait which adds methods to `Future` types.
 
-use super::{assert_future, Either};
 #[cfg(feature = "alloc")]
 use alloc::boxed::Box;
 use core::pin::Pin;
 
+use crate::future::{assert_future, Either};
+use crate::stream::assert_stream;
+use crate::fns::{inspect_fn, into_fn, ok_fn, InspectFn, IntoFn, OkFn};
+use crate::never::Never;
 #[cfg(feature = "alloc")]
 use futures_core::future::{BoxFuture, LocalBoxFuture};
 use futures_core::{
@@ -15,8 +18,7 @@
     stream::Stream,
     task::{Context, Poll},
 };
-use crate::never::Never;
-use crate::fns::{OkFn, ok_fn, IntoFn, into_fn, InspectFn, inspect_fn};
+use pin_utils::pin_mut;
 
 // Combinators
 
@@ -44,7 +46,7 @@
 pub use fuse::Fuse;
 
 delegate_all!(
-    /// Future for the [`flatten`](super::FutureExt::flatten) method.
+    /// Future for the [`map`](super::FutureExt::map) method.
     Map<Fut, F>(
         map::Map<Fut, F>
     ): Debug + Future + FusedFuture + New[|x: Fut, f: F| map::Map::new(x, f)]
@@ -99,9 +101,11 @@
 pub use self::catch_unwind::CatchUnwind;
 
 #[cfg(feature = "channel")]
+#[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
 #[cfg(feature = "std")]
 mod remote_handle;
 #[cfg(feature = "channel")]
+#[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
 #[cfg(feature = "std")]
 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
 pub use self::remote_handle::{Remote, RemoteHandle};
@@ -220,7 +224,7 @@
         B: Future<Output = Self::Output>,
         Self: Sized,
     {
-        Either::Left(self)
+        assert_future::<Self::Output, _>(Either::Left(self))
     }
 
     /// Wrap this future in an `Either` future, making it the right-hand variant
@@ -250,7 +254,7 @@
         A: Future<Output = Self::Output>,
         Self: Sized,
     {
-        Either::Right(self)
+        assert_future::<Self::Output, _>(Either::Right(self))
     }
 
     /// Convert this future into a single element stream.
@@ -275,7 +279,7 @@
     where
         Self: Sized,
     {
-        IntoStream::new(self)
+        assert_stream::<Self::Output, _>(IntoStream::new(self))
     }
 
     /// Flatten the execution of this future when the output of this
@@ -339,7 +343,7 @@
         Self::Output: Stream,
         Self: Sized,
     {
-        FlattenStream::new(self)
+        assert_stream::<<Self::Output as Stream>::Item, _>(FlattenStream::new(self))
     }
 
     /// Fuse a future such that `poll` will never again be called once it has
@@ -428,7 +432,9 @@
     where
         Self: Sized + ::std::panic::UnwindSafe,
     {
-        CatchUnwind::new(self)
+        assert_future::<Result<Self::Output, Box<dyn std::any::Any + Send>>, _>(CatchUnwind::new(
+            self,
+        ))
     }
 
     /// Create a cloneable handle to this future where all handles will resolve
@@ -482,7 +488,7 @@
         Self: Sized,
         Self::Output: Clone,
     {
-        Shared::new(self)
+        assert_future::<Self::Output, _>(Shared::new(self))
     }
 
     /// Turn this future into a future that yields `()` on completion and sends
@@ -494,6 +500,7 @@
     /// This method is only available when the `std` feature of this
     /// library is activated, and it is activated by default.
     #[cfg(feature = "channel")]
+    #[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
     #[cfg(feature = "std")]
     fn remote_handle(self) -> (Remote<Self>, RemoteHandle<Self::Output>)
     where
@@ -511,7 +518,7 @@
     where
         Self: Sized + Send + 'a,
     {
-        Box::pin(self)
+        assert_future::<Self::Output, _>(Box::pin(self))
     }
 
     /// Wrap the future in a Box, pinning it.
@@ -525,7 +532,7 @@
     where
         Self: Sized + 'a,
     {
-        Box::pin(self)
+        assert_future::<Self::Output, _>(Box::pin(self))
     }
 
     /// Turns a [`Future<Output = T>`](Future) into a
@@ -534,7 +541,7 @@
     where
         Self: Sized,
     {
-        UnitError::new(self)
+        assert_future::<Result<Self::Output, ()>, _>(UnitError::new(self))
     }
 
     /// Turns a [`Future<Output = T>`](Future) into a
@@ -543,7 +550,7 @@
     where
         Self: Sized,
     {
-        NeverError::new(self)
+        assert_future::<Result<Self::Output, Never>, _>(NeverError::new(self))
     }
 
     /// A convenience for calling `Future::poll` on `Unpin` future types.
@@ -585,19 +592,16 @@
     ///
     /// assert_eq!(future_ready.now_or_never().expect("Future not ready"), "foobar");
     /// ```
-    fn now_or_never(mut self) -> Option<Self::Output>
+    fn now_or_never(self) -> Option<Self::Output>
     where
         Self: Sized,
     {
         let noop_waker = crate::task::noop_waker();
         let mut cx = Context::from_waker(&noop_waker);
 
-        // SAFETY: This is safe because this method consumes the future, so `poll` is
-        //         only going to be called once. Thus it doesn't matter to us if the
-        //         future is `Unpin` or not.
-        let pinned = unsafe { Pin::new_unchecked(&mut self) };
-
-        match pinned.poll(&mut cx) {
+        let this = self;
+        pin_mut!(this);
+        match this.poll(&mut cx) {
             Poll::Ready(x) => Some(x),
             _ => None,
         }
diff --git a/src/future/future/remote_handle.rs b/src/future/future/remote_handle.rs
index 9495bec..598f63c 100644
--- a/src/future/future/remote_handle.rs
+++ b/src/future/future/remote_handle.rs
@@ -16,7 +16,7 @@
         },
         thread,
     },
-    pin_project::{pin_project, project},
+    pin_project::pin_project,
 };
 
 /// The handle to a remote future returned by
@@ -37,6 +37,7 @@
 /// will unwind.
 #[must_use = "futures do nothing unless you `.await` or poll them"]
 #[derive(Debug)]
+#[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
 pub struct RemoteHandle<T> {
     rx: Receiver<thread::Result<T>>,
     keep_running: Arc<AtomicBool>,
@@ -72,6 +73,7 @@
 /// Created by [`remote_handle`](crate::future::FutureExt::remote_handle).
 #[pin_project]
 #[must_use = "futures do nothing unless you `.await` or poll them"]
+#[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
 pub struct Remote<Fut: Future> {
     tx: Option<Sender<SendMsg<Fut>>>,
     keep_running: Arc<AtomicBool>,
@@ -90,23 +92,21 @@
 impl<Fut: Future> Future for Remote<Fut> {
     type Output = ();
 
-    #[project]
     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
-        #[project]
-        let Remote { tx, keep_running, future } = self.project();
+        let this = self.project();
 
-        if let Poll::Ready(_) = tx.as_mut().unwrap().poll_canceled(cx) {
-            if !keep_running.load(Ordering::SeqCst) {
+        if let Poll::Ready(_) = this.tx.as_mut().unwrap().poll_canceled(cx) {
+            if !this.keep_running.load(Ordering::SeqCst) {
                 // Cancelled, bail out
                 return Poll::Ready(())
             }
         }
 
-        let output = ready!(future.poll(cx));
+        let output = ready!(this.future.poll(cx));
 
         // if the receiving end has gone away then that's ok, we just ignore the
         // send error here.
-        drop(tx.take().unwrap().send(output));
+        drop(this.tx.take().unwrap().send(output));
         Poll::Ready(())
     }
 }
diff --git a/src/future/join_all.rs b/src/future/join_all.rs
index df62f3a..0c8357c 100644
--- a/src/future/join_all.rs
+++ b/src/future/join_all.rs
@@ -56,8 +56,10 @@
 ///
 /// This is purposefully a very simple API for basic use-cases. In a lot of
 /// cases you will want to use the more powerful
-/// [`FuturesUnordered`][crate::stream::FuturesUnordered] APIs, some
-/// examples of additional functionality that provides:
+/// [`FuturesOrdered`][crate::stream::FuturesOrdered] APIs, or, if order does
+/// not matter, [`FuturesUnordered`][crate::stream::FuturesUnordered].
+///
+/// Some examples for additional functionality provided by these are:
 ///
 ///  * Adding new futures to the set even after it has been started.
 ///
diff --git a/src/future/maybe_done.rs b/src/future/maybe_done.rs
index 71cb6fa..5120a9b 100644
--- a/src/future/maybe_done.rs
+++ b/src/future/maybe_done.rs
@@ -1,15 +1,14 @@
 //! Definition of the MaybeDone combinator
 
-use core::mem;
 use core::pin::Pin;
 use futures_core::future::{FusedFuture, Future};
 use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// A future that may have completed.
 ///
 /// This is created by the [`maybe_done()`] function.
-#[pin_project]
+#[pin_project(project = MaybeDoneProj, project_replace = MaybeDoneProjOwn)]
 #[derive(Debug)]
 pub enum MaybeDone<Fut: Future> {
     /// A not-yet-completed future
@@ -47,12 +46,10 @@
     /// The output of this method will be [`Some`] if and only if the inner
     /// future has been completed and [`take_output`](MaybeDone::take_output)
     /// has not yet been called.
-    #[project]
     #[inline]
     pub fn output_mut(self: Pin<&mut Self>) -> Option<&mut Fut::Output> {
-        #[project]
         match self.project() {
-            MaybeDone::Done(res) => Some(res),
+            MaybeDoneProj::Done(res) => Some(res),
             _ => None,
         }
     }
@@ -61,22 +58,13 @@
     /// towards completion.
     #[inline]
     pub fn take_output(self: Pin<&mut Self>) -> Option<Fut::Output> {
-        // Safety: we return immediately unless we are in the `Done`
-        // state, which does not have any pinning guarantees to uphold.
-        //
-        // Hopefully `pin_project` will support this safely soon:
-        // https://github.com/taiki-e/pin-project/issues/184
-        unsafe {
-            let this = self.get_unchecked_mut();
-            match this {
-                MaybeDone::Done(_) => {},
-                MaybeDone::Future(_) | MaybeDone::Gone => return None,
-            };
-            if let MaybeDone::Done(output) = mem::replace(this, MaybeDone::Gone) {
-                Some(output)
-            } else {
-                unreachable!()
-            }
+        match &*self {
+            MaybeDone::Done(_) => {}
+            MaybeDone::Future(_) | MaybeDone::Gone => return None,
+        }
+        match self.project_replace(MaybeDone::Gone) {
+            MaybeDoneProjOwn::Done(output) => Some(output),
+            _ => unreachable!(),
         }
     }
 }
@@ -93,16 +81,14 @@
 impl<Fut: Future> Future for MaybeDone<Fut> {
     type Output = ();
 
-    #[project]
     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
-        #[project]
         match self.as_mut().project() {
-            MaybeDone::Future(f) => {
+            MaybeDoneProj::Future(f) => {
                 let res = ready!(f.poll(cx));
                 self.set(MaybeDone::Done(res));
-            },
-            MaybeDone::Done(_) => {},
-            MaybeDone::Gone => panic!("MaybeDone polled after value taken"),
+            }
+            MaybeDoneProj::Done(_) => {}
+            MaybeDoneProj::Gone => panic!("MaybeDone polled after value taken"),
         }
         Poll::Ready(())
     }
diff --git a/src/future/mod.rs b/src/future/mod.rs
index 7962894..3f19c19 100644
--- a/src/future/mod.rs
+++ b/src/future/mod.rs
@@ -23,6 +23,7 @@
 pub use self::future::CatchUnwind;
 
 #[cfg(feature = "channel")]
+#[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
 #[cfg(feature = "std")]
 pub use self::future::{Remote, RemoteHandle};
 
@@ -36,6 +37,7 @@
 };
 
 #[cfg(feature = "sink")]
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
 pub use self::try_future::FlattenSink;
 
 // Primitive futures
@@ -107,7 +109,7 @@
 
 // Just a helper function to ensure the futures we're returning all have the
 // right implementations.
-fn assert_future<T, F>(future: F) -> F
+pub(crate) fn assert_future<T, F>(future: F) -> F
 where
     F: Future<Output = T>,
 {
diff --git a/src/future/select.rs b/src/future/select.rs
index 3e0a447..bc24779 100644
--- a/src/future/select.rs
+++ b/src/future/select.rs
@@ -27,6 +27,34 @@
 ///
 /// # Examples
 ///
+/// A simple example
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::future::{self, Either};
+/// use futures::pin_mut;
+///
+/// // These two futures have different types even though their outputs have the same type
+/// let future1 = async { 1 };
+/// let future2 = async { 2 };
+///
+/// // 'select' requires Future + Unpin bounds
+/// pin_mut!(future1);
+/// pin_mut!(future2);
+///
+/// let value = match future::select(future1, future2).await {
+///     Either::Left((value1, _)) => value1, // `value1` is resolved from `future1`
+///                                          // `_` represents `future2`
+///     Either::Right((value2, _)) => value2, // `value2` is resolved from `future2`
+///                                           // `_` represents `future1`
+/// };
+///
+/// assert!(value == 1 || value == 2);
+/// # });
+/// ```
+///
+/// A more complex example
+///
 /// ```
 /// use futures::future::{self, Either, Future, FutureExt};
 ///
diff --git a/src/future/try_future/mod.rs b/src/future/try_future/mod.rs
index bd1ab33..1ce01d2 100644
--- a/src/future/try_future/mod.rs
+++ b/src/future/try_future/mod.rs
@@ -14,13 +14,13 @@
 #[cfg(feature = "sink")]
 use futures_sink::Sink;
 
-use super::assert_future;
-use crate::future::{Map, Inspect};
 use crate::fns::{
-    MapOkFn, map_ok_fn, MapErrFn, map_err_fn, MapOkOrElseFn,
-    map_ok_or_else_fn, IntoFn, UnwrapOrElseFn, unwrap_or_else_fn, InspectOkFn, inspect_ok_fn, InspectErrFn,
-    inspect_err_fn, into_fn
+    inspect_err_fn, inspect_ok_fn, into_fn, map_err_fn, map_ok_fn, map_ok_or_else_fn,
+    unwrap_or_else_fn, InspectErrFn, InspectOkFn, IntoFn, MapErrFn, MapOkFn, MapOkOrElseFn,
+    UnwrapOrElseFn,
 };
+use crate::future::{assert_future, Inspect, Map};
+use crate::stream::assert_stream;
 
 // Combinators
 mod into_future;
@@ -52,6 +52,7 @@
 #[cfg(feature = "sink")]
 delegate_all!(
     /// Sink for the [`flatten_sink`](TryFutureExt::flatten_sink) method.
+    #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
     FlattenSink<Fut, Si>(
         try_flatten::TryFlatten<Fut, Si>
     ): Debug + Sink + Stream + FusedStream + New[|x: Fut| try_flatten::TryFlatten::new(x)]
@@ -166,6 +167,7 @@
     /// take_sink(fut.flatten_sink())
     /// ```
     #[cfg(feature = "sink")]
+    #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
     fn flatten_sink<Item>(self) -> FlattenSink<Self, Self::Ok>
     where
         Self::Ok: Sink<Item, Error = Self::Error>,
@@ -228,7 +230,7 @@
     /// The provided closure `f` will only be called if this future is resolved
     /// to an [`Ok`]. If it resolves to an [`Err`], panics, or is dropped, then
     /// the provided closure will never be invoked.
-    /// 
+    ///
     /// The provided closure `e` will only be called if this future is resolved
     /// to an [`Err`]. If it resolves to an [`Ok`], panics, or is dropped, then
     /// the provided closure will never be invoked.
@@ -245,13 +247,13 @@
     /// let future = async { Ok::<i32, i32>(5) };
     /// let future = future.map_ok_or_else(|x| x * 2, |x| x + 3);
     /// assert_eq!(future.await, 8);
-    /// 
+    ///
     /// let future = async { Err::<i32, i32>(5) };
     /// let future = future.map_ok_or_else(|x| x * 2, |x| x + 3);
     /// assert_eq!(future.await, 10);
     /// # });
     /// ```
-    /// 
+    ///
     fn map_ok_or_else<T, E, F>(self, e: E, f: F) -> MapOkOrElse<Self, F, E>
     where
         F: FnOnce(Self::Ok) -> T,
@@ -532,7 +534,9 @@
         Self::Ok: TryStream<Error = Self::Error>,
         Self: Sized,
     {
-        TryFlattenStream::new(self)
+        assert_stream::<Result<<Self::Ok as TryStream>::Ok, Self::Error>, _>(TryFlattenStream::new(
+            self,
+        ))
     }
 
     /// Unwraps this future's ouput, producing a future with this future's
@@ -568,6 +572,7 @@
     /// Wraps a [`TryFuture`] into a future compatable with libraries using
     /// futures 0.1 future definitons. Requires the `compat` feature to enable.
     #[cfg(feature = "compat")]
+    #[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
     fn compat(self) -> Compat<Self>
     where
         Self: Sized + Unpin,
@@ -600,7 +605,7 @@
     where
         Self: Sized,
     {
-        IntoFuture::new(self)
+        assert_future::<Result<Self::Ok, Self::Error>, _>(IntoFuture::new(self))
     }
 
     /// A convenience method for calling [`TryFuture::try_poll`] on [`Unpin`]
diff --git a/src/future/try_future/try_flatten.rs b/src/future/try_future/try_flatten.rs
index 661d3ad..2bcadc5 100644
--- a/src/future/try_future/try_flatten.rs
+++ b/src/future/try_future/try_flatten.rs
@@ -4,9 +4,9 @@
 #[cfg(feature = "sink")]
 use futures_sink::Sink;
 use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
-#[pin_project]
+#[pin_project(project = TryFlattenProj)]
 #[derive(Debug)]
 pub enum TryFlatten<Fut1, Fut2> {
     First(#[pin] Fut1),
@@ -38,12 +38,10 @@
 {
     type Output = Result<<Fut::Ok as TryFuture>::Ok, Fut::Error>;
 
-    #[project]
     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
         Poll::Ready(loop {
-            #[project]
             match self.as_mut().project() {
-                TryFlatten::First(f) => {
+                TryFlattenProj::First(f) => {
                     match ready!(f.try_poll(cx)) {
                         Ok(f) => self.set(TryFlatten::Second(f)),
                         Err(e) => {
@@ -52,12 +50,12 @@
                         }
                     }
                 },
-                TryFlatten::Second(f) => {
+                TryFlattenProj::Second(f) => {
                     let output = ready!(f.try_poll(cx));
                     self.set(TryFlatten::Empty);
                     break output;
                 },
-                TryFlatten::Empty => panic!("TryFlatten polled after completion"),
+                TryFlattenProj::Empty => panic!("TryFlatten polled after completion"),
             }
         })
     }
@@ -81,12 +79,10 @@
 {
     type Item = Result<<Fut::Ok as TryStream>::Ok, Fut::Error>;
 
-    #[project]
     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
         Poll::Ready(loop {
-            #[project]
             match self.as_mut().project() {
-                TryFlatten::First(f) => {
+                TryFlattenProj::First(f) => {
                     match ready!(f.try_poll(cx)) {
                         Ok(f) => self.set(TryFlatten::Second(f)),
                         Err(e) => {
@@ -95,14 +91,14 @@
                         }
                     }
                 },
-                TryFlatten::Second(f) => {
+                TryFlattenProj::Second(f) => {
                     let output = ready!(f.try_poll_next(cx));
                     if output.is_none() {
                         self.set(TryFlatten::Empty);
                     }
                     break output;
                 },
-                TryFlatten::Empty => break None,
+                TryFlattenProj::Empty => break None,
             }
         })
     }
@@ -117,15 +113,13 @@
 {
     type Error = Fut::Error;
 
-    #[project]
     fn poll_ready(
         mut self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Result<(), Self::Error>> {
         Poll::Ready(loop {
-            #[project]
             match self.as_mut().project() {
-                TryFlatten::First(f) => {
+                TryFlattenProj::First(f) => {
                     match ready!(f.try_poll(cx)) {
                         Ok(f) => self.set(TryFlatten::Second(f)),
                         Err(e) => {
@@ -134,42 +128,36 @@
                         }
                     }
                 },
-                TryFlatten::Second(f) => {
+                TryFlattenProj::Second(f) => {
                     break ready!(f.poll_ready(cx));
                 },
-                TryFlatten::Empty => panic!("poll_ready called after eof"),
+                TryFlattenProj::Empty => panic!("poll_ready called after eof"),
             }
         })
     }
 
-    #[project]
     fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
-        #[project]
         match self.project() {
-            TryFlatten::First(_) => panic!("poll_ready not called first"),
-            TryFlatten::Second(f) => f.start_send(item),
-            TryFlatten::Empty => panic!("start_send called after eof"),
+            TryFlattenProj::First(_) => panic!("poll_ready not called first"),
+            TryFlattenProj::Second(f) => f.start_send(item),
+            TryFlattenProj::Empty => panic!("start_send called after eof"),
         }
     }
 
-    #[project]
     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
-        #[project]
         match self.project() {
-            TryFlatten::First(_) => Poll::Ready(Ok(())),
-            TryFlatten::Second(f) => f.poll_flush(cx),
-            TryFlatten::Empty => panic!("poll_flush called after eof"),
+            TryFlattenProj::First(_) => Poll::Ready(Ok(())),
+            TryFlattenProj::Second(f) => f.poll_flush(cx),
+            TryFlattenProj::Empty => panic!("poll_flush called after eof"),
         }
     }
 
-    #[project]
     fn poll_close(
         mut self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Result<(), Self::Error>> {
-        #[project]
         let res = match self.as_mut().project() {
-            TryFlatten::Second(f) => f.poll_close(cx),
+            TryFlattenProj::Second(f) => f.poll_close(cx),
             _ => Poll::Ready(Ok(())),
         };
         if res.is_ready() {
diff --git a/src/future/try_future/try_flatten_err.rs b/src/future/try_future/try_flatten_err.rs
index fbb413d..480f8c3 100644
--- a/src/future/try_future/try_flatten_err.rs
+++ b/src/future/try_future/try_flatten_err.rs
@@ -1,9 +1,9 @@
 use core::pin::Pin;
 use futures_core::future::{FusedFuture, Future, TryFuture};
 use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
-#[pin_project]
+#[pin_project(project = TryFlattenErrProj)]
 #[derive(Debug)]
 pub enum TryFlattenErr<Fut1, Fut2> {
     First(#[pin] Fut1),
@@ -35,12 +35,10 @@
 {
     type Output = Result<Fut::Ok, <Fut::Error as TryFuture>::Error>;
 
-    #[project]
     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
         Poll::Ready(loop {
-            #[project]
             match self.as_mut().project() {
-                TryFlattenErr::First(f) => {
+                TryFlattenErrProj::First(f) => {
                     match ready!(f.try_poll(cx)) {
                         Err(f) => self.set(TryFlattenErr::Second(f)),
                         Ok(e) => {
@@ -49,12 +47,12 @@
                         }
                     }
                 },
-                TryFlattenErr::Second(f) => {
+                TryFlattenErrProj::Second(f) => {
                     let output = ready!(f.try_poll(cx));
                     self.set(TryFlattenErr::Empty);
                     break output;
                 },
-                TryFlattenErr::Empty => panic!("TryFlattenErr polled after completion"),
+                TryFlattenErrProj::Empty => panic!("TryFlattenErr polled after completion"),
             }
         })
     }
diff --git a/src/future/try_maybe_done.rs b/src/future/try_maybe_done.rs
index a249c5c..b38b038 100644
--- a/src/future/try_maybe_done.rs
+++ b/src/future/try_maybe_done.rs
@@ -1,15 +1,14 @@
 //! Definition of the TryMaybeDone combinator
 
-use core::mem;
 use core::pin::Pin;
 use futures_core::future::{FusedFuture, Future, TryFuture};
 use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// A future that may have completed with an error.
 ///
 /// This is created by the [`try_maybe_done()`] function.
-#[pin_project]
+#[pin_project(project = TryMaybeDoneProj, project_replace = TryMaybeDoneProjOwn)]
 #[derive(Debug)]
 pub enum TryMaybeDone<Fut: TryFuture> {
     /// A not-yet-completed future
@@ -32,12 +31,10 @@
     /// The output of this method will be [`Some`] if and only if the inner
     /// future has completed successfully and [`take_output`](TryMaybeDone::take_output)
     /// has not yet been called.
-    #[project]
     #[inline]
     pub fn output_mut(self: Pin<&mut Self>) -> Option<&mut Fut::Ok> {
-        #[project]
         match self.project() {
-            TryMaybeDone::Done(res) => Some(res),
+            TryMaybeDoneProj::Done(res) => Some(res),
             _ => None,
         }
     }
@@ -46,22 +43,13 @@
     /// towards completion.
     #[inline]
     pub fn take_output(self: Pin<&mut Self>) -> Option<Fut::Ok> {
-        // Safety: we return immediately unless we are in the `Done`
-        // state, which does not have any pinning guarantees to uphold.
-        //
-        // Hopefully `pin_project` will support this safely soon:
-        // https://github.com/taiki-e/pin-project/issues/184
-        unsafe {
-            let this = self.get_unchecked_mut();
-            match this {
-                TryMaybeDone::Done(_) => {},
-                TryMaybeDone::Future(_) | TryMaybeDone::Gone => return None,
-            };
-            if let TryMaybeDone::Done(output) = mem::replace(this, TryMaybeDone::Gone) {
-                Some(output)
-            } else {
-                unreachable!()
-            }
+        match &*self {
+            TryMaybeDone::Done(_) => {},
+            TryMaybeDone::Future(_) | TryMaybeDone::Gone => return None,
+        }
+        match self.project_replace(TryMaybeDone::Gone) {
+            TryMaybeDoneProjOwn::Done(output) => Some(output),
+            _ => unreachable!()
         }
     }
 }
@@ -78,11 +66,9 @@
 impl<Fut: TryFuture> Future for TryMaybeDone<Fut> {
     type Output = Result<(), Fut::Error>;
 
-    #[project]
     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
-        #[project]
         match self.as_mut().project() {
-            TryMaybeDone::Future(f) => {
+            TryMaybeDoneProj::Future(f) => {
                 match ready!(f.try_poll(cx)) {
                     Ok(res) => self.set(TryMaybeDone::Done(res)),
                     Err(e) => {
@@ -91,8 +77,8 @@
                     }
                 }
             },
-            TryMaybeDone::Done(_) => {},
-            TryMaybeDone::Gone => panic!("TryMaybeDone polled after value taken"),
+            TryMaybeDoneProj::Done(_) => {},
+            TryMaybeDoneProj::Gone => panic!("TryMaybeDone polled after value taken"),
         }
         Poll::Ready(Ok(()))
     }
diff --git a/src/io/buf_reader.rs b/src/io/buf_reader.rs
index 94b3d2a..2755667 100644
--- a/src/io/buf_reader.rs
+++ b/src/io/buf_reader.rs
@@ -2,7 +2,7 @@
 #[cfg(feature = "read-initializer")]
 use futures_io::Initializer;
 use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSliceMut, SeekFrom};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 use std::io::{self, Read};
 use std::pin::Pin;
 use std::{cmp, fmt};
@@ -68,13 +68,11 @@
     }
 
     /// Invalidates all data in the internal buffer.
-    #[project]
     #[inline]
     fn discard_buffer(self: Pin<&mut Self>) {
-        #[project]
-        let BufReader { pos, cap, .. } = self.project();
-        *pos = 0;
-        *cap = 0;
+        let this = self.project();
+        *this.pos = 0;
+        *this.cap = 0;
     }
 }
 
@@ -123,24 +121,22 @@
 }
 
 impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
-    #[project]
     fn poll_fill_buf(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<io::Result<&[u8]>> {
-        #[project]
-        let BufReader { inner, buffer, cap, pos } = self.project();
+        let this = self.project();
 
         // If we've reached the end of our internal buffer then we need to fetch
         // some more data from the underlying reader.
         // Branch using `>=` instead of the more correct `==`
         // to tell the compiler that the pos..cap slice is always valid.
-        if *pos >= *cap {
-            debug_assert!(*pos == *cap);
-            *cap = ready!(inner.poll_read(cx, buffer))?;
-            *pos = 0;
+        if *this.pos >= *this.cap {
+            debug_assert!(*this.pos == *this.cap);
+            *this.cap = ready!(this.inner.poll_read(cx, this.buffer))?;
+            *this.pos = 0;
         }
-        Poll::Ready(Ok(&buffer[*pos..*cap]))
+        Poll::Ready(Ok(&this.buffer[*this.pos..*this.cap]))
     }
 
     fn consume(self: Pin<&mut Self>, amt: usize) {
diff --git a/src/io/buf_writer.rs b/src/io/buf_writer.rs
index 66df81f..ed9196d 100644
--- a/src/io/buf_writer.rs
+++ b/src/io/buf_writer.rs
@@ -1,6 +1,6 @@
 use futures_core::task::{Context, Poll};
 use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, SeekFrom};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 use std::fmt;
 use std::io::{self, Write};
 use std::pin::Pin;
@@ -51,15 +51,13 @@
         }
     }
 
-    #[project]
     fn flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
-        #[project]
-        let BufWriter { mut inner, buf, written } = self.project();
+        let mut this = self.project();
 
-        let len = buf.len();
+        let len = this.buf.len();
         let mut ret = Ok(());
-        while *written < len {
-            match ready!(inner.as_mut().poll_write(cx, &buf[*written..])) {
+        while *this.written < len {
+            match ready!(this.inner.as_mut().poll_write(cx, &this.buf[*this.written..])) {
                 Ok(0) => {
                     ret = Err(io::Error::new(
                         io::ErrorKind::WriteZero,
@@ -67,17 +65,17 @@
                     ));
                     break;
                 }
-                Ok(n) => *written += n,
+                Ok(n) => *this.written += n,
                 Err(e) => {
                     ret = Err(e);
                     break;
                 }
             }
         }
-        if *written > 0 {
-            buf.drain(..*written);
+        if *this.written > 0 {
+            this.buf.drain(..*this.written);
         }
-        *written = 0;
+        *this.written = 0;
         Poll::Ready(ret)
     }
 
diff --git a/src/io/chain.rs b/src/io/chain.rs
index 4e85854..336307f 100644
--- a/src/io/chain.rs
+++ b/src/io/chain.rs
@@ -2,7 +2,7 @@
 #[cfg(feature = "read-initializer")]
 use futures_io::Initializer;
 use futures_io::{AsyncBufRead, AsyncRead, IoSliceMut};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 use std::fmt;
 use std::io;
 use std::pin::Pin;
@@ -51,10 +51,8 @@
     /// underlying readers as doing so may corrupt the internal state of this
     /// `Chain`.
     pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut T>, Pin<&mut U>) {
-        unsafe {
-            let Self { first, second, .. } = self.get_unchecked_mut();
-            (Pin::new_unchecked(first), Pin::new_unchecked(second))
-        }
+        let this = self.project();
+        (this.first, this.second)
     }
 
     /// Consumes the `Chain`, returning the wrapped readers.
@@ -82,42 +80,38 @@
     T: AsyncRead,
     U: AsyncRead,
 {
-    #[project]
     fn poll_read(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
         buf: &mut [u8],
     ) -> Poll<io::Result<usize>> {
-        #[project]
-        let Chain { first, second, done_first } = self.project();
+        let this = self.project();
 
-        if !*done_first {
-            match ready!(first.poll_read(cx, buf)?) {
-                0 if !buf.is_empty() => *done_first = true,
+        if !*this.done_first {
+            match ready!(this.first.poll_read(cx, buf)?) {
+                0 if !buf.is_empty() => *this.done_first = true,
                 n => return Poll::Ready(Ok(n)),
             }
         }
-        second.poll_read(cx, buf)
+        this.second.poll_read(cx, buf)
     }
 
-    #[project]
     fn poll_read_vectored(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
         bufs: &mut [IoSliceMut<'_>],
     ) -> Poll<io::Result<usize>> {
-        #[project]
-        let Chain { first, second, done_first } = self.project();
+        let this = self.project();
 
-        if !*done_first {
-            let n = ready!(first.poll_read_vectored(cx, bufs)?);
+        if !*this.done_first {
+            let n = ready!(this.first.poll_read_vectored(cx, bufs)?);
             if n == 0 && bufs.iter().any(|b| !b.is_empty()) {
-                *done_first = true
+                *this.done_first = true
             } else {
                 return Poll::Ready(Ok(n));
             }
         }
-        second.poll_read_vectored(cx, bufs)
+        this.second.poll_read_vectored(cx, bufs)
     }
 
     #[cfg(feature = "read-initializer")]
@@ -136,31 +130,27 @@
     T: AsyncBufRead,
     U: AsyncBufRead,
 {
-    #[project]
     fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
-        #[project]
-        let Chain { first, second, done_first } = self.project();
+        let this = self.project();
 
-        if !*done_first {
-            match ready!(first.poll_fill_buf(cx)?) {
+        if !*this.done_first {
+            match ready!(this.first.poll_fill_buf(cx)?) {
                 buf if buf.is_empty() => {
-                    *done_first = true;
+                    *this.done_first = true;
                 }
                 buf => return Poll::Ready(Ok(buf)),
             }
         }
-        second.poll_fill_buf(cx)
+        this.second.poll_fill_buf(cx)
     }
 
-    #[project]
     fn consume(self: Pin<&mut Self>, amt: usize) {
-        #[project]
-        let Chain { first, second, done_first } = self.project();
+        let this = self.project();
 
-        if !*done_first {
-            first.consume(amt)
+        if !*this.done_first {
+            this.first.consume(amt)
         } else {
-            second.consume(amt)
+            this.second.consume(amt)
         }
     }
 }
diff --git a/src/io/copy_buf.rs b/src/io/copy_buf.rs
index f8bb49e..f47144a 100644
--- a/src/io/copy_buf.rs
+++ b/src/io/copy_buf.rs
@@ -3,7 +3,7 @@
 use futures_io::{AsyncBufRead, AsyncWrite};
 use std::io;
 use std::pin::Pin;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Creates a future which copies all the bytes from one object to another.
 ///
@@ -59,23 +59,21 @@
 {
     type Output = io::Result<u64>;
 
-    #[project]
     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
-        #[project]
-        let CopyBuf { mut reader, mut writer, amt } = self.project();
+        let mut this = self.project();
         loop {
-            let buffer = ready!(reader.as_mut().poll_fill_buf(cx))?;
+            let buffer = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
             if buffer.is_empty() {
-                ready!(Pin::new(&mut writer).poll_flush(cx))?;
-                return Poll::Ready(Ok(*amt));
+                ready!(Pin::new(&mut this.writer).poll_flush(cx))?;
+                return Poll::Ready(Ok(*this.amt));
             }
 
-            let i = ready!(Pin::new(&mut writer).poll_write(cx, buffer))?;
+            let i = ready!(Pin::new(&mut this.writer).poll_write(cx, buffer))?;
             if i == 0 {
                 return Poll::Ready(Err(io::ErrorKind::WriteZero.into()))
             }
-            *amt += i as u64;
-            reader.as_mut().consume(i);
+            *this.amt += i as u64;
+            this.reader.as_mut().consume(i);
         }
     }
 }
diff --git a/src/io/fill_buf.rs b/src/io/fill_buf.rs
new file mode 100644
index 0000000..015547e
--- /dev/null
+++ b/src/io/fill_buf.rs
@@ -0,0 +1,50 @@
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use futures_io::AsyncBufRead;
+use std::io;
+use std::pin::Pin;
+
+/// Future for the [`fill_buf`](super::AsyncBufReadExt::fill_buf) method.
+#[derive(Debug)]
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+pub struct FillBuf<'a, R: ?Sized> {
+    reader: Option<&'a mut R>,
+}
+
+impl<R: ?Sized> Unpin for FillBuf<'_, R> {}
+
+impl<'a, R: AsyncBufRead + ?Sized + Unpin> FillBuf<'a, R> {
+    pub(super) fn new(reader: &'a mut R) -> Self {
+        FillBuf { reader: Some(reader) }
+    }
+}
+
+impl<'a, R> Future for FillBuf<'a, R>
+    where R: AsyncBufRead + ?Sized + Unpin,
+{
+    type Output = io::Result<&'a [u8]>;
+
+    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+        let this = &mut *self;
+        let reader = this.reader.take().expect("Polled FillBuf after completion");
+
+        match Pin::new(&mut *reader).poll_fill_buf(cx) {
+            // With polinius it is possible to remove this inner match and just have the correct
+            // lifetime of the reference inferred based on which branch is taken
+            Poll::Ready(Ok(_)) => match Pin::new(reader).poll_fill_buf(cx) {
+                Poll::Ready(Ok(slice)) => Poll::Ready(Ok(slice)),
+                Poll::Ready(Err(err)) => {
+                    unreachable!("reader indicated readiness but then returned an error: {:?}", err)
+                }
+                Poll::Pending => {
+                    unreachable!("reader indicated readiness but then returned pending")
+                }
+            },
+            Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
+            Poll::Pending => {
+                this.reader = Some(reader);
+                Poll::Pending
+            }
+        }
+    }
+}
diff --git a/src/io/into_sink.rs b/src/io/into_sink.rs
index 0589f3a..082c581 100644
--- a/src/io/into_sink.rs
+++ b/src/io/into_sink.rs
@@ -3,7 +3,7 @@
 use futures_sink::Sink;
 use std::io;
 use std::pin::Pin;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 #[derive(Debug)]
 struct Block<Item> {
@@ -15,6 +15,7 @@
 #[pin_project]
 #[must_use = "sinks do nothing unless polled"]
 #[derive(Debug)]
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
 pub struct IntoSink<W, Item> {
     #[pin]
     writer: W,
@@ -30,26 +31,24 @@
 
     /// If we have an outstanding block in `buffer` attempt to push it into the writer, does _not_
     /// flush the writer after it succeeds in pushing the block into it.
-    #[project]
     fn poll_flush_buffer(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Result<(), io::Error>>
     {
-        #[project]
-        let IntoSink { mut writer, buffer } = self.project();
+        let mut this = self.project();
 
-        if let Some(buffer) = buffer {
+        if let Some(buffer) = this.buffer {
             loop {
                 let bytes = buffer.bytes.as_ref();
-                let written = ready!(writer.as_mut().poll_write(cx, &bytes[buffer.offset..]))?;
+                let written = ready!(this.writer.as_mut().poll_write(cx, &bytes[buffer.offset..]))?;
                 buffer.offset += written;
                 if buffer.offset == bytes.len() {
                     break;
                 }
             }
         }
-        *buffer = None;
+        *this.buffer = None;
         Poll::Ready(Ok(()))
     }
 
diff --git a/src/io/lines.rs b/src/io/lines.rs
index af0b491..90b993d 100644
--- a/src/io/lines.rs
+++ b/src/io/lines.rs
@@ -5,7 +5,7 @@
 use std::mem;
 use std::pin::Pin;
 use super::read_line::read_line_internal;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Stream for the [`lines`](super::AsyncBufReadExt::lines) method.
 
@@ -34,20 +34,18 @@
 impl<R: AsyncBufRead> Stream for Lines<R> {
     type Item = io::Result<String>;
 
-    #[project]
     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
-        #[project]
-        let Lines { reader, buf, bytes, read } = self.project();
-        let n = ready!(read_line_internal(reader, cx, buf, bytes, read))?;
-        if n == 0 && buf.is_empty() {
+        let this = self.project();
+        let n = ready!(read_line_internal(this.reader, cx, this.buf, this.bytes, this.read))?;
+        if n == 0 && this.buf.is_empty() {
             return Poll::Ready(None)
         }
-        if buf.ends_with('\n') {
-            buf.pop();
-            if buf.ends_with('\r') {
-                buf.pop();
+        if this.buf.ends_with('\n') {
+            this.buf.pop();
+            if this.buf.ends_with('\r') {
+                this.buf.pop();
             }
         }
-        Poll::Ready(Some(Ok(mem::replace(buf, String::new()))))
+        Poll::Ready(Some(Ok(mem::replace(this.buf, String::new()))))
     }
 }
diff --git a/src/io/mod.rs b/src/io/mod.rs
index 29b6418..51ee995 100644
--- a/src/io/mod.rs
+++ b/src/io/mod.rs
@@ -10,14 +10,16 @@
 //! library is activated, and it is activated by default.
 
 #[cfg(feature = "io-compat")]
+#[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
 use crate::compat::Compat;
-use std::ptr;
+use std::{ptr, pin::Pin};
 
 pub use futures_io::{
     AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead, Error, ErrorKind,
     IoSlice, IoSliceMut, Result, SeekFrom,
 };
 #[cfg(feature = "read-initializer")]
+#[cfg_attr(docsrs, doc(cfg(feature = "read-initializer")))]
 pub use futures_io::Initializer;
 
 // used by `BufReader` and `BufWriter`
@@ -65,12 +67,17 @@
 mod empty;
 pub use self::empty::{empty, Empty};
 
+mod fill_buf;
+pub use self::fill_buf::FillBuf;
+
 mod flush;
 pub use self::flush::Flush;
 
 #[cfg(feature = "sink")]
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
 mod into_sink;
 #[cfg(feature = "sink")]
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
 pub use self::into_sink::IntoSink;
 
 mod lines;
@@ -124,9 +131,9 @@
 mod write_all;
 pub use self::write_all::WriteAll;
 
-#[cfg(feature = "write_all_vectored")]
+#[cfg(feature = "write-all-vectored")]
 mod write_all_vectored;
-#[cfg(feature = "write_all_vectored")]
+#[cfg(feature = "write-all-vectored")]
 pub use self::write_all_vectored::WriteAllVectored;
 
 /// An extension trait which adds utility methods to `AsyncRead` types.
@@ -372,6 +379,7 @@
     ///
     /// Requires the `io-compat` feature to enable.
     #[cfg(feature = "io-compat")]
+    #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
     fn compat(self) -> Compat<Self>
         where Self: Sized + Unpin,
     {
@@ -493,9 +501,10 @@
     /// ```
     /// # futures::executor::block_on(async {
     /// use futures::io::AsyncWriteExt;
-    /// use std::io::{Cursor, IoSlice};
+    /// use futures_util::io::Cursor;
+    /// use std::io::IoSlice;
     ///
-    /// let mut writer = Cursor::new([0u8; 7]);
+    /// let mut writer = Cursor::new(Vec::new());
     /// let bufs = &mut [
     ///     IoSlice::new(&[1]),
     ///     IoSlice::new(&[2, 3]),
@@ -503,12 +512,12 @@
     /// ];
     ///
     /// writer.write_all_vectored(bufs).await?;
-    /// // Note: the contents of `bufs` is now undefined, see the Notes section.
+    /// // Note: the contents of `bufs` is now unspecified, see the Notes section.
     ///
-    /// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 5, 6, 0]);
+    /// assert_eq!(writer.into_inner(), &[1, 2, 3, 4, 5, 6]);
     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
     /// ```
-    #[cfg(feature = "write_all_vectored")]
+    #[cfg(feature = "write-all-vectored")]
     fn write_all_vectored<'a>(
         &'a mut self,
         bufs: &'a mut [IoSlice<'a>],
@@ -523,6 +532,7 @@
     /// used as a futures 0.1 / tokio-io 0.1 `AsyncWrite`.
     /// Requires the `io-compat` feature to enable.
     #[cfg(feature = "io-compat")]
+    #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
     fn compat_write(self) -> Compat<Self>
         where Self: Sized + Unpin,
     {
@@ -556,6 +566,7 @@
     /// # Ok::<(), Box<dyn std::error::Error>>(())
     /// ```
     #[cfg(feature = "sink")]
+    #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
     fn into_sink<Item: AsRef<[u8]>>(self) -> IntoSink<Self, Item>
         where Self: Sized,
     {
@@ -583,6 +594,58 @@
 
 /// An extension trait which adds utility methods to `AsyncBufRead` types.
 pub trait AsyncBufReadExt: AsyncBufRead {
+    /// Creates a future which will wait for a non-empty buffer to be available from this I/O
+    /// object or EOF to be reached.
+    ///
+    /// This method is the async equivalent to [`BufRead::fill_buf`](std::io::BufRead::fill_buf).
+    ///
+    /// ```rust
+    /// # futures::executor::block_on(async {
+    /// use futures::{io::AsyncBufReadExt as _, stream::{iter, TryStreamExt as _}};
+    ///
+    /// let mut stream = iter(vec![Ok(vec![1, 2, 3]), Ok(vec![4, 5, 6])]).into_async_read();
+    ///
+    /// assert_eq!(stream.fill_buf().await?, vec![1, 2, 3]);
+    /// stream.consume_unpin(2);
+    ///
+    /// assert_eq!(stream.fill_buf().await?, vec![3]);
+    /// stream.consume_unpin(1);
+    ///
+    /// assert_eq!(stream.fill_buf().await?, vec![4, 5, 6]);
+    /// stream.consume_unpin(3);
+    ///
+    /// assert_eq!(stream.fill_buf().await?, vec![]);
+    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+    /// ```
+    fn fill_buf(&mut self) -> FillBuf<'_, Self>
+        where Self: Unpin,
+    {
+        FillBuf::new(self)
+    }
+
+    /// A convenience for calling [`AsyncBufRead::consume`] on [`Unpin`] IO types.
+    ///
+    /// ```rust
+    /// # futures::executor::block_on(async {
+    /// use futures::{io::AsyncBufReadExt as _, stream::{iter, TryStreamExt as _}};
+    ///
+    /// let mut stream = iter(vec![Ok(vec![1, 2, 3])]).into_async_read();
+    ///
+    /// assert_eq!(stream.fill_buf().await?, vec![1, 2, 3]);
+    /// stream.consume_unpin(2);
+    ///
+    /// assert_eq!(stream.fill_buf().await?, vec![3]);
+    /// stream.consume_unpin(1);
+    ///
+    /// assert_eq!(stream.fill_buf().await?, vec![]);
+    /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
+    /// ```
+    fn consume_unpin(&mut self, amt: usize)
+        where Self: Unpin,
+    {
+        Pin::new(self).consume(amt)
+    }
+
     /// Creates a future which will read all the bytes associated with this I/O
     /// object into `buf` until the delimiter `byte` or EOF is reached.
     /// This method is the async equivalent to [`BufRead::read_until`](std::io::BufRead::read_until).
diff --git a/src/io/take.rs b/src/io/take.rs
index 39088b7..6179486 100644
--- a/src/io/take.rs
+++ b/src/io/take.rs
@@ -2,7 +2,7 @@
 #[cfg(feature = "read-initializer")]
 use futures_io::Initializer;
 use futures_io::{AsyncRead, AsyncBufRead};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 use std::{cmp, io};
 use std::pin::Pin;
 
@@ -83,22 +83,20 @@
 }
 
 impl<R: AsyncRead> AsyncRead for Take<R> {
-    #[project]
     fn poll_read(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
         buf: &mut [u8],
     ) -> Poll<Result<usize, io::Error>> {
-        #[project]
-        let Take { inner, limit_ } = self.project();
+        let this = self.project();
 
-        if *limit_ == 0 {
+        if *this.limit_ == 0 {
             return Poll::Ready(Ok(0));
         }
 
-        let max = std::cmp::min(buf.len() as u64, *limit_) as usize;
-        let n = ready!(inner.poll_read(cx, &mut buf[..max]))?;
-        *limit_ -= n as u64;
+        let max = cmp::min(buf.len() as u64, *this.limit_) as usize;
+        let n = ready!(this.inner.poll_read(cx, &mut buf[..max]))?;
+        *this.limit_ -= n as u64;
         Poll::Ready(Ok(n))
     }
 
@@ -109,29 +107,25 @@
 }
 
 impl<R: AsyncBufRead> AsyncBufRead for Take<R> {
-    #[project]
     fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
-        #[project]
-        let Take { inner, limit_ } = self.project();
+        let this = self.project();
 
         // Don't call into inner reader at all at EOF because it may still block
-        if *limit_ == 0 {
+        if *this.limit_ == 0 {
             return Poll::Ready(Ok(&[]));
         }
 
-        let buf = ready!(inner.poll_fill_buf(cx)?);
-        let cap = cmp::min(buf.len() as u64, *limit_) as usize;
+        let buf = ready!(this.inner.poll_fill_buf(cx)?);
+        let cap = cmp::min(buf.len() as u64, *this.limit_) as usize;
         Poll::Ready(Ok(&buf[..cap]))
     }
 
-    #[project]
     fn consume(self: Pin<&mut Self>, amt: usize) {
-        #[project]
-        let Take { inner, limit_ } = self.project();
+        let this = self.project();
 
         // Don't let callers reset the limit by passing an overlarge value
-        let amt = cmp::min(amt as u64, *limit_) as usize;
-        *limit_ -= amt as u64;
-        inner.consume(amt);
+        let amt = cmp::min(amt as u64, *this.limit_) as usize;
+        *this.limit_ -= amt as u64;
+        this.inner.consume(amt);
     }
 }
diff --git a/src/io/write_all_vectored.rs b/src/io/write_all_vectored.rs
index fbe7e73..ec28798 100644
--- a/src/io/write_all_vectored.rs
+++ b/src/io/write_all_vectored.rs
@@ -19,7 +19,7 @@
 
 impl<'a, W: AsyncWrite + ?Sized + Unpin> WriteAllVectored<'a, W> {
     pub(super) fn new(writer: &'a mut W, bufs: &'a mut [IoSlice<'a>]) -> Self {
-        WriteAllVectored { writer, bufs }
+        WriteAllVectored { writer, bufs: IoSlice::advance(bufs, 0) }
     }
 }
 
@@ -171,6 +171,7 @@
         #[rustfmt::skip] // Becomes unreadable otherwise.
         let tests: Vec<(_, &'static [u8])> = vec![
             (vec![], &[]),
+            (vec![IoSlice::new(&[]), IoSlice::new(&[])], &[]),
             (vec![IoSlice::new(&[1])], &[1]),
             (vec![IoSlice::new(&[1, 2])], &[1, 2]),
             (vec![IoSlice::new(&[1, 2, 3])], &[1, 2, 3]),
diff --git a/src/lib.rs b/src/lib.rs
index 68b954a..1cf165c 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -7,16 +7,20 @@
 
 #![cfg_attr(not(feature = "std"), no_std)]
 #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)]
+// It cannot be included in the published code because this lints have false positives in the minimum required version.
+#![cfg_attr(test, warn(single_use_lifetimes))]
 #![warn(clippy::all)]
 
-// The solution for this lint is not available on 1.39 which is the current minimum supported version.
-// Can be removed as of minimum supported 1.40 or if https://github.com/rust-lang/rust-clippy/issues/3941
+// mem::take requires Rust 1.40, matches! requires Rust 1.42
+// Can be removed if the minimum supported version increased or if https://github.com/rust-lang/rust-clippy/issues/3941
 // get's implemented.
-#![allow(clippy::mem_replace_with_default)]
+#![allow(clippy::mem_replace_with_default, clippy::match_like_matches_macro)]
 
 #![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))]
 
-#![doc(html_root_url = "https://docs.rs/futures-util/0.3.5")]
+#![doc(html_root_url = "https://docs.rs/futures-util/0.3.7")]
+
+#![cfg_attr(docsrs, feature(doc_cfg))]
 
 #[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))]
 compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feature as an explicit opt-in to unstable features");
@@ -37,25 +41,27 @@
 pub use futures_core::ready;
 pub use pin_utils::pin_mut;
 
-// Not public API.
 #[cfg(feature = "async-await")]
 #[macro_use]
-#[doc(hidden)]
-pub mod async_await;
+mod async_await;
 #[cfg(feature = "async-await")]
 #[doc(hidden)]
 pub use self::async_await::*;
 
 // Not public API.
-#[doc(hidden)]
-pub use futures_core::core_reexport;
-
-// Not public API.
 #[cfg(feature = "async-await")]
 #[doc(hidden)]
-pub mod __reexport {
-    #[doc(hidden)]
+pub mod __private {
     pub use crate::*;
+    pub use core::{
+        option::Option::{self, Some, None},
+        pin::Pin,
+        result::Result::{Err, Ok},
+    };
+
+    pub mod async_await {
+        pub use crate::async_await::*;
+    }
 }
 
 macro_rules! cfg_target_has_atomic {
@@ -70,8 +76,8 @@
     ($field:ident, $item:ty) => {
         fn poll_ready(
             self: core::pin::Pin<&mut Self>,
-            cx: &mut $crate::core_reexport::task::Context<'_>,
-        ) -> $crate::core_reexport::task::Poll<Result<(), Self::Error>> {
+            cx: &mut core::task::Context<'_>,
+        ) -> core::task::Poll<Result<(), Self::Error>> {
             self.project().$field.poll_ready(cx)
         }
 
@@ -84,15 +90,15 @@
 
         fn poll_flush(
             self: core::pin::Pin<&mut Self>,
-            cx: &mut $crate::core_reexport::task::Context<'_>,
-        ) -> $crate::core_reexport::task::Poll<Result<(), Self::Error>> {
+            cx: &mut core::task::Context<'_>,
+        ) -> core::task::Poll<Result<(), Self::Error>> {
             self.project().$field.poll_flush(cx)
         }
 
         fn poll_close(
             self: core::pin::Pin<&mut Self>,
-            cx: &mut $crate::core_reexport::task::Context<'_>,
-        ) -> $crate::core_reexport::task::Poll<Result<(), Self::Error>> {
+            cx: &mut core::task::Context<'_>,
+        ) -> core::task::Poll<Result<(), Self::Error>> {
             self.project().$field.poll_close(cx)
         }
     }
@@ -102,8 +108,8 @@
     ($field:ident) => {
         fn poll(
             self: core::pin::Pin<&mut Self>,
-            cx: &mut $crate::core_reexport::task::Context<'_>,
-        ) -> $crate::core_reexport::task::Poll<Self::Output> {
+            cx: &mut core::task::Context<'_>,
+        ) -> core::task::Poll<Self::Output> {
             self.project().$field.poll(cx)
         }
     }
@@ -113,8 +119,8 @@
     ($field:ident) => {
         fn poll_next(
             self: core::pin::Pin<&mut Self>,
-            cx: &mut $crate::core_reexport::task::Context<'_>,
-        ) -> $crate::core_reexport::task::Poll<Option<Self::Item>> {
+            cx: &mut core::task::Context<'_>,
+        ) -> core::task::Poll<Option<Self::Item>> {
             self.project().$field.poll_next(cx)
         }
         fn size_hint(&self) -> (usize, Option<usize>) {
@@ -183,7 +189,7 @@
         ) -> core::task::Poll<std::io::Result<&[u8]>> {
             self.project().$field.poll_fill_buf(cx)
         }
-    
+
         fn consume(self: core::pin::Pin<&mut Self>, amt: usize) {
             self.project().$field.consume(amt)
         }
@@ -308,6 +314,7 @@
 #[doc(hidden)] pub use crate::stream::{StreamExt, TryStreamExt};
 
 #[cfg(feature = "sink")]
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
 pub mod sink;
 #[cfg(feature = "sink")]
 #[doc(hidden)] pub use crate::sink::SinkExt;
@@ -317,9 +324,11 @@
 pub mod never;
 
 #[cfg(feature = "compat")]
+#[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
 pub mod compat;
 
 #[cfg(feature = "io")]
+#[cfg_attr(docsrs, doc(cfg(feature = "io")))]
 #[cfg(feature = "std")]
 pub mod io;
 #[cfg(feature = "io")]
diff --git a/src/lock/bilock.rs b/src/lock/bilock.rs
index cccd0be..3698406 100644
--- a/src/lock/bilock.rs
+++ b/src/lock/bilock.rs
@@ -34,6 +34,7 @@
 /// This type is only available when the `bilock` feature of this
 /// library is activated.
 #[derive(Debug)]
+#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
 pub struct BiLock<T> {
     arc: Arc<Inner<T>>,
 }
@@ -142,6 +143,7 @@
     ///
     /// Note that the returned future will never resolve to an error.
     #[cfg(feature = "bilock")]
+    #[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
     pub fn lock(&self) -> BiLockAcquire<'_, T> {
         BiLockAcquire {
             bilock: self,
@@ -198,6 +200,7 @@
 
 /// Error indicating two `BiLock<T>`s were not two halves of a whole, and
 /// thus could not be `reunite`d.
+#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
 pub struct ReuniteError<T>(pub BiLock<T>, pub BiLock<T>);
 
 impl<T> fmt::Debug for ReuniteError<T> {
@@ -223,6 +226,7 @@
 /// implementing `Deref` and `DerefMut` to `T`. When dropped, the lock will be
 /// unlocked.
 #[derive(Debug)]
+#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
 pub struct BiLockGuard<'a, T> {
     bilock: &'a BiLock<T>,
 }
@@ -258,6 +262,7 @@
 /// Future returned by `BiLock::lock` which will resolve when the lock is
 /// acquired.
 #[cfg(feature = "bilock")]
+#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
 #[must_use = "futures do nothing unless you `.await` or poll them"]
 #[derive(Debug)]
 pub struct BiLockAcquire<'a, T> {
diff --git a/src/lock/mod.rs b/src/lock/mod.rs
index 3db5e5b..b252613 100644
--- a/src/lock/mod.rs
+++ b/src/lock/mod.rs
@@ -9,9 +9,11 @@
 pub use self::mutex::{MappedMutexGuard, Mutex, MutexLockFuture, MutexGuard};
 
 #[cfg(any(feature = "bilock", feature = "sink", feature = "io"))]
+#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
 #[cfg_attr(not(feature = "bilock"), allow(unreachable_pub))]
 mod bilock;
 #[cfg(feature = "bilock")]
+#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
 pub use self::bilock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError};
 #[cfg(any(feature = "sink", feature = "io"))]
 #[cfg(not(feature = "bilock"))]
diff --git a/src/lock/mutex.rs b/src/lock/mutex.rs
index 96549ea..84aeeda 100644
--- a/src/lock/mutex.rs
+++ b/src/lock/mutex.rs
@@ -3,15 +3,16 @@
 use slab::Slab;
 use std::{fmt, mem};
 use std::cell::UnsafeCell;
+use std::marker::PhantomData;
 use std::ops::{Deref, DerefMut};
 use std::pin::Pin;
 use std::sync::Mutex as StdMutex;
 use std::sync::atomic::{AtomicUsize, Ordering};
 
 /// A futures-aware mutex.
-/// 
+///
 /// # Fairness
-/// 
+///
 /// This mutex provides no fairness guarantees. Tasks may not acquire the mutex
 /// in the order that they requested the lock, and it's possible for a single task
 /// which repeatedly takes the lock to starve other tasks, which may be left waiting
@@ -288,7 +289,7 @@
         // Don't run the `drop` method for MutexGuard. The ownership of the underlying
         // locked state is being moved to the returned MappedMutexGuard.
         mem::forget(this);
-        MappedMutexGuard { mutex, value }
+        MappedMutexGuard { mutex, value, _marker: PhantomData }
     }
 }
 
@@ -325,6 +326,7 @@
 pub struct MappedMutexGuard<'a, T: ?Sized, U: ?Sized> {
     mutex: &'a Mutex<T>,
     value: *mut U,
+    _marker: PhantomData<&'a mut U>,
 }
 
 impl<'a, T: ?Sized, U: ?Sized> MappedMutexGuard<'a, T, U> {
@@ -354,7 +356,7 @@
         // Don't run the `drop` method for MappedMutexGuard. The ownership of the underlying
         // locked state is being moved to the returned MappedMutexGuard.
         mem::forget(this);
-        MappedMutexGuard { mutex, value }
+        MappedMutexGuard { mutex, value, _marker: PhantomData }
     }
 }
 
@@ -401,8 +403,8 @@
 // lock is essentially spinlock-equivalent (attempt to flip an atomic bool)
 unsafe impl<T: ?Sized + Send> Send for MutexGuard<'_, T> {}
 unsafe impl<T: ?Sized + Sync> Sync for MutexGuard<'_, T> {}
-unsafe impl<T: ?Sized + Send, U: ?Sized> Send for MappedMutexGuard<'_, T, U> {}
-unsafe impl<T: ?Sized + Sync, U: ?Sized> Sync for MappedMutexGuard<'_, T, U> {}
+unsafe impl<T: ?Sized + Send, U: ?Sized + Send> Send for MappedMutexGuard<'_, T, U> {}
+unsafe impl<T: ?Sized + Sync, U: ?Sized + Sync> Sync for MappedMutexGuard<'_, T, U> {}
 
 #[test]
 fn test_mutex_guard_debug_not_recurse() {
diff --git a/src/sink/buffer.rs b/src/sink/buffer.rs
index c3df3b9..8176abd 100644
--- a/src/sink/buffer.rs
+++ b/src/sink/buffer.rs
@@ -1,7 +1,7 @@
 use futures_core::stream::{Stream, FusedStream};
 use futures_core::task::{Context, Poll};
 use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 use core::pin::Pin;
 use alloc::collections::VecDeque;
 
@@ -29,18 +29,16 @@
 
     delegate_access_inner!(sink, Si, ());
 
-    #[project]
     fn try_empty_buffer(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Result<(), Si::Error>> {
-        #[project]
-        let Buffer { mut sink, buf, .. } = self.project();
-        ready!(sink.as_mut().poll_ready(cx))?;
-        while let Some(item) = buf.pop_front() {
-            sink.as_mut().start_send(item)?;
-            if !buf.is_empty() {
-                ready!(sink.as_mut().poll_ready(cx))?;
+        let mut this = self.project();
+        ready!(this.sink.as_mut().poll_ready(cx))?;
+        while let Some(item) = this.buf.pop_front() {
+            this.sink.as_mut().start_send(item)?;
+            if !this.buf.is_empty() {
+                ready!(this.sink.as_mut().poll_ready(cx))?;
             }
         }
         Poll::Ready(Ok(()))
diff --git a/src/sink/err_into.rs b/src/sink/err_into.rs
index 530e1cc..b23ada7 100644
--- a/src/sink/err_into.rs
+++ b/src/sink/err_into.rs
@@ -1,7 +1,7 @@
 use crate::sink::{SinkExt, SinkMapErr};
 use futures_core::stream::{Stream, FusedStream};
 use futures_sink::{Sink};
-use pin_project::{pin_project};
+use pin_project::pin_project;
 
 /// Sink for the [`sink_err_into`](super::SinkExt::sink_err_into) method.
 #[pin_project]
diff --git a/src/sink/fanout.rs b/src/sink/fanout.rs
index 7066e21..d71d793 100644
--- a/src/sink/fanout.rs
+++ b/src/sink/fanout.rs
@@ -2,7 +2,7 @@
 use core::pin::Pin;
 use futures_core::task::{Context, Poll};
 use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Sink that clones incoming items and forwards them to two sinks at the same time.
 ///
@@ -33,11 +33,9 @@
     }
 
     /// Get a pinned mutable reference to the inner sinks.
-    #[project]
     pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut Si1>, Pin<&mut Si2>) {
-        #[project]
-        let Fanout { sink1, sink2 } = self.project();
-        (sink1, sink2)
+        let this = self.project();
+        (this.sink1, this.sink2)
     }
 
     /// Consumes this combinator, returning the underlying sinks.
@@ -65,57 +63,49 @@
 {
     type Error = Si1::Error;
 
-    #[project]
     fn poll_ready(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Result<(), Self::Error>> {
-        #[project]
-        let Fanout { sink1, sink2 } = self.project();
+        let this = self.project();
 
-        let sink1_ready = sink1.poll_ready(cx)?.is_ready();
-        let sink2_ready = sink2.poll_ready(cx)?.is_ready();
+        let sink1_ready = this.sink1.poll_ready(cx)?.is_ready();
+        let sink2_ready = this.sink2.poll_ready(cx)?.is_ready();
         let ready = sink1_ready && sink2_ready;
         if ready { Poll::Ready(Ok(())) } else { Poll::Pending }
     }
 
-    #[project]
     fn start_send(
         self: Pin<&mut Self>,
         item: Item,
     ) -> Result<(), Self::Error> {
-        #[project]
-        let Fanout { sink1, sink2 } = self.project();
+        let this = self.project();
 
-        sink1.start_send(item.clone())?;
-        sink2.start_send(item)?;
+        this.sink1.start_send(item.clone())?;
+        this.sink2.start_send(item)?;
         Ok(())
     }
 
-    #[project]
     fn poll_flush(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Result<(), Self::Error>> {
-        #[project]
-        let Fanout { sink1, sink2 } = self.project();
+        let this = self.project();
 
-        let sink1_ready = sink1.poll_flush(cx)?.is_ready();
-        let sink2_ready = sink2.poll_flush(cx)?.is_ready();
+        let sink1_ready = this.sink1.poll_flush(cx)?.is_ready();
+        let sink2_ready = this.sink2.poll_flush(cx)?.is_ready();
         let ready = sink1_ready && sink2_ready;
         if ready { Poll::Ready(Ok(())) } else { Poll::Pending }
     }
 
-    #[project]
     fn poll_close(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Result<(), Self::Error>> {
-        #[project]
-        let Fanout { sink1, sink2 } = self.project();
+        let this = self.project();
 
-        let sink1_ready = sink1.poll_close(cx)?.is_ready();
-        let sink2_ready = sink2.poll_close(cx)?.is_ready();
+        let sink1_ready = this.sink1.poll_close(cx)?.is_ready();
+        let sink2_ready = this.sink2.poll_close(cx)?.is_ready();
         let ready = sink1_ready && sink2_ready;
         if ready { Poll::Ready(Ok(())) } else { Poll::Pending }
     }
diff --git a/src/sink/mod.rs b/src/sink/mod.rs
index ebdb999..b0e2c83 100644
--- a/src/sink/mod.rs
+++ b/src/sink/mod.rs
@@ -258,6 +258,7 @@
     /// Wraps a [`Sink`] into a sink compatible with libraries using
     /// futures 0.1 `Sink`. Requires the `compat` feature to be enabled.
     #[cfg(feature = "compat")]
+    #[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
     fn compat(self) -> CompatSink<Self, Item>
         where Self: Sized + Unpin,
     {
diff --git a/src/sink/with.rs b/src/sink/with.rs
index 802123f..6329a0c 100644
--- a/src/sink/with.rs
+++ b/src/sink/with.rs
@@ -5,7 +5,7 @@
 use futures_core::stream::Stream;
 use futures_core::task::{Context, Poll};
 use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Sink for the [`with`](super::SinkExt::with) method.
 #[pin_project]
@@ -71,20 +71,18 @@
     delegate_access_inner!(sink, Si, ());
 
     /// Completes the processing of previous item if any.
-    #[project]
     fn poll(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Result<(), E>> {
-        #[project]
-        let With { mut state, sink, .. } = self.project();
+        let mut this = self.project();
 
-        let item = match state.as_mut().as_pin_mut() {
+        let item = match this.state.as_mut().as_pin_mut() {
             None => return Poll::Ready(Ok(())),
             Some(fut) => ready!(fut.poll(cx))?,
         };
-        state.set(None);
-        sink.start_send(item)?;
+        this.state.set(None);
+        this.sink.start_send(item)?;
         Poll::Ready(Ok(()))
     }
 }
@@ -106,16 +104,14 @@
         Poll::Ready(Ok(()))
     }
 
-    #[project]
     fn start_send(
         self: Pin<&mut Self>,
         item: U,
     ) -> Result<(), Self::Error> {
-        #[project]
-        let With { mut state, f, .. } = self.project();
+        let mut this = self.project();
 
-        assert!(state.is_none());
-        state.set(Some(f(item)));
+        assert!(this.state.is_none());
+        this.state.set(Some((this.f)(item)));
         Ok(())
     }
 
diff --git a/src/sink/with_flat_map.rs b/src/sink/with_flat_map.rs
index f260a0b..cf213e6 100644
--- a/src/sink/with_flat_map.rs
+++ b/src/sink/with_flat_map.rs
@@ -4,7 +4,7 @@
 use futures_core::stream::{Stream, FusedStream};
 use futures_core::task::{Context, Poll};
 use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Sink for the [`with_flat_map`](super::SinkExt::with_flat_map) method.
 #[pin_project]
@@ -52,31 +52,29 @@
 
     delegate_access_inner!(sink, Si, ());
 
-    #[project]
     fn try_empty_stream(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Result<(), Si::Error>> {
-        #[project]
-        let WithFlatMap { mut sink, mut stream, buffer, .. } = self.project();
+        let mut this = self.project();
 
-        if buffer.is_some() {
-            ready!(sink.as_mut().poll_ready(cx))?;
-            let item = buffer.take().unwrap();
-            sink.as_mut().start_send(item)?;
+        if this.buffer.is_some() {
+            ready!(this.sink.as_mut().poll_ready(cx))?;
+            let item = this.buffer.take().unwrap();
+            this.sink.as_mut().start_send(item)?;
         }
-        if let Some(mut some_stream) = stream.as_mut().as_pin_mut() {
+        if let Some(mut some_stream) = this.stream.as_mut().as_pin_mut() {
             while let Some(item) = ready!(some_stream.as_mut().poll_next(cx)?) {
-                match sink.as_mut().poll_ready(cx)? {
-                    Poll::Ready(()) => sink.as_mut().start_send(item)?,
+                match this.sink.as_mut().poll_ready(cx)? {
+                    Poll::Ready(()) => this.sink.as_mut().start_send(item)?,
                     Poll::Pending => {
-                        *buffer = Some(item);
+                        *this.buffer = Some(item);
                         return Poll::Pending;
                     }
                 };
             }
         }
-        stream.set(None);
+        this.stream.set(None);
         Poll::Ready(Ok(()))
     }
 }
@@ -119,16 +117,14 @@
         self.try_empty_stream(cx)
     }
 
-    #[project]
     fn start_send(
         self: Pin<&mut Self>,
         item: U,
     ) -> Result<(), Self::Error> {
-        #[project]
-        let WithFlatMap { mut stream, f, .. } = self.project();
+        let mut this = self.project();
 
-        assert!(stream.is_none());
-        stream.set(Some(f(item)));
+        assert!(this.stream.is_none());
+        this.stream.set(Some((this.f)(item)));
         Ok(())
     }
 
diff --git a/src/stream/futures_ordered.rs b/src/stream/futures_ordered.rs
index 6dc07ad..5dbd4ae 100644
--- a/src/stream/futures_ordered.rs
+++ b/src/stream/futures_ordered.rs
@@ -1,7 +1,7 @@
 use crate::stream::{FuturesUnordered, StreamExt};
 use futures_core::future::Future;
 use futures_core::stream::Stream;
-use futures_core::task::{Context, Poll};
+use futures_core::{FusedStream, task::{Context, Poll}};
 use pin_project::pin_project;
 use core::cmp::Ordering;
 use core::fmt::{self, Debug};
@@ -203,6 +203,12 @@
     }
 }
 
+impl<Fut: Future> FusedStream for FuturesOrdered<Fut> {
+    fn is_terminated(&self) -> bool {
+        self.in_progress_queue.is_terminated() && self.queued_outputs.is_empty()
+    }
+}
+
 impl<Fut: Future> Extend<Fut> for FuturesOrdered<Fut> {
     fn extend<I>(&mut self, iter: I)
     where
diff --git a/src/stream/futures_unordered/task.rs b/src/stream/futures_unordered/task.rs
index 9277229..abb0264 100644
--- a/src/stream/futures_unordered/task.rs
+++ b/src/stream/futures_unordered/task.rs
@@ -70,7 +70,7 @@
 
 impl<Fut> Task<Fut> {
     /// Returns a waker reference for this task without cloning the Arc.
-    pub(super) fn waker_ref<'a>(this: &'a Arc<Task<Fut>>) -> WakerRef<'a> {
+    pub(super) fn waker_ref(this: &Arc<Task<Fut>>) -> WakerRef<'_> {
         waker_ref(this)
     }
 
diff --git a/src/stream/mod.rs b/src/stream/mod.rs
index 2a5ecf9..ca9bc89 100644
--- a/src/stream/mod.rs
+++ b/src/stream/mod.rs
@@ -28,6 +28,7 @@
 pub use self::stream::ReadyChunks;
 
 #[cfg(feature = "sink")]
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
 pub use self::stream::Forward;
 
 #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
@@ -36,6 +37,7 @@
 
 #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
 #[cfg(feature = "sink")]
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
 #[cfg(feature = "alloc")]
 pub use self::stream::{ReuniteError, SplitSink, SplitStream};
 
@@ -43,10 +45,11 @@
 pub use self::try_stream::{
     try_unfold, AndThen, ErrInto, InspectErr, InspectOk, IntoStream, MapErr, MapOk, OrElse,
     TryCollect, TryConcat, TryFilter, TryFilterMap, TryFlatten, TryFold, TryForEach, TryNext,
-    TrySkipWhile, TryStreamExt, TryUnfold,
+    TrySkipWhile, TryStreamExt, TryTakeWhile, TryUnfold,
 };
 
 #[cfg(feature = "io")]
+#[cfg_attr(docsrs, doc(cfg(feature = "io")))]
 #[cfg(feature = "std")]
 pub use self::try_stream::IntoAsyncRead;
 
@@ -97,3 +100,12 @@
     #[cfg(feature = "alloc")]
     pub use self::select_all::{select_all, SelectAll};
 }
+
+// Just a helper function to ensure the futures we're returning all have the
+// right implementations.
+pub(crate) fn assert_stream<T, S>(stream: S) -> S
+    where
+        S: Stream<Item = T>,
+{
+    stream
+}
diff --git a/src/stream/once.rs b/src/stream/once.rs
index 21cd14b..3a8fef6 100644
--- a/src/stream/once.rs
+++ b/src/stream/once.rs
@@ -2,7 +2,7 @@
 use futures_core::future::Future;
 use futures_core::stream::{Stream, FusedStream};
 use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Creates a stream of a single element.
 ///
@@ -20,8 +20,6 @@
 }
 
 /// A stream which emits single element and then EOF.
-///
-/// This stream will never block and is always ready.
 #[pin_project]
 #[derive(Debug)]
 #[must_use = "streams do nothing unless polled"]
@@ -39,16 +37,14 @@
 impl<Fut: Future> Stream for Once<Fut> {
     type Item = Fut::Output;
 
-    #[project]
     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
-        #[project]
-        let Once { mut future } = self.project();
-        let v = match future.as_mut().as_pin_mut() {
+        let mut this = self.project();
+        let v = match this.future.as_mut().as_pin_mut() {
             Some(fut) => ready!(fut.poll(cx)),
             None => return Poll::Ready(None),
         };
 
-        future.set(None);
+        this.future.set(None);
         Poll::Ready(Some(v))
     }
 
diff --git a/src/stream/select.rs b/src/stream/select.rs
index 36503e4..7666386 100644
--- a/src/stream/select.rs
+++ b/src/stream/select.rs
@@ -2,7 +2,7 @@
 use core::pin::Pin;
 use futures_core::stream::{FusedStream, Stream};
 use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Stream for the [`select()`] function.
 #[pin_project]
@@ -58,11 +58,9 @@
     ///
     /// Note that care must be taken to avoid tampering with the state of the
     /// stream which may otherwise confuse this combinator.
-    #[project]
     pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) {
-        #[project]
-        let Select { stream1, stream2, .. } = self.project();
-        (stream1.get_pin_mut(), stream2.get_pin_mut())
+        let this = self.project();
+        (this.stream1.get_pin_mut(), this.stream2.get_pin_mut())
     }
 
     /// Consumes this combinator, returning the underlying streams.
@@ -89,18 +87,15 @@
 {
     type Item = St1::Item;
 
-    #[project]
     fn poll_next(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<St1::Item>> {
-        #[project]
-        let Select { flag, stream1, stream2 } = self.project();
-
-        if !*flag {
-            poll_inner(flag, stream1, stream2, cx)
+        let this = self.project();
+        if !*this.flag {
+            poll_inner(this.flag, this.stream1, this.stream2, cx)
         } else {
-            poll_inner(flag, stream2, stream1, cx)
+            poll_inner(this.flag, this.stream2, this.stream1, cx)
         }
     }
 }
diff --git a/src/stream/stream/buffer_unordered.rs b/src/stream/stream/buffer_unordered.rs
index a822576..24f4853 100644
--- a/src/stream/stream/buffer_unordered.rs
+++ b/src/stream/stream/buffer_unordered.rs
@@ -4,7 +4,7 @@
 use futures_core::task::{Context, Poll};
 #[cfg(feature = "sink")]
 use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 use core::fmt;
 use core::pin::Pin;
 
@@ -62,31 +62,29 @@
 {
     type Item = <St::Item as Future>::Output;
 
-    #[project]
     fn poll_next(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
-        #[project]
-        let BufferUnordered { mut stream, in_progress_queue, max } = self.project();
+        let mut this = self.project();
 
         // First up, try to spawn off as many futures as possible by filling up
         // our queue of futures.
-        while in_progress_queue.len() < *max {
-            match stream.as_mut().poll_next(cx) {
-                Poll::Ready(Some(fut)) => in_progress_queue.push(fut),
+        while this.in_progress_queue.len() < *this.max {
+            match this.stream.as_mut().poll_next(cx) {
+                Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut),
                 Poll::Ready(None) | Poll::Pending => break,
             }
         }
 
         // Attempt to pull the next value from the in_progress_queue
-        match in_progress_queue.poll_next_unpin(cx) {
+        match this.in_progress_queue.poll_next_unpin(cx) {
             x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x,
             Poll::Ready(None) => {}
         }
 
         // If more values are still coming from the stream, we're not done yet
-        if stream.is_done() {
+        if this.stream.is_done() {
             Poll::Ready(None)
         } else {
             Poll::Pending
diff --git a/src/stream/stream/buffered.rs b/src/stream/stream/buffered.rs
index 9dff01f..626ead1 100644
--- a/src/stream/stream/buffered.rs
+++ b/src/stream/stream/buffered.rs
@@ -4,7 +4,7 @@
 use futures_core::task::{Context, Poll};
 #[cfg(feature = "sink")]
 use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 use core::fmt;
 use core::pin::Pin;
 
@@ -59,31 +59,29 @@
 {
     type Item = <St::Item as Future>::Output;
 
-    #[project]
     fn poll_next(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
-        #[project]
-        let Buffered { mut stream, in_progress_queue, max } = self.project();
+        let mut this = self.project();
 
         // First up, try to spawn off as many futures as possible by filling up
         // our queue of futures.
-        while in_progress_queue.len() < *max {
-            match stream.as_mut().poll_next(cx) {
-                Poll::Ready(Some(fut)) => in_progress_queue.push(fut),
+        while this.in_progress_queue.len() < *this.max {
+            match this.stream.as_mut().poll_next(cx) {
+                Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut),
                 Poll::Ready(None) | Poll::Pending => break,
             }
         }
 
         // Attempt to pull the next value from the in_progress_queue
-        let res = in_progress_queue.poll_next_unpin(cx);
+        let res = this.in_progress_queue.poll_next_unpin(cx);
         if let Some(val) = ready!(res) {
             return Poll::Ready(Some(val))
         }
 
         // If more values are still coming from the stream, we're not done yet
-        if stream.is_done() {
+        if this.stream.is_done() {
             Poll::Ready(None)
         } else {
             Poll::Pending
diff --git a/src/stream/stream/catch_unwind.rs b/src/stream/stream/catch_unwind.rs
index 1bb43b2..b9eb4ba 100644
--- a/src/stream/stream/catch_unwind.rs
+++ b/src/stream/stream/catch_unwind.rs
@@ -1,6 +1,6 @@
 use futures_core::stream::{Stream, FusedStream};
 use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 use std::any::Any;
 use std::pin::Pin;
 use std::panic::{catch_unwind, UnwindSafe, AssertUnwindSafe};
@@ -26,25 +26,23 @@
 impl<St: Stream + UnwindSafe> Stream for CatchUnwind<St> {
     type Item = Result<St::Item, Box<dyn Any + Send>>;
 
-    #[project]
     fn poll_next(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
-        #[project]
-        let CatchUnwind { stream, caught_unwind } = self.project();
+        let mut this = self.project();
 
-        if *caught_unwind {
+        if *this.caught_unwind {
             Poll::Ready(None)
         } else {
             let res = catch_unwind(AssertUnwindSafe(|| {
-                stream.poll_next(cx)
+                this.stream.as_mut().poll_next(cx)
             }));
 
             match res {
                 Ok(poll) => poll.map(|opt| opt.map(Ok)),
                 Err(e) => {
-                    *caught_unwind = true;
+                    *this.caught_unwind = true;
                     Poll::Ready(Some(Err(e)))
                 },
             }
diff --git a/src/stream/stream/chain.rs b/src/stream/stream/chain.rs
index 720903c..c7fbd5f 100644
--- a/src/stream/stream/chain.rs
+++ b/src/stream/stream/chain.rs
@@ -1,7 +1,7 @@
 use core::pin::Pin;
 use futures_core::stream::{FusedStream, Stream};
 use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Stream for the [`chain`](super::StreamExt::chain) method.
 #[pin_project]
@@ -42,20 +42,18 @@
 {
     type Item = St1::Item;
 
-    #[project]
     fn poll_next(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
-        #[project]
-        let Chain { mut first, second } = self.project();
-        if let Some(first) = first.as_mut().as_pin_mut() {
+        let mut this = self.project();
+        if let Some(first) = this.first.as_mut().as_pin_mut() {
             if let Some(item) = ready!(first.poll_next(cx)) {
                 return Poll::Ready(Some(item))
             }
         }
-        first.set(None);
-        second.poll_next(cx)
+        this.first.set(None);
+        this.second.poll_next(cx)
     }
 
     fn size_hint(&self) -> (usize, Option<usize>) {
diff --git a/src/stream/stream/chunks.rs b/src/stream/stream/chunks.rs
index d24c31c..9b4ed93 100644
--- a/src/stream/stream/chunks.rs
+++ b/src/stream/stream/chunks.rs
@@ -3,7 +3,7 @@
 use futures_core::task::{Context, Poll};
 #[cfg(feature = "sink")]
 use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 use core::mem;
 use core::pin::Pin;
 use alloc::vec::Vec;
@@ -41,21 +41,19 @@
 impl<St: Stream> Stream for Chunks<St> {
     type Item = Vec<St::Item>;
 
-    #[project]
     fn poll_next(
         mut self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
-        #[project]
-        let Chunks { mut stream, items, cap } = self.as_mut().project();
+        let mut this = self.as_mut().project();
         loop {
-            match ready!(stream.as_mut().poll_next(cx)) {
+            match ready!(this.stream.as_mut().poll_next(cx)) {
                 // Push the item into the buffer and check whether it is full.
                 // If so, replace our buffer with a new and empty one and return
                 // the full one.
                 Some(item) => {
-                    items.push(item);
-                    if items.len() >= *cap {
+                    this.items.push(item);
+                    if this.items.len() >= *this.cap {
                         return Poll::Ready(Some(self.take()))
                     }
                 }
@@ -63,10 +61,10 @@
                 // Since the underlying stream ran out of values, return what we
                 // have buffered, if we have anything.
                 None => {
-                    let last = if items.is_empty() {
+                    let last = if this.items.is_empty() {
                         None
                     } else {
-                        let full_buf = mem::replace(items, Vec::new());
+                        let full_buf = mem::replace(this.items, Vec::new());
                         Some(full_buf)
                     };
 
diff --git a/src/stream/stream/collect.rs b/src/stream/stream/collect.rs
index 349e42d..6d07660 100644
--- a/src/stream/stream/collect.rs
+++ b/src/stream/stream/collect.rs
@@ -3,7 +3,7 @@
 use futures_core::future::{FusedFuture, Future};
 use futures_core::stream::{FusedStream, Stream};
 use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Future for the [`collect`](super::StreamExt::collect) method.
 #[pin_project]
@@ -43,13 +43,11 @@
 {
     type Output = C;
 
-    #[project]
     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> {
-        #[project]
-        let Collect { mut stream, collection } = self.as_mut().project();
+        let mut this = self.as_mut().project();
         loop {
-            match ready!(stream.as_mut().poll_next(cx)) {
-                Some(e) => collection.extend(Some(e)),
+            match ready!(this.stream.as_mut().poll_next(cx)) {
+                Some(e) => this.collection.extend(Some(e)),
                 None => return Poll::Ready(self.finish()),
             }
         }
diff --git a/src/stream/stream/concat.rs b/src/stream/stream/concat.rs
index 647632b..9b37cd2 100644
--- a/src/stream/stream/concat.rs
+++ b/src/stream/stream/concat.rs
@@ -2,7 +2,7 @@
 use futures_core::future::{Future, FusedFuture};
 use futures_core::stream::{Stream, FusedStream};
 use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Future for the [`concat`](super::StreamExt::concat) method.
 #[pin_project]
@@ -34,23 +34,21 @@
 {
     type Output = St::Item;
 
-    #[project]
     fn poll(
         self: Pin<&mut Self>, cx: &mut Context<'_>
     ) -> Poll<Self::Output> {
-        #[project]
-        let Concat { mut stream, accum } = self.project();
+        let mut this = self.project();
 
         loop {
-            match ready!(stream.as_mut().poll_next(cx)) {
+            match ready!(this.stream.as_mut().poll_next(cx)) {
                 None => {
-                    return Poll::Ready(accum.take().unwrap_or_default())
+                    return Poll::Ready(this.accum.take().unwrap_or_default())
                 }
                 Some(e) => {
-                    if let Some(a) = accum {
+                    if let Some(a) = this.accum {
                         a.extend(e)
                     } else {
-                        *accum = Some(e)
+                        *this.accum = Some(e)
                     }
                 }
             }
diff --git a/src/stream/stream/enumerate.rs b/src/stream/stream/enumerate.rs
index 477a052..4e6bac2 100644
--- a/src/stream/stream/enumerate.rs
+++ b/src/stream/stream/enumerate.rs
@@ -3,7 +3,7 @@
 use futures_core::task::{Context, Poll};
 #[cfg(feature = "sink")]
 use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Stream for the [`enumerate`](super::StreamExt::enumerate) method.
 #[pin_project]
@@ -35,18 +35,16 @@
 impl<St: Stream> Stream for Enumerate<St> {
     type Item = (usize, St::Item);
 
-    #[project]
     fn poll_next(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
-        #[project]
-        let Enumerate { stream, count } = self.project();
+        let this = self.project();
 
-        match ready!(stream.poll_next(cx)) {
+        match ready!(this.stream.poll_next(cx)) {
             Some(item) => {
-                let prev_count = *count;
-                *count += 1;
+                let prev_count = *this.count;
+                *this.count += 1;
                 Poll::Ready(Some((prev_count, item)))
             }
             None => Poll::Ready(None),
diff --git a/src/stream/stream/filter.rs b/src/stream/stream/filter.rs
index 9d848ad..55493fe 100644
--- a/src/stream/stream/filter.rs
+++ b/src/stream/stream/filter.rs
@@ -5,7 +5,7 @@
 use futures_core::task::{Context, Poll};
 #[cfg(feature = "sink")]
 use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 use crate::fns::FnMut1;
 
 /// Stream for the [`filter`](super::StreamExt::filter) method.
@@ -37,6 +37,7 @@
     }
 }
 
+#[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058
 impl<St, Fut, F> Filter<St, Fut, F>
 where St: Stream,
       F: for<'a> FnMut1<&'a St::Item, Output=Fut>,
@@ -64,6 +65,7 @@
     }
 }
 
+#[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058
 impl<St, Fut, F> Stream for Filter<St, Fut, F>
     where St: Stream,
           F: for<'a> FnMut1<&'a St::Item, Output=Fut>,
@@ -71,24 +73,22 @@
 {
     type Item = St::Item;
 
-    #[project]
     fn poll_next(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<St::Item>> {
-        #[project]
-        let Filter { mut stream, f, mut pending_fut, pending_item } = self.project();
+        let mut this = self.project();
         Poll::Ready(loop {
-            if let Some(fut) = pending_fut.as_mut().as_pin_mut() {
+            if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() {
                 let res = ready!(fut.poll(cx));
-                pending_fut.set(None);
+                this.pending_fut.set(None);
                 if res {
-                    break pending_item.take();
+                    break this.pending_item.take();
                 }
-                *pending_item = None;
-            } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
-                pending_fut.set(Some(f.call_mut(&item)));
-                *pending_item = Some(item);
+                *this.pending_item = None;
+            } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
+                this.pending_fut.set(Some(this.f.call_mut(&item)));
+                *this.pending_item = Some(item);
             } else {
                 break None;
             }
diff --git a/src/stream/stream/filter_map.rs b/src/stream/stream/filter_map.rs
index 2d098ee..50c440f 100644
--- a/src/stream/stream/filter_map.rs
+++ b/src/stream/stream/filter_map.rs
@@ -5,7 +5,7 @@
 use futures_core::task::{Context, Poll};
 #[cfg(feature = "sink")]
 use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 use crate::fns::FnMut1;
 
 /// Stream for the [`filter_map`](super::StreamExt::filter_map) method.
@@ -61,24 +61,22 @@
 {
     type Item = T;
 
-    #[project]
     fn poll_next(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<T>> {
-        #[project]
-        let FilterMap { mut stream, f, mut pending } = self.project();
+        let mut this = self.project();
         Poll::Ready(loop {
-            if let Some(p) = pending.as_mut().as_pin_mut() {
+            if let Some(p) = this.pending.as_mut().as_pin_mut() {
                 // We have an item in progress, poll that until it's done
                 let item = ready!(p.poll(cx));
-                pending.set(None);
+                this.pending.set(None);
                 if item.is_some() {
                     break item;
                 }
-            } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
+            } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
                 // No item in progress, but the stream is still going
-                pending.set(Some(f.call_mut(item)));
+                this.pending.set(Some(this.f.call_mut(item)));
             } else {
                 // The stream is done
                 break None;
diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs
index 4db77e1..75bbc21 100644
--- a/src/stream/stream/flatten.rs
+++ b/src/stream/stream/flatten.rs
@@ -3,7 +3,7 @@
 use futures_core::task::{Context, Poll};
 #[cfg(feature = "sink")]
 use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Stream for the [`flatten`](super::StreamExt::flatten) method.
 #[pin_project]
@@ -41,19 +41,17 @@
 {
     type Item = <St::Item as Stream>::Item;
 
-    #[project]
     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
-        #[project]
-        let Flatten { mut stream, mut next } = self.project();
+        let mut this = self.project();
         Poll::Ready(loop {
-            if let Some(s) = next.as_mut().as_pin_mut() {
+            if let Some(s) = this.next.as_mut().as_pin_mut() {
                 if let Some(item) = ready!(s.poll_next(cx)) {
                     break Some(item);
                 } else {
-                    next.set(None);
+                    this.next.set(None);
                 }
-            } else if let Some(s) = ready!(stream.as_mut().poll_next(cx)) {
-                next.set(Some(s));
+            } else if let Some(s) = ready!(this.stream.as_mut().poll_next(cx)) {
+                this.next.set(Some(s));
             } else {
                 break None;
             }
diff --git a/src/stream/stream/fold.rs b/src/stream/stream/fold.rs
index d4bec25..6fce256 100644
--- a/src/stream/stream/fold.rs
+++ b/src/stream/stream/fold.rs
@@ -3,7 +3,7 @@
 use futures_core::future::{FusedFuture, Future};
 use futures_core::stream::Stream;
 use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Future for the [`fold`](super::StreamExt::fold) method.
 #[pin_project]
@@ -64,21 +64,19 @@
 {
     type Output = T;
 
-    #[project]
     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
-        #[project]
-        let Fold { mut stream, f, accum, mut future } = self.project();
+        let mut this = self.project();
         Poll::Ready(loop {
-            if let Some(fut) = future.as_mut().as_pin_mut() {
+            if let Some(fut) = this.future.as_mut().as_pin_mut() {
                 // we're currently processing a future to produce a new accum value
-                *accum = Some(ready!(fut.poll(cx)));
-                future.set(None);
-            } else if accum.is_some() {
+                *this.accum = Some(ready!(fut.poll(cx)));
+                this.future.set(None);
+            } else if this.accum.is_some() {
                 // we're waiting on a new item from the stream
-                let res = ready!(stream.as_mut().poll_next(cx));
-                let a = accum.take().unwrap();
+                let res = ready!(this.stream.as_mut().poll_next(cx));
+                let a = this.accum.take().unwrap();
                 if let Some(item) = res {
-                    future.set(Some(f(a, item)));
+                    this.future.set(Some((this.f)(a, item)));
                 } else {
                     break a;
                 }
diff --git a/src/stream/stream/for_each.rs b/src/stream/stream/for_each.rs
index fb3f40f..c8af21b 100644
--- a/src/stream/stream/for_each.rs
+++ b/src/stream/stream/for_each.rs
@@ -3,7 +3,7 @@
 use futures_core::future::{FusedFuture, Future};
 use futures_core::stream::{FusedStream, Stream};
 use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Future for the [`for_each`](super::StreamExt::for_each) method.
 #[pin_project]
@@ -60,16 +60,14 @@
 {
     type Output = ();
 
-    #[project]
     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
-        #[project]
-        let ForEach { mut stream, f, mut future } = self.project();
+        let mut this = self.project();
         loop {
-            if let Some(fut) = future.as_mut().as_pin_mut() {
+            if let Some(fut) = this.future.as_mut().as_pin_mut() {
                 ready!(fut.poll(cx));
-                future.set(None);
-            } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
-                future.set(Some(f(item)));
+                this.future.set(None);
+            } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
+                this.future.set(Some((this.f)(item)));
             } else {
                 break;
             }
diff --git a/src/stream/stream/for_each_concurrent.rs b/src/stream/stream/for_each_concurrent.rs
index 88ff2d3..843ddaa 100644
--- a/src/stream/stream/for_each_concurrent.rs
+++ b/src/stream/stream/for_each_concurrent.rs
@@ -5,7 +5,7 @@
 use futures_core::future::{FusedFuture, Future};
 use futures_core::stream::Stream;
 use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Future for the [`for_each_concurrent`](super::StreamExt::for_each_concurrent)
 /// method.
@@ -66,17 +66,15 @@
 {
     type Output = ();
 
-    #[project]
     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
-        #[project]
-        let ForEachConcurrent { mut stream, f, futures, limit } = self.project();
+        let mut this = self.project();
         loop {
             let mut made_progress_this_iter = false;
 
             // Check if we've already created a number of futures greater than `limit`
-            if limit.map(|limit| limit.get() > futures.len()).unwrap_or(true) {
+            if this.limit.map(|limit| limit.get() > this.futures.len()).unwrap_or(true) {
                 let mut stream_completed = false;
-                let elem = if let Some(stream) = stream.as_mut().as_pin_mut() {
+                let elem = if let Some(stream) = this.stream.as_mut().as_pin_mut() {
                     match stream.poll_next(cx) {
                         Poll::Ready(Some(elem)) => {
                             made_progress_this_iter = true;
@@ -92,17 +90,17 @@
                     None
                 };
                 if stream_completed {
-                    stream.set(None);
+                    this.stream.set(None);
                 }
                 if let Some(elem) = elem {
-                    futures.push(f(elem));
+                    this.futures.push((this.f)(elem));
                 }
             }
 
-            match futures.poll_next_unpin(cx) {
+            match this.futures.poll_next_unpin(cx) {
                 Poll::Ready(Some(())) => made_progress_this_iter = true,
                 Poll::Ready(None) => {
-                    if stream.is_none() {
+                    if this.stream.is_none() {
                         return Poll::Ready(())
                     }
                 },
diff --git a/src/stream/stream/forward.rs b/src/stream/stream/forward.rs
index 9776056..feef113 100644
--- a/src/stream/stream/forward.rs
+++ b/src/stream/stream/forward.rs
@@ -4,10 +4,10 @@
 use futures_core::stream::Stream;
 use futures_core::task::{Context, Poll};
 use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Future for the [`forward`](super::StreamExt::forward) method.
-#[pin_project]
+#[pin_project(project = ForwardProj)]
 #[derive(Debug)]
 #[must_use = "futures do nothing unless you `.await` or poll them"]
 pub struct Forward<St, Si, Item> {
@@ -45,13 +45,11 @@
 {
     type Output = Result<(), E>;
 
-    #[project]
     fn poll(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Self::Output> {
-        #[project]
-        let Forward { mut sink, mut stream, buffered_item } = self.project();
+        let ForwardProj { mut sink, mut stream, buffered_item } = self.project();
         let mut si = sink.as_mut().as_pin_mut().expect("polled `Forward` after completion");
 
         loop {
@@ -61,7 +59,7 @@
                 ready!(si.as_mut().poll_ready(cx))?;
                 si.as_mut().start_send(buffered_item.take().unwrap())?;
             }
-    
+
             match stream.as_mut().poll_next(cx)? {
                 Poll::Ready(Some(item)) => {
                     *buffered_item = Some(item);
diff --git a/src/stream/stream/fuse.rs b/src/stream/stream/fuse.rs
index 971fe60..408ad81 100644
--- a/src/stream/stream/fuse.rs
+++ b/src/stream/stream/fuse.rs
@@ -3,7 +3,7 @@
 use futures_core::task::{Context, Poll};
 #[cfg(feature = "sink")]
 use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Stream for the [`fuse`](super::StreamExt::fuse) method.
 #[pin_project]
@@ -41,21 +41,19 @@
 impl<S: Stream> Stream for Fuse<S> {
     type Item = S::Item;
 
-    #[project]
     fn poll_next(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<S::Item>> {
-        #[project]
-        let Fuse { stream, done } = self.project();
+        let this = self.project();
 
-        if *done {
+        if *this.done {
             return Poll::Ready(None);
         }
 
-        let item = ready!(stream.poll_next(cx));
+        let item = ready!(this.stream.poll_next(cx));
         if item.is_none() {
-            *done = true;
+            *this.done = true;
         }
         Poll::Ready(item)
     }
diff --git a/src/stream/stream/into_future.rs b/src/stream/stream/into_future.rs
index 0d49384..8aa2b1e 100644
--- a/src/stream/stream/into_future.rs
+++ b/src/stream/stream/into_future.rs
@@ -52,7 +52,7 @@
     /// in order to return it to the caller of `Future::poll` if the stream yielded
     /// an element.
     pub fn get_pin_mut(self: Pin<&mut Self>) -> Option<Pin<&mut St>> {
-        Pin::get_mut(self).stream.as_mut().map(Pin::new)
+        self.get_mut().stream.as_mut().map(Pin::new)
     }
 
     /// Consumes this combinator, returning the underlying stream.
diff --git a/src/stream/stream/map.rs b/src/stream/stream/map.rs
index 755f53a..c877512 100644
--- a/src/stream/stream/map.rs
+++ b/src/stream/stream/map.rs
@@ -4,7 +4,7 @@
 use futures_core::task::{Context, Poll};
 #[cfg(feature = "sink")]
 use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 use crate::fns::FnMut1;
 
@@ -51,15 +51,13 @@
 {
     type Item = F::Output;
 
-    #[project]
     fn poll_next(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
-        #[project]
-        let Map { stream, f } = self.project();
-        let res = ready!(stream.poll_next(cx));
-        Poll::Ready(res.map(|x| f.call_mut(x)))
+        let mut this = self.project();
+        let res = ready!(this.stream.as_mut().poll_next(cx));
+        Poll::Ready(res.map(|x| this.f.call_mut(x)))
     }
 
     fn size_hint(&self) -> (usize, Option<usize>) {
diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs
index 359bb2f..f988468 100644
--- a/src/stream/stream/mod.rs
+++ b/src/stream/stream/mod.rs
@@ -3,7 +3,7 @@
 //! This module contains a number of functions for working with `Stream`s,
 //! including the `StreamExt` trait which adds methods to `Stream` types.
 
-use crate::future::Either;
+use crate::future::{assert_future, Either};
 #[cfg(feature = "alloc")]
 use alloc::boxed::Box;
 use core::pin::Pin;
@@ -48,7 +48,7 @@
 mod flatten;
 
 delegate_all!(
-    /// Stream for the [`inspect`](StreamExt::inspect) method.
+    /// Stream for the [`flatten`](StreamExt::flatten) method.
     Flatten<St>(
         flatten::Flatten<St, St::Item>
     ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| flatten::Flatten::new(x)]
@@ -65,6 +65,7 @@
 #[cfg(feature = "sink")]
 delegate_all!(
     /// Future for the [`forward`](super::StreamExt::forward) method.
+    #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
     Forward<St, Si>(
         forward::Forward<St, Si, St::Ok>
     ): Debug + Future + FusedFuture + New[|x: St, y: Si| forward::Forward::new(x, y)]
@@ -177,9 +178,11 @@
     pub use self::for_each_concurrent::ForEachConcurrent;
 
     #[cfg(feature = "sink")]
+    #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
     #[cfg(feature = "alloc")]
     mod split;
     #[cfg(feature = "sink")]
+    #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
     #[cfg(feature = "alloc")]
     #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
     pub use self::split::{SplitStream, SplitSink, ReuniteError};
@@ -190,6 +193,7 @@
 #[cfg(feature = "std")]
 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
 pub use self::catch_unwind::CatchUnwind;
+use crate::stream::assert_stream;
 
 impl<T: ?Sized> StreamExt for T where T: Stream {}
 
@@ -223,7 +227,7 @@
     where
         Self: Unpin,
     {
-        Next::new(self)
+        assert_future::<Option<Self::Item>, _>(Next::new(self))
     }
 
     /// Converts this stream into a future of `(next_item, tail_of_stream)`.
@@ -258,7 +262,7 @@
     where
         Self: Sized + Unpin,
     {
-        StreamFuture::new(self)
+        assert_future::<(Option<Self::Item>, Self), _>(StreamFuture::new(self))
     }
 
     /// Maps this stream's items to a different type, returning a new stream of
@@ -289,7 +293,7 @@
         F: FnMut(Self::Item) -> T,
         Self: Sized,
     {
-        Map::new(self, f)
+        assert_stream::<T, _>(Map::new(self, f))
     }
 
     /// Creates a stream which gives the current iteration count as well as
@@ -306,7 +310,7 @@
     /// # Overflow Behavior
     ///
     /// The method does no guarding against overflows, so enumerating more than
-    /// [`usize::max_value()`] elements either produces the wrong result or panics. If
+    /// [`prim@usize::max_value()`] elements either produces the wrong result or panics. If
     /// debug assertions are enabled, a panic is guaranteed.
     ///
     /// # Panics
@@ -334,7 +338,7 @@
     where
         Self: Sized,
     {
-        Enumerate::new(self)
+        assert_stream::<(usize, Self::Item), _>(Enumerate::new(self))
     }
 
     /// Filters the values produced by this stream according to the provided
@@ -369,7 +373,7 @@
         Fut: Future<Output = bool>,
         Self: Sized,
     {
-        Filter::new(self, f)
+        assert_stream::<Self::Item, _>(Filter::new(self, f))
     }
 
     /// Filters the values produced by this stream while simultaneously mapping
@@ -403,7 +407,7 @@
         Fut: Future<Output = Option<T>>,
         Self: Sized,
     {
-        FilterMap::new(self, f)
+        assert_stream::<T, _>(FilterMap::new(self, f))
     }
 
     /// Computes from this stream's items new items of a different type using
@@ -434,7 +438,7 @@
         Fut: Future,
         Self: Sized,
     {
-        Then::new(self, f)
+        assert_stream::<Fut::Output, _>(Then::new(self, f))
     }
 
     /// Transforms a stream into a collection, returning a
@@ -466,7 +470,7 @@
     where
         Self: Sized,
     {
-        Collect::new(self)
+        assert_future::<C, _>(Collect::new(self))
     }
 
     /// Concatenate all items of a stream into a single extendable
@@ -506,7 +510,7 @@
         Self: Sized,
         Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,
     {
-        Concat::new(self)
+        assert_future::<Self::Item, _>(Concat::new(self))
     }
 
     /// Execute an accumulating asynchronous computation over a stream,
@@ -535,7 +539,7 @@
         Fut: Future<Output = T>,
         Self: Sized,
     {
-        Fold::new(self, f, init)
+        assert_future::<T, _>(Fold::new(self, f, init))
     }
 
     /// Flattens a stream of streams into just one continuous stream.
@@ -574,7 +578,7 @@
         Self::Item: Stream,
         Self: Sized,
     {
-        Flatten::new(self)
+        assert_stream::<<Self::Item as Stream>::Item, _>(Flatten::new(self))
     }
 
     /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s.
@@ -611,7 +615,7 @@
         FlatMap::new(self, f)
     }
 
-    /// Combinator similar to [`StreamExt::fold`] that holds internal state 
+    /// Combinator similar to [`StreamExt::fold`] that holds internal state
     /// and produces a new stream.
     ///
     /// Accepts initial state and closure which will be applied to each element
@@ -672,7 +676,7 @@
         Fut: Future<Output = bool>,
         Self: Sized,
     {
-        SkipWhile::new(self, f)
+        assert_stream::<Self::Item, _>(SkipWhile::new(self, f))
     }
 
     /// Take elements from this stream while the provided asynchronous predicate
@@ -702,7 +706,7 @@
         Fut: Future<Output = bool>,
         Self: Sized,
     {
-        TakeWhile::new(self, f)
+        assert_stream::<Self::Item, _>(TakeWhile::new(self, f))
     }
 
     /// Take elements from this stream until the provided future resolves.
@@ -788,7 +792,7 @@
         Fut: Future<Output = ()>,
         Self: Sized,
     {
-        ForEach::new(self, f)
+        assert_future::<(), _>(ForEach::new(self, f))
     }
 
     /// Runs this stream to completion, executing the provided asynchronous
@@ -848,7 +852,7 @@
         Fut: Future<Output = ()>,
         Self: Sized,
     {
-        ForEachConcurrent::new(self, limit.into(), f)
+        assert_future::<(), _>(ForEachConcurrent::new(self, limit.into(), f))
     }
 
     /// Creates a new stream of at most `n` items of the underlying stream.
@@ -871,7 +875,7 @@
     where
         Self: Sized,
     {
-        Take::new(self, n)
+        assert_stream::<Self::Item, _>(Take::new(self, n))
     }
 
     /// Creates a new stream which skips `n` items of the underlying stream.
@@ -894,7 +898,7 @@
     where
         Self: Sized,
     {
-        Skip::new(self, n)
+        assert_stream::<Self::Item, _>(Skip::new(self, n))
     }
 
     /// Fuse a stream such that [`poll_next`](Stream::poll_next) will never
@@ -940,7 +944,7 @@
     where
         Self: Sized,
     {
-        Fuse::new(self)
+        assert_stream::<Self::Item, _>(Fuse::new(self))
     }
 
     /// Borrows a stream, rather than consuming it.
@@ -1018,7 +1022,7 @@
     where
         Self: Sized + std::panic::UnwindSafe,
     {
-        CatchUnwind::new(self)
+        assert_stream(CatchUnwind::new(self))
     }
 
     /// Wrap the stream in a Box, pinning it.
@@ -1030,7 +1034,7 @@
     where
         Self: Sized + Send + 'a,
     {
-        Box::pin(self)
+        assert_stream::<Self::Item, _>(Box::pin(self))
     }
 
     /// Wrap the stream in a Box, pinning it.
@@ -1044,7 +1048,7 @@
     where
         Self: Sized + 'a,
     {
-        Box::pin(self)
+        assert_stream::<Self::Item, _>(Box::pin(self))
     }
 
     /// An adaptor for creating a buffered list of pending futures.
@@ -1066,7 +1070,7 @@
         Self::Item: Future,
         Self: Sized,
     {
-        Buffered::new(self, n)
+        assert_stream::<<Self::Item as Future>::Output, _>(Buffered::new(self, n))
     }
 
     /// An adaptor for creating a buffered list of pending futures (unordered).
@@ -1111,7 +1115,7 @@
         Self::Item: Future,
         Self: Sized,
     {
-        BufferUnordered::new(self, n)
+        assert_stream::<<Self::Item as Future>::Output, _>(BufferUnordered::new(self, n))
     }
 
     /// An adapter for zipping two streams together.
@@ -1141,7 +1145,7 @@
         St: Stream,
         Self: Sized,
     {
-        Zip::new(self, other)
+        assert_stream::<(Self::Item, St::Item), _>(Zip::new(self, other))
     }
 
     /// Adapter for chaining two streams.
@@ -1172,7 +1176,7 @@
         St: Stream<Item = Self::Item>,
         Self: Sized,
     {
-        Chain::new(self, other)
+        assert_stream::<Self::Item, _>(Chain::new(self, other))
     }
 
     /// Creates a new stream which exposes a `peek` method.
@@ -1182,7 +1186,7 @@
     where
         Self: Sized,
     {
-        Peekable::new(self)
+        assert_stream::<Self::Item, _>(Peekable::new(self))
     }
 
     /// An adaptor for chunking up items of the stream inside a vector.
@@ -1208,7 +1212,7 @@
     where
         Self: Sized,
     {
-        Chunks::new(self, capacity)
+        assert_stream::<alloc::vec::Vec<Self::Item>, _>(Chunks::new(self, capacity))
     }
 
     /// An adaptor for chunking up ready items of the stream inside a vector.
@@ -1248,6 +1252,7 @@
     /// (for example, via `forward(&mut sink)` inside an `async` fn/block) in
     /// order to preserve access to the `Sink`.
     #[cfg(feature = "sink")]
+    #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
     fn forward<S>(self, sink: S) -> Forward<Self, S>
     where
         S: Sink<Self::Ok, Error = Self::Error>,
@@ -1266,13 +1271,15 @@
     /// This method is only available when the `std` or `alloc` feature of this
     /// library is activated, and it is activated by default.
     #[cfg(feature = "sink")]
+    #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
     #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
     #[cfg(feature = "alloc")]
     fn split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>)
     where
         Self: Sink<Item> + Sized,
     {
-        split::split(self)
+        let (sink, stream) = split::split(self);
+        (sink, assert_stream::<Self::Item, _>(stream))
     }
 
     /// Do something with each item of this stream, afterwards passing it on.
@@ -1285,7 +1292,7 @@
         F: FnMut(&Self::Item),
         Self: Sized,
     {
-        Inspect::new(self, f)
+        assert_stream::<Self::Item, _>(Inspect::new(self, f))
     }
 
     /// Wrap this stream in an `Either` stream, making it the left-hand variant
@@ -1298,7 +1305,7 @@
         B: Stream<Item = Self::Item>,
         Self: Sized,
     {
-        Either::Left(self)
+        assert_stream::<Self::Item, _>(Either::Left(self))
     }
 
     /// Wrap this stream in an `Either` stream, making it the right-hand variant
@@ -1311,7 +1318,7 @@
         B: Stream<Item = Self::Item>,
         Self: Sized,
     {
-        Either::Right(self)
+        assert_stream::<Self::Item, _>(Either::Right(self))
     }
 
     /// A convenience method for calling [`Stream::poll_next`] on [`Unpin`]
diff --git a/src/stream/stream/peek.rs b/src/stream/stream/peek.rs
index fb0f874..1d8c342 100644
--- a/src/stream/stream/peek.rs
+++ b/src/stream/stream/peek.rs
@@ -6,7 +6,7 @@
 use futures_core::task::{Context, Poll};
 #[cfg(feature = "sink")]
 use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// A `Stream` that implements a `peek` method.
 ///
@@ -42,19 +42,17 @@
     ///
     /// This method polls the underlying stream and return either a reference
     /// to the next item if the stream is ready or passes through any errors.
-    #[project]
     pub fn poll_peek(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<&St::Item>> {
-        #[project]
-        let Peekable { mut stream, peeked } = self.project();
+        let mut this = self.project();
 
         Poll::Ready(loop {
-            if peeked.is_some() {
-                break peeked.as_ref();
-            } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
-                *peeked = Some(item);
+            if this.peeked.is_some() {
+                break this.peeked.as_ref();
+            } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
+                *this.peeked = Some(item);
             } else {
                 break None;
             }
@@ -71,14 +69,12 @@
 impl<S: Stream> Stream for Peekable<S> {
     type Item = S::Item;
 
-    #[project]
     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
-        #[project]
-        let Peekable { stream, peeked } = self.project();
-        if let Some(item) = peeked.take() {
+        let this = self.project();
+        if let Some(item) = this.peeked.take() {
             return Poll::Ready(Some(item));
         }
-        stream.poll_next(cx)
+        this.stream.poll_next(cx)
     }
 
     fn size_hint(&self) -> (usize, Option<usize>) {
diff --git a/src/stream/stream/ready_chunks.rs b/src/stream/stream/ready_chunks.rs
index 2152cb7..192d3c6 100644
--- a/src/stream/stream/ready_chunks.rs
+++ b/src/stream/stream/ready_chunks.rs
@@ -3,7 +3,7 @@
 use futures_core::task::{Context, Poll};
 #[cfg(feature = "sink")]
 use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 use core::mem;
 use core::pin::Pin;
 use alloc::vec::Vec;
@@ -36,23 +36,21 @@
 impl<St: Stream> Stream for ReadyChunks<St> {
     type Item = Vec<St::Item>;
 
-    #[project]
     fn poll_next(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
-        #[project]
-        let ReadyChunks { items, cap, mut stream } = self.project();
+        let mut this = self.project();
 
         loop {
-            match stream.as_mut().poll_next(cx) {
+            match this.stream.as_mut().poll_next(cx) {
                 // Flush all collected data if underlying stream doesn't contain
                 // more ready values
                 Poll::Pending => {
-                    return if items.is_empty() {
+                    return if this.items.is_empty() {
                         Poll::Pending
                     } else {
-                        Poll::Ready(Some(mem::replace(items, Vec::with_capacity(*cap))))
+                        Poll::Ready(Some(mem::replace(this.items, Vec::with_capacity(*this.cap))))
                     }
                 }
 
@@ -60,19 +58,19 @@
                 // If so, replace our buffer with a new and empty one and return
                 // the full one.
                 Poll::Ready(Some(item)) => {
-                    items.push(item);
-                    if items.len() >= *cap {
-                        return Poll::Ready(Some(mem::replace(items, Vec::with_capacity(*cap))))
+                    this.items.push(item);
+                    if this.items.len() >= *this.cap {
+                        return Poll::Ready(Some(mem::replace(this.items, Vec::with_capacity(*this.cap))))
                     }
                 }
 
                 // Since the underlying stream ran out of values, return what we
                 // have buffered, if we have anything.
                 Poll::Ready(None) => {
-                    let last = if items.is_empty() {
+                    let last = if this.items.is_empty() {
                         None
                     } else {
-                        let full_buf = mem::replace(items, Vec::new());
+                        let full_buf = mem::replace(this.items, Vec::new());
                         Some(full_buf)
                     };
 
diff --git a/src/stream/stream/scan.rs b/src/stream/stream/scan.rs
index 0cdfcbc..dd0316d 100644
--- a/src/stream/stream/scan.rs
+++ b/src/stream/stream/scan.rs
@@ -5,7 +5,7 @@
 use futures_core::task::{Context, Poll};
 #[cfg(feature = "sink")]
 use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 struct StateFn<S, F> {
     state: S,
@@ -75,28 +75,26 @@
 {
     type Item = B;
 
-    #[project]
     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<B>> {
         if self.is_done_taking() {
             return Poll::Ready(None);
         }
 
-        #[project]
-        let Scan { mut stream, state_f, mut future } = self.project();
+        let mut this = self.project();
 
         Poll::Ready(loop {
-            if let Some(fut) = future.as_mut().as_pin_mut() {
+            if let Some(fut) = this.future.as_mut().as_pin_mut() {
                 let item = ready!(fut.poll(cx));
-                future.set(None);
+                this.future.set(None);
 
                 if item.is_none() {
-                    *state_f = None;
+                    *this.state_f = None;
                 }
 
                 break item;
-            } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
-                let state_f = state_f.as_mut().unwrap();
-                future.set(Some((state_f.f)(&mut state_f.state, item)))
+            } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
+                let state_f = this.state_f.as_mut().unwrap();
+                this.future.set(Some((state_f.f)(&mut state_f.state, item)))
             } else {
                 break None;
             }
diff --git a/src/stream/stream/skip.rs b/src/stream/stream/skip.rs
index c0f6611..ea31b17 100644
--- a/src/stream/stream/skip.rs
+++ b/src/stream/stream/skip.rs
@@ -3,7 +3,7 @@
 use futures_core::task::{Context, Poll};
 #[cfg(feature = "sink")]
 use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Stream for the [`skip`](super::StreamExt::skip) method.
 #[pin_project]
@@ -35,22 +35,21 @@
 impl<St: Stream> Stream for Skip<St> {
     type Item = St::Item;
 
-    #[project]
     fn poll_next(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<St::Item>> {
-        #[project]
-        let Skip { mut stream, remaining } = self.project();
-        while *remaining > 0 {
-            if ready!(stream.as_mut().poll_next(cx)).is_some() {
-                *remaining -= 1;
+        let mut this = self.project();
+
+        while *this.remaining > 0 {
+            if ready!(this.stream.as_mut().poll_next(cx)).is_some() {
+                *this.remaining -= 1;
             } else {
                 return Poll::Ready(None);
             }
         }
 
-        stream.poll_next(cx)
+        this.stream.poll_next(cx)
     }
 
     fn size_hint(&self) -> (usize, Option<usize>) {
diff --git a/src/stream/stream/skip_while.rs b/src/stream/stream/skip_while.rs
index 3d664f2..2c58102 100644
--- a/src/stream/stream/skip_while.rs
+++ b/src/stream/stream/skip_while.rs
@@ -5,7 +5,7 @@
 use futures_core::task::{Context, Poll};
 #[cfg(feature = "sink")]
 use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Stream for the [`skip_while`](super::StreamExt::skip_while) method.
 #[pin_project]
@@ -71,30 +71,28 @@
 {
     type Item = St::Item;
 
-    #[project]
     fn poll_next(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<St::Item>> {
-        #[project]
-        let SkipWhile { mut stream, f, mut pending_fut, pending_item, done_skipping } = self.project();
+        let mut this = self.project();
 
-        if *done_skipping {
-            return stream.poll_next(cx);
+        if *this.done_skipping {
+            return this.stream.poll_next(cx);
         }
 
         Poll::Ready(loop {
-            if let Some(fut) = pending_fut.as_mut().as_pin_mut() {
+            if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() {
                 let skipped = ready!(fut.poll(cx));
-                let item = pending_item.take();
-                pending_fut.set(None);
+                let item = this.pending_item.take();
+                this.pending_fut.set(None);
                 if !skipped {
-                    *done_skipping = true;
+                    *this.done_skipping = true;
                     break item;
                 }
-            } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
-                pending_fut.set(Some(f(&item)));
-                *pending_item = Some(item);
+            } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
+                this.pending_fut.set(Some((this.f)(&item)));
+                *this.pending_item = Some(item);
             } else {
                 break None;
             }
diff --git a/src/stream/stream/split.rs b/src/stream/stream/split.rs
index c7237a2..991eb16 100644
--- a/src/stream/stream/split.rs
+++ b/src/stream/stream/split.rs
@@ -9,6 +9,7 @@
 /// A `Stream` part of the split pair
 #[derive(Debug)]
 #[must_use = "streams do nothing unless polled"]
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
 pub struct SplitStream<S>(BiLock<S>);
 
 impl<S> Unpin for SplitStream<S> {}
@@ -43,6 +44,7 @@
 /// A `Sink` part of the split pair
 #[derive(Debug)]
 #[must_use = "sinks do nothing unless polled"]
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
 pub struct SplitSink<S, Item> {
     lock: BiLock<S>,
     slot: Option<Item>,
@@ -119,6 +121,7 @@
 
 /// Error indicating a `SplitSink<S>` and `SplitStream<S>` were not two halves
 /// of a `Stream + Split`, and thus could not be `reunite`d.
+#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
 pub struct ReuniteError<T, Item>(pub SplitSink<T, Item>, pub SplitStream<T>);
 
 impl<T, Item> fmt::Debug for ReuniteError<T, Item> {
diff --git a/src/stream/stream/take.rs b/src/stream/stream/take.rs
index 4a68920..1956a82 100644
--- a/src/stream/stream/take.rs
+++ b/src/stream/stream/take.rs
@@ -4,7 +4,7 @@
 use futures_core::task::{Context, Poll};
 #[cfg(feature = "sink")]
 use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Stream for the [`take`](super::StreamExt::take) method.
 #[pin_project]
@@ -32,7 +32,6 @@
 {
     type Item = St::Item;
 
-    #[project]
     fn poll_next(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
@@ -40,13 +39,12 @@
         if self.remaining == 0 {
             Poll::Ready(None)
         } else {
-            #[project]
-            let Take { stream, remaining } = self.project();
-            let next = ready!(stream.poll_next(cx));
+            let this = self.project();
+            let next = ready!(this.stream.poll_next(cx));
             if next.is_some() {
-                *remaining -= 1;
+                *this.remaining -= 1;
             } else {
-                *remaining = 0;
+                *this.remaining = 0;
             }
             Poll::Ready(next)
         }
diff --git a/src/stream/stream/take_until.rs b/src/stream/stream/take_until.rs
index 3662620..4e32ad8 100644
--- a/src/stream/stream/take_until.rs
+++ b/src/stream/stream/take_until.rs
@@ -5,7 +5,7 @@
 use futures_core::task::{Context, Poll};
 #[cfg(feature = "sink")]
 use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 // FIXME: docs, tests
 
@@ -121,26 +121,24 @@
 {
     type Item = St::Item;
 
-    #[project]
     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> {
-        #[project]
-        let TakeUntil { stream, mut fut, fut_result, free } = self.project();
+        let mut this = self.project();
 
-        if let Some(f) = fut.as_mut().as_pin_mut() {
+        if let Some(f) = this.fut.as_mut().as_pin_mut() {
             if let Poll::Ready(result) = f.poll(cx) {
-                fut.set(None);
-                *fut_result = Some(result);
+                this.fut.set(None);
+                *this.fut_result = Some(result);
             }
         }
 
-        if !*free && fut.is_none() {
+        if !*this.free && this.fut.is_none() {
             // Future resolved, inner stream stopped
             Poll::Ready(None)
         } else {
             // Future either not resolved yet or taken out by the user
-            let item = ready!(stream.poll_next(cx));
+            let item = ready!(this.stream.poll_next(cx));
             if item.is_none() {
-                fut.set(None);
+                this.fut.set(None);
             }
             Poll::Ready(item)
         }
diff --git a/src/stream/stream/take_while.rs b/src/stream/stream/take_while.rs
index d90061e..e3a6d00 100644
--- a/src/stream/stream/take_while.rs
+++ b/src/stream/stream/take_while.rs
@@ -5,7 +5,7 @@
 use futures_core::task::{Context, Poll};
 #[cfg(feature = "sink")]
 use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Stream for the [`take_while`](super::StreamExt::take_while) method.
 #[pin_project]
@@ -61,7 +61,6 @@
 {
     type Item = St::Item;
 
-    #[project]
     fn poll_next(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
@@ -70,23 +69,22 @@
             return Poll::Ready(None);
         }
 
-        #[project]
-        let TakeWhile { mut stream, f, mut pending_fut, pending_item, done_taking } = self.project();
+        let mut this = self.project();
 
         Poll::Ready(loop {
-            if let Some(fut) = pending_fut.as_mut().as_pin_mut() {
+            if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() {
                 let take = ready!(fut.poll(cx));
-                let item = pending_item.take();
-                pending_fut.set(None);
+                let item = this.pending_item.take();
+                this.pending_fut.set(None);
                 if take {
                     break item;
                 } else {
-                    *done_taking = true;
+                    *this.done_taking = true;
                     break None;
                 }
-            } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
-                pending_fut.set(Some(f(&item)));
-                *pending_item = Some(item);
+            } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
+                this.pending_fut.set(Some((this.f)(&item)));
+                *this.pending_item = Some(item);
             } else {
                 break None;
             }
diff --git a/src/stream/stream/then.rs b/src/stream/stream/then.rs
index d54512e..1334a6c 100644
--- a/src/stream/stream/then.rs
+++ b/src/stream/stream/then.rs
@@ -5,7 +5,7 @@
 use futures_core::task::{Context, Poll};
 #[cfg(feature = "sink")]
 use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Stream for the [`then`](super::StreamExt::then) method.
 #[pin_project]
@@ -63,21 +63,19 @@
 {
     type Item = Fut::Output;
 
-    #[project]
     fn poll_next(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
-    ) -> Poll<Option<Fut::Output>> {
-        #[project]
-        let Then { mut stream, f, mut future } = self.project();
+    ) -> Poll<Option<Self::Item>> {
+        let mut this = self.project();
 
         Poll::Ready(loop {
-            if let Some(fut) = future.as_mut().as_pin_mut() {
+            if let Some(fut) = this.future.as_mut().as_pin_mut() {
                 let item = ready!(fut.poll(cx));
-                future.set(None);
+                this.future.set(None);
                 break Some(item);
-            } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
-                future.set(Some(f(item)));
+            } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
+                this.future.set(Some((this.f)(item)));
             } else {
                 break None;
             }
diff --git a/src/stream/stream/zip.rs b/src/stream/stream/zip.rs
index 6c148fb..522f7e1 100644
--- a/src/stream/stream/zip.rs
+++ b/src/stream/stream/zip.rs
@@ -3,7 +3,7 @@
 use core::pin::Pin;
 use futures_core::stream::{FusedStream, Stream};
 use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Stream for the [`zip`](super::StreamExt::zip) method.
 #[pin_project]
@@ -49,10 +49,8 @@
     /// Note that care must be taken to avoid tampering with the state of the
     /// stream which may otherwise confuse this combinator.
     pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) {
-        unsafe {
-            let Self { stream1, stream2, .. } = self.get_unchecked_mut();
-            (Pin::new_unchecked(stream1).get_pin_mut(), Pin::new_unchecked(stream2).get_pin_mut())
-        }
+        let this = self.project();
+        (this.stream1.get_pin_mut(), this.stream2.get_pin_mut())
     }
 
     /// Consumes this combinator, returning the underlying streams.
@@ -77,31 +75,29 @@
 {
     type Item = (St1::Item, St2::Item);
 
-    #[project]
     fn poll_next(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
-        #[project]
-        let Zip { mut stream1, mut stream2, queued1, queued2 } = self.project();
+        let mut this = self.project();
 
-        if queued1.is_none() {
-            match stream1.as_mut().poll_next(cx) {
-                Poll::Ready(Some(item1)) => *queued1 = Some(item1),
+        if this.queued1.is_none() {
+            match this.stream1.as_mut().poll_next(cx) {
+                Poll::Ready(Some(item1)) => *this.queued1 = Some(item1),
                 Poll::Ready(None) | Poll::Pending => {}
             }
         }
-        if queued2.is_none() {
-            match stream2.as_mut().poll_next(cx) {
-                Poll::Ready(Some(item2)) => *queued2 = Some(item2),
+        if this.queued2.is_none() {
+            match this.stream2.as_mut().poll_next(cx) {
+                Poll::Ready(Some(item2)) => *this.queued2 = Some(item2),
                 Poll::Ready(None) | Poll::Pending => {}
             }
         }
 
-        if queued1.is_some() && queued2.is_some() {
-            let pair = (queued1.take().unwrap(), queued2.take().unwrap());
+        if this.queued1.is_some() && this.queued2.is_some() {
+            let pair = (this.queued1.take().unwrap(), this.queued2.take().unwrap());
             Poll::Ready(Some(pair))
-        } else if stream1.is_done() || stream2.is_done() {
+        } else if this.stream1.is_done() || this.stream2.is_done() {
             Poll::Ready(None)
         } else {
             Poll::Pending
diff --git a/src/stream/try_stream/and_then.rs b/src/stream/try_stream/and_then.rs
index 563ed34..4d24c4f 100644
--- a/src/stream/try_stream/and_then.rs
+++ b/src/stream/try_stream/and_then.rs
@@ -5,7 +5,7 @@
 use futures_core::task::{Context, Poll};
 #[cfg(feature = "sink")]
 use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Stream for the [`and_then`](super::TryStreamExt::and_then) method.
 #[pin_project]
@@ -50,21 +50,19 @@
 {
     type Item = Result<Fut::Ok, St::Error>;
 
-    #[project]
     fn poll_next(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
-        #[project]
-        let AndThen { mut stream, mut future, f } = self.project();
+        let mut this = self.project();
 
         Poll::Ready(loop {
-            if let Some(fut) = future.as_mut().as_pin_mut() {
+            if let Some(fut) = this.future.as_mut().as_pin_mut() {
                 let item = ready!(fut.try_poll(cx));
-                future.set(None);
+                this.future.set(None);
                 break Some(item);
-            } else if let Some(item) = ready!(stream.as_mut().try_poll_next(cx)?) {
-                future.set(Some(f(item)));
+            } else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
+                this.future.set(Some((this.f)(item)));
             } else {
                 break None;
             }
diff --git a/src/stream/try_stream/into_async_read.rs b/src/stream/try_stream/into_async_read.rs
index 71bbfd1..10291b0 100644
--- a/src/stream/try_stream/into_async_read.rs
+++ b/src/stream/try_stream/into_async_read.rs
@@ -9,6 +9,7 @@
 /// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method.
 #[derive(Debug)]
 #[must_use = "readers do nothing unless polled"]
+#[cfg_attr(docsrs, doc(cfg(feature = "io")))]
 pub struct IntoAsyncRead<St>
 where
     St: TryStream<Error = Error> + Unpin,
diff --git a/src/stream/try_stream/mod.rs b/src/stream/try_stream/mod.rs
index 99d5a6d..0c8e108 100644
--- a/src/stream/try_stream/mod.rs
+++ b/src/stream/try_stream/mod.rs
@@ -103,6 +103,10 @@
 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
 pub use self::try_skip_while::TrySkipWhile;
 
+mod try_take_while;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::try_take_while::TryTakeWhile;
+
 cfg_target_has_atomic! {
     #[cfg(feature = "alloc")]
     mod try_buffer_unordered;
@@ -121,9 +125,12 @@
 #[cfg(feature = "std")]
 mod into_async_read;
 #[cfg(feature = "io")]
+#[cfg_attr(docsrs, doc(cfg(feature = "io")))]
 #[cfg(feature = "std")]
 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
 pub use self::into_async_read::IntoAsyncRead;
+use crate::future::assert_future;
+use crate::stream::assert_stream;
 
 impl<S: ?Sized + TryStream> TryStreamExt for S {}
 
@@ -151,7 +158,7 @@
         Self: Sized,
         Self::Error: Into<E>,
     {
-        ErrInto::new(self)
+        assert_stream::<Result<Self::Ok, E>, _>(ErrInto::new(self))
     }
 
     /// Wraps the current stream in a new stream which maps the success value
@@ -176,7 +183,7 @@
         Self: Sized,
         F: FnMut(Self::Ok) -> T,
     {
-        MapOk::new(self, f)
+        assert_stream::<Result<T, Self::Error>, _>(MapOk::new(self, f))
     }
 
     /// Wraps the current stream in a new stream which maps the error value
@@ -201,7 +208,7 @@
         Self: Sized,
         F: FnMut(Self::Error) -> E,
     {
-        MapErr::new(self, f)
+        assert_stream::<Result<Self::Ok, E>, _>(MapErr::new(self, f))
     }
 
     /// Chain on a computation for when a value is ready, passing the successful
@@ -248,7 +255,7 @@
         Fut: TryFuture<Error = Self::Error>,
         Self: Sized,
     {
-        AndThen::new(self, f)
+        assert_stream::<Result<Fut::Ok, Fut::Error>, _>(AndThen::new(self, f))
     }
 
     /// Chain on a computation for when an error happens, passing the
@@ -274,7 +281,7 @@
         Fut: TryFuture<Ok = Self::Ok>,
         Self: Sized,
     {
-        OrElse::new(self, f)
+        assert_stream::<Result<Self::Ok, Fut::Error>, _>(OrElse::new(self, f))
     }
 
     /// Do something with the success value of this stream, afterwards passing
@@ -288,7 +295,7 @@
         F: FnMut(&Self::Ok),
         Self: Sized,
     {
-        InspectOk::new(self, f)
+        assert_stream::<Result<Self::Ok, Self::Error>, _>(InspectOk::new(self, f))
     }
 
     /// Do something with the error value of this stream, afterwards passing it on.
@@ -301,7 +308,7 @@
         F: FnMut(&Self::Error),
         Self: Sized,
     {
-        InspectErr::new(self, f)
+        assert_stream::<Result<Self::Ok, Self::Error>, _>(InspectErr::new(self, f))
     }
 
     /// Wraps a [`TryStream`] into a type that implements
@@ -329,7 +336,7 @@
     where
         Self: Sized,
     {
-        IntoStream::new(self)
+        assert_stream::<Result<Self::Ok, Self::Error>, _>(IntoStream::new(self))
     }
 
     /// Creates a future that attempts to resolve the next item in the stream.
@@ -356,7 +363,7 @@
     where
         Self: Unpin,
     {
-        TryNext::new(self)
+        assert_future::<Result<Option<Self::Ok>, Self::Error>, _>(TryNext::new(self))
     }
 
     /// Attempts to run this stream to completion, executing the provided
@@ -398,7 +405,7 @@
         Fut: TryFuture<Ok = (), Error = Self::Error>,
         Self: Sized,
     {
-        TryForEach::new(self, f)
+        assert_future::<Result<(), Self::Error>, _>(TryForEach::new(self, f))
     }
 
     /// Skip elements on this stream while the provided asynchronous predicate
@@ -428,7 +435,37 @@
         Fut: TryFuture<Ok = bool, Error = Self::Error>,
         Self: Sized,
     {
-        TrySkipWhile::new(self, f)
+        assert_stream::<Result<Self::Ok, Self::Error>, _>(TrySkipWhile::new(self, f))
+    }
+
+    /// Take elements on this stream while the provided asynchronous predicate
+    /// resolves to `true`.
+    ///
+    /// This function is similar to
+    /// [`StreamExt::take_while`](crate::stream::StreamExt::take_while) but exits
+    /// early if an error occurs.
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// # futures::executor::block_on(async {
+    /// use futures::future;
+    /// use futures::stream::{self, TryStreamExt};
+    ///
+    /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Ok(2)]);
+    /// let stream = stream.try_take_while(|x| future::ready(Ok(*x < 3)));
+    ///
+    /// let output: Result<Vec<i32>, i32> = stream.try_collect().await;
+    /// assert_eq!(output, Ok(vec![1, 2]));
+    /// # })
+    /// ```
+    fn try_take_while<Fut, F>(self, f: F) -> TryTakeWhile<Self, Fut, F>
+    where
+        F: FnMut(&Self::Ok) -> Fut,
+        Fut: TryFuture<Ok = bool, Error = Self::Error>,
+        Self: Sized,
+    {
+        TryTakeWhile::new(self, f)
     }
 
     /// Attempts to run this stream to completion, executing the provided asynchronous
@@ -484,7 +521,11 @@
         Fut: Future<Output = Result<(), Self::Error>>,
         Self: Sized,
     {
-        TryForEachConcurrent::new(self, limit.into(), f)
+        assert_future::<Result<(), Self::Error>, _>(TryForEachConcurrent::new(
+            self,
+            limit.into(),
+            f,
+        ))
     }
 
     /// Attempt to transform a stream into a collection,
@@ -521,7 +562,7 @@
     where
         Self: Sized,
     {
-        TryCollect::new(self)
+        assert_future::<Result<C, Self::Error>, _>(TryCollect::new(self))
     }
 
     /// Attempt to filter the values produced by this stream according to the
@@ -560,7 +601,7 @@
         F: FnMut(&Self::Ok) -> Fut,
         Self: Sized,
     {
-        TryFilter::new(self, f)
+        assert_stream::<Result<Self::Ok, Self::Error>, _>(TryFilter::new(self, f))
     }
 
     /// Attempt to filter the values produced by this stream while
@@ -601,7 +642,7 @@
         F: FnMut(Self::Ok) -> Fut,
         Self: Sized,
     {
-        TryFilterMap::new(self, f)
+        assert_stream::<Result<T, Self::Error>, _>(TryFilterMap::new(self, f))
     }
 
     /// Flattens a stream of streams into just one continuous stream.
@@ -648,7 +689,9 @@
         <Self::Ok as TryStream>::Error: From<Self::Error>,
         Self: Sized,
     {
-        TryFlatten::new(self)
+        assert_stream::<Result<<Self::Ok as TryStream>::Ok, <Self::Ok as TryStream>::Error>, _>(
+            TryFlatten::new(self),
+        )
     }
 
     /// Attempt to execute an accumulating asynchronous computation over a
@@ -685,7 +728,7 @@
         Fut: TryFuture<Ok = T, Error = Self::Error>,
         Self: Sized,
     {
-        TryFold::new(self, f, init)
+        assert_future::<Result<T, Self::Error>, _>(TryFold::new(self, f, init))
     }
 
     /// Attempt to concatenate all items of a stream into a single
@@ -727,7 +770,7 @@
         Self: Sized,
         Self::Ok: Extend<<<Self as TryStream>::Ok as IntoIterator>::Item> + IntoIterator + Default,
     {
-        TryConcat::new(self)
+        assert_future::<Result<Self::Ok, Self::Error>, _>(TryConcat::new(self))
     }
 
     /// Attempt to execute several futures from a stream concurrently.
@@ -794,7 +837,9 @@
         Self::Ok: TryFuture<Error = Self::Error>,
         Self: Sized,
     {
-        TryBufferUnordered::new(self, n)
+        assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(
+            TryBufferUnordered::new(self, n),
+        )
     }
 
     // TODO: false positive warning from rustdoc. Verify once #43466 settles
@@ -831,6 +876,7 @@
     /// # assert_eq!(42, futures::executor::block_on(rx).unwrap());
     /// ```
     #[cfg(feature = "compat")]
+    #[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
     fn compat(self) -> Compat<Self>
     where
         Self: Sized + Unpin,
@@ -864,6 +910,7 @@
     /// # })
     /// ```
     #[cfg(feature = "io")]
+    #[cfg_attr(docsrs, doc(cfg(feature = "io")))]
     #[cfg(feature = "std")]
     fn into_async_read(self) -> IntoAsyncRead<Self>
     where
diff --git a/src/stream/try_stream/or_else.rs b/src/stream/try_stream/or_else.rs
index 0bba0d0..d972c0f 100644
--- a/src/stream/try_stream/or_else.rs
+++ b/src/stream/try_stream/or_else.rs
@@ -5,7 +5,7 @@
 use futures_core::task::{Context, Poll};
 #[cfg(feature = "sink")]
 use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Stream for the [`or_else`](super::TryStreamExt::or_else) method.
 #[pin_project]
@@ -50,24 +50,22 @@
 {
     type Item = Result<St::Ok, Fut::Error>;
 
-    #[project]
     fn poll_next(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
-        #[project]
-        let OrElse { mut stream, mut future, f } = self.project();
+        let mut this = self.project();
 
         Poll::Ready(loop {
-            if let Some(fut) = future.as_mut().as_pin_mut() {
+            if let Some(fut) = this.future.as_mut().as_pin_mut() {
                 let item = ready!(fut.try_poll(cx));
-                future.set(None);
+                this.future.set(None);
                 break Some(item);
             } else {
-                match ready!(stream.as_mut().try_poll_next(cx)) {
+                match ready!(this.stream.as_mut().try_poll_next(cx)) {
                     Some(Ok(item)) => break Some(Ok(item)),
                     Some(Err(e)) => {
-                        future.set(Some(f(e)));
+                        this.future.set(Some((this.f)(e)));
                     },
                     None => break None,
                 }
diff --git a/src/stream/try_stream/try_buffer_unordered.rs b/src/stream/try_stream/try_buffer_unordered.rs
index 566868b..c1c931a 100644
--- a/src/stream/try_stream/try_buffer_unordered.rs
+++ b/src/stream/try_stream/try_buffer_unordered.rs
@@ -5,7 +5,7 @@
 use futures_core::task::{Context, Poll};
 #[cfg(feature = "sink")]
 use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 use core::pin::Pin;
 
 /// Stream for the
@@ -43,31 +43,29 @@
 {
     type Item = Result<<St::Ok as TryFuture>::Ok, St::Error>;
 
-    #[project]
     fn poll_next(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
-        #[project]
-        let TryBufferUnordered { mut stream, in_progress_queue, max } = self.project();
+        let mut this = self.project();
 
         // First up, try to spawn off as many futures as possible by filling up
         // our queue of futures. Propagate errors from the stream immediately.
-        while in_progress_queue.len() < *max {
-            match stream.as_mut().poll_next(cx)? {
-                Poll::Ready(Some(fut)) => in_progress_queue.push(fut.into_future()),
+        while this.in_progress_queue.len() < *this.max {
+            match this.stream.as_mut().poll_next(cx)? {
+                Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut.into_future()),
                 Poll::Ready(None) | Poll::Pending => break,
             }
         }
 
         // Attempt to pull the next value from the in_progress_queue
-        match in_progress_queue.poll_next_unpin(cx) {
+        match this.in_progress_queue.poll_next_unpin(cx) {
             x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x,
             Poll::Ready(None) => {}
         }
 
         // If more values are still coming from the stream, we're not done yet
-        if stream.is_done() {
+        if this.stream.is_done() {
             Poll::Ready(None)
         } else {
             Poll::Pending
diff --git a/src/stream/try_stream/try_collect.rs b/src/stream/try_stream/try_collect.rs
index 3c9aee2..76ce619 100644
--- a/src/stream/try_stream/try_collect.rs
+++ b/src/stream/try_stream/try_collect.rs
@@ -3,7 +3,7 @@
 use futures_core::future::{FusedFuture, Future};
 use futures_core::stream::{FusedStream, TryStream};
 use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Future for the [`try_collect`](super::TryStreamExt::try_collect) method.
 #[pin_project]
@@ -41,17 +41,15 @@
 {
     type Output = Result<C, St::Error>;
 
-    #[project]
     fn poll(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Self::Output> {
-        #[project]
-        let TryCollect { mut stream, items } = self.project();
+        let mut this = self.project();
         Poll::Ready(Ok(loop {
-            match ready!(stream.as_mut().try_poll_next(cx)?) {
-                Some(x) => items.extend(Some(x)),
-                None => break mem::replace(items, Default::default()),
+            match ready!(this.stream.as_mut().try_poll_next(cx)?) {
+                Some(x) => this.items.extend(Some(x)),
+                None => break mem::replace(this.items, Default::default()),
             }
         }))
     }
diff --git a/src/stream/try_stream/try_concat.rs b/src/stream/try_stream/try_concat.rs
index 8c9710b..235a2c5 100644
--- a/src/stream/try_stream/try_concat.rs
+++ b/src/stream/try_stream/try_concat.rs
@@ -2,7 +2,7 @@
 use futures_core::future::Future;
 use futures_core::stream::TryStream;
 use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Future for the [`try_concat`](super::TryStreamExt::try_concat) method.
 #[pin_project]
@@ -34,19 +34,18 @@
 {
     type Output = Result<St::Ok, St::Error>;
 
-    #[project]
     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
-        #[project]
-        let TryConcat { mut stream, accum } = self.project();
+        let mut this = self.project();
+
         Poll::Ready(Ok(loop {
-            if let Some(x) = ready!(stream.as_mut().try_poll_next(cx)?) {
-                if let Some(a) = accum {
+            if let Some(x) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
+                if let Some(a) = this.accum {
                     a.extend(x)
                 } else {
-                    *accum = Some(x)
+                    *this.accum = Some(x)
                 }
             } else {
-                break accum.take().unwrap_or_default();
+                break this.accum.take().unwrap_or_default();
             }
         }))
     }
diff --git a/src/stream/try_stream/try_filter.rs b/src/stream/try_stream/try_filter.rs
index 310f991..38d060c 100644
--- a/src/stream/try_stream/try_filter.rs
+++ b/src/stream/try_stream/try_filter.rs
@@ -5,7 +5,7 @@
 use futures_core::task::{Context, Poll};
 #[cfg(feature = "sink")]
 use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Stream for the [`try_filter`](super::TryStreamExt::try_filter)
 /// method.
@@ -69,24 +69,23 @@
 {
     type Item = Result<St::Ok, St::Error>;
 
-    #[project]
     fn poll_next(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
-    ) -> Poll<Option<Result<St::Ok, St::Error>>> {
-        #[project]
-        let TryFilter { mut stream, f, mut pending_fut, pending_item } = self.project();
+    ) -> Poll<Option<Self::Item>> {
+        let mut this = self.project();
+
         Poll::Ready(loop {
-            if let Some(fut) = pending_fut.as_mut().as_pin_mut() {
+            if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() {
                 let res = ready!(fut.poll(cx));
-                pending_fut.set(None);
+                this.pending_fut.set(None);
                 if res {
-                    break pending_item.take().map(Ok);
+                    break this.pending_item.take().map(Ok);
                 }
-                *pending_item = None;
-            } else if let Some(item) = ready!(stream.as_mut().try_poll_next(cx)?) {
-                pending_fut.set(Some(f(&item)));
-                *pending_item = Some(item);
+                *this.pending_item = None;
+            } else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
+                this.pending_fut.set(Some((this.f)(&item)));
+                *this.pending_item = Some(item);
             } else {
                 break None;
             }
diff --git a/src/stream/try_stream/try_filter_map.rs b/src/stream/try_stream/try_filter_map.rs
index ba8e43a..6c1e48f 100644
--- a/src/stream/try_stream/try_filter_map.rs
+++ b/src/stream/try_stream/try_filter_map.rs
@@ -5,7 +5,7 @@
 use futures_core::task::{Context, Poll};
 #[cfg(feature = "sink")]
 use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Stream for the [`try_filter_map`](super::TryStreamExt::try_filter_map)
 /// method.
@@ -57,24 +57,23 @@
 {
     type Item = Result<T, St::Error>;
 
-    #[project]
     fn poll_next(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
-    ) -> Poll<Option<Result<T, St::Error>>> {
-        #[project]
-        let TryFilterMap { mut stream, f, mut pending } = self.project();
+    ) -> Poll<Option<Self::Item>> {
+        let mut this = self.project();
+
         Poll::Ready(loop {
-            if let Some(p) = pending.as_mut().as_pin_mut() {
+            if let Some(p) = this.pending.as_mut().as_pin_mut() {
                 // We have an item in progress, poll that until it's done
                 let item = ready!(p.try_poll(cx)?);
-                pending.set(None);
+                this.pending.set(None);
                 if item.is_some() {
                     break item.map(Ok);
                 }
-            } else if let Some(item) = ready!(stream.as_mut().try_poll_next(cx)?) {
+            } else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
                 // No item in progress, but the stream is still going
-                pending.set(Some(f(item)));
+                this.pending.set(Some((this.f)(item)));
             } else {
                 // The stream is done
                 break None;
diff --git a/src/stream/try_stream/try_flatten.rs b/src/stream/try_stream/try_flatten.rs
index a528639..5227903 100644
--- a/src/stream/try_stream/try_flatten.rs
+++ b/src/stream/try_stream/try_flatten.rs
@@ -3,7 +3,7 @@
 use futures_core::task::{Context, Poll};
 #[cfg(feature = "sink")]
 use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Stream for the [`try_flatten`](super::TryStreamExt::try_flatten) method.
 #[pin_project]
@@ -51,19 +51,18 @@
 {
     type Item = Result<<St::Ok as TryStream>::Ok, <St::Ok as TryStream>::Error>;
 
-    #[project]
     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
-        #[project]
-        let TryFlatten { mut stream, mut next } = self.project();
+        let mut this = self.project();
+
         Poll::Ready(loop {
-            if let Some(s) = next.as_mut().as_pin_mut() {
+            if let Some(s) = this.next.as_mut().as_pin_mut() {
                 if let Some(item) = ready!(s.try_poll_next(cx)?) {
                     break Some(Ok(item));
                 } else {
-                    next.set(None);
+                    this.next.set(None);
                 }
-            } else if let Some(s) = ready!(stream.as_mut().try_poll_next(cx)?) {
-                next.set(Some(s));
+            } else if let Some(s) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
+                this.next.set(Some(s));
             } else {
                 break None;
             }
diff --git a/src/stream/try_stream/try_fold.rs b/src/stream/try_stream/try_fold.rs
index d85c1fe..abeeea4 100644
--- a/src/stream/try_stream/try_fold.rs
+++ b/src/stream/try_stream/try_fold.rs
@@ -3,7 +3,7 @@
 use futures_core::future::{FusedFuture, Future, TryFuture};
 use futures_core::stream::TryStream;
 use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Future for the [`try_fold`](super::TryStreamExt::try_fold) method.
 #[pin_project]
@@ -64,25 +64,24 @@
 {
     type Output = Result<T, St::Error>;
 
-    #[project]
     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
-        #[project]
-        let TryFold { mut stream, f, accum, mut future } = self.project();
+        let mut this = self.project();
+
         Poll::Ready(loop {
-            if let Some(fut) = future.as_mut().as_pin_mut() {
+            if let Some(fut) = this.future.as_mut().as_pin_mut() {
                 // we're currently processing a future to produce a new accum value
                 let res = ready!(fut.try_poll(cx));
-                future.set(None);
+                this.future.set(None);
                 match res {
-                    Ok(a) => *accum = Some(a),
+                    Ok(a) => *this.accum = Some(a),
                     Err(e) => break Err(e),
                 }
-            } else if accum.is_some() {
+            } else if this.accum.is_some() {
                 // we're waiting on a new item from the stream
-                let res = ready!(stream.as_mut().try_poll_next(cx));
-                let a = accum.take().unwrap();
+                let res = ready!(this.stream.as_mut().try_poll_next(cx));
+                let a = this.accum.take().unwrap();
                 match res {
-                    Some(Ok(item)) => future.set(Some(f(a, item))),
+                    Some(Ok(item)) => this.future.set(Some((this.f)(a, item))),
                     Some(Err(e)) => break Err(e),
                     None => break Ok(a),
                 }
diff --git a/src/stream/try_stream/try_for_each.rs b/src/stream/try_stream/try_for_each.rs
index 5fc91df..a00e911 100644
--- a/src/stream/try_stream/try_for_each.rs
+++ b/src/stream/try_stream/try_for_each.rs
@@ -3,7 +3,7 @@
 use futures_core::future::{Future, TryFuture};
 use futures_core::stream::TryStream;
 use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Future for the [`try_for_each`](super::TryStreamExt::try_for_each) method.
 #[pin_project]
@@ -50,17 +50,15 @@
 {
     type Output = Result<(), St::Error>;
 
-    #[project]
     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
-        #[project]
-        let TryForEach { mut stream, f, mut future } = self.project();
+        let mut this = self.project();
         loop {
-            if let Some(fut) = future.as_mut().as_pin_mut() {
+            if let Some(fut) = this.future.as_mut().as_pin_mut() {
                 ready!(fut.try_poll(cx))?;
-                future.set(None);
+                this.future.set(None);
             } else {
-                match ready!(stream.as_mut().try_poll_next(cx)?) {
-                    Some(e) => future.set(Some(f(e))),
+                match ready!(this.stream.as_mut().try_poll_next(cx)?) {
+                    Some(e) => this.future.set(Some((this.f)(e))),
                     None => break,
                 }
             }
diff --git a/src/stream/try_stream/try_for_each_concurrent.rs b/src/stream/try_stream/try_for_each_concurrent.rs
index 87fd465..db8e505 100644
--- a/src/stream/try_stream/try_for_each_concurrent.rs
+++ b/src/stream/try_stream/try_for_each_concurrent.rs
@@ -6,7 +6,7 @@
 use futures_core::future::{FusedFuture, Future};
 use futures_core::stream::TryStream;
 use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Future for the
 /// [`try_for_each_concurrent`](super::TryStreamExt::try_for_each_concurrent)
@@ -68,16 +68,14 @@
 {
     type Output = Result<(), St::Error>;
 
-    #[project]
     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
-        #[project]
-        let TryForEachConcurrent { mut stream, f, futures, limit } = self.project();
+        let mut this = self.project();
         loop {
             let mut made_progress_this_iter = false;
 
             // Check if we've already created a number of futures greater than `limit`
-            if limit.map(|limit| limit.get() > futures.len()).unwrap_or(true) {
-                let poll_res = match stream.as_mut().as_pin_mut() {
+            if this.limit.map(|limit| limit.get() > this.futures.len()).unwrap_or(true) {
+                let poll_res = match this.stream.as_mut().as_pin_mut() {
                     Some(stream) => stream.try_poll_next(cx),
                     None => Poll::Ready(None),
                 };
@@ -88,28 +86,28 @@
                         Some(elem)
                     },
                     Poll::Ready(None) => {
-                        stream.set(None);
+                        this.stream.set(None);
                         None
                     }
                     Poll::Pending => None,
                     Poll::Ready(Some(Err(e))) => {
                         // Empty the stream and futures so that we know
                         // the future has completed.
-                        stream.set(None);
-                        drop(mem::replace(futures, FuturesUnordered::new()));
+                        this.stream.set(None);
+                        drop(mem::replace(this.futures, FuturesUnordered::new()));
                         return Poll::Ready(Err(e));
                     }
                 };
 
                 if let Some(elem) = elem {
-                    futures.push(f(elem));
+                    this.futures.push((this.f)(elem));
                 }
             }
 
-            match futures.poll_next_unpin(cx) {
+            match this.futures.poll_next_unpin(cx) {
                 Poll::Ready(Some(Ok(()))) => made_progress_this_iter = true,
                 Poll::Ready(None) => {
-                    if stream.is_none() {
+                    if this.stream.is_none() {
                         return Poll::Ready(Ok(()))
                     }
                 },
@@ -117,8 +115,8 @@
                 Poll::Ready(Some(Err(e))) => {
                     // Empty the stream and futures so that we know
                     // the future has completed.
-                    stream.set(None);
-                    drop(mem::replace(futures, FuturesUnordered::new()));
+                    this.stream.set(None);
+                    drop(mem::replace(this.futures, FuturesUnordered::new()));
                     return Poll::Ready(Err(e));
                 }
             }
diff --git a/src/stream/try_stream/try_skip_while.rs b/src/stream/try_stream/try_skip_while.rs
index 624380f..35759d0 100644
--- a/src/stream/try_stream/try_skip_while.rs
+++ b/src/stream/try_stream/try_skip_while.rs
@@ -5,7 +5,7 @@
 use futures_core::task::{Context, Poll};
 #[cfg(feature = "sink")]
 use futures_sink::Sink;
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Stream for the [`try_skip_while`](super::TryStreamExt::try_skip_while)
 /// method.
@@ -62,30 +62,28 @@
 {
     type Item = Result<St::Ok, St::Error>;
 
-    #[project]
     fn poll_next(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
-        #[project]
-        let TrySkipWhile { mut stream, f, mut pending_fut, pending_item, done_skipping } = self.project();
+        let mut this = self.project();
 
-        if *done_skipping {
-            return stream.try_poll_next(cx);
+        if *this.done_skipping {
+            return this.stream.try_poll_next(cx);
         }
 
         Poll::Ready(loop {
-            if let Some(fut) = pending_fut.as_mut().as_pin_mut() {
+            if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() {
                 let skipped = ready!(fut.try_poll(cx)?);
-                let item = pending_item.take();
-                pending_fut.set(None);
+                let item = this.pending_item.take();
+                this.pending_fut.set(None);
                 if !skipped {
-                    *done_skipping = true;
+                    *this.done_skipping = true;
                     break item.map(Ok);
                 }
-            } else if let Some(item) = ready!(stream.as_mut().try_poll_next(cx)?) {
-                pending_fut.set(Some(f(&item)));
-                *pending_item = Some(item);
+            } else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
+                this.pending_fut.set(Some((this.f)(&item)));
+                *this.pending_item = Some(item);
             } else {
                 break None;
             }
diff --git a/src/stream/try_stream/try_take_while.rs b/src/stream/try_stream/try_take_while.rs
new file mode 100644
index 0000000..16bfb20
--- /dev/null
+++ b/src/stream/try_stream/try_take_while.rs
@@ -0,0 +1,132 @@
+use core::fmt;
+use core::pin::Pin;
+use futures_core::future::TryFuture;
+use futures_core::stream::{FusedStream, Stream, TryStream};
+use futures_core::task::{Context, Poll};
+#[cfg(feature = "sink")]
+use futures_sink::Sink;
+use pin_project::pin_project;
+
+/// Stream for the [`try_take_while`](super::TryStreamExt::try_take_while)
+/// method.
+#[pin_project]
+#[must_use = "streams do nothing unless polled"]
+pub struct TryTakeWhile<St, Fut, F>
+where
+    St: TryStream,
+{
+    #[pin]
+    stream: St,
+    f: F,
+    #[pin]
+    pending_fut: Option<Fut>,
+    pending_item: Option<St::Ok>,
+    done_taking: bool,
+}
+
+impl<St, Fut, F> fmt::Debug for TryTakeWhile<St, Fut, F>
+where
+    St: TryStream + fmt::Debug,
+    St::Ok: fmt::Debug,
+    Fut: fmt::Debug,
+{
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("TryTakeWhile")
+            .field("stream", &self.stream)
+            .field("pending_fut", &self.pending_fut)
+            .field("pending_item", &self.pending_item)
+            .field("done_taking", &self.done_taking)
+            .finish()
+    }
+}
+
+impl<St, Fut, F> TryTakeWhile<St, Fut, F>
+where
+    St: TryStream,
+    F: FnMut(&St::Ok) -> Fut,
+    Fut: TryFuture<Ok = bool, Error = St::Error>,
+{
+    pub(super) fn new(stream: St, f: F) -> TryTakeWhile<St, Fut, F> {
+        TryTakeWhile {
+            stream,
+            f,
+            pending_fut: None,
+            pending_item: None,
+            done_taking: false,
+        }
+    }
+
+    delegate_access_inner!(stream, St, ());
+}
+
+impl<St, Fut, F> Stream for TryTakeWhile<St, Fut, F>
+where
+    St: TryStream,
+    F: FnMut(&St::Ok) -> Fut,
+    Fut: TryFuture<Ok = bool, Error = St::Error>,
+{
+    type Item = Result<St::Ok, St::Error>;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        let mut this = self.project();
+
+        if *this.done_taking {
+            return Poll::Ready(None);
+        }
+
+        Poll::Ready(loop {
+            if let Some(fut) = this.pending_fut.as_mut().as_pin_mut() {
+                let take = ready!(fut.try_poll(cx)?);
+                let item = this.pending_item.take();
+                this.pending_fut.set(None);
+                if take {
+                    break item.map(Ok);
+                } else {
+                    *this.done_taking = true;
+                    break None;
+                }
+            } else if let Some(item) = ready!(this.stream.as_mut().try_poll_next(cx)?) {
+                this.pending_fut.set(Some((this.f)(&item)));
+                *this.pending_item = Some(item);
+            } else {
+                break None;
+            }
+        })
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        if self.done_taking {
+            return (0, Some(0));
+        }
+
+        let pending_len = if self.pending_item.is_some() { 1 } else { 0 };
+        let (_, upper) = self.stream.size_hint();
+        let upper = match upper {
+            Some(x) => x.checked_add(pending_len),
+            None => None,
+        };
+        (0, upper) // can't know a lower bound, due to the predicate
+    }
+}
+
+impl<St, Fut, F> FusedStream for TryTakeWhile<St, Fut, F>
+where
+    St: TryStream + FusedStream,
+    F: FnMut(&St::Ok) -> Fut,
+    Fut: TryFuture<Ok = bool, Error = St::Error>,
+{
+    fn is_terminated(&self) -> bool {
+        self.done_taking || self.pending_item.is_none() && self.stream.is_terminated()
+    }
+}
+
+// Forwarding impl of Sink from the underlying stream
+#[cfg(feature = "sink")]
+impl<S, Fut, F, Item, E> Sink<Item> for TryTakeWhile<S, Fut, F>
+where
+    S: TryStream + Sink<Item, Error = E>,
+{
+    type Error = E;
+
+    delegate_sink!(stream, Item);
+}
diff --git a/src/stream/try_stream/try_unfold.rs b/src/stream/try_stream/try_unfold.rs
index 8da1248..a6b31fe 100644
--- a/src/stream/try_stream/try_unfold.rs
+++ b/src/stream/try_stream/try_unfold.rs
@@ -3,7 +3,7 @@
 use futures_core::future::TryFuture;
 use futures_core::stream::Stream;
 use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Creates a `TryStream` from a seed and a closure returning a `TryFuture`.
 ///
@@ -96,30 +96,28 @@
 {
     type Item = Result<Item, Fut::Error>;
 
-    #[project]
     fn poll_next(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
-    ) -> Poll<Option<Result<Item, Fut::Error>>> {
-        #[project]
-        let TryUnfold {f, state, mut fut } = self.project();
+    ) -> Poll<Option<Self::Item>> {
+        let mut this = self.project();
 
-        if let Some(state) = state.take() {
-            fut.set(Some(f(state)));
+        if let Some(state) = this.state.take() {
+            this.fut.set(Some((this.f)(state)));
         }
 
-        match fut.as_mut().as_pin_mut() {
+        match this.fut.as_mut().as_pin_mut() {
             None => {
                 // The future previously errored
                 Poll::Ready(None)
             }
             Some(future) => {
                 let step = ready!(future.try_poll(cx));
-                fut.set(None);
+                this.fut.set(None);
 
                 match step {
                     Ok(Some((item, next_state))) => {
-                        *state = Some(next_state);
+                        *this.state = Some(next_state);
                         Poll::Ready(Some(Ok(item)))
                     }
                     Ok(None) => Poll::Ready(None),
diff --git a/src/stream/unfold.rs b/src/stream/unfold.rs
index 0279571..b6f8eae 100644
--- a/src/stream/unfold.rs
+++ b/src/stream/unfold.rs
@@ -3,7 +3,7 @@
 use futures_core::future::Future;
 use futures_core::stream::{FusedStream, Stream};
 use futures_core::task::{Context, Poll};
-use pin_project::{pin_project, project};
+use pin_project::pin_project;
 
 /// Creates a `Stream` from a seed and a closure returning a `Future`.
 ///
@@ -93,24 +93,22 @@
 {
     type Item = Item;
 
-    #[project]
     fn poll_next(
         self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
-        #[project]
-        let Unfold { state, f, mut fut } = self.project();
+        let mut this = self.project();
 
-        if let Some(state) = state.take() {
-            fut.set(Some(f(state)));
+        if let Some(state) = this.state.take() {
+            this.fut.set(Some((this.f)(state)));
         }
 
-        let step = ready!(fut.as_mut().as_pin_mut()
+        let step = ready!(this.fut.as_mut().as_pin_mut()
             .expect("Unfold must not be polled after it returned `Poll::Ready(None)`").poll(cx));
-        fut.set(None);
+        this.fut.set(None);
 
         if let Some((item, next_state)) = step {
-            *state = Some(next_state);
+            *this.state = Some(next_state);
             Poll::Ready(Some(item))
         } else {
             Poll::Ready(None)
diff --git a/src/task/spawn.rs b/src/task/spawn.rs
index f34445a..f877923 100644
--- a/src/task/spawn.rs
+++ b/src/task/spawn.rs
@@ -69,6 +69,7 @@
     /// assert_eq!(block_on(join_handle_fut), 1);
     /// ```
     #[cfg(feature = "channel")]
+    #[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
     #[cfg(feature = "std")]
     fn spawn_with_handle<Fut>(&self, future: Fut) -> Result<RemoteHandle<Fut::Output>, SpawnError>
     where
@@ -83,6 +84,7 @@
     /// Wraps a [`Spawn`] and makes it usable as a futures 0.1 `Executor`.
     /// Requires the `compat` feature to enable.
     #[cfg(feature = "compat")]
+    #[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
     fn compat(self) -> Compat<Self>
     where
         Self: Sized,
@@ -145,6 +147,7 @@
     /// assert_eq!(executor.run_until(join_handle_fut), 1);
     /// ```
     #[cfg(feature = "channel")]
+    #[cfg_attr(docsrs, doc(cfg(feature = "channel")))]
     #[cfg(feature = "std")]
     fn spawn_local_with_handle<Fut>(
         &self,