Snap for 8564071 from 6661c46def7a4e79589003500e402827370bd285 to mainline-permission-release

Change-Id: I49e68a3bc35e3694cf08a3387612d559edfd5ae9
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index f3ad3ab..e483977 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,6 @@
 {
   "git": {
-    "sha1": "c91f8691672c7401b1923ab00bf138975c99391a"
-  }
-}
+    "sha1": "fc1e3250219170e31cddb8857a276cba7dd08d44"
+  },
+  "path_in_vcs": "futures"
+}
\ No newline at end of file
diff --git a/Android.bp b/Android.bp
index f5fe962..a594b53 100644
--- a/Android.bp
+++ b/Android.bp
@@ -41,6 +41,8 @@
     name: "libfutures",
     host_supported: true,
     crate_name: "futures",
+    cargo_env_compat: true,
+    cargo_pkg_version: "0.3.21",
     srcs: ["src/lib.rs"],
     edition: "2018",
     features: [
@@ -62,28 +64,9 @@
     ],
     apex_available: [
         "//apex_available:platform",
+        "com.android.bluetooth",
         "com.android.resolv",
         "com.android.virt",
     ],
     min_sdk_version: "29",
 }
-
-// dependent_library ["feature_list"]
-//   futures-channel-0.3.14 "alloc,futures-sink,sink,std"
-//   futures-core-0.3.14 "alloc,std"
-//   futures-executor-0.3.14 "std"
-//   futures-io-0.3.14 "std"
-//   futures-macro-0.3.14
-//   futures-sink-0.3.14 "alloc,std"
-//   futures-task-0.3.14 "alloc,std"
-//   futures-util-0.3.14 "alloc,async-await,async-await-macro,channel,futures-channel,futures-io,futures-macro,futures-sink,io,memchr,proc-macro-hack,proc-macro-nested,sink,slab,std"
-//   memchr-2.3.4 "default,std"
-//   pin-project-lite-0.2.6
-//   pin-utils-0.1.0
-//   proc-macro-hack-0.5.19
-//   proc-macro-nested-0.1.7
-//   proc-macro2-1.0.26 "default,proc-macro"
-//   quote-1.0.9 "default,proc-macro"
-//   slab-0.4.3 "default,std"
-//   syn-1.0.70 "clone-impls,default,derive,full,parsing,printing,proc-macro,quote,visit-mut"
-//   unicode-xid-0.2.1 "default"
diff --git a/Cargo.toml b/Cargo.toml
index 9905985..f740f96 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -3,62 +3,80 @@
 # When uploading crates to the registry Cargo will automatically
 # "normalize" Cargo.toml files for maximal compatibility
 # with all versions of Cargo and also rewrite `path` dependencies
-# to registry (e.g., crates.io) dependencies
+# to registry (e.g., crates.io) dependencies.
 #
-# If you believe there's an error in this file please file an
-# issue against the rust-lang/cargo repository. If you're
-# editing this file be aware that the upstream Cargo.toml
-# will likely look very different (and much more reasonable)
+# If you are reading this file be aware that the original Cargo.toml
+# will likely look very different (and much more reasonable).
+# See Cargo.toml.orig for the original contents.
 
 [package]
 edition = "2018"
+rust-version = "1.45"
 name = "futures"
-version = "0.3.13"
-authors = ["Alex Crichton <alex@alexcrichton.com>"]
-description = "An implementation of futures and streams featuring zero allocations,\ncomposability, and iterator-like interfaces.\n"
+version = "0.3.21"
+description = """
+An implementation of futures and streams featuring zero allocations,
+composability, and iterator-like interfaces.
+"""
 homepage = "https://rust-lang.github.io/futures-rs"
-documentation = "https://docs.rs/futures/0.3"
 readme = "../README.md"
-keywords = ["futures", "async", "future"]
+keywords = [
+    "futures",
+    "async",
+    "future",
+]
 categories = ["asynchronous"]
 license = "MIT OR Apache-2.0"
 repository = "https://github.com/rust-lang/futures-rs"
+
 [package.metadata.docs.rs]
 all-features = true
-rustdoc-args = ["--cfg", "docsrs"]
+rustdoc-args = [
+    "--cfg",
+    "docsrs",
+]
 
 [package.metadata.playground]
-features = ["std", "async-await", "compat", "io-compat", "executor", "thread-pool"]
+features = [
+    "std",
+    "async-await",
+    "compat",
+    "io-compat",
+    "executor",
+    "thread-pool",
+]
+
 [dependencies.futures-channel]
-version = "0.3.13"
+version = "0.3.21"
 features = ["sink"]
 default-features = false
 
 [dependencies.futures-core]
-version = "0.3.13"
+version = "0.3.21"
 default-features = false
 
 [dependencies.futures-executor]
-version = "0.3.13"
+version = "0.3.21"
 optional = true
 default-features = false
 
 [dependencies.futures-io]
-version = "0.3.13"
+version = "0.3.21"
 default-features = false
 
 [dependencies.futures-sink]
-version = "0.3.13"
+version = "0.3.21"
 default-features = false
 
 [dependencies.futures-task]
-version = "0.3.13"
+version = "0.3.21"
 default-features = false
 
 [dependencies.futures-util]
-version = "0.3.13"
+version = "0.3.21"
 features = ["sink"]
 default-features = false
+
 [dev-dependencies.assert_matches]
 version = "1.3.0"
 
@@ -75,16 +93,55 @@
 version = "0.1.11"
 
 [features]
-alloc = ["futures-core/alloc", "futures-task/alloc", "futures-sink/alloc", "futures-channel/alloc", "futures-util/alloc"]
-async-await = ["futures-util/async-await", "futures-util/async-await-macro"]
+alloc = [
+    "futures-core/alloc",
+    "futures-task/alloc",
+    "futures-sink/alloc",
+    "futures-channel/alloc",
+    "futures-util/alloc",
+]
+async-await = [
+    "futures-util/async-await",
+    "futures-util/async-await-macro",
+]
 bilock = ["futures-util/bilock"]
-cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic", "futures-task/cfg-target-has-atomic", "futures-channel/cfg-target-has-atomic", "futures-util/cfg-target-has-atomic"]
-compat = ["std", "futures-util/compat"]
-default = ["std", "async-await", "executor"]
-executor = ["std", "futures-executor/std"]
-io-compat = ["compat", "futures-util/io-compat"]
-read-initializer = ["futures-io/read-initializer", "futures-util/read-initializer"]
-std = ["alloc", "futures-core/std", "futures-task/std", "futures-io/std", "futures-sink/std", "futures-util/std", "futures-util/io", "futures-util/channel"]
-thread-pool = ["executor", "futures-executor/thread-pool"]
-unstable = ["futures-core/unstable", "futures-task/unstable", "futures-channel/unstable", "futures-io/unstable", "futures-util/unstable"]
+cfg-target-has-atomic = []
+compat = [
+    "std",
+    "futures-util/compat",
+]
+default = [
+    "std",
+    "async-await",
+    "executor",
+]
+executor = [
+    "std",
+    "futures-executor/std",
+]
+io-compat = [
+    "compat",
+    "futures-util/io-compat",
+]
+std = [
+    "alloc",
+    "futures-core/std",
+    "futures-task/std",
+    "futures-io/std",
+    "futures-sink/std",
+    "futures-util/std",
+    "futures-util/io",
+    "futures-util/channel",
+]
+thread-pool = [
+    "executor",
+    "futures-executor/thread-pool",
+]
+unstable = [
+    "futures-core/unstable",
+    "futures-task/unstable",
+    "futures-channel/unstable",
+    "futures-io/unstable",
+    "futures-util/unstable",
+]
 write-all-vectored = ["futures-util/write-all-vectored"]
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index d046c54..6871f47 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,14 +1,13 @@
 [package]
 name = "futures"
+version = "0.3.21"
 edition = "2018"
-version = "0.3.13"
-authors = ["Alex Crichton <alex@alexcrichton.com>"]
+rust-version = "1.45"
 license = "MIT OR Apache-2.0"
 readme = "../README.md"
 keywords = ["futures", "async", "future"]
 repository = "https://github.com/rust-lang/futures-rs"
 homepage = "https://rust-lang.github.io/futures-rs"
-documentation = "https://docs.rs/futures/0.3"
 description = """
 An implementation of futures and streams featuring zero allocations,
 composability, and iterator-like interfaces.
@@ -16,13 +15,13 @@
 categories = ["asynchronous"]
 
 [dependencies]
-futures-core = { path = "../futures-core", version = "0.3.13", default-features = false }
-futures-task = { path = "../futures-task", version = "0.3.13", default-features = false }
-futures-channel = { path = "../futures-channel", version = "0.3.13", default-features = false, features = ["sink"] }
-futures-executor = { path = "../futures-executor", version = "0.3.13", default-features = false, optional = true }
-futures-io = { path = "../futures-io", version = "0.3.13", default-features = false }
-futures-sink = { path = "../futures-sink", version = "0.3.13", default-features = false }
-futures-util = { path = "../futures-util", version = "0.3.13", default-features = false, features = ["sink"] }
+futures-core = { path = "../futures-core", version = "0.3.21", default-features = false }
+futures-task = { path = "../futures-task", version = "0.3.21", default-features = false }
+futures-channel = { path = "../futures-channel", version = "0.3.21", default-features = false, features = ["sink"] }
+futures-executor = { path = "../futures-executor", version = "0.3.21", default-features = false, optional = true }
+futures-io = { path = "../futures-io", version = "0.3.21", default-features = false }
+futures-sink = { path = "../futures-sink", version = "0.3.21", default-features = false }
+futures-util = { path = "../futures-util", version = "0.3.21", default-features = false, features = ["sink"] }
 
 [dev-dependencies]
 futures-executor = { path = "../futures-executor", features = ["thread-pool"] }
@@ -47,11 +46,13 @@
 # These features are outside of the normal semver guarantees and require the
 # `unstable` feature as an explicit opt-in to unstable API.
 unstable = ["futures-core/unstable", "futures-task/unstable", "futures-channel/unstable", "futures-io/unstable", "futures-util/unstable"]
-cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic", "futures-task/cfg-target-has-atomic", "futures-channel/cfg-target-has-atomic", "futures-util/cfg-target-has-atomic"]
 bilock = ["futures-util/bilock"]
-read-initializer = ["futures-io/read-initializer", "futures-util/read-initializer"]
 write-all-vectored = ["futures-util/write-all-vectored"]
 
+# These features are no longer used.
+# TODO: remove in the next major version.
+cfg-target-has-atomic = []
+
 [package.metadata.docs.rs]
 all-features = true
 rustdoc-args = ["--cfg", "docsrs"]
diff --git a/METADATA b/METADATA
index e99c2fa..554f4df 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@
   }
   url {
     type: ARCHIVE
-    value: "https://static.crates.io/crates/futures/futures-0.3.13.crate"
+    value: "https://static.crates.io/crates/futures/futures-0.3.21.crate"
   }
-  version: "0.3.13"
+  version: "0.3.21"
   license_type: NOTICE
   last_upgrade_date {
-    year: 2021
-    month: 4
+    year: 2022
+    month: 3
     day: 1
   }
 }
diff --git a/TEST_MAPPING b/TEST_MAPPING
index ec169ff..e2df61d 100644
--- a/TEST_MAPPING
+++ b/TEST_MAPPING
@@ -1,41 +1,39 @@
 // Generated by update_crate_tests.py for tests that depend on this crate.
 {
+  "imports": [
+    {
+      "path": "external/rust/crates/anyhow"
+    },
+    {
+      "path": "external/rust/crates/tokio"
+    }
+  ],
   "presubmit": [
     {
-      "name": "anyhow_device_test_tests_test_ffi"
+      "name": "ZipFuseTest"
     },
     {
-      "name": "anyhow_device_test_tests_test_context"
+      "name": "authfs_device_test_src_lib"
     },
     {
-      "name": "anyhow_device_test_tests_test_repr"
+      "name": "doh_unit_test"
     },
     {
-      "name": "anyhow_device_test_tests_test_convert"
+      "name": "virtualizationservice_device_test"
+    }
+  ],
+  "presubmit-rust": [
+    {
+      "name": "ZipFuseTest"
     },
     {
-      "name": "anyhow_device_test_tests_test_fmt"
+      "name": "authfs_device_test_src_lib"
     },
     {
-      "name": "anyhow_device_test_tests_test_boxed"
+      "name": "doh_unit_test"
     },
     {
-      "name": "anyhow_device_test_tests_test_downcast"
-    },
-    {
-      "name": "anyhow_device_test_tests_test_source"
-    },
-    {
-      "name": "anyhow_device_test_tests_test_macros"
-    },
-    {
-      "name": "anyhow_device_test_src_lib"
-    },
-    {
-      "name": "anyhow_device_test_tests_test_autotrait"
-    },
-    {
-      "name": "anyhow_device_test_tests_test_chain"
+      "name": "virtualizationservice_device_test"
     }
   ]
 }
diff --git a/cargo2android.json b/cargo2android.json
index 01465d0..a7e2a4b 100644
--- a/cargo2android.json
+++ b/cargo2android.json
@@ -1,11 +1,12 @@
 {
   "apex-available": [
     "//apex_available:platform",
+    "com.android.bluetooth",
     "com.android.resolv",
     "com.android.virt"
   ],
-  "min_sdk_version": "29",
   "dependencies": true,
   "device": true,
+  "min-sdk-version": "29",
   "run": true
-}
\ No newline at end of file
+}
diff --git a/src/lib.rs b/src/lib.rs
index de29ace..b8ebc61 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -3,12 +3,12 @@
 //! This crate provides a number of core abstractions for writing asynchronous
 //! code:
 //!
-//! - [Futures](crate::future::Future) are single eventual values produced by
+//! - [Futures](crate::future) are single eventual values produced by
 //!   asynchronous computations. Some programming languages (e.g. JavaScript)
 //!   call this concept "promise".
-//! - [Streams](crate::stream::Stream) represent a series of values
+//! - [Streams](crate::stream) represent a series of values
 //!   produced asynchronously.
-//! - [Sinks](crate::sink::Sink) provide support for asynchronous writing of
+//! - [Sinks](crate::sink) provide support for asynchronous writing of
 //!   data.
 //! - [Executors](crate::executor) are responsible for running asynchronous
 //!   tasks.
@@ -25,11 +25,12 @@
 //! within macros and keywords such as async and await!.
 //!
 //! ```rust
+//! # if cfg!(miri) { return; } // https://github.com/rust-lang/miri/issues/1038
 //! # use futures::channel::mpsc;
 //! # use futures::executor; ///standard executors to provide a context for futures and streams
 //! # use futures::executor::ThreadPool;
 //! # use futures::StreamExt;
-//!
+//! #
 //! fn main() {
 //!     let pool = ThreadPool::new().expect("Failed to build pool");
 //!     let (tx, rx) = mpsc::unbounded::<i32>();
@@ -67,7 +68,7 @@
 //!     };
 //!
 //!     // Actually execute the above future, which will invoke Future::poll and
-//!     // subsequenty chain appropriate Future::poll and methods needing executors
+//!     // subsequently chain appropriate Future::poll and methods needing executors
 //!     // to drive all futures. Eventually fut_values will be driven to completion.
 //!     let values: Vec<i32> = executor::block_on(fut_values);
 //!
@@ -78,48 +79,46 @@
 //! The majority of examples and code snippets in this crate assume that they are
 //! inside an async block as written above.
 
-#![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))]
-#![cfg_attr(feature = "read-initializer", feature(read_initializer))]
-
 #![cfg_attr(not(feature = "std"), no_std)]
-
-#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)]
-// It cannot be included in the published code because this lints have false positives in the minimum required version.
-#![cfg_attr(test, warn(single_use_lifetimes))]
-#![warn(clippy::all)]
-#![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))]
-
+#![warn(
+    missing_debug_implementations,
+    missing_docs,
+    rust_2018_idioms,
+    single_use_lifetimes,
+    unreachable_pub
+)]
+#![doc(test(
+    no_crate_inject,
+    attr(
+        deny(warnings, rust_2018_idioms, single_use_lifetimes),
+        allow(dead_code, unused_assignments, unused_variables)
+    )
+))]
 #![cfg_attr(docsrs, feature(doc_cfg))]
 
-#[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))]
-compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feature as an explicit opt-in to unstable features");
-
 #[cfg(all(feature = "bilock", not(feature = "unstable")))]
 compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features");
 
-#[cfg(all(feature = "read-initializer", not(feature = "unstable")))]
-compile_error!("The `read-initializer` feature requires the `unstable` feature as an explicit opt-in to unstable features");
-
-#[doc(hidden)]
+#[doc(no_inline)]
 pub use futures_core::future::{Future, TryFuture};
-#[doc(hidden)]
+#[doc(no_inline)]
 pub use futures_util::future::{FutureExt, TryFutureExt};
 
-#[doc(hidden)]
+#[doc(no_inline)]
 pub use futures_core::stream::{Stream, TryStream};
-#[doc(hidden)]
+#[doc(no_inline)]
 pub use futures_util::stream::{StreamExt, TryStreamExt};
 
-#[doc(hidden)]
+#[doc(no_inline)]
 pub use futures_sink::Sink;
-#[doc(hidden)]
+#[doc(no_inline)]
 pub use futures_util::sink::SinkExt;
 
 #[cfg(feature = "std")]
-#[doc(hidden)]
+#[doc(no_inline)]
 pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
 #[cfg(feature = "std")]
-#[doc(hidden)]
+#[doc(no_inline)]
 pub use futures_util::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
 
 // Macro reexports
@@ -135,6 +134,10 @@
 #[doc(inline)]
 pub use futures_util::{future, never, sink, stream, task};
 
+#[cfg(feature = "std")]
+#[cfg(feature = "async-await")]
+pub use futures_util::stream_select;
+
 #[cfg(feature = "alloc")]
 #[doc(inline)]
 pub use futures_channel as channel;
@@ -148,13 +151,73 @@
 
 #[cfg(feature = "executor")]
 #[cfg_attr(docsrs, doc(cfg(feature = "executor")))]
-#[doc(inline)]
-pub use futures_executor as executor;
+pub mod executor {
+    //! Built-in executors and related tools.
+    //!
+    //! All asynchronous computation occurs within an executor, which is
+    //! capable of spawning futures as tasks. This module provides several
+    //! built-in executors, as well as tools for building your own.
+    //!
+    //!
+    //! This module is only available when the `executor` feature of this
+    //! library is activated.
+    //!
+    //! # Using a thread pool (M:N task scheduling)
+    //!
+    //! Most of the time tasks should be executed on a [thread pool](ThreadPool).
+    //! A small set of worker threads can handle a very large set of spawned tasks
+    //! (which are much lighter weight than threads). Tasks spawned onto the pool
+    //! with the [`spawn_ok`](ThreadPool::spawn_ok) function will run ambiently on
+    //! the created threads.
+    //!
+    //! # Spawning additional tasks
+    //!
+    //! Tasks can be spawned onto a spawner by calling its [`spawn_obj`] method
+    //! directly. In the case of `!Send` futures, [`spawn_local_obj`] can be used
+    //! instead.
+    //!
+    //! # Single-threaded execution
+    //!
+    //! In addition to thread pools, it's possible to run a task (and the tasks
+    //! it spawns) entirely within a single thread via the [`LocalPool`] executor.
+    //! Aside from cutting down on synchronization costs, this executor also makes
+    //! it possible to spawn non-`Send` tasks, via [`spawn_local_obj`]. The
+    //! [`LocalPool`] is best suited for running I/O-bound tasks that do relatively
+    //! little work between I/O operations.
+    //!
+    //! There is also a convenience function [`block_on`] for simply running a
+    //! future to completion on the current thread.
+    //!
+    //! [`spawn_obj`]: https://docs.rs/futures/0.3/futures/task/trait.Spawn.html#tymethod.spawn_obj
+    //! [`spawn_local_obj`]: https://docs.rs/futures/0.3/futures/task/trait.LocalSpawn.html#tymethod.spawn_local_obj
+
+    pub use futures_executor::{
+        block_on, block_on_stream, enter, BlockingStream, Enter, EnterError, LocalPool,
+        LocalSpawner,
+    };
+
+    #[cfg(feature = "thread-pool")]
+    #[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))]
+    pub use futures_executor::{ThreadPool, ThreadPoolBuilder};
+}
 
 #[cfg(feature = "compat")]
 #[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
-#[doc(inline)]
-pub use futures_util::compat;
+pub mod compat {
+    //! Interop between `futures` 0.1 and 0.3.
+    //!
+    //! This module is only available when the `compat` feature of this
+    //! library is activated.
+
+    pub use futures_util::compat::{
+        Compat, Compat01As03, Compat01As03Sink, CompatSink, Executor01As03, Executor01CompatExt,
+        Executor01Future, Future01CompatExt, Sink01CompatExt, Stream01CompatExt,
+    };
+
+    #[cfg(feature = "io-compat")]
+    #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
+    pub use futures_util::compat::{AsyncRead01CompatExt, AsyncWrite01CompatExt};
+}
 
 pub mod prelude {
     //! A "prelude" for crates using the `futures` crate.
@@ -175,10 +238,12 @@
     pub use crate::stream::{self, Stream, TryStream};
 
     #[doc(no_inline)]
+    #[allow(unreachable_pub)]
     pub use crate::future::{FutureExt as _, TryFutureExt as _};
     #[doc(no_inline)]
     pub use crate::sink::SinkExt as _;
     #[doc(no_inline)]
+    #[allow(unreachable_pub)]
     pub use crate::stream::{StreamExt as _, TryStreamExt as _};
 
     #[cfg(feature = "std")]
@@ -186,6 +251,7 @@
 
     #[cfg(feature = "std")]
     #[doc(no_inline)]
+    #[allow(unreachable_pub)]
     pub use crate::io::{
         AsyncBufReadExt as _, AsyncReadExt as _, AsyncSeekExt as _, AsyncWriteExt as _,
     };
diff --git a/tests/_require_features.rs b/tests/_require_features.rs
index da76dcd..8046cc9 100644
--- a/tests/_require_features.rs
+++ b/tests/_require_features.rs
@@ -1,8 +1,13 @@
 #[cfg(not(all(
-    feature = "std", feature = "alloc", feature = "async-await",
-    feature = "compat", feature = "io-compat",
-    feature = "executor", feature = "thread-pool",
+    feature = "std",
+    feature = "alloc",
+    feature = "async-await",
+    feature = "compat",
+    feature = "io-compat",
+    feature = "executor",
+    feature = "thread-pool",
 )))]
-compile_error!("`futures` tests must have all stable features activated: \
+compile_error!(
+    "`futures` tests must have all stable features activated: \
     use `--all-features` or `--features default,thread-pool,io-compat`"
 );
diff --git a/tests/arc_wake.rs b/tests/arc_wake.rs
deleted file mode 100644
index d19a83d..0000000
--- a/tests/arc_wake.rs
+++ /dev/null
@@ -1,82 +0,0 @@
-mod countingwaker {
-    use futures::task::{self, ArcWake, Waker};
-    use std::sync::{Arc, Mutex};
-
-    struct CountingWaker {
-        nr_wake: Mutex<i32>,
-    }
-
-    impl CountingWaker {
-        fn new() -> Self {
-            Self {
-                nr_wake: Mutex::new(0),
-            }
-        }
-
-        fn wakes(&self) -> i32 {
-            *self.nr_wake.lock().unwrap()
-        }
-    }
-
-    impl ArcWake for CountingWaker {
-        fn wake_by_ref(arc_self: &Arc<Self>) {
-            let mut lock = arc_self.nr_wake.lock().unwrap();
-            *lock += 1;
-        }
-    }
-
-    #[test]
-    fn create_from_arc() {
-        let some_w = Arc::new(CountingWaker::new());
-
-        let w1: Waker = task::waker(some_w.clone());
-        assert_eq!(2, Arc::strong_count(&some_w));
-        w1.wake_by_ref();
-        assert_eq!(1, some_w.wakes());
-
-        let w2 = w1.clone();
-        assert_eq!(3, Arc::strong_count(&some_w));
-
-        w2.wake_by_ref();
-        assert_eq!(2, some_w.wakes());
-
-        drop(w2);
-        assert_eq!(2, Arc::strong_count(&some_w));
-        drop(w1);
-        assert_eq!(1, Arc::strong_count(&some_w));
-    }
-
-    #[test]
-    fn ref_wake_same() {
-        let some_w = Arc::new(CountingWaker::new());
-
-        let w1: Waker = task::waker(some_w.clone());
-        let w2 = task::waker_ref(&some_w);
-        let w3 = w2.clone();
-
-        assert!(w1.will_wake(&w2));
-        assert!(w2.will_wake(&w3));
-    }
-}
-
-#[test]
-fn proper_refcount_on_wake_panic() {
-    use futures::task::{self, ArcWake, Waker};
-    use std::sync::Arc;
-
-    struct PanicWaker;
-
-    impl ArcWake for PanicWaker {
-        fn wake_by_ref(_arc_self: &Arc<Self>) {
-            panic!("WAKE UP");
-        }
-    }
-
-    let some_w = Arc::new(PanicWaker);
-
-    let w1: Waker = task::waker(some_w.clone());
-    assert_eq!("WAKE UP", *std::panic::catch_unwind(|| w1.wake_by_ref()).unwrap_err().downcast::<&str>().unwrap());
-    assert_eq!(2, Arc::strong_count(&some_w)); // some_w + w1
-    drop(w1);
-    assert_eq!(1, Arc::strong_count(&some_w)); // some_w
-}
diff --git a/tests/async_await_macros.rs b/tests/async_await_macros.rs
index bd586d6..ce1f3a3 100644
--- a/tests/async_await_macros.rs
+++ b/tests/async_await_macros.rs
@@ -1,9 +1,16 @@
+use futures::channel::{mpsc, oneshot};
+use futures::executor::block_on;
+use futures::future::{self, poll_fn, FutureExt};
+use futures::sink::SinkExt;
+use futures::stream::StreamExt;
+use futures::task::{Context, Poll};
+use futures::{
+    join, pending, pin_mut, poll, select, select_biased, stream, stream_select, try_join,
+};
+use std::mem;
+
 #[test]
 fn poll_and_pending() {
-    use futures::{pending, pin_mut, poll};
-    use futures::executor::block_on;
-    use futures::task::Poll;
-
     let pending_once = async { pending!() };
     block_on(async {
         pin_mut!(pending_once);
@@ -14,11 +21,6 @@
 
 #[test]
 fn join() {
-    use futures::{pin_mut, poll, join};
-    use futures::channel::oneshot;
-    use futures::executor::block_on;
-    use futures::task::Poll;
-
     let (tx1, rx1) = oneshot::channel::<i32>();
     let (tx2, rx2) = oneshot::channel::<i32>();
 
@@ -39,11 +41,6 @@
 
 #[test]
 fn select() {
-    use futures::select;
-    use futures::channel::oneshot;
-    use futures::executor::block_on;
-    use futures::future::FutureExt;
-
     let (tx1, rx1) = oneshot::channel::<i32>();
     let (_tx2, rx2) = oneshot::channel::<i32>();
     tx1.send(1).unwrap();
@@ -62,11 +59,6 @@
 
 #[test]
 fn select_biased() {
-    use futures::channel::oneshot;
-    use futures::executor::block_on;
-    use futures::future::FutureExt;
-    use futures::select_biased;
-
     let (tx1, rx1) = oneshot::channel::<i32>();
     let (_tx2, rx2) = oneshot::channel::<i32>();
     tx1.send(1).unwrap();
@@ -85,12 +77,6 @@
 
 #[test]
 fn select_streams() {
-    use futures::select;
-    use futures::channel::mpsc;
-    use futures::executor::block_on;
-    use futures::sink::SinkExt;
-    use futures::stream::StreamExt;
-
     let (mut tx1, rx1) = mpsc::channel::<i32>(1);
     let (mut tx2, rx2) = mpsc::channel::<i32>(1);
     let mut rx1 = rx1.fuse();
@@ -134,11 +120,6 @@
 
 #[test]
 fn select_can_move_uncompleted_futures() {
-    use futures::select;
-    use futures::channel::oneshot;
-    use futures::executor::block_on;
-    use futures::future::FutureExt;
-
     let (tx1, rx1) = oneshot::channel::<i32>();
     let (tx2, rx2) = oneshot::channel::<i32>();
     tx1.send(1).unwrap();
@@ -165,10 +146,6 @@
 
 #[test]
 fn select_nested() {
-    use futures::select;
-    use futures::executor::block_on;
-    use futures::future;
-
     let mut outer_fut = future::ready(1);
     let mut inner_fut = future::ready(2);
     let res = block_on(async {
@@ -183,18 +160,16 @@
     assert_eq!(res, 3);
 }
 
+#[cfg_attr(not(target_pointer_width = "64"), ignore)]
 #[test]
 fn select_size() {
-    use futures::select;
-    use futures::future;
-
     let fut = async {
         let mut ready = future::ready(0i32);
         select! {
             _ = ready => {},
         }
     };
-    assert_eq!(::std::mem::size_of_val(&fut), 24);
+    assert_eq!(mem::size_of_val(&fut), 24);
 
     let fut = async {
         let mut ready1 = future::ready(0i32);
@@ -204,19 +179,13 @@
             _ = ready2 => {},
         }
     };
-    assert_eq!(::std::mem::size_of_val(&fut), 40);
+    assert_eq!(mem::size_of_val(&fut), 40);
 }
 
 #[test]
 fn select_on_non_unpin_expressions() {
-    use futures::select;
-    use futures::executor::block_on;
-    use futures::future::FutureExt;
-
     // The returned Future is !Unpin
-    let make_non_unpin_fut = || { async {
-        5
-    }};
+    let make_non_unpin_fut = || async { 5 };
 
     let res = block_on(async {
         let select_res;
@@ -231,14 +200,8 @@
 
 #[test]
 fn select_on_non_unpin_expressions_with_default() {
-    use futures::select;
-    use futures::executor::block_on;
-    use futures::future::FutureExt;
-
     // The returned Future is !Unpin
-    let make_non_unpin_fut = || { async {
-        5
-    }};
+    let make_non_unpin_fut = || async { 5 };
 
     let res = block_on(async {
         let select_res;
@@ -252,15 +215,11 @@
     assert_eq!(res, 5);
 }
 
+#[cfg_attr(not(target_pointer_width = "64"), ignore)]
 #[test]
 fn select_on_non_unpin_size() {
-    use futures::select;
-    use futures::future::FutureExt;
-
     // The returned Future is !Unpin
-    let make_non_unpin_fut = || { async {
-        5
-    }};
+    let make_non_unpin_fut = || async { 5 };
 
     let fut = async {
         let select_res;
@@ -271,15 +230,11 @@
         select_res
     };
 
-    assert_eq!(32, std::mem::size_of_val(&fut));
+    assert_eq!(32, mem::size_of_val(&fut));
 }
 
 #[test]
 fn select_can_be_used_as_expression() {
-    use futures::select;
-    use futures::executor::block_on;
-    use futures::future;
-
     block_on(async {
         let res = select! {
             x = future::ready(7) => x,
@@ -291,11 +246,6 @@
 
 #[test]
 fn select_with_default_can_be_used_as_expression() {
-    use futures::select;
-    use futures::executor::block_on;
-    use futures::future::{FutureExt, poll_fn};
-    use futures::task::{Context, Poll};
-
     fn poll_always_pending<T>(_cx: &mut Context<'_>) -> Poll<T> {
         Poll::Pending
     }
@@ -312,10 +262,6 @@
 
 #[test]
 fn select_with_complete_can_be_used_as_expression() {
-    use futures::select;
-    use futures::executor::block_on;
-    use futures::future;
-
     block_on(async {
         let res = select! {
             x = future::pending::<i32>() => x,
@@ -330,10 +276,6 @@
 #[test]
 #[allow(unused_assignments)]
 fn select_on_mutable_borrowing_future_with_same_borrow_in_block() {
-    use futures::select;
-    use futures::executor::block_on;
-    use futures::future::FutureExt;
-
     async fn require_mutable(_: &mut i32) {}
     async fn async_noop() {}
 
@@ -351,10 +293,6 @@
 #[test]
 #[allow(unused_assignments)]
 fn select_on_mutable_borrowing_future_with_same_borrow_in_block_and_default() {
-    use futures::select;
-    use futures::executor::block_on;
-    use futures::future::FutureExt;
-
     async fn require_mutable(_: &mut i32) {}
     async fn async_noop() {}
 
@@ -373,60 +311,79 @@
 }
 
 #[test]
-fn join_size() {
-    use futures::join;
-    use futures::future;
+#[allow(unused_assignments)]
+fn stream_select() {
+    // stream_select! macro
+    block_on(async {
+        let endless_ints = |i| stream::iter(vec![i].into_iter().cycle());
 
+        let mut endless_ones = stream_select!(endless_ints(1i32), stream::pending());
+        assert_eq!(endless_ones.next().await, Some(1));
+        assert_eq!(endless_ones.next().await, Some(1));
+
+        let mut finite_list =
+            stream_select!(stream::iter(vec![1].into_iter()), stream::iter(vec![1].into_iter()));
+        assert_eq!(finite_list.next().await, Some(1));
+        assert_eq!(finite_list.next().await, Some(1));
+        assert_eq!(finite_list.next().await, None);
+
+        let endless_mixed = stream_select!(endless_ints(1i32), endless_ints(2), endless_ints(3));
+        // Take 1000, and assert a somewhat even distribution of values.
+        // The fairness is randomized, but over 1000 samples we should be pretty close to even.
+        // This test may be a bit flaky. Feel free to adjust the margins as you see fit.
+        let mut count = 0;
+        let results = endless_mixed
+            .take_while(move |_| {
+                count += 1;
+                let ret = count < 1000;
+                async move { ret }
+            })
+            .collect::<Vec<_>>()
+            .await;
+        assert!(results.iter().filter(|x| **x == 1).count() >= 299);
+        assert!(results.iter().filter(|x| **x == 2).count() >= 299);
+        assert!(results.iter().filter(|x| **x == 3).count() >= 299);
+    });
+}
+
+#[test]
+fn join_size() {
     let fut = async {
         let ready = future::ready(0i32);
         join!(ready)
     };
-    assert_eq!(::std::mem::size_of_val(&fut), 16);
+    assert_eq!(mem::size_of_val(&fut), 16);
 
     let fut = async {
         let ready1 = future::ready(0i32);
         let ready2 = future::ready(0i32);
         join!(ready1, ready2)
     };
-    assert_eq!(::std::mem::size_of_val(&fut), 28);
+    assert_eq!(mem::size_of_val(&fut), 28);
 }
 
 #[test]
 fn try_join_size() {
-    use futures::try_join;
-    use futures::future;
-
     let fut = async {
         let ready = future::ready(Ok::<i32, i32>(0));
         try_join!(ready)
     };
-    assert_eq!(::std::mem::size_of_val(&fut), 16);
+    assert_eq!(mem::size_of_val(&fut), 16);
 
     let fut = async {
         let ready1 = future::ready(Ok::<i32, i32>(0));
         let ready2 = future::ready(Ok::<i32, i32>(0));
         try_join!(ready1, ready2)
     };
-    assert_eq!(::std::mem::size_of_val(&fut), 28);
+    assert_eq!(mem::size_of_val(&fut), 28);
 }
 
 #[test]
 fn join_doesnt_require_unpin() {
-    use futures::join;
-
-    let _ = async {
-        join!(async {}, async {})
-    };
+    let _ = async { join!(async {}, async {}) };
 }
 
 #[test]
 fn try_join_doesnt_require_unpin() {
-    use futures::try_join;
-
-    let _ = async {
-        try_join!(
-            async { Ok::<(), ()>(()) },
-            async { Ok::<(), ()>(()) },
-        )
-    };
+    let _ = async { try_join!(async { Ok::<(), ()>(()) }, async { Ok::<(), ()>(()) },) };
 }
diff --git a/tests/auto_traits.rs b/tests/auto_traits.rs
index 111fdf6..b3d8b00 100644
--- a/tests/auto_traits.rs
+++ b/tests/auto_traits.rs
@@ -470,6 +470,13 @@
     assert_not_impl!(PollFn<*const ()>: Sync);
     assert_impl!(PollFn<PhantomPinned>: Unpin);
 
+    assert_impl!(PollImmediate<SendStream>: Send);
+    assert_not_impl!(PollImmediate<LocalStream<()>>: Send);
+    assert_impl!(PollImmediate<SyncStream>: Sync);
+    assert_not_impl!(PollImmediate<LocalStream<()>>: Sync);
+    assert_impl!(PollImmediate<UnpinStream>: Unpin);
+    assert_not_impl!(PollImmediate<PinnedStream>: Unpin);
+
     assert_impl!(Ready<()>: Send);
     assert_not_impl!(Ready<*const ()>: Send);
     assert_impl!(Ready<()>: Sync);
@@ -810,6 +817,12 @@
     assert_impl!(Seek<'_, ()>: Unpin);
     assert_not_impl!(Seek<'_, PhantomPinned>: Unpin);
 
+    assert_impl!(SeeKRelative<'_, ()>: Send);
+    assert_not_impl!(SeeKRelative<'_, *const ()>: Send);
+    assert_impl!(SeeKRelative<'_, ()>: Sync);
+    assert_not_impl!(SeeKRelative<'_, *const ()>: Sync);
+    assert_impl!(SeeKRelative<'_, PhantomPinned>: Unpin);
+
     assert_impl!(Sink: Send);
     assert_impl!(Sink: Sync);
     assert_impl!(Sink: Unpin);
@@ -1383,6 +1396,26 @@
     assert_impl!(Next<'_, ()>: Unpin);
     assert_not_impl!(Next<'_, PhantomPinned>: Unpin);
 
+    assert_impl!(NextIf<'_, SendStream<()>, ()>: Send);
+    assert_not_impl!(NextIf<'_, SendStream<()>, *const ()>: Send);
+    assert_not_impl!(NextIf<'_, SendStream, ()>: Send);
+    assert_not_impl!(NextIf<'_, LocalStream<()>, ()>: Send);
+    assert_impl!(NextIf<'_, SyncStream<()>, ()>: Sync);
+    assert_not_impl!(NextIf<'_, SyncStream<()>, *const ()>: Sync);
+    assert_not_impl!(NextIf<'_, SyncStream, ()>: Sync);
+    assert_not_impl!(NextIf<'_, LocalStream<()>, ()>: Send);
+    assert_impl!(NextIf<'_, PinnedStream, PhantomPinned>: Unpin);
+
+    assert_impl!(NextIfEq<'_, SendStream<()>, ()>: Send);
+    assert_not_impl!(NextIfEq<'_, SendStream<()>, *const ()>: Send);
+    assert_not_impl!(NextIfEq<'_, SendStream, ()>: Send);
+    assert_not_impl!(NextIfEq<'_, LocalStream<()>, ()>: Send);
+    assert_impl!(NextIfEq<'_, SyncStream<()>, ()>: Sync);
+    assert_not_impl!(NextIfEq<'_, SyncStream<()>, *const ()>: Sync);
+    assert_not_impl!(NextIfEq<'_, SyncStream, ()>: Sync);
+    assert_not_impl!(NextIfEq<'_, LocalStream<()>, ()>: Send);
+    assert_impl!(NextIfEq<'_, PinnedStream, PhantomPinned>: Unpin);
+
     assert_impl!(Once<()>: Send);
     assert_not_impl!(Once<*const ()>: Send);
     assert_impl!(Once<()>: Sync);
@@ -1410,6 +1443,14 @@
     assert_not_impl!(Peek<'_, LocalStream<()>>: Sync);
     assert_impl!(Peek<'_, PinnedStream>: Unpin);
 
+    assert_impl!(PeekMut<'_, SendStream<()>>: Send);
+    assert_not_impl!(PeekMut<'_, SendStream>: Send);
+    assert_not_impl!(PeekMut<'_, LocalStream<()>>: Send);
+    assert_impl!(PeekMut<'_, SyncStream<()>>: Sync);
+    assert_not_impl!(PeekMut<'_, SyncStream>: Sync);
+    assert_not_impl!(PeekMut<'_, LocalStream<()>>: Sync);
+    assert_impl!(PeekMut<'_, PinnedStream>: Unpin);
+
     assert_impl!(Peekable<SendStream<()>>: Send);
     assert_not_impl!(Peekable<SendStream>: Send);
     assert_not_impl!(Peekable<LocalStream>: Send);
@@ -1431,6 +1472,13 @@
     assert_not_impl!(PollFn<*const ()>: Sync);
     assert_impl!(PollFn<PhantomPinned>: Unpin);
 
+    assert_impl!(PollImmediate<SendStream>: Send);
+    assert_not_impl!(PollImmediate<LocalStream<()>>: Send);
+    assert_impl!(PollImmediate<SyncStream>: Sync);
+    assert_not_impl!(PollImmediate<LocalStream<()>>: Sync);
+    assert_impl!(PollImmediate<UnpinStream>: Unpin);
+    assert_not_impl!(PollImmediate<PinnedStream>: Unpin);
+
     assert_impl!(ReadyChunks<SendStream<()>>: Send);
     assert_not_impl!(ReadyChunks<SendStream>: Send);
     assert_not_impl!(ReadyChunks<LocalStream>: Send);
@@ -1780,25 +1828,40 @@
     assert_not_impl!(Zip<UnpinStream, PinnedStream>: Unpin);
     assert_not_impl!(Zip<PinnedStream, UnpinStream>: Unpin);
 
-    assert_not_impl!(futures_unordered::Iter<()>: Send);
-    assert_not_impl!(futures_unordered::Iter<()>: Sync);
+    assert_impl!(futures_unordered::Iter<()>: Send);
+    assert_not_impl!(futures_unordered::Iter<*const ()>: Send);
+    assert_impl!(futures_unordered::Iter<()>: Sync);
+    assert_not_impl!(futures_unordered::Iter<*const ()>: Sync);
     assert_impl!(futures_unordered::Iter<()>: Unpin);
-    // futures_unordered::Iter requires `Fut: Unpin`
+    // The definition of futures_unordered::Iter has `Fut: Unpin` bounds.
     // assert_not_impl!(futures_unordered::Iter<PhantomPinned>: Unpin);
 
-    assert_not_impl!(futures_unordered::IterMut<()>: Send);
-    assert_not_impl!(futures_unordered::IterMut<()>: Sync);
+    assert_impl!(futures_unordered::IterMut<()>: Send);
+    assert_not_impl!(futures_unordered::IterMut<*const ()>: Send);
+    assert_impl!(futures_unordered::IterMut<()>: Sync);
+    assert_not_impl!(futures_unordered::IterMut<*const ()>: Sync);
     assert_impl!(futures_unordered::IterMut<()>: Unpin);
-    // futures_unordered::IterMut requires `Fut: Unpin`
+    // The definition of futures_unordered::IterMut has `Fut: Unpin` bounds.
     // assert_not_impl!(futures_unordered::IterMut<PhantomPinned>: Unpin);
 
-    assert_not_impl!(futures_unordered::IterPinMut<()>: Send);
-    assert_not_impl!(futures_unordered::IterPinMut<()>: Sync);
+    assert_impl!(futures_unordered::IterPinMut<()>: Send);
+    assert_not_impl!(futures_unordered::IterPinMut<*const ()>: Send);
+    assert_impl!(futures_unordered::IterPinMut<()>: Sync);
+    assert_not_impl!(futures_unordered::IterPinMut<*const ()>: Sync);
     assert_impl!(futures_unordered::IterPinMut<PhantomPinned>: Unpin);
 
-    assert_not_impl!(futures_unordered::IterPinRef<()>: Send);
-    assert_not_impl!(futures_unordered::IterPinRef<()>: Sync);
+    assert_impl!(futures_unordered::IterPinRef<()>: Send);
+    assert_not_impl!(futures_unordered::IterPinRef<*const ()>: Send);
+    assert_impl!(futures_unordered::IterPinRef<()>: Sync);
+    assert_not_impl!(futures_unordered::IterPinRef<*const ()>: Sync);
     assert_impl!(futures_unordered::IterPinRef<PhantomPinned>: Unpin);
+
+    assert_impl!(futures_unordered::IntoIter<()>: Send);
+    assert_not_impl!(futures_unordered::IntoIter<*const ()>: Send);
+    assert_impl!(futures_unordered::IntoIter<()>: Sync);
+    assert_not_impl!(futures_unordered::IntoIter<*const ()>: Sync);
+    // The definition of futures_unordered::IntoIter has `Fut: Unpin` bounds.
+    // assert_not_impl!(futures_unordered::IntoIter<PhantomPinned>: Unpin);
 }
 
 /// Assert Send/Sync/Unpin for all public types in `futures::task`.
diff --git a/tests/compat.rs b/tests/compat.rs
index 39adc7c..ac04a95 100644
--- a/tests/compat.rs
+++ b/tests/compat.rs
@@ -1,16 +1,15 @@
 #![cfg(feature = "compat")]
+#![cfg(not(miri))] // Miri does not support epoll
 
-use tokio::timer::Delay;
-use tokio::runtime::Runtime;
-use std::time::Instant;
-use futures::prelude::*;
 use futures::compat::Future01CompatExt;
+use futures::prelude::*;
+use std::time::Instant;
+use tokio::runtime::Runtime;
+use tokio::timer::Delay;
 
 #[test]
 fn can_use_01_futures_in_a_03_future_running_on_a_01_executor() {
-    let f = async {
-        Delay::new(Instant::now()).compat().await
-    };
+    let f = async { Delay::new(Instant::now()).compat().await };
 
     let mut runtime = Runtime::new().unwrap();
     runtime.block_on(f.boxed().compat()).unwrap();
diff --git a/tests/eager_drop.rs b/tests/eager_drop.rs
index 11edb1b..9925077 100644
--- a/tests/eager_drop.rs
+++ b/tests/eager_drop.rs
@@ -1,16 +1,23 @@
+use futures::channel::oneshot;
+use futures::future::{self, Future, FutureExt, TryFutureExt};
+use futures::task::{Context, Poll};
+use futures_test::future::FutureTestExt;
+use pin_project::pin_project;
+use std::pin::Pin;
+use std::sync::mpsc;
+
 #[test]
 fn map_ok() {
-    use futures::future::{self, FutureExt, TryFutureExt};
-    use futures_test::future::FutureTestExt;
-    use std::sync::mpsc;
-
     // The closure given to `map_ok` should have been dropped by the time `map`
     // runs.
     let (tx1, rx1) = mpsc::channel::<()>();
     let (tx2, rx2) = mpsc::channel::<()>();
 
     future::ready::<Result<i32, i32>>(Err(1))
-        .map_ok(move |_| { let _tx1 = tx1; panic!("should not run"); })
+        .map_ok(move |_| {
+            let _tx1 = tx1;
+            panic!("should not run");
+        })
         .map(move |_| {
             assert!(rx1.recv().is_err());
             tx2.send(()).unwrap()
@@ -22,17 +29,16 @@
 
 #[test]
 fn map_err() {
-    use futures::future::{self, FutureExt, TryFutureExt};
-    use futures_test::future::FutureTestExt;
-    use std::sync::mpsc;
-
     // The closure given to `map_err` should have been dropped by the time `map`
     // runs.
     let (tx1, rx1) = mpsc::channel::<()>();
     let (tx2, rx2) = mpsc::channel::<()>();
 
     future::ready::<Result<i32, i32>>(Ok(1))
-        .map_err(move |_| { let _tx1 = tx1; panic!("should not run"); })
+        .map_err(move |_| {
+            let _tx1 = tx1;
+            panic!("should not run");
+        })
         .map(move |_| {
             assert!(rx1.recv().is_err());
             tx2.send(()).unwrap()
@@ -42,96 +48,74 @@
     rx2.recv().unwrap();
 }
 
-mod channelled {
-    use futures::future::Future;
-    use futures::task::{Context,Poll};
-    use pin_project::pin_project;
-    use std::pin::Pin;
+#[pin_project]
+struct FutureData<F, T> {
+    _data: T,
+    #[pin]
+    future: F,
+}
 
-    #[pin_project]
-    struct FutureData<F, T> {
-        _data: T,
-        #[pin]
-        future: F,
+impl<F: Future, T: Send + 'static> Future for FutureData<F, T> {
+    type Output = F::Output;
+
+    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
+        self.project().future.poll(cx)
     }
+}
 
-    impl<F: Future, T: Send + 'static> Future for FutureData<F, T> {
-        type Output = F::Output;
+#[test]
+fn then_drops_eagerly() {
+    let (tx0, rx0) = oneshot::channel::<()>();
+    let (tx1, rx1) = mpsc::channel::<()>();
+    let (tx2, rx2) = mpsc::channel::<()>();
 
-        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
-            self.project().future.poll(cx)
-        }
-    }
+    FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| panic!()) }
+        .then(move |_| {
+            assert!(rx1.recv().is_err()); // tx1 should have been dropped
+            tx2.send(()).unwrap();
+            future::ready(())
+        })
+        .run_in_background();
 
-    #[test]
-    fn then_drops_eagerly() {
-        use futures::channel::oneshot;
-        use futures::future::{self, FutureExt, TryFutureExt};
-        use futures_test::future::FutureTestExt;
-        use std::sync::mpsc;
+    assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv());
+    tx0.send(()).unwrap();
+    rx2.recv().unwrap();
+}
 
-        let (tx0, rx0) = oneshot::channel::<()>();
-        let (tx1, rx1) = mpsc::channel::<()>();
-        let (tx2, rx2) = mpsc::channel::<()>();
+#[test]
+fn and_then_drops_eagerly() {
+    let (tx0, rx0) = oneshot::channel::<Result<(), ()>>();
+    let (tx1, rx1) = mpsc::channel::<()>();
+    let (tx2, rx2) = mpsc::channel::<()>();
 
-        FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) }
-            .then(move |_| {
-                assert!(rx1.recv().is_err()); // tx1 should have been dropped
-                tx2.send(()).unwrap();
-                future::ready(())
-            })
-            .run_in_background();
+    FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| panic!()) }
+        .and_then(move |_| {
+            assert!(rx1.recv().is_err()); // tx1 should have been dropped
+            tx2.send(()).unwrap();
+            future::ready(Ok(()))
+        })
+        .run_in_background();
 
-        assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv());
-        tx0.send(()).unwrap();
-        rx2.recv().unwrap();
-    }
+    assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv());
+    tx0.send(Ok(())).unwrap();
+    rx2.recv().unwrap();
+}
 
-    #[test]
-    fn and_then_drops_eagerly() {
-        use futures::channel::oneshot;
-        use futures::future::{self, TryFutureExt};
-        use futures_test::future::FutureTestExt;
-        use std::sync::mpsc;
+#[test]
+fn or_else_drops_eagerly() {
+    let (tx0, rx0) = oneshot::channel::<Result<(), ()>>();
+    let (tx1, rx1) = mpsc::channel::<()>();
+    let (tx2, rx2) = mpsc::channel::<()>();
 
-        let (tx0, rx0) = oneshot::channel::<Result<(), ()>>();
-        let (tx1, rx1) = mpsc::channel::<()>();
-        let (tx2, rx2) = mpsc::channel::<()>();
+    FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| panic!()) }
+        .or_else(move |_| {
+            assert!(rx1.recv().is_err()); // tx1 should have been dropped
+            tx2.send(()).unwrap();
+            future::ready::<Result<(), ()>>(Ok(()))
+        })
+        .run_in_background();
 
-        FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) }
-            .and_then(move |_| {
-                assert!(rx1.recv().is_err()); // tx1 should have been dropped
-                tx2.send(()).unwrap();
-                future::ready(Ok(()))
-            })
-            .run_in_background();
-
-        assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv());
-        tx0.send(Ok(())).unwrap();
-        rx2.recv().unwrap();
-    }
-
-    #[test]
-    fn or_else_drops_eagerly() {
-        use futures::channel::oneshot;
-        use futures::future::{self, TryFutureExt};
-        use futures_test::future::FutureTestExt;
-        use std::sync::mpsc;
-
-        let (tx0, rx0) = oneshot::channel::<Result<(), ()>>();
-        let (tx1, rx1) = mpsc::channel::<()>();
-        let (tx2, rx2) = mpsc::channel::<()>();
-
-        FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) }
-            .or_else(move |_| {
-                assert!(rx1.recv().is_err()); // tx1 should have been dropped
-                tx2.send(()).unwrap();
-                future::ready::<Result<(), ()>>(Ok(()))
-            })
-            .run_in_background();
-
-        assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv());
-        tx0.send(Err(())).unwrap();
-        rx2.recv().unwrap();
-    }
+    assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv());
+    tx0.send(Err(())).unwrap();
+    rx2.recv().unwrap();
 }
diff --git a/tests/eventual.rs b/tests/eventual.rs
index bff000d..3461380 100644
--- a/tests/eventual.rs
+++ b/tests/eventual.rs
@@ -1,3 +1,5 @@
+#![cfg(not(miri))] // https://github.com/rust-lang/miri/issues/1038
+
 use futures::channel::oneshot;
 use futures::executor::ThreadPool;
 use futures::future::{self, ok, Future, FutureExt, TryFutureExt};
diff --git a/tests/abortable.rs b/tests/future_abortable.rs
similarity index 69%
rename from tests/abortable.rs
rename to tests/future_abortable.rs
index 6b5a25c..e119f0b 100644
--- a/tests/abortable.rs
+++ b/tests/future_abortable.rs
@@ -1,45 +1,44 @@
+use futures::channel::oneshot;
+use futures::executor::block_on;
+use futures::future::{abortable, Aborted, FutureExt};
+use futures::task::{Context, Poll};
+use futures_test::task::new_count_waker;
+
 #[test]
 fn abortable_works() {
-    use futures::channel::oneshot;
-    use futures::future::{abortable, Aborted};
-    use futures::executor::block_on;
-
     let (_tx, a_rx) = oneshot::channel::<()>();
     let (abortable_rx, abort_handle) = abortable(a_rx);
 
     abort_handle.abort();
+    assert!(abortable_rx.is_aborted());
     assert_eq!(Err(Aborted), block_on(abortable_rx));
 }
 
 #[test]
 fn abortable_awakens() {
-    use futures::channel::oneshot;
-    use futures::future::{abortable, Aborted, FutureExt};
-    use futures::task::{Context, Poll};
-    use futures_test::task::new_count_waker;
-
     let (_tx, a_rx) = oneshot::channel::<()>();
     let (mut abortable_rx, abort_handle) = abortable(a_rx);
 
     let (waker, counter) = new_count_waker();
     let mut cx = Context::from_waker(&waker);
+
     assert_eq!(counter, 0);
     assert_eq!(Poll::Pending, abortable_rx.poll_unpin(&mut cx));
     assert_eq!(counter, 0);
+
     abort_handle.abort();
     assert_eq!(counter, 1);
+    assert!(abortable_rx.is_aborted());
     assert_eq!(Poll::Ready(Err(Aborted)), abortable_rx.poll_unpin(&mut cx));
 }
 
 #[test]
 fn abortable_resolves() {
-    use futures::channel::oneshot;
-    use futures::future::abortable;
-    use futures::executor::block_on;
     let (tx, a_rx) = oneshot::channel::<()>();
     let (abortable_rx, _abort_handle) = abortable(a_rx);
 
     tx.send(()).unwrap();
 
+    assert!(!abortable_rx.is_aborted());
     assert_eq!(Ok(Ok(())), block_on(abortable_rx));
 }
diff --git a/tests/basic_combinators.rs b/tests/future_basic_combinators.rs
similarity index 92%
rename from tests/basic_combinators.rs
rename to tests/future_basic_combinators.rs
index fa65b6f..372ab48 100644
--- a/tests/basic_combinators.rs
+++ b/tests/future_basic_combinators.rs
@@ -13,17 +13,21 @@
             tx1.send(x).unwrap(); // Send 1
             tx1.send(2).unwrap(); // Send 2
             future::ready(3)
-        }).map(move |x| {
+        })
+        .map(move |x| {
             tx2.send(x).unwrap(); // Send 3
             tx2.send(4).unwrap(); // Send 4
             5
-        }).map(move |x| {
+        })
+        .map(move |x| {
             tx3.send(x).unwrap(); // Send 5
         });
 
     assert!(rx.try_recv().is_err()); // Not started yet
     fut.run_in_background(); // Start it
-    for i in 1..=5 { assert_eq!(rx.recv(), Ok(i)); } // Check it
+    for i in 1..=5 {
+        assert_eq!(rx.recv(), Ok(i));
+    } // Check it
     assert!(rx.recv().is_err()); // Should be done
 }
 
@@ -93,6 +97,8 @@
 
     assert!(rx.try_recv().is_err()); // Not started yet
     fut.run_in_background(); // Start it
-    for i in 1..=12 { assert_eq!(rx.recv(), Ok(i)); } // Check it
+    for i in 1..=12 {
+        assert_eq!(rx.recv(), Ok(i));
+    } // Check it
     assert!(rx.recv().is_err()); // Should be done
 }
diff --git a/tests/fuse.rs b/tests/future_fuse.rs
similarity index 100%
rename from tests/fuse.rs
rename to tests/future_fuse.rs
diff --git a/tests/future_inspect.rs b/tests/future_inspect.rs
new file mode 100644
index 0000000..eacd1f7
--- /dev/null
+++ b/tests/future_inspect.rs
@@ -0,0 +1,16 @@
+use futures::executor::block_on;
+use futures::future::{self, FutureExt};
+
+#[test]
+fn smoke() {
+    let mut counter = 0;
+
+    {
+        let work = future::ready::<i32>(40).inspect(|val| {
+            counter += *val;
+        });
+        assert_eq!(block_on(work), 40);
+    }
+
+    assert_eq!(counter, 40);
+}
diff --git a/tests/future_join_all.rs b/tests/future_join_all.rs
new file mode 100644
index 0000000..44486e1
--- /dev/null
+++ b/tests/future_join_all.rs
@@ -0,0 +1,41 @@
+use futures::executor::block_on;
+use futures::future::{join_all, ready, Future, JoinAll};
+use futures::pin_mut;
+use std::fmt::Debug;
+
+#[track_caller]
+fn assert_done<T>(actual_fut: impl Future<Output = T>, expected: T)
+where
+    T: PartialEq + Debug,
+{
+    pin_mut!(actual_fut);
+    let output = block_on(actual_fut);
+    assert_eq!(output, expected);
+}
+
+#[test]
+fn collect_collects() {
+    assert_done(join_all(vec![ready(1), ready(2)]), vec![1, 2]);
+    assert_done(join_all(vec![ready(1)]), vec![1]);
+    // REVIEW: should this be implemented?
+    // assert_done(join_all(Vec::<i32>::new()), vec![]);
+
+    // TODO: needs more tests
+}
+
+#[test]
+fn join_all_iter_lifetime() {
+    // In futures-rs version 0.1, this function would fail to typecheck due to an overly
+    // conservative type parameterization of `JoinAll`.
+    fn sizes(bufs: Vec<&[u8]>) -> impl Future<Output = Vec<usize>> {
+        let iter = bufs.into_iter().map(|b| ready::<usize>(b.len()));
+        join_all(iter)
+    }
+
+    assert_done(sizes(vec![&[1, 2, 3], &[], &[0]]), vec![3_usize, 0, 1]);
+}
+
+#[test]
+fn join_all_from_iter() {
+    assert_done(vec![ready(1), ready(2)].into_iter().collect::<JoinAll<_>>(), vec![1, 2])
+}
diff --git a/tests/future_obj.rs b/tests/future_obj.rs
index c6b18fc..0e52534 100644
--- a/tests/future_obj.rs
+++ b/tests/future_obj.rs
@@ -1,6 +1,6 @@
-use futures::future::{Future, FutureObj, FutureExt};
-use std::pin::Pin;
+use futures::future::{Future, FutureExt, FutureObj};
 use futures::task::{Context, Poll};
+use std::pin::Pin;
 
 #[test]
 fn dropping_does_not_segfault() {
diff --git a/tests/select_all.rs b/tests/future_select_all.rs
similarity index 69%
rename from tests/select_all.rs
rename to tests/future_select_all.rs
index 540db2c..299b479 100644
--- a/tests/select_all.rs
+++ b/tests/future_select_all.rs
@@ -1,14 +1,10 @@
+use futures::executor::block_on;
+use futures::future::{ready, select_all};
+use std::collections::HashSet;
+
 #[test]
 fn smoke() {
-    use futures::executor::block_on;
-    use futures::future::{ready, select_all};
-    use std::collections::HashSet;
-
-    let v = vec![
-        ready(1),
-        ready(2),
-        ready(3),
-    ];
+    let v = vec![ready(1), ready(2), ready(3)];
 
     let mut c = vec![1, 2, 3].into_iter().collect::<HashSet<_>>();
 
diff --git a/tests/future_select_ok.rs b/tests/future_select_ok.rs
new file mode 100644
index 0000000..8aec003
--- /dev/null
+++ b/tests/future_select_ok.rs
@@ -0,0 +1,30 @@
+use futures::executor::block_on;
+use futures::future::{err, ok, select_ok};
+
+#[test]
+fn ignore_err() {
+    let v = vec![err(1), err(2), ok(3), ok(4)];
+
+    let (i, v) = block_on(select_ok(v)).ok().unwrap();
+    assert_eq!(i, 3);
+
+    assert_eq!(v.len(), 1);
+
+    let (i, v) = block_on(select_ok(v)).ok().unwrap();
+    assert_eq!(i, 4);
+
+    assert!(v.is_empty());
+}
+
+#[test]
+fn last_err() {
+    let v = vec![ok(1), err(2), err(3)];
+
+    let (i, v) = block_on(select_ok(v)).ok().unwrap();
+    assert_eq!(i, 1);
+
+    assert_eq!(v.len(), 2);
+
+    let i = block_on(select_ok(v)).err().unwrap();
+    assert_eq!(i, 3);
+}
diff --git a/tests/shared.rs b/tests/future_shared.rs
similarity index 73%
rename from tests/shared.rs
rename to tests/future_shared.rs
index cc0c6a2..3ceaebb 100644
--- a/tests/shared.rs
+++ b/tests/future_shared.rs
@@ -1,22 +1,22 @@
-mod count_clone {
-    use std::cell::Cell;
-    use std::rc::Rc;
+use futures::channel::oneshot;
+use futures::executor::{block_on, LocalPool};
+use futures::future::{self, FutureExt, LocalFutureObj, TryFutureExt};
+use futures::task::LocalSpawn;
+use std::cell::{Cell, RefCell};
+use std::rc::Rc;
+use std::task::Poll;
+use std::thread;
 
-    pub struct CountClone(pub Rc<Cell<i32>>);
+struct CountClone(Rc<Cell<i32>>);
 
-    impl Clone for CountClone {
-        fn clone(&self) -> Self {
-            self.0.set(self.0.get() + 1);
-            Self(self.0.clone())
-        }
+impl Clone for CountClone {
+    fn clone(&self) -> Self {
+        self.0.set(self.0.get() + 1);
+        Self(self.0.clone())
     }
 }
 
 fn send_shared_oneshot_and_wait_on_multiple_threads(threads_number: u32) {
-    use futures::channel::oneshot;
-    use futures::executor::block_on;
-    use futures::future::FutureExt;
-    use std::thread;
     let (tx, rx) = oneshot::channel::<i32>();
     let f = rx.shared();
     let join_handles = (0..threads_number)
@@ -53,11 +53,6 @@
 
 #[test]
 fn drop_on_one_task_ok() {
-    use futures::channel::oneshot;
-    use futures::executor::block_on;
-    use futures::future::{self, FutureExt, TryFutureExt};
-    use std::thread;
-
     let (tx, rx) = oneshot::channel::<u32>();
     let f1 = rx.shared();
     let f2 = f1.clone();
@@ -86,11 +81,6 @@
 
 #[test]
 fn drop_in_poll() {
-    use futures::executor::block_on;
-    use futures::future::{self, FutureExt, LocalFutureObj};
-    use std::cell::RefCell;
-    use std::rc::Rc;
-
     let slot1 = Rc::new(RefCell::new(None));
     let slot2 = slot1.clone();
 
@@ -106,13 +96,9 @@
     assert_eq!(block_on(future1), 1);
 }
 
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
 #[test]
 fn peek() {
-    use futures::channel::oneshot;
-    use futures::executor::LocalPool;
-    use futures::future::{FutureExt, LocalFutureObj};
-    use futures::task::LocalSpawn;
-
     let mut local_pool = LocalPool::new();
     let spawn = &mut local_pool.spawner();
 
@@ -134,9 +120,7 @@
     }
 
     // Once the Shared has been polled, the value is peekable on the clone.
-    spawn
-        .spawn_local_obj(LocalFutureObj::new(Box::new(f1.map(|_| ()))))
-        .unwrap();
+    spawn.spawn_local_obj(LocalFutureObj::new(Box::new(f1.map(|_| ())))).unwrap();
     local_pool.run();
     for _ in 0..2 {
         assert_eq!(*f2.peek().unwrap(), Ok(42));
@@ -145,10 +129,6 @@
 
 #[test]
 fn downgrade() {
-    use futures::channel::oneshot;
-    use futures::executor::block_on;
-    use futures::future::FutureExt;
-
     let (tx, rx) = oneshot::channel::<i32>();
     let shared = rx.shared();
     // Since there are outstanding `Shared`s, we can get a `WeakShared`.
@@ -173,14 +153,6 @@
 
 #[test]
 fn dont_clone_in_single_owner_shared_future() {
-    use futures::channel::oneshot;
-    use futures::executor::block_on;
-    use futures::future::FutureExt;
-    use std::cell::Cell;
-    use std::rc::Rc;
-
-    use count_clone::CountClone;
-
     let counter = CountClone(Rc::new(Cell::new(0)));
     let (tx, rx) = oneshot::channel();
 
@@ -193,14 +165,6 @@
 
 #[test]
 fn dont_do_unnecessary_clones_on_output() {
-    use futures::channel::oneshot;
-    use futures::executor::block_on;
-    use futures::future::FutureExt;
-    use std::cell::Cell;
-    use std::rc::Rc;
-
-    use count_clone::CountClone;
-
     let counter = CountClone(Rc::new(Cell::new(0)));
     let (tx, rx) = oneshot::channel();
 
@@ -215,11 +179,6 @@
 
 #[test]
 fn shared_future_that_wakes_itself_until_pending_is_returned() {
-    use futures::executor::block_on;
-    use futures::future::FutureExt;
-    use std::cell::Cell;
-    use std::task::Poll;
-
     let proceed = Cell::new(false);
     let fut = futures::future::poll_fn(|cx| {
         if proceed.get() {
@@ -233,8 +192,5 @@
 
     // The join future can only complete if the second future gets a chance to run after the first
     // has returned pending
-    assert_eq!(
-        block_on(futures::future::join(fut, async { proceed.set(true) })),
-        ((), ())
-    );
+    assert_eq!(block_on(futures::future::join(fut, async { proceed.set(true) })), ((), ()));
 }
diff --git a/tests/future_try_flatten_stream.rs b/tests/future_try_flatten_stream.rs
index 4a614f9..82ae1ba 100644
--- a/tests/future_try_flatten_stream.rs
+++ b/tests/future_try_flatten_stream.rs
@@ -1,9 +1,14 @@
+use futures::executor::block_on_stream;
+use futures::future::{err, ok, TryFutureExt};
+use futures::sink::Sink;
+use futures::stream::Stream;
+use futures::stream::{self, StreamExt};
+use futures::task::{Context, Poll};
+use std::marker::PhantomData;
+use std::pin::Pin;
+
 #[test]
 fn successful_future() {
-    use futures::executor::block_on_stream;
-    use futures::future::{ok, TryFutureExt};
-    use futures::stream::{self, StreamExt};
-
     let stream_items = vec![17, 19];
     let future_of_a_stream = ok::<_, bool>(stream::iter(stream_items).map(Ok));
 
@@ -17,15 +22,8 @@
 
 #[test]
 fn failed_future() {
-    use core::marker::PhantomData;
-    use core::pin::Pin;
-    use futures::executor::block_on_stream;
-    use futures::future::{err, TryFutureExt};
-    use futures::stream::Stream;
-    use futures::task::{Context, Poll};
-
     struct PanickingStream<T, E> {
-        _marker: PhantomData<(T, E)>
+        _marker: PhantomData<(T, E)>,
     }
 
     impl<T, E> Stream for PanickingStream<T, E> {
@@ -45,13 +43,6 @@
 
 #[test]
 fn assert_impls() {
-    use core::marker::PhantomData;
-    use core::pin::Pin;
-    use futures::sink::Sink;
-    use futures::stream::Stream;
-    use futures::task::{Context, Poll};
-    use futures::future::{ok, TryFutureExt};
-
     struct StreamSink<T, E, Item>(PhantomData<(T, E, Item)>);
 
     impl<T, E, Item> Stream for StreamSink<T, E, Item> {
diff --git a/tests/future_try_join_all.rs b/tests/future_try_join_all.rs
new file mode 100644
index 0000000..9a82487
--- /dev/null
+++ b/tests/future_try_join_all.rs
@@ -0,0 +1,46 @@
+use futures::executor::block_on;
+use futures::pin_mut;
+use futures_util::future::{err, ok, try_join_all, TryJoinAll};
+use std::fmt::Debug;
+use std::future::Future;
+
+#[track_caller]
+fn assert_done<T>(actual_fut: impl Future<Output = T>, expected: T)
+where
+    T: PartialEq + Debug,
+{
+    pin_mut!(actual_fut);
+    let output = block_on(actual_fut);
+    assert_eq!(output, expected);
+}
+
+#[test]
+fn collect_collects() {
+    assert_done(try_join_all(vec![ok(1), ok(2)]), Ok::<_, usize>(vec![1, 2]));
+    assert_done(try_join_all(vec![ok(1), err(2)]), Err(2));
+    assert_done(try_join_all(vec![ok(1)]), Ok::<_, usize>(vec![1]));
+    // REVIEW: should this be implemented?
+    // assert_done(try_join_all(Vec::<i32>::new()), Ok(vec![]));
+
+    // TODO: needs more tests
+}
+
+#[test]
+fn try_join_all_iter_lifetime() {
+    // In futures-rs version 0.1, this function would fail to typecheck due to an overly
+    // conservative type parameterization of `TryJoinAll`.
+    fn sizes(bufs: Vec<&[u8]>) -> impl Future<Output = Result<Vec<usize>, ()>> {
+        let iter = bufs.into_iter().map(|b| ok::<usize, ()>(b.len()));
+        try_join_all(iter)
+    }
+
+    assert_done(sizes(vec![&[1, 2, 3], &[], &[0]]), Ok(vec![3_usize, 0, 1]));
+}
+
+#[test]
+fn try_join_all_from_iter() {
+    assert_done(
+        vec![ok(1), ok(2)].into_iter().collect::<TryJoinAll<_>>(),
+        Ok::<_, usize>(vec![1, 2]),
+    )
+}
diff --git a/tests/futures_ordered.rs b/tests/futures_ordered.rs
deleted file mode 100644
index 7f21c82..0000000
--- a/tests/futures_ordered.rs
+++ /dev/null
@@ -1,96 +0,0 @@
-#[test]
-fn works_1() {
-    use futures::channel::oneshot;
-    use futures::executor::block_on_stream;
-    use futures::stream::{StreamExt, FuturesOrdered};
-    use futures_test::task::noop_context;
-
-    let (a_tx, a_rx) = oneshot::channel::<i32>();
-    let (b_tx, b_rx) = oneshot::channel::<i32>();
-    let (c_tx, c_rx) = oneshot::channel::<i32>();
-
-    let mut stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesOrdered<_>>();
-
-    b_tx.send(99).unwrap();
-    assert!(stream.poll_next_unpin(&mut noop_context()).is_pending());
-
-    a_tx.send(33).unwrap();
-    c_tx.send(33).unwrap();
-
-    let mut iter = block_on_stream(stream);
-    assert_eq!(Some(Ok(33)), iter.next());
-    assert_eq!(Some(Ok(99)), iter.next());
-    assert_eq!(Some(Ok(33)), iter.next());
-    assert_eq!(None, iter.next());
-}
-
-#[test]
-fn works_2() {
-    use futures::channel::oneshot;
-    use futures::future::{join, FutureExt};
-    use futures::stream::{StreamExt, FuturesOrdered};
-    use futures_test::task::noop_context;
-
-    let (a_tx, a_rx) = oneshot::channel::<i32>();
-    let (b_tx, b_rx) = oneshot::channel::<i32>();
-    let (c_tx, c_rx) = oneshot::channel::<i32>();
-
-    let mut stream = vec![
-        a_rx.boxed(),
-        join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed(),
-    ].into_iter().collect::<FuturesOrdered<_>>();
-
-    let mut cx = noop_context();
-    a_tx.send(33).unwrap();
-    b_tx.send(33).unwrap();
-    assert!(stream.poll_next_unpin(&mut cx).is_ready());
-    assert!(stream.poll_next_unpin(&mut cx).is_pending());
-    c_tx.send(33).unwrap();
-    assert!(stream.poll_next_unpin(&mut cx).is_ready());
-}
-
-#[test]
-fn from_iterator() {
-    use futures::executor::block_on;
-    use futures::future;
-    use futures::stream::{StreamExt, FuturesOrdered};
-
-    let stream = vec![
-        future::ready::<i32>(1),
-        future::ready::<i32>(2),
-        future::ready::<i32>(3)
-    ].into_iter().collect::<FuturesOrdered<_>>();
-    assert_eq!(stream.len(), 3);
-    assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1,2,3]);
-}
-
-#[test]
-fn queue_never_unblocked() {
-    use futures::channel::oneshot;
-    use futures::future::{self, Future, TryFutureExt};
-    use futures::stream::{StreamExt, FuturesOrdered};
-    use futures_test::task::noop_context;
-    use std::any::Any;
-
-    let (_a_tx, a_rx) = oneshot::channel::<Box<dyn Any + Send>>();
-    let (b_tx, b_rx) = oneshot::channel::<Box<dyn Any + Send>>();
-    let (c_tx, c_rx) = oneshot::channel::<Box<dyn Any + Send>>();
-
-    let mut stream = vec![
-        Box::new(a_rx) as Box<dyn Future<Output = _> + Unpin>,
-        Box::new(future::try_select(b_rx, c_rx)
-            .map_err(|e| e.factor_first().0)
-            .and_then(|e| future::ok(Box::new(e) as Box<dyn Any + Send>))) as _,
-    ].into_iter().collect::<FuturesOrdered<_>>();
-
-    let cx = &mut noop_context();
-    for _ in 0..10 {
-        assert!(stream.poll_next_unpin(cx).is_pending());
-    }
-
-    b_tx.send(Box::new(())).unwrap();
-    assert!(stream.poll_next_unpin(cx).is_pending());
-    c_tx.send(Box::new(())).unwrap();
-    assert!(stream.poll_next_unpin(cx).is_pending());
-    assert!(stream.poll_next_unpin(cx).is_pending());
-}
diff --git a/tests/inspect.rs b/tests/inspect.rs
deleted file mode 100644
index 375778b..0000000
--- a/tests/inspect.rs
+++ /dev/null
@@ -1,14 +0,0 @@
-#[test]
-fn smoke() {
-    use futures::executor::block_on;
-    use futures::future::{self, FutureExt};
-
-    let mut counter = 0;
-
-    {
-        let work = future::ready::<i32>(40).inspect(|val| { counter += *val; });
-        assert_eq!(block_on(work), 40);
-    }
-
-    assert_eq!(counter, 40);
-}
diff --git a/tests/io_buf_reader.rs b/tests/io_buf_reader.rs
index f8f9d14..717297c 100644
--- a/tests/io_buf_reader.rs
+++ b/tests/io_buf_reader.rs
@@ -1,156 +1,240 @@
-macro_rules! run_fill_buf {
-    ($reader:expr) => {{
-        use futures_test::task::noop_context;
-        use futures::task::Poll;
-        use std::pin::Pin;
+use futures::executor::block_on;
+use futures::future::{Future, FutureExt};
+use futures::io::{
+    AllowStdIo, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt,
+    BufReader, SeekFrom,
+};
+use futures::pin_mut;
+use futures::task::{Context, Poll};
+use futures_test::task::noop_context;
+use pin_project::pin_project;
+use std::cmp;
+use std::io;
+use std::pin::Pin;
 
-        let mut cx = noop_context();
-        loop {
-            if let Poll::Ready(x) = Pin::new(&mut $reader).poll_fill_buf(&mut cx) {
-                break x;
-            }
-        }
-    }};
-}
-
-mod util {
-    use futures::future::Future;
-    pub fn run<F: Future + Unpin>(mut f: F) -> F::Output {
-        use futures_test::task::noop_context;
-        use futures::task::Poll;
-        use futures::future::FutureExt;
-
-        let mut cx = noop_context();
-        loop {
-            if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
-                return x;
-            }
+// helper for maybe_pending_* tests
+fn run<F: Future + Unpin>(mut f: F) -> F::Output {
+    let mut cx = noop_context();
+    loop {
+        if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
+            return x;
         }
     }
 }
 
-mod maybe_pending {
-    use futures::task::{Context,Poll};
-    use std::{cmp,io};
-    use std::pin::Pin;
-    use futures::io::{AsyncRead,AsyncBufRead};
+// https://github.com/rust-lang/futures-rs/pull/2489#discussion_r697865719
+#[pin_project(!Unpin)]
+struct Cursor<T> {
+    #[pin]
+    inner: futures::io::Cursor<T>,
+}
 
-    pub struct MaybePending<'a> {
-        inner: &'a [u8],
-        ready_read: bool,
-        ready_fill_buf: bool,
+impl<T> Cursor<T> {
+    fn new(inner: T) -> Self {
+        Self { inner: futures::io::Cursor::new(inner) }
+    }
+}
+
+impl AsyncRead for Cursor<&[u8]> {
+    fn poll_read(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        buf: &mut [u8],
+    ) -> Poll<io::Result<usize>> {
+        self.project().inner.poll_read(cx, buf)
+    }
+}
+
+impl AsyncBufRead for Cursor<&[u8]> {
+    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
+        self.project().inner.poll_fill_buf(cx)
     }
 
-    impl<'a> MaybePending<'a> {
-        pub fn new(inner: &'a [u8]) -> Self {
-            Self { inner, ready_read: false, ready_fill_buf: false }
+    fn consume(self: Pin<&mut Self>, amt: usize) {
+        self.project().inner.consume(amt)
+    }
+}
+
+impl AsyncSeek for Cursor<&[u8]> {
+    fn poll_seek(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        pos: SeekFrom,
+    ) -> Poll<io::Result<u64>> {
+        self.project().inner.poll_seek(cx, pos)
+    }
+}
+
+struct MaybePending<'a> {
+    inner: &'a [u8],
+    ready_read: bool,
+    ready_fill_buf: bool,
+}
+
+impl<'a> MaybePending<'a> {
+    fn new(inner: &'a [u8]) -> Self {
+        Self { inner, ready_read: false, ready_fill_buf: false }
+    }
+}
+
+impl AsyncRead for MaybePending<'_> {
+    fn poll_read(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        buf: &mut [u8],
+    ) -> Poll<io::Result<usize>> {
+        if self.ready_read {
+            self.ready_read = false;
+            Pin::new(&mut self.inner).poll_read(cx, buf)
+        } else {
+            self.ready_read = true;
+            Poll::Pending
         }
     }
+}
 
-    impl AsyncRead for MaybePending<'_> {
-        fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8])
-            -> Poll<io::Result<usize>>
-        {
-            if self.ready_read {
-                self.ready_read = false;
-                Pin::new(&mut self.inner).poll_read(cx, buf)
-            } else {
-                self.ready_read = true;
-                Poll::Pending
+impl AsyncBufRead for MaybePending<'_> {
+    fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
+        if self.ready_fill_buf {
+            self.ready_fill_buf = false;
+            if self.inner.is_empty() {
+                return Poll::Ready(Ok(&[]));
             }
+            let len = cmp::min(2, self.inner.len());
+            Poll::Ready(Ok(&self.inner[0..len]))
+        } else {
+            self.ready_fill_buf = true;
+            Poll::Pending
         }
     }
 
-    impl AsyncBufRead for MaybePending<'_> {
-        fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>)
-            -> Poll<io::Result<&[u8]>>
-        {
-            if self.ready_fill_buf {
-                self.ready_fill_buf = false;
-                if self.inner.is_empty() { return Poll::Ready(Ok(&[])) }
-                let len = cmp::min(2, self.inner.len());
-                Poll::Ready(Ok(&self.inner[0..len]))
-            } else {
-                self.ready_fill_buf = true;
-                Poll::Pending
-            }
-        }
-
-        fn consume(mut self: Pin<&mut Self>, amt: usize) {
-            self.inner = &self.inner[amt..];
-        }
+    fn consume(mut self: Pin<&mut Self>, amt: usize) {
+        self.inner = &self.inner[amt..];
     }
 }
 
 #[test]
 fn test_buffered_reader() {
-    use futures::executor::block_on;
-    use futures::io::{AsyncReadExt, BufReader};
+    block_on(async {
+        let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
+        let mut reader = BufReader::with_capacity(2, inner);
 
-    let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
-    let mut reader = BufReader::with_capacity(2, inner);
+        let mut buf = [0, 0, 0];
+        let nread = reader.read(&mut buf).await.unwrap();
+        assert_eq!(nread, 3);
+        assert_eq!(buf, [5, 6, 7]);
+        assert_eq!(reader.buffer(), []);
 
-    let mut buf = [0, 0, 0];
-    let nread = block_on(reader.read(&mut buf));
-    assert_eq!(nread.unwrap(), 3);
-    assert_eq!(buf, [5, 6, 7]);
-    assert_eq!(reader.buffer(), []);
+        let mut buf = [0, 0];
+        let nread = reader.read(&mut buf).await.unwrap();
+        assert_eq!(nread, 2);
+        assert_eq!(buf, [0, 1]);
+        assert_eq!(reader.buffer(), []);
 
-    let mut buf = [0, 0];
-    let nread = block_on(reader.read(&mut buf));
-    assert_eq!(nread.unwrap(), 2);
-    assert_eq!(buf, [0, 1]);
-    assert_eq!(reader.buffer(), []);
+        let mut buf = [0];
+        let nread = reader.read(&mut buf).await.unwrap();
+        assert_eq!(nread, 1);
+        assert_eq!(buf, [2]);
+        assert_eq!(reader.buffer(), [3]);
 
-    let mut buf = [0];
-    let nread = block_on(reader.read(&mut buf));
-    assert_eq!(nread.unwrap(), 1);
-    assert_eq!(buf, [2]);
-    assert_eq!(reader.buffer(), [3]);
+        let mut buf = [0, 0, 0];
+        let nread = reader.read(&mut buf).await.unwrap();
+        assert_eq!(nread, 1);
+        assert_eq!(buf, [3, 0, 0]);
+        assert_eq!(reader.buffer(), []);
 
-    let mut buf = [0, 0, 0];
-    let nread = block_on(reader.read(&mut buf));
-    assert_eq!(nread.unwrap(), 1);
-    assert_eq!(buf, [3, 0, 0]);
-    assert_eq!(reader.buffer(), []);
+        let nread = reader.read(&mut buf).await.unwrap();
+        assert_eq!(nread, 1);
+        assert_eq!(buf, [4, 0, 0]);
+        assert_eq!(reader.buffer(), []);
 
-    let nread = block_on(reader.read(&mut buf));
-    assert_eq!(nread.unwrap(), 1);
-    assert_eq!(buf, [4, 0, 0]);
-    assert_eq!(reader.buffer(), []);
-
-    assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
+        assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
+    });
 }
 
 #[test]
 fn test_buffered_reader_seek() {
-    use futures::executor::block_on;
-    use futures::io::{AsyncSeekExt, AsyncBufRead, BufReader, Cursor, SeekFrom};
-    use std::pin::Pin;
-    use util::run;
+    block_on(async {
+        let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
+        let reader = BufReader::with_capacity(2, Cursor::new(inner));
+        pin_mut!(reader);
 
-    let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
-    let mut reader = BufReader::with_capacity(2, Cursor::new(inner));
+        assert_eq!(reader.seek(SeekFrom::Start(3)).await.unwrap(), 3);
+        assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]);
+        assert!(reader.seek(SeekFrom::Current(i64::MIN)).await.is_err());
+        assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]);
+        assert_eq!(reader.seek(SeekFrom::Current(1)).await.unwrap(), 4);
+        assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[1, 2][..]);
+        reader.as_mut().consume(1);
+        assert_eq!(reader.seek(SeekFrom::Current(-2)).await.unwrap(), 3);
+    });
+}
 
-    assert_eq!(block_on(reader.seek(SeekFrom::Start(3))).ok(), Some(3));
-    assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
-    assert_eq!(run(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), None);
-    assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
-    assert_eq!(block_on(reader.seek(SeekFrom::Current(1))).ok(), Some(4));
-    assert_eq!(run_fill_buf!(reader).ok(), Some(&[1, 2][..]));
-    Pin::new(&mut reader).consume(1);
-    assert_eq!(block_on(reader.seek(SeekFrom::Current(-2))).ok(), Some(3));
+#[test]
+fn test_buffered_reader_seek_relative() {
+    block_on(async {
+        let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
+        let reader = BufReader::with_capacity(2, Cursor::new(inner));
+        pin_mut!(reader);
+
+        assert!(reader.as_mut().seek_relative(3).await.is_ok());
+        assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]);
+        assert!(reader.as_mut().seek_relative(0).await.is_ok());
+        assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]);
+        assert!(reader.as_mut().seek_relative(1).await.is_ok());
+        assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[1][..]);
+        assert!(reader.as_mut().seek_relative(-1).await.is_ok());
+        assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]);
+        assert!(reader.as_mut().seek_relative(2).await.is_ok());
+        assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[2, 3][..]);
+    });
+}
+
+#[test]
+fn test_buffered_reader_invalidated_after_read() {
+    block_on(async {
+        let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
+        let reader = BufReader::with_capacity(3, Cursor::new(inner));
+        pin_mut!(reader);
+
+        assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[5, 6, 7][..]);
+        reader.as_mut().consume(3);
+
+        let mut buffer = [0, 0, 0, 0, 0];
+        assert_eq!(reader.read(&mut buffer).await.unwrap(), 5);
+        assert_eq!(buffer, [0, 1, 2, 3, 4]);
+
+        assert!(reader.as_mut().seek_relative(-2).await.is_ok());
+        let mut buffer = [0, 0];
+        assert_eq!(reader.read(&mut buffer).await.unwrap(), 2);
+        assert_eq!(buffer, [3, 4]);
+    });
+}
+
+#[test]
+fn test_buffered_reader_invalidated_after_seek() {
+    block_on(async {
+        let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
+        let reader = BufReader::with_capacity(3, Cursor::new(inner));
+        pin_mut!(reader);
+
+        assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[5, 6, 7][..]);
+        reader.as_mut().consume(3);
+
+        assert!(reader.seek(SeekFrom::Current(5)).await.is_ok());
+
+        assert!(reader.as_mut().seek_relative(-2).await.is_ok());
+        let mut buffer = [0, 0];
+        assert_eq!(reader.read(&mut buffer).await.unwrap(), 2);
+        assert_eq!(buffer, [3, 4]);
+    });
 }
 
 #[test]
 fn test_buffered_reader_seek_underflow() {
-    use futures::executor::block_on;
-    use futures::io::{AsyncSeekExt, AsyncBufRead, AllowStdIo, BufReader, SeekFrom};
-    use std::io;
-
     // gimmick reader that yields its position modulo 256 for each byte
     struct PositionReader {
-        pos: u64
+        pos: u64,
     }
     impl io::Read for PositionReader {
         fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
@@ -172,32 +256,31 @@
                     self.pos = self.pos.wrapping_add(n as u64);
                 }
                 SeekFrom::End(n) => {
-                    self.pos = u64::max_value().wrapping_add(n as u64);
+                    self.pos = u64::MAX.wrapping_add(n as u64);
                 }
             }
             Ok(self.pos)
         }
     }
 
-    let mut reader = BufReader::with_capacity(5, AllowStdIo::new(PositionReader { pos: 0 }));
-    assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1, 2, 3, 4][..]));
-    assert_eq!(block_on(reader.seek(SeekFrom::End(-5))).ok(), Some(u64::max_value()-5));
-    assert_eq!(run_fill_buf!(reader).ok().map(|s| s.len()), Some(5));
-    // the following seek will require two underlying seeks
-    let expected = 9_223_372_036_854_775_802;
-    assert_eq!(block_on(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), Some(expected));
-    assert_eq!(run_fill_buf!(reader).ok().map(|s| s.len()), Some(5));
-    // seeking to 0 should empty the buffer.
-    assert_eq!(block_on(reader.seek(SeekFrom::Current(0))).ok(), Some(expected));
-    assert_eq!(reader.get_ref().get_ref().pos, expected);
+    block_on(async {
+        let reader = BufReader::with_capacity(5, AllowStdIo::new(PositionReader { pos: 0 }));
+        pin_mut!(reader);
+        assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1, 2, 3, 4][..]);
+        assert_eq!(reader.seek(SeekFrom::End(-5)).await.unwrap(), u64::MAX - 5);
+        assert_eq!(reader.as_mut().fill_buf().await.unwrap().len(), 5);
+        // the following seek will require two underlying seeks
+        let expected = 9_223_372_036_854_775_802;
+        assert_eq!(reader.seek(SeekFrom::Current(i64::MIN)).await.unwrap(), expected);
+        assert_eq!(reader.as_mut().fill_buf().await.unwrap().len(), 5);
+        // seeking to 0 should empty the buffer.
+        assert_eq!(reader.seek(SeekFrom::Current(0)).await.unwrap(), expected);
+        assert_eq!(reader.get_ref().get_ref().pos, expected);
+    });
 }
 
 #[test]
 fn test_short_reads() {
-    use futures::executor::block_on;
-    use futures::io::{AsyncReadExt, AllowStdIo, BufReader};
-    use std::io;
-
     /// A dummy reader intended at testing short-reads propagation.
     struct ShortReader {
         lengths: Vec<usize>,
@@ -213,24 +296,22 @@
         }
     }
 
-    let inner = ShortReader { lengths: vec![0, 1, 2, 0, 1, 0] };
-    let mut reader = BufReader::new(AllowStdIo::new(inner));
-    let mut buf = [0, 0];
-    assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
-    assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 1);
-    assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 2);
-    assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
-    assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 1);
-    assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
-    assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0);
+    block_on(async {
+        let inner = ShortReader { lengths: vec![0, 1, 2, 0, 1, 0] };
+        let mut reader = BufReader::new(AllowStdIo::new(inner));
+        let mut buf = [0, 0];
+        assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
+        assert_eq!(reader.read(&mut buf).await.unwrap(), 1);
+        assert_eq!(reader.read(&mut buf).await.unwrap(), 2);
+        assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
+        assert_eq!(reader.read(&mut buf).await.unwrap(), 1);
+        assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
+        assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
+    });
 }
 
 #[test]
 fn maybe_pending() {
-    use futures::io::{AsyncReadExt, BufReader};
-    use util::run;
-    use maybe_pending::MaybePending;
-
     let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
     let mut reader = BufReader::with_capacity(2, MaybePending::new(inner));
 
@@ -268,10 +349,6 @@
 
 #[test]
 fn maybe_pending_buf_read() {
-    use futures::io::{AsyncBufReadExt, BufReader};
-    use util::run;
-    use maybe_pending::MaybePending;
-
     let inner = MaybePending::new(&[0, 1, 2, 3, 1, 0]);
     let mut reader = BufReader::with_capacity(2, inner);
     let mut v = Vec::new();
@@ -291,68 +368,65 @@
 // https://github.com/rust-lang/futures-rs/pull/1573#discussion_r281162309
 #[test]
 fn maybe_pending_seek() {
-    use futures::io::{AsyncBufRead, AsyncSeek, AsyncSeekExt, AsyncRead, BufReader,
-        Cursor, SeekFrom
-    };
-    use futures::task::{Context,Poll};
-    use std::io;
-    use std::pin::Pin;
-    use util::run;
-    pub struct MaybePendingSeek<'a> {
+    #[pin_project]
+    struct MaybePendingSeek<'a> {
+        #[pin]
         inner: Cursor<&'a [u8]>,
         ready: bool,
     }
 
     impl<'a> MaybePendingSeek<'a> {
-        pub fn new(inner: &'a [u8]) -> Self {
+        fn new(inner: &'a [u8]) -> Self {
             Self { inner: Cursor::new(inner), ready: true }
         }
     }
 
     impl AsyncRead for MaybePendingSeek<'_> {
-        fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8])
-            -> Poll<io::Result<usize>>
-        {
-            Pin::new(&mut self.inner).poll_read(cx, buf)
+        fn poll_read(
+            self: Pin<&mut Self>,
+            cx: &mut Context<'_>,
+            buf: &mut [u8],
+        ) -> Poll<io::Result<usize>> {
+            self.project().inner.poll_read(cx, buf)
         }
     }
 
     impl AsyncBufRead for MaybePendingSeek<'_> {
-        fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-            -> Poll<io::Result<&[u8]>>
-        {
-            let this: *mut Self = &mut *self as *mut _;
-            Pin::new(&mut unsafe { &mut *this }.inner).poll_fill_buf(cx)
+        fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
+            self.project().inner.poll_fill_buf(cx)
         }
 
-        fn consume(mut self: Pin<&mut Self>, amt: usize) {
-            Pin::new(&mut self.inner).consume(amt)
+        fn consume(self: Pin<&mut Self>, amt: usize) {
+            self.project().inner.consume(amt)
         }
     }
 
     impl AsyncSeek for MaybePendingSeek<'_> {
-        fn poll_seek(mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom)
-            -> Poll<io::Result<u64>>
-        {
+        fn poll_seek(
+            mut self: Pin<&mut Self>,
+            cx: &mut Context<'_>,
+            pos: SeekFrom,
+        ) -> Poll<io::Result<u64>> {
             if self.ready {
-                self.ready = false;
-                Pin::new(&mut self.inner).poll_seek(cx, pos)
+                *self.as_mut().project().ready = false;
+                self.project().inner.poll_seek(cx, pos)
             } else {
-                self.ready = true;
+                *self.project().ready = true;
                 Poll::Pending
             }
         }
     }
 
     let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
-    let mut reader = BufReader::with_capacity(2, MaybePendingSeek::new(inner));
+    let reader = BufReader::with_capacity(2, MaybePendingSeek::new(inner));
+    pin_mut!(reader);
 
     assert_eq!(run(reader.seek(SeekFrom::Current(3))).ok(), Some(3));
-    assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
-    assert_eq!(run(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), None);
-    assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..]));
+    assert_eq!(run(reader.as_mut().fill_buf()).ok(), Some(&[0, 1][..]));
+    assert_eq!(run(reader.seek(SeekFrom::Current(i64::MIN))).ok(), None);
+    assert_eq!(run(reader.as_mut().fill_buf()).ok(), Some(&[0, 1][..]));
     assert_eq!(run(reader.seek(SeekFrom::Current(1))).ok(), Some(4));
-    assert_eq!(run_fill_buf!(reader).ok(), Some(&[1, 2][..]));
+    assert_eq!(run(reader.as_mut().fill_buf()).ok(), Some(&[1, 2][..]));
     Pin::new(&mut reader).consume(1);
     assert_eq!(run(reader.seek(SeekFrom::Current(-2))).ok(), Some(3));
 }
diff --git a/tests/io_buf_writer.rs b/tests/io_buf_writer.rs
index d58a6d8..b264cd5 100644
--- a/tests/io_buf_writer.rs
+++ b/tests/io_buf_writer.rs
@@ -1,67 +1,59 @@
-mod maybe_pending {
-    use futures::io::AsyncWrite;
-    use futures::task::{Context, Poll};
-    use std::io;
-    use std::pin::Pin;
+use futures::executor::block_on;
+use futures::future::{Future, FutureExt};
+use futures::io::{
+    AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufWriter, Cursor, SeekFrom,
+};
+use futures::task::{Context, Poll};
+use futures_test::task::noop_context;
+use std::io;
+use std::pin::Pin;
 
-    pub struct MaybePending {
-        pub inner: Vec<u8>,
-        ready: bool,
-    }
+struct MaybePending {
+    inner: Vec<u8>,
+    ready: bool,
+}
 
-    impl MaybePending {
-        pub fn new(inner: Vec<u8>) -> Self {
-            Self { inner, ready: false }
-        }
-    }
-
-    impl AsyncWrite for MaybePending {
-        fn poll_write(
-            mut self: Pin<&mut Self>,
-            cx: &mut Context<'_>,
-            buf: &[u8],
-        ) -> Poll<io::Result<usize>> {
-            if self.ready {
-                self.ready = false;
-                Pin::new(&mut self.inner).poll_write(cx, buf)
-            } else {
-                self.ready = true;
-                Poll::Pending
-            }
-        }
-
-        fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
-            Pin::new(&mut self.inner).poll_flush(cx)
-        }
-
-        fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
-            Pin::new(&mut self.inner).poll_close(cx)
-        }
+impl MaybePending {
+    fn new(inner: Vec<u8>) -> Self {
+        Self { inner, ready: false }
     }
 }
 
-mod util {
-    use futures::future::Future;
+impl AsyncWrite for MaybePending {
+    fn poll_write(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        buf: &[u8],
+    ) -> Poll<io::Result<usize>> {
+        if self.ready {
+            self.ready = false;
+            Pin::new(&mut self.inner).poll_write(cx, buf)
+        } else {
+            self.ready = true;
+            Poll::Pending
+        }
+    }
 
-    pub fn run<F: Future + Unpin>(mut f: F) -> F::Output {
-        use futures::future::FutureExt;
-        use futures::task::Poll;
-        use futures_test::task::noop_context;
+    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+        Pin::new(&mut self.inner).poll_flush(cx)
+    }
 
-        let mut cx = noop_context();
-        loop {
-            if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
-                return x;
-            }
+    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+        Pin::new(&mut self.inner).poll_close(cx)
+    }
+}
+
+fn run<F: Future + Unpin>(mut f: F) -> F::Output {
+    let mut cx = noop_context();
+    loop {
+        if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
+            return x;
         }
     }
 }
 
 #[test]
 fn buf_writer() {
-    use futures::executor::block_on;
-    use futures::io::{AsyncWriteExt, BufWriter};
-
     let mut writer = BufWriter::with_capacity(2, Vec::new());
 
     block_on(writer.write(&[0, 1])).unwrap();
@@ -104,9 +96,6 @@
 
 #[test]
 fn buf_writer_inner_flushes() {
-    use futures::executor::block_on;
-    use futures::io::{AsyncWriteExt, BufWriter};
-
     let mut w = BufWriter::with_capacity(3, Vec::new());
     block_on(w.write(&[0, 1])).unwrap();
     assert_eq!(*w.get_ref(), []);
@@ -117,9 +106,6 @@
 
 #[test]
 fn buf_writer_seek() {
-    use futures::executor::block_on;
-    use futures::io::{AsyncSeekExt, AsyncWriteExt, BufWriter, Cursor, SeekFrom};
-
     // FIXME: when https://github.com/rust-lang/futures-rs/issues/1510 fixed,
     // use `Vec::new` instead of `vec![0; 8]`.
     let mut w = BufWriter::with_capacity(3, Cursor::new(vec![0; 8]));
@@ -135,11 +121,6 @@
 
 #[test]
 fn maybe_pending_buf_writer() {
-    use futures::io::{AsyncWriteExt, BufWriter};
-
-    use maybe_pending::MaybePending;
-    use util::run;
-
     let mut writer = BufWriter::with_capacity(2, MaybePending::new(Vec::new()));
 
     run(writer.write(&[0, 1])).unwrap();
@@ -182,11 +163,6 @@
 
 #[test]
 fn maybe_pending_buf_writer_inner_flushes() {
-    use futures::io::{AsyncWriteExt, BufWriter};
-
-    use maybe_pending::MaybePending;
-    use util::run;
-
     let mut w = BufWriter::with_capacity(3, MaybePending::new(Vec::new()));
     run(w.write(&[0, 1])).unwrap();
     assert_eq!(&w.get_ref().inner, &[]);
@@ -197,13 +173,6 @@
 
 #[test]
 fn maybe_pending_buf_writer_seek() {
-    use futures::io::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufWriter, Cursor, SeekFrom};
-    use futures::task::{Context, Poll};
-    use std::io;
-    use std::pin::Pin;
-
-    use util::run;
-
     struct MaybePendingSeek {
         inner: Cursor<Vec<u8>>,
         ready_write: bool,
@@ -241,9 +210,11 @@
     }
 
     impl AsyncSeek for MaybePendingSeek {
-        fn poll_seek(mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom)
-            -> Poll<io::Result<u64>>
-        {
+        fn poll_seek(
+            mut self: Pin<&mut Self>,
+            cx: &mut Context<'_>,
+            pos: SeekFrom,
+        ) -> Poll<io::Result<u64>> {
             if self.ready_seek {
                 self.ready_seek = false;
                 Pin::new(&mut self.inner).poll_seek(cx, pos)
diff --git a/tests/io_cursor.rs b/tests/io_cursor.rs
index 4ba6342..435ea5a 100644
--- a/tests/io_cursor.rs
+++ b/tests/io_cursor.rs
@@ -1,13 +1,14 @@
+use assert_matches::assert_matches;
+use futures::executor::block_on;
+use futures::future::lazy;
+use futures::io::{AsyncWrite, Cursor};
+use futures::task::Poll;
+use std::pin::Pin;
+
 #[test]
 fn cursor_asyncwrite_vec() {
-    use assert_matches::assert_matches;
-    use futures::future::lazy;
-    use futures::io::{AsyncWrite, Cursor};
-    use futures::task::Poll;
-    use std::pin::Pin;
-
     let mut cursor = Cursor::new(vec![0; 5]);
-    futures::executor::block_on(lazy(|cx| {
+    block_on(lazy(|cx| {
         assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[1, 2]), Poll::Ready(Ok(2)));
         assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[3, 4]), Poll::Ready(Ok(2)));
         assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[5, 6]), Poll::Ready(Ok(2)));
@@ -18,14 +19,8 @@
 
 #[test]
 fn cursor_asyncwrite_box() {
-    use assert_matches::assert_matches;
-    use futures::future::lazy;
-    use futures::io::{AsyncWrite, Cursor};
-    use futures::task::Poll;
-    use std::pin::Pin;
-
     let mut cursor = Cursor::new(vec![0; 5].into_boxed_slice());
-    futures::executor::block_on(lazy(|cx| {
+    block_on(lazy(|cx| {
         assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[1, 2]), Poll::Ready(Ok(2)));
         assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[3, 4]), Poll::Ready(Ok(2)));
         assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[5, 6]), Poll::Ready(Ok(1)));
diff --git a/tests/io_line_writer.rs b/tests/io_line_writer.rs
new file mode 100644
index 0000000..b483e0f
--- /dev/null
+++ b/tests/io_line_writer.rs
@@ -0,0 +1,73 @@
+use futures::executor::block_on;
+use futures::io::{AsyncWriteExt, LineWriter};
+use std::io;
+
+#[test]
+fn line_writer() {
+    let mut writer = LineWriter::new(Vec::new());
+
+    block_on(writer.write(&[0])).unwrap();
+    assert_eq!(*writer.get_ref(), []);
+
+    block_on(writer.write(&[1])).unwrap();
+    assert_eq!(*writer.get_ref(), []);
+
+    block_on(writer.flush()).unwrap();
+    assert_eq!(*writer.get_ref(), [0, 1]);
+
+    block_on(writer.write(&[0, b'\n', 1, b'\n', 2])).unwrap();
+    assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n']);
+
+    block_on(writer.flush()).unwrap();
+    assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n', 2]);
+
+    block_on(writer.write(&[3, b'\n'])).unwrap();
+    assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n', 2, 3, b'\n']);
+}
+
+#[test]
+fn line_vectored() {
+    let mut line_writer = LineWriter::new(Vec::new());
+    assert_eq!(
+        block_on(line_writer.write_vectored(&[
+            io::IoSlice::new(&[]),
+            io::IoSlice::new(b"\n"),
+            io::IoSlice::new(&[]),
+            io::IoSlice::new(b"a"),
+        ]))
+        .unwrap(),
+        2
+    );
+    assert_eq!(line_writer.get_ref(), b"\n");
+
+    assert_eq!(
+        block_on(line_writer.write_vectored(&[
+            io::IoSlice::new(&[]),
+            io::IoSlice::new(b"b"),
+            io::IoSlice::new(&[]),
+            io::IoSlice::new(b"a"),
+            io::IoSlice::new(&[]),
+            io::IoSlice::new(b"c"),
+        ]))
+        .unwrap(),
+        3
+    );
+    assert_eq!(line_writer.get_ref(), b"\n");
+    block_on(line_writer.flush()).unwrap();
+    assert_eq!(line_writer.get_ref(), b"\nabac");
+    assert_eq!(block_on(line_writer.write_vectored(&[])).unwrap(), 0);
+
+    assert_eq!(
+        block_on(line_writer.write_vectored(&[
+            io::IoSlice::new(&[]),
+            io::IoSlice::new(&[]),
+            io::IoSlice::new(&[]),
+            io::IoSlice::new(&[]),
+        ]))
+        .unwrap(),
+        0
+    );
+
+    assert_eq!(block_on(line_writer.write_vectored(&[io::IoSlice::new(b"a\nb")])).unwrap(), 3);
+    assert_eq!(line_writer.get_ref(), b"\nabaca\nb");
+}
diff --git a/tests/io_lines.rs b/tests/io_lines.rs
index 2552c7c..5ce01a6 100644
--- a/tests/io_lines.rs
+++ b/tests/io_lines.rs
@@ -1,32 +1,34 @@
-mod util {
-    use futures::future::Future;
+use futures::executor::block_on;
+use futures::future::{Future, FutureExt};
+use futures::io::{AsyncBufReadExt, Cursor};
+use futures::stream::{self, StreamExt, TryStreamExt};
+use futures::task::Poll;
+use futures_test::io::AsyncReadTestExt;
+use futures_test::task::noop_context;
 
-    pub fn run<F: Future + Unpin>(mut f: F) -> F::Output {
-        use futures_test::task::noop_context;
-        use futures::task::Poll;
-        use futures::future::FutureExt;
-
-        let mut cx = noop_context();
-        loop {
-            if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
-                return x;
-            }
+fn run<F: Future + Unpin>(mut f: F) -> F::Output {
+    let mut cx = noop_context();
+    loop {
+        if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
+            return x;
         }
     }
 }
 
+macro_rules! block_on_next {
+    ($expr:expr) => {
+        block_on($expr.next()).unwrap().unwrap()
+    };
+}
+
+macro_rules! run_next {
+    ($expr:expr) => {
+        run($expr.next()).unwrap().unwrap()
+    };
+}
+
 #[test]
 fn lines() {
-    use futures::executor::block_on;
-    use futures::stream::StreamExt;
-    use futures::io::{AsyncBufReadExt, Cursor};
-
-    macro_rules! block_on_next {
-        ($expr:expr) => {
-            block_on($expr.next()).unwrap().unwrap()
-        };
-    }
-
     let buf = Cursor::new(&b"12\r"[..]);
     let mut s = buf.lines();
     assert_eq!(block_on_next!(s), "12\r".to_string());
@@ -41,22 +43,8 @@
 
 #[test]
 fn maybe_pending() {
-    use futures::stream::{self, StreamExt, TryStreamExt};
-    use futures::io::AsyncBufReadExt;
-    use futures_test::io::AsyncReadTestExt;
-
-    use util::run;
-
-    macro_rules! run_next {
-        ($expr:expr) => {
-            run($expr.next()).unwrap().unwrap()
-        };
-    }
-
-    let buf = stream::iter(vec![&b"12"[..], &b"\r"[..]])
-        .map(Ok)
-        .into_async_read()
-        .interleave_pending();
+    let buf =
+        stream::iter(vec![&b"12"[..], &b"\r"[..]]).map(Ok).into_async_read().interleave_pending();
     let mut s = buf.lines();
     assert_eq!(run_next!(s), "12\r".to_string());
     assert!(run(s.next()).is_none());
diff --git a/tests/io_read.rs b/tests/io_read.rs
index 5902ad0..d39a6ea 100644
--- a/tests/io_read.rs
+++ b/tests/io_read.rs
@@ -1,27 +1,26 @@
-mod mock_reader {
-    use futures::io::AsyncRead;
-    use std::io;
-    use std::pin::Pin;
-    use std::task::{Context, Poll};
+use futures::io::AsyncRead;
+use futures_test::task::panic_context;
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
 
-    pub struct MockReader {
-        fun: Box<dyn FnMut(&mut [u8]) -> Poll<io::Result<usize>>>,
+struct MockReader {
+    fun: Box<dyn FnMut(&mut [u8]) -> Poll<io::Result<usize>>>,
+}
+
+impl MockReader {
+    fn new(fun: impl FnMut(&mut [u8]) -> Poll<io::Result<usize>> + 'static) -> Self {
+        Self { fun: Box::new(fun) }
     }
+}
 
-    impl MockReader {
-        pub fn new(fun: impl FnMut(&mut [u8]) -> Poll<io::Result<usize>> + 'static) -> Self {
-            Self { fun: Box::new(fun) }
-        }
-    }
-
-    impl AsyncRead for MockReader {
-        fn poll_read(
-            self: Pin<&mut Self>,
-            _cx: &mut Context<'_>,
-            buf: &mut [u8]
-        ) -> Poll<io::Result<usize>> {
-            (self.get_mut().fun)(buf)
-        }
+impl AsyncRead for MockReader {
+    fn poll_read(
+        self: Pin<&mut Self>,
+        _cx: &mut Context<'_>,
+        buf: &mut [u8],
+    ) -> Poll<io::Result<usize>> {
+        (self.get_mut().fun)(buf)
     }
 }
 
@@ -29,14 +28,6 @@
 /// calls `poll_read` with an empty slice if no buffers are provided.
 #[test]
 fn read_vectored_no_buffers() {
-    use futures::io::AsyncRead;
-    use futures_test::task::panic_context;
-    use std::io;
-    use std::pin::Pin;
-    use std::task::Poll;
-
-    use mock_reader::MockReader;
-
     let mut reader = MockReader::new(|buf| {
         assert_eq!(buf, b"");
         Err(io::ErrorKind::BrokenPipe.into()).into()
@@ -53,14 +44,6 @@
 /// calls `poll_read` with the first non-empty buffer.
 #[test]
 fn read_vectored_first_non_empty() {
-    use futures::io::AsyncRead;
-    use futures_test::task::panic_context;
-    use std::io;
-    use std::pin::Pin;
-    use std::task::Poll;
-
-    use mock_reader::MockReader;
-
     let mut reader = MockReader::new(|buf| {
         assert_eq!(buf.len(), 4);
         buf.copy_from_slice(b"four");
diff --git a/tests/io_read_exact.rs b/tests/io_read_exact.rs
index bd4b36d..6582e50 100644
--- a/tests/io_read_exact.rs
+++ b/tests/io_read_exact.rs
@@ -1,14 +1,14 @@
+use futures::executor::block_on;
+use futures::io::AsyncReadExt;
+
 #[test]
 fn read_exact() {
-    use futures::executor::block_on;
-    use futures::io::AsyncReadExt;
-
     let mut reader: &[u8] = &[1, 2, 3, 4, 5];
     let mut out = [0u8; 3];
 
     let res = block_on(reader.read_exact(&mut out)); // read 3 bytes out
     assert!(res.is_ok());
-    assert_eq!(out, [1,2,3]);
+    assert_eq!(out, [1, 2, 3]);
     assert_eq!(reader.len(), 2);
 
     let res = block_on(reader.read_exact(&mut out)); // read another 3 bytes, but only 2 bytes left
diff --git a/tests/io_read_line.rs b/tests/io_read_line.rs
index 51e8126..88a8779 100644
--- a/tests/io_read_line.rs
+++ b/tests/io_read_line.rs
@@ -1,8 +1,22 @@
+use futures::executor::block_on;
+use futures::future::{Future, FutureExt};
+use futures::io::{AsyncBufReadExt, Cursor};
+use futures::stream::{self, StreamExt, TryStreamExt};
+use futures::task::Poll;
+use futures_test::io::AsyncReadTestExt;
+use futures_test::task::noop_context;
+
+fn run<F: Future + Unpin>(mut f: F) -> F::Output {
+    let mut cx = noop_context();
+    loop {
+        if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
+            return x;
+        }
+    }
+}
+
 #[test]
 fn read_line() {
-    use futures::executor::block_on;
-    use futures::io::{AsyncBufReadExt, Cursor};
-
     let mut buf = Cursor::new(b"12");
     let mut v = String::new();
     assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 2);
@@ -22,34 +36,13 @@
 
 #[test]
 fn maybe_pending() {
-    use futures::future::Future;
-
-    fn run<F: Future + Unpin>(mut f: F) -> F::Output {
-        use futures::future::FutureExt;
-        use futures::task::Poll;
-        use futures_test::task::noop_context;
-
-        let mut cx = noop_context();
-        loop {
-            if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
-                return x;
-            }
-        }
-    }
-
-    use futures::stream::{self, StreamExt, TryStreamExt};
-    use futures::io::AsyncBufReadExt;
-    use futures_test::io::AsyncReadTestExt;
-
     let mut buf = b"12".interleave_pending();
     let mut v = String::new();
     assert_eq!(run(buf.read_line(&mut v)).unwrap(), 2);
     assert_eq!(v, "12");
 
-    let mut buf = stream::iter(vec![&b"12"[..], &b"\n\n"[..]])
-        .map(Ok)
-        .into_async_read()
-        .interleave_pending();
+    let mut buf =
+        stream::iter(vec![&b"12"[..], &b"\n\n"[..]]).map(Ok).into_async_read().interleave_pending();
     let mut v = String::new();
     assert_eq!(run(buf.read_line(&mut v)).unwrap(), 3);
     assert_eq!(v, "12\n");
diff --git a/tests/io_read_to_end.rs b/tests/io_read_to_end.rs
index 892d463..7122511 100644
--- a/tests/io_read_to_end.rs
+++ b/tests/io_read_to_end.rs
@@ -1,4 +1,5 @@
 use futures::{
+    executor::block_on,
     io::{self, AsyncRead, AsyncReadExt},
     task::{Context, Poll},
 };
@@ -12,7 +13,7 @@
     }
 
     impl MyRead {
-        pub fn new() -> Self {
+        fn new() -> Self {
             MyRead { first: false }
         }
     }
@@ -39,7 +40,7 @@
     }
 
     impl VecWrapper {
-        pub fn new() -> Self {
+        fn new() -> Self {
             VecWrapper { inner: Vec::new() }
         }
     }
@@ -55,7 +56,7 @@
         }
     }
 
-    futures::executor::block_on(async {
+    block_on(async {
         let mut vec = VecWrapper::new();
         let mut read = MyRead::new();
 
diff --git a/tests/io_read_to_string.rs b/tests/io_read_to_string.rs
index 2e9c00a..ae6aaa2 100644
--- a/tests/io_read_to_string.rs
+++ b/tests/io_read_to_string.rs
@@ -1,8 +1,13 @@
+use futures::executor::block_on;
+use futures::future::{Future, FutureExt};
+use futures::io::{AsyncReadExt, Cursor};
+use futures::stream::{self, StreamExt, TryStreamExt};
+use futures::task::Poll;
+use futures_test::io::AsyncReadTestExt;
+use futures_test::task::noop_context;
+
 #[test]
 fn read_to_string() {
-    use futures::executor::block_on;
-    use futures::io::{AsyncReadExt, Cursor};
-
     let mut c = Cursor::new(&b""[..]);
     let mut v = String::new();
     assert_eq!(block_on(c.read_to_string(&mut v)).unwrap(), 0);
@@ -20,16 +25,7 @@
 
 #[test]
 fn interleave_pending() {
-    use futures::future::Future;
-    use futures::stream::{self, StreamExt, TryStreamExt};
-    use futures::io::AsyncReadExt;
-    use futures_test::io::AsyncReadTestExt;
-
     fn run<F: Future + Unpin>(mut f: F) -> F::Output {
-        use futures::future::FutureExt;
-        use futures_test::task::noop_context;
-        use futures::task::Poll;
-
         let mut cx = noop_context();
         loop {
             if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
diff --git a/tests/io_read_until.rs b/tests/io_read_until.rs
index 6fa22ee..71f857f 100644
--- a/tests/io_read_until.rs
+++ b/tests/io_read_until.rs
@@ -1,8 +1,22 @@
+use futures::executor::block_on;
+use futures::future::{Future, FutureExt};
+use futures::io::{AsyncBufReadExt, Cursor};
+use futures::stream::{self, StreamExt, TryStreamExt};
+use futures::task::Poll;
+use futures_test::io::AsyncReadTestExt;
+use futures_test::task::noop_context;
+
+fn run<F: Future + Unpin>(mut f: F) -> F::Output {
+    let mut cx = noop_context();
+    loop {
+        if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
+            return x;
+        }
+    }
+}
+
 #[test]
 fn read_until() {
-    use futures::executor::block_on;
-    use futures::io::{AsyncBufReadExt, Cursor};
-
     let mut buf = Cursor::new(b"12");
     let mut v = Vec::new();
     assert_eq!(block_on(buf.read_until(b'3', &mut v)).unwrap(), 2);
@@ -22,25 +36,6 @@
 
 #[test]
 fn maybe_pending() {
-    use futures::future::Future;
-
-    fn run<F: Future + Unpin>(mut f: F) -> F::Output {
-        use futures::future::FutureExt;
-        use futures_test::task::noop_context;
-        use futures::task::Poll;
-
-        let mut cx = noop_context();
-        loop {
-            if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
-                return x;
-            }
-        }
-    }
-
-    use futures::stream::{self, StreamExt, TryStreamExt};
-    use futures::io::AsyncBufReadExt;
-    use futures_test::io::AsyncReadTestExt;
-
     let mut buf = b"12".interleave_pending();
     let mut v = Vec::new();
     assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 2);
diff --git a/tests/io_write.rs b/tests/io_write.rs
index 363f32b..6af2755 100644
--- a/tests/io_write.rs
+++ b/tests/io_write.rs
@@ -1,35 +1,34 @@
-mod mock_writer {
-    use futures::io::AsyncWrite;
-    use std::io;
-    use std::pin::Pin;
-    use std::task::{Context, Poll};
+use futures::io::AsyncWrite;
+use futures_test::task::panic_context;
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
 
-    pub struct MockWriter {
-        fun: Box<dyn FnMut(&[u8]) -> Poll<io::Result<usize>>>,
+struct MockWriter {
+    fun: Box<dyn FnMut(&[u8]) -> Poll<io::Result<usize>>>,
+}
+
+impl MockWriter {
+    fn new(fun: impl FnMut(&[u8]) -> Poll<io::Result<usize>> + 'static) -> Self {
+        Self { fun: Box::new(fun) }
+    }
+}
+
+impl AsyncWrite for MockWriter {
+    fn poll_write(
+        self: Pin<&mut Self>,
+        _cx: &mut Context<'_>,
+        buf: &[u8],
+    ) -> Poll<io::Result<usize>> {
+        (self.get_mut().fun)(buf)
     }
 
-    impl MockWriter {
-        pub fn new(fun: impl FnMut(&[u8]) -> Poll<io::Result<usize>> + 'static) -> Self {
-            Self { fun: Box::new(fun) }
-        }
+    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+        panic!()
     }
 
-    impl AsyncWrite for MockWriter {
-        fn poll_write(
-            self: Pin<&mut Self>,
-            _cx: &mut Context<'_>,
-            buf: &[u8],
-        ) -> Poll<io::Result<usize>> {
-            (self.get_mut().fun)(buf)
-        }
-
-        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
-            panic!()
-        }
-
-        fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
-            panic!()
-        }
+    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+        panic!()
     }
 }
 
@@ -37,14 +36,6 @@
 /// calls `poll_write` with an empty slice if no buffers are provided.
 #[test]
 fn write_vectored_no_buffers() {
-    use futures::io::AsyncWrite;
-    use futures_test::task::panic_context;
-    use std::io;
-    use std::pin::Pin;
-    use std::task::Poll;
-
-    use mock_writer::MockWriter;
-
     let mut writer = MockWriter::new(|buf| {
         assert_eq!(buf, b"");
         Err(io::ErrorKind::BrokenPipe.into()).into()
@@ -61,24 +52,12 @@
 /// calls `poll_write` with the first non-empty buffer.
 #[test]
 fn write_vectored_first_non_empty() {
-    use futures::io::AsyncWrite;
-    use futures_test::task::panic_context;
-    use std::io;
-    use std::pin::Pin;
-    use std::task::Poll;
-
-    use mock_writer::MockWriter;
-
     let mut writer = MockWriter::new(|buf| {
         assert_eq!(buf, b"four");
         Poll::Ready(Ok(4))
     });
     let cx = &mut panic_context();
-    let bufs = &mut [
-        io::IoSlice::new(&[]),
-        io::IoSlice::new(&[]),
-        io::IoSlice::new(b"four")
-    ];
+    let bufs = &mut [io::IoSlice::new(&[]), io::IoSlice::new(&[]), io::IoSlice::new(b"four")];
 
     let res = Pin::new(&mut writer).poll_write_vectored(cx, bufs);
     let res = res.map_err(|e| e.kind());
diff --git a/tests/join_all.rs b/tests/join_all.rs
deleted file mode 100644
index c322e58..0000000
--- a/tests/join_all.rs
+++ /dev/null
@@ -1,51 +0,0 @@
-mod util {
-    use std::future::Future;
-    use std::fmt::Debug;
-
-    pub fn assert_done<T, F>(actual_fut: F, expected: T)
-    where
-        T: PartialEq + Debug,
-        F: FnOnce() -> Box<dyn Future<Output = T> + Unpin>,
-    {
-        use futures::executor::block_on;
-
-        let output = block_on(actual_fut());
-        assert_eq!(output, expected);
-    }
-}
-
-#[test]
-fn collect_collects() {
-    use futures_util::future::{join_all,ready};
-
-    util::assert_done(|| Box::new(join_all(vec![ready(1), ready(2)])), vec![1, 2]);
-    util::assert_done(|| Box::new(join_all(vec![ready(1)])), vec![1]);
-    // REVIEW: should this be implemented?
-    // assert_done(|| Box::new(join_all(Vec::<i32>::new())), vec![]);
-
-    // TODO: needs more tests
-}
-
-#[test]
-fn join_all_iter_lifetime() {
-    use futures_util::future::{join_all,ready};
-    use std::future::Future;
-    // In futures-rs version 0.1, this function would fail to typecheck due to an overly
-    // conservative type parameterization of `JoinAll`.
-    fn sizes(bufs: Vec<&[u8]>) -> Box<dyn Future<Output = Vec<usize>> + Unpin> {
-        let iter = bufs.into_iter().map(|b| ready::<usize>(b.len()));
-        Box::new(join_all(iter))
-    }
-
-    util::assert_done(|| sizes(vec![&[1,2,3], &[], &[0]]), vec![3_usize, 0, 1]);
-}
-
-#[test]
-fn join_all_from_iter() {
-    use futures_util::future::{JoinAll,ready};
-
-    util::assert_done(
-        || Box::new(vec![ready(1), ready(2)].into_iter().collect::<JoinAll<_>>()),
-        vec![1, 2],
-    )
-}
diff --git a/tests/mutex.rs b/tests/lock_mutex.rs
similarity index 67%
rename from tests/mutex.rs
rename to tests/lock_mutex.rs
index 68e0301..c92ef50 100644
--- a/tests/mutex.rs
+++ b/tests/lock_mutex.rs
@@ -1,9 +1,15 @@
+use futures::channel::mpsc;
+use futures::executor::{block_on, ThreadPool};
+use futures::future::{ready, FutureExt};
+use futures::lock::Mutex;
+use futures::stream::StreamExt;
+use futures::task::{Context, SpawnExt};
+use futures_test::future::FutureTestExt;
+use futures_test::task::{new_count_waker, panic_context};
+use std::sync::Arc;
+
 #[test]
 fn mutex_acquire_uncontested() {
-    use futures::future::FutureExt;
-    use futures::lock::Mutex;
-    use futures_test::task::panic_context;
-
     let mutex = Mutex::new(());
     for _ in 0..10 {
         assert!(mutex.lock().poll_unpin(&mut panic_context()).is_ready());
@@ -12,11 +18,6 @@
 
 #[test]
 fn mutex_wakes_waiters() {
-    use futures::future::FutureExt;
-    use futures::lock::Mutex;
-    use futures::task::Context;
-    use futures_test::task::{new_count_waker, panic_context};
-
     let mutex = Mutex::new(());
     let (waker, counter) = new_count_waker();
     let lock = mutex.lock().poll_unpin(&mut panic_context());
@@ -33,22 +34,11 @@
     assert!(waiter.poll_unpin(&mut panic_context()).is_ready());
 }
 
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
 #[test]
 fn mutex_contested() {
-    use futures::channel::mpsc;
-    use futures::executor::block_on;
-    use futures::future::ready;
-    use futures::lock::Mutex;
-    use futures::stream::StreamExt;
-    use futures::task::SpawnExt;
-    use futures_test::future::FutureTestExt;
-    use std::sync::Arc;
-
     let (tx, mut rx) = mpsc::unbounded();
-    let pool = futures::executor::ThreadPool::builder()
-        .pool_size(16)
-        .create()
-        .unwrap();
+    let pool = ThreadPool::builder().pool_size(16).create().unwrap();
 
     let tx = Arc::new(tx);
     let mutex = Arc::new(Mutex::new(0));
diff --git a/tests/macro_comma_support.rs b/tests/macro_comma_support.rs
index ca13163..3b082d2 100644
--- a/tests/macro_comma_support.rs
+++ b/tests/macro_comma_support.rs
@@ -1,25 +1,23 @@
+use futures::{
+    executor::block_on,
+    future::{self, FutureExt},
+    join, ready,
+    task::Poll,
+    try_join,
+};
+
 #[test]
 fn ready() {
-    use futures::{
-        executor::block_on,
-        future,
-        task::Poll,
-        ready,
-    };
-
     block_on(future::poll_fn(|_| {
         ready!(Poll::Ready(()),);
         Poll::Ready(())
     }))
 }
 
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
 #[test]
 fn poll() {
-    use futures::{
-        executor::block_on,
-        future::FutureExt,
-        poll,
-    };
+    use futures::poll;
 
     block_on(async {
         let _ = poll!(async {}.boxed(),);
@@ -28,11 +26,6 @@
 
 #[test]
 fn join() {
-    use futures::{
-        executor::block_on,
-        join
-    };
-
     block_on(async {
         let future1 = async { 1 };
         let future2 = async { 2 };
@@ -42,12 +35,6 @@
 
 #[test]
 fn try_join() {
-    use futures::{
-        executor::block_on,
-        future::FutureExt,
-        try_join,
-    };
-
     block_on(async {
         let future1 = async { 1 }.never_error();
         let future2 = async { 2 }.never_error();
diff --git a/tests/oneshot.rs b/tests/oneshot.rs
index 2494306..34b78a3 100644
--- a/tests/oneshot.rs
+++ b/tests/oneshot.rs
@@ -1,11 +1,11 @@
+use futures::channel::oneshot;
+use futures::future::{FutureExt, TryFutureExt};
+use futures_test::future::FutureTestExt;
+use std::sync::mpsc;
+use std::thread;
+
 #[test]
 fn oneshot_send1() {
-    use futures::channel::oneshot;
-    use futures::future::TryFutureExt;
-    use futures_test::future::FutureTestExt;
-    use std::sync::mpsc;
-    use std::thread;
-
     let (tx1, rx1) = oneshot::channel::<i32>();
     let (tx2, rx2) = mpsc::channel();
 
@@ -17,12 +17,6 @@
 
 #[test]
 fn oneshot_send2() {
-    use futures::channel::oneshot;
-    use futures::future::TryFutureExt;
-    use futures_test::future::FutureTestExt;
-    use std::sync::mpsc;
-    use std::thread;
-
     let (tx1, rx1) = oneshot::channel::<i32>();
     let (tx2, rx2) = mpsc::channel();
 
@@ -33,12 +27,6 @@
 
 #[test]
 fn oneshot_send3() {
-    use futures::channel::oneshot;
-    use futures::future::TryFutureExt;
-    use futures_test::future::FutureTestExt;
-    use std::sync::mpsc;
-    use std::thread;
-
     let (tx1, rx1) = oneshot::channel::<i32>();
     let (tx2, rx2) = mpsc::channel();
 
@@ -49,11 +37,6 @@
 
 #[test]
 fn oneshot_drop_tx1() {
-    use futures::channel::oneshot;
-    use futures::future::FutureExt;
-    use futures_test::future::FutureTestExt;
-    use std::sync::mpsc;
-
     let (tx1, rx1) = oneshot::channel::<i32>();
     let (tx2, rx2) = mpsc::channel();
 
@@ -65,12 +48,6 @@
 
 #[test]
 fn oneshot_drop_tx2() {
-    use futures::channel::oneshot;
-    use futures::future::FutureExt;
-    use futures_test::future::FutureTestExt;
-    use std::sync::mpsc;
-    use std::thread;
-
     let (tx1, rx1) = oneshot::channel::<i32>();
     let (tx2, rx2) = mpsc::channel();
 
@@ -83,9 +60,19 @@
 
 #[test]
 fn oneshot_drop_rx() {
-    use futures::channel::oneshot;
-
     let (tx, rx) = oneshot::channel::<i32>();
     drop(rx);
     assert_eq!(Err(2), tx.send(2));
 }
+
+#[test]
+fn oneshot_debug() {
+    let (tx, rx) = oneshot::channel::<i32>();
+    assert_eq!(format!("{:?}", tx), "Sender { complete: false }");
+    assert_eq!(format!("{:?}", rx), "Receiver { complete: false }");
+    drop(rx);
+    assert_eq!(format!("{:?}", tx), "Sender { complete: true }");
+    let (tx, rx) = oneshot::channel::<i32>();
+    drop(tx);
+    assert_eq!(format!("{:?}", rx), "Receiver { complete: true }");
+}
diff --git a/tests/ready_queue.rs b/tests/ready_queue.rs
index 9aa3636..afba8f2 100644
--- a/tests/ready_queue.rs
+++ b/tests/ready_queue.rs
@@ -1,18 +1,15 @@
-mod assert_send_sync {
-    use futures::stream::FuturesUnordered;
-
-    pub trait AssertSendSync: Send + Sync {}
-    impl AssertSendSync for FuturesUnordered<()> {}
-}
+use futures::channel::oneshot;
+use futures::executor::{block_on, block_on_stream};
+use futures::future;
+use futures::stream::{FuturesUnordered, StreamExt};
+use futures::task::Poll;
+use futures_test::task::noop_context;
+use std::panic::{self, AssertUnwindSafe};
+use std::sync::{Arc, Barrier};
+use std::thread;
 
 #[test]
 fn basic_usage() {
-    use futures::channel::oneshot;
-    use futures::executor::block_on;
-    use futures::future;
-    use futures::stream::{FuturesUnordered, StreamExt};
-    use futures::task::Poll;
-
     block_on(future::lazy(move |cx| {
         let mut queue = FuturesUnordered::new();
         let (tx1, rx1) = oneshot::channel();
@@ -41,12 +38,6 @@
 
 #[test]
 fn resolving_errors() {
-    use futures::channel::oneshot;
-    use futures::executor::block_on;
-    use futures::future;
-    use futures::stream::{FuturesUnordered, StreamExt};
-    use futures::task::Poll;
-
     block_on(future::lazy(move |cx| {
         let mut queue = FuturesUnordered::new();
         let (tx1, rx1) = oneshot::channel();
@@ -75,12 +66,6 @@
 
 #[test]
 fn dropping_ready_queue() {
-    use futures::channel::oneshot;
-    use futures::executor::block_on;
-    use futures::future;
-    use futures::stream::FuturesUnordered;
-    use futures_test::task::noop_context;
-
     block_on(future::lazy(move |_| {
         let queue = FuturesUnordered::new();
         let (mut tx1, rx1) = oneshot::channel::<()>();
@@ -108,12 +93,9 @@
 
 #[test]
 fn stress() {
-    use futures::channel::oneshot;
-    use futures::executor::block_on_stream;
-    use futures::stream::FuturesUnordered;
-    use std::sync::{Arc, Barrier};
-    use std::thread;
-
+    #[cfg(miri)]
+    const ITER: usize = 30;
+    #[cfg(not(miri))]
     const ITER: usize = 300;
 
     for i in 0..ITER {
@@ -157,12 +139,6 @@
 
 #[test]
 fn panicking_future_dropped() {
-    use futures::executor::block_on;
-    use futures::future;
-    use futures::stream::{FuturesUnordered, StreamExt};
-    use futures::task::Poll;
-    use std::panic::{self, AssertUnwindSafe};
-
     block_on(future::lazy(move |cx| {
         let mut queue = FuturesUnordered::new();
         queue.push(future::poll_fn(|_| -> Poll<Result<i32, i32>> { panic!() }));
diff --git a/tests/recurse.rs b/tests/recurse.rs
index a151f1b..f06524f 100644
--- a/tests/recurse.rs
+++ b/tests/recurse.rs
@@ -1,10 +1,11 @@
+use futures::executor::block_on;
+use futures::future::{self, BoxFuture, FutureExt};
+use std::sync::mpsc;
+use std::thread;
+
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
 #[test]
 fn lots() {
-    use futures::executor::block_on;
-    use futures::future::{self, FutureExt, BoxFuture};
-    use std::sync::mpsc;
-    use std::thread;
-
     #[cfg(not(futures_sanitizer))]
     const N: i32 = 1_000;
     #[cfg(futures_sanitizer)] // If N is many, asan reports stack-overflow: https://gist.github.com/taiki-e/099446d21cbec69d4acbacf7a9646136
@@ -20,8 +21,6 @@
     }
 
     let (tx, rx) = mpsc::channel();
-    thread::spawn(|| {
-        block_on(do_it((N, 0)).map(move |x| tx.send(x).unwrap()))
-    });
+    thread::spawn(|| block_on(do_it((N, 0)).map(move |x| tx.send(x).unwrap())));
     assert_eq!((0..=N).sum::<i32>(), rx.recv().unwrap());
 }
diff --git a/tests/select_ok.rs b/tests/select_ok.rs
deleted file mode 100644
index 81cadb7..0000000
--- a/tests/select_ok.rs
+++ /dev/null
@@ -1,42 +0,0 @@
-#[test]
-fn ignore_err() {
-    use futures::executor::block_on;
-    use futures::future::{err, ok, select_ok};
-
-    let v = vec![
-        err(1),
-        err(2),
-        ok(3),
-        ok(4),
-    ];
-
-    let (i, v) = block_on(select_ok(v)).ok().unwrap();
-    assert_eq!(i, 3);
-
-    assert_eq!(v.len(), 1);
-
-    let (i, v) = block_on(select_ok(v)).ok().unwrap();
-    assert_eq!(i, 4);
-
-    assert!(v.is_empty());
-}
-
-#[test]
-fn last_err() {
-    use futures::executor::block_on;
-    use futures::future::{err, ok, select_ok};
-
-    let v = vec![
-        ok(1),
-        err(2),
-        err(3),
-    ];
-
-    let (i, v) = block_on(select_ok(v)).ok().unwrap();
-    assert_eq!(i, 1);
-
-    assert_eq!(v.len(), 2);
-
-    let i = block_on(select_ok(v)).err().unwrap();
-    assert_eq!(i, 3);
-}
diff --git a/tests/sink.rs b/tests/sink.rs
index 597ed34..dc826bd 100644
--- a/tests/sink.rs
+++ b/tests/sink.rs
@@ -1,264 +1,221 @@
-mod sassert_next {
-    use futures::stream::{Stream, StreamExt};
-    use futures::task::Poll;
-    use futures_test::task::panic_context;
-    use std::fmt;
+use futures::channel::{mpsc, oneshot};
+use futures::executor::block_on;
+use futures::future::{self, poll_fn, Future, FutureExt, TryFutureExt};
+use futures::never::Never;
+use futures::ready;
+use futures::sink::{self, Sink, SinkErrInto, SinkExt};
+use futures::stream::{self, Stream, StreamExt};
+use futures::task::{self, ArcWake, Context, Poll, Waker};
+use futures_test::task::panic_context;
+use std::cell::{Cell, RefCell};
+use std::collections::VecDeque;
+use std::fmt;
+use std::mem;
+use std::pin::Pin;
+use std::rc::Rc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Arc;
 
-    pub fn sassert_next<S>(s: &mut S, item: S::Item)
-    where
-        S: Stream + Unpin,
-        S::Item: Eq + fmt::Debug,
-    {
-        match s.poll_next_unpin(&mut panic_context()) {
-            Poll::Ready(None) => panic!("stream is at its end"),
-            Poll::Ready(Some(e)) => assert_eq!(e, item),
-            Poll::Pending => panic!("stream wasn't ready"),
-        }
+fn sassert_next<S>(s: &mut S, item: S::Item)
+where
+    S: Stream + Unpin,
+    S::Item: Eq + fmt::Debug,
+{
+    match s.poll_next_unpin(&mut panic_context()) {
+        Poll::Ready(None) => panic!("stream is at its end"),
+        Poll::Ready(Some(e)) => assert_eq!(e, item),
+        Poll::Pending => panic!("stream wasn't ready"),
     }
 }
 
-mod unwrap {
-    use futures::task::Poll;
-    use std::fmt;
-
-    pub fn unwrap<T, E: fmt::Debug>(x: Poll<Result<T, E>>) -> T {
-        match x {
-            Poll::Ready(Ok(x)) => x,
-            Poll::Ready(Err(_)) => panic!("Poll::Ready(Err(_))"),
-            Poll::Pending => panic!("Poll::Pending"),
-        }
+fn unwrap<T, E: fmt::Debug>(x: Poll<Result<T, E>>) -> T {
+    match x {
+        Poll::Ready(Ok(x)) => x,
+        Poll::Ready(Err(_)) => panic!("Poll::Ready(Err(_))"),
+        Poll::Pending => panic!("Poll::Pending"),
     }
 }
 
-mod flag_cx {
-    use futures::task::{self, ArcWake, Context};
-    use std::sync::Arc;
-    use std::sync::atomic::{AtomicBool, Ordering};
+// An Unpark struct that records unpark events for inspection
+struct Flag(AtomicBool);
 
-    // An Unpark struct that records unpark events for inspection
-    pub struct Flag(AtomicBool);
-
-    impl Flag {
-        pub fn new() -> Arc<Self> {
-            Arc::new(Self(AtomicBool::new(false)))
-        }
-
-        pub fn take(&self) -> bool {
-            self.0.swap(false, Ordering::SeqCst)
-        }
-
-        pub fn set(&self, v: bool) {
-            self.0.store(v, Ordering::SeqCst)
-        }
+impl Flag {
+    fn new() -> Arc<Self> {
+        Arc::new(Self(AtomicBool::new(false)))
     }
 
-    impl ArcWake for Flag {
-        fn wake_by_ref(arc_self: &Arc<Self>) {
-            arc_self.set(true)
-        }
+    fn take(&self) -> bool {
+        self.0.swap(false, Ordering::SeqCst)
     }
 
-    pub fn flag_cx<F, R>(f: F) -> R
-    where
-        F: FnOnce(Arc<Flag>, &mut Context<'_>) -> R,
-    {
-        let flag = Flag::new();
-        let waker = task::waker_ref(&flag);
-        let cx = &mut Context::from_waker(&waker);
-        f(flag.clone(), cx)
+    fn set(&self, v: bool) {
+        self.0.store(v, Ordering::SeqCst)
     }
 }
 
-mod start_send_fut {
-    use futures::future::Future;
-    use futures::ready;
-    use futures::sink::Sink;
-    use futures::task::{Context, Poll};
-    use std::pin::Pin;
-
-    // Sends a value on an i32 channel sink
-    pub struct StartSendFut<S: Sink<Item> + Unpin, Item: Unpin>(Option<S>, Option<Item>);
-
-    impl<S: Sink<Item> + Unpin, Item: Unpin> StartSendFut<S, Item> {
-        pub fn new(sink: S, item: Item) -> Self {
-            Self(Some(sink), Some(item))
-        }
-    }
-
-    impl<S: Sink<Item> + Unpin, Item: Unpin> Future for StartSendFut<S, Item> {
-        type Output = Result<S, S::Error>;
-
-        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
-            let Self(inner, item) = self.get_mut();
-            {
-                let mut inner = inner.as_mut().unwrap();
-                ready!(Pin::new(&mut inner).poll_ready(cx))?;
-                Pin::new(&mut inner).start_send(item.take().unwrap())?;
-            }
-            Poll::Ready(Ok(inner.take().unwrap()))
-        }
+impl ArcWake for Flag {
+    fn wake_by_ref(arc_self: &Arc<Self>) {
+        arc_self.set(true)
     }
 }
 
-mod manual_flush {
-    use futures::sink::Sink;
-    use futures::task::{Context, Poll, Waker};
-    use std::mem;
-    use std::pin::Pin;
+fn flag_cx<F, R>(f: F) -> R
+where
+    F: FnOnce(Arc<Flag>, &mut Context<'_>) -> R,
+{
+    let flag = Flag::new();
+    let waker = task::waker_ref(&flag);
+    let cx = &mut Context::from_waker(&waker);
+    f(flag.clone(), cx)
+}
 
-    // Immediately accepts all requests to start pushing, but completion is managed
-    // by manually flushing
-    pub struct ManualFlush<T: Unpin> {
-        data: Vec<T>,
-        waiting_tasks: Vec<Waker>,
-    }
+// Sends a value on an i32 channel sink
+struct StartSendFut<S: Sink<Item> + Unpin, Item: Unpin>(Option<S>, Option<Item>);
 
-    impl<T: Unpin> Sink<Option<T>> for ManualFlush<T> {
-        type Error = ();
-
-        fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
-            Poll::Ready(Ok(()))
-        }
-
-        fn start_send(mut self: Pin<&mut Self>, item: Option<T>) -> Result<(), Self::Error> {
-            if let Some(item) = item {
-                self.data.push(item);
-            } else {
-                self.force_flush();
-            }
-            Ok(())
-        }
-
-        fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
-            if self.data.is_empty() {
-                Poll::Ready(Ok(()))
-            } else {
-                self.waiting_tasks.push(cx.waker().clone());
-                Poll::Pending
-            }
-        }
-
-        fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
-            self.poll_flush(cx)
-        }
-    }
-
-    impl<T: Unpin> ManualFlush<T> {
-        pub fn new() -> Self {
-            Self {
-                data: Vec::new(),
-                waiting_tasks: Vec::new(),
-            }
-        }
-
-        pub fn force_flush(&mut self) -> Vec<T> {
-            for task in self.waiting_tasks.drain(..) {
-                task.wake()
-            }
-            mem::replace(&mut self.data, Vec::new())
-        }
+impl<S: Sink<Item> + Unpin, Item: Unpin> StartSendFut<S, Item> {
+    fn new(sink: S, item: Item) -> Self {
+        Self(Some(sink), Some(item))
     }
 }
 
-mod allowance {
-    use futures::sink::Sink;
-    use futures::task::{Context, Poll, Waker};
-    use std::cell::{Cell, RefCell};
-    use std::pin::Pin;
-    use std::rc::Rc;
+impl<S: Sink<Item> + Unpin, Item: Unpin> Future for StartSendFut<S, Item> {
+    type Output = Result<S, S::Error>;
 
-    pub struct ManualAllow<T: Unpin> {
-        pub data: Vec<T>,
-        allow: Rc<Allow>,
+    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+        let Self(inner, item) = self.get_mut();
+        {
+            let mut inner = inner.as_mut().unwrap();
+            ready!(Pin::new(&mut inner).poll_ready(cx))?;
+            Pin::new(&mut inner).start_send(item.take().unwrap())?;
+        }
+        Poll::Ready(Ok(inner.take().unwrap()))
+    }
+}
+
+// Immediately accepts all requests to start pushing, but completion is managed
+// by manually flushing
+struct ManualFlush<T: Unpin> {
+    data: Vec<T>,
+    waiting_tasks: Vec<Waker>,
+}
+
+impl<T: Unpin> Sink<Option<T>> for ManualFlush<T> {
+    type Error = ();
+
+    fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+        Poll::Ready(Ok(()))
     }
 
-    pub struct Allow {
-        flag: Cell<bool>,
-        tasks: RefCell<Vec<Waker>>,
-    }
-
-    impl Allow {
-        pub fn new() -> Self {
-            Self {
-                flag: Cell::new(false),
-                tasks: RefCell::new(Vec::new()),
-            }
-        }
-
-        pub fn check(&self, cx: &mut Context<'_>) -> bool {
-            if self.flag.get() {
-                true
-            } else {
-                self.tasks.borrow_mut().push(cx.waker().clone());
-                false
-            }
-        }
-
-        pub fn start(&self) {
-            self.flag.set(true);
-            let mut tasks = self.tasks.borrow_mut();
-            for task in tasks.drain(..) {
-                task.wake();
-            }
-        }
-    }
-
-    impl<T: Unpin> Sink<T> for ManualAllow<T> {
-        type Error = ();
-
-        fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
-            if self.allow.check(cx) {
-                Poll::Ready(Ok(()))
-            } else {
-                Poll::Pending
-            }
-        }
-
-        fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
+    fn start_send(mut self: Pin<&mut Self>, item: Option<T>) -> Result<(), Self::Error> {
+        if let Some(item) = item {
             self.data.push(item);
-            Ok(())
+        } else {
+            self.force_flush();
         }
+        Ok(())
+    }
 
-        fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+        if self.data.is_empty() {
             Poll::Ready(Ok(()))
-        }
-
-        fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
-            Poll::Ready(Ok(()))
+        } else {
+            self.waiting_tasks.push(cx.waker().clone());
+            Poll::Pending
         }
     }
 
-    pub fn manual_allow<T: Unpin>() -> (ManualAllow<T>, Rc<Allow>) {
-        let allow = Rc::new(Allow::new());
-        let manual_allow = ManualAllow {
-            data: Vec::new(),
-            allow: allow.clone(),
-        };
-        (manual_allow, allow)
+    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+        self.poll_flush(cx)
     }
 }
 
+impl<T: Unpin> ManualFlush<T> {
+    fn new() -> Self {
+        Self { data: Vec::new(), waiting_tasks: Vec::new() }
+    }
+
+    fn force_flush(&mut self) -> Vec<T> {
+        for task in self.waiting_tasks.drain(..) {
+            task.wake()
+        }
+        mem::replace(&mut self.data, Vec::new())
+    }
+}
+
+struct ManualAllow<T: Unpin> {
+    data: Vec<T>,
+    allow: Rc<Allow>,
+}
+
+struct Allow {
+    flag: Cell<bool>,
+    tasks: RefCell<Vec<Waker>>,
+}
+
+impl Allow {
+    fn new() -> Self {
+        Self { flag: Cell::new(false), tasks: RefCell::new(Vec::new()) }
+    }
+
+    fn check(&self, cx: &mut Context<'_>) -> bool {
+        if self.flag.get() {
+            true
+        } else {
+            self.tasks.borrow_mut().push(cx.waker().clone());
+            false
+        }
+    }
+
+    fn start(&self) {
+        self.flag.set(true);
+        let mut tasks = self.tasks.borrow_mut();
+        for task in tasks.drain(..) {
+            task.wake();
+        }
+    }
+}
+
+impl<T: Unpin> Sink<T> for ManualAllow<T> {
+    type Error = ();
+
+    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+        if self.allow.check(cx) {
+            Poll::Ready(Ok(()))
+        } else {
+            Poll::Pending
+        }
+    }
+
+    fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
+        self.data.push(item);
+        Ok(())
+    }
+
+    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+        Poll::Ready(Ok(()))
+    }
+
+    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+        Poll::Ready(Ok(()))
+    }
+}
+
+fn manual_allow<T: Unpin>() -> (ManualAllow<T>, Rc<Allow>) {
+    let allow = Rc::new(Allow::new());
+    let manual_allow = ManualAllow { data: Vec::new(), allow: allow.clone() };
+    (manual_allow, allow)
+}
+
 #[test]
 fn either_sink() {
-    use futures::sink::{Sink, SinkExt};
-    use std::collections::VecDeque;
-    use std::pin::Pin;
-
-    let mut s = if true {
-        Vec::<i32>::new().left_sink()
-    } else {
-        VecDeque::<i32>::new().right_sink()
-    };
+    let mut s =
+        if true { Vec::<i32>::new().left_sink() } else { VecDeque::<i32>::new().right_sink() };
 
     Pin::new(&mut s).start_send(0).unwrap();
 }
 
 #[test]
 fn vec_sink() {
-    use futures::executor::block_on;
-    use futures::sink::{Sink, SinkExt};
-    use std::pin::Pin;
-
     let mut v = Vec::new();
     Pin::new(&mut v).start_send(0).unwrap();
     Pin::new(&mut v).start_send(1).unwrap();
@@ -269,10 +226,6 @@
 
 #[test]
 fn vecdeque_sink() {
-    use futures::sink::Sink;
-    use std::collections::VecDeque;
-    use std::pin::Pin;
-
     let mut deque = VecDeque::new();
     Pin::new(&mut deque).start_send(2).unwrap();
     Pin::new(&mut deque).start_send(3).unwrap();
@@ -284,9 +237,6 @@
 
 #[test]
 fn send() {
-    use futures::executor::block_on;
-    use futures::sink::SinkExt;
-
     let mut v = Vec::new();
 
     block_on(v.send(0)).unwrap();
@@ -301,10 +251,6 @@
 
 #[test]
 fn send_all() {
-    use futures::executor::block_on;
-    use futures::sink::SinkExt;
-    use futures::stream::{self, StreamExt};
-
     let mut v = Vec::new();
 
     block_on(v.send_all(&mut stream::iter(vec![0, 1]).map(Ok))).unwrap();
@@ -321,15 +267,6 @@
 // channel is full
 #[test]
 fn mpsc_blocking_start_send() {
-    use futures::channel::mpsc;
-    use futures::executor::block_on;
-    use futures::future::{self, FutureExt};
-
-    use start_send_fut::StartSendFut;
-    use flag_cx::flag_cx;
-    use sassert_next::sassert_next;
-    use unwrap::unwrap;
-
     let (mut tx, mut rx) = mpsc::channel::<i32>(0);
 
     block_on(future::lazy(|_| {
@@ -351,19 +288,9 @@
 
 // test `flush` by using `with` to make the first insertion into a sink block
 // until a oneshot is completed
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
 #[test]
 fn with_flush() {
-    use futures::channel::oneshot;
-    use futures::executor::block_on;
-    use futures::future::{self, FutureExt, TryFutureExt};
-    use futures::never::Never;
-    use futures::sink::{Sink, SinkExt};
-    use std::mem;
-    use std::pin::Pin;
-
-    use flag_cx::flag_cx;
-    use unwrap::unwrap;
-
     let (tx, rx) = oneshot::channel();
     let mut block = rx.boxed();
     let mut sink = Vec::new().with(|elem| {
@@ -390,11 +317,6 @@
 // test simple use of with to change data
 #[test]
 fn with_as_map() {
-    use futures::executor::block_on;
-    use futures::future;
-    use futures::never::Never;
-    use futures::sink::SinkExt;
-
     let mut sink = Vec::new().with(|item| future::ok::<i32, Never>(item * 2));
     block_on(sink.send(0)).unwrap();
     block_on(sink.send(1)).unwrap();
@@ -405,10 +327,6 @@
 // test simple use of with_flat_map
 #[test]
 fn with_flat_map() {
-    use futures::executor::block_on;
-    use futures::sink::SinkExt;
-    use futures::stream::{self, StreamExt};
-
     let mut sink = Vec::new().with_flat_map(|item| stream::iter(vec![item; item]).map(Ok));
     block_on(sink.send(0)).unwrap();
     block_on(sink.send(1)).unwrap();
@@ -421,16 +339,6 @@
 // Regression test for the issue #1834.
 #[test]
 fn with_propagates_poll_ready() {
-    use futures::channel::mpsc;
-    use futures::executor::block_on;
-    use futures::future;
-    use futures::sink::{Sink, SinkExt};
-    use futures::task::Poll;
-    use std::pin::Pin;
-
-    use flag_cx::flag_cx;
-    use sassert_next::sassert_next;
-
     let (tx, mut rx) = mpsc::channel::<i32>(0);
     let mut tx = tx.with(|item: i32| future::ok::<i32, mpsc::SendError>(item + 10));
 
@@ -457,14 +365,6 @@
 // but doesn't claim to be flushed until the underlying sink is
 #[test]
 fn with_flush_propagate() {
-    use futures::future::{self, FutureExt};
-    use futures::sink::{Sink, SinkExt};
-    use std::pin::Pin;
-
-    use manual_flush::ManualFlush;
-    use flag_cx::flag_cx;
-    use unwrap::unwrap;
-
     let mut sink = ManualFlush::new().with(future::ok::<Option<i32>, ()>);
     flag_cx(|flag, cx| {
         unwrap(Pin::new(&mut sink).poll_ready(cx));
@@ -486,21 +386,13 @@
 // test that `Clone` is implemented on `with` sinks
 #[test]
 fn with_implements_clone() {
-    use futures::channel::mpsc;
-    use futures::executor::block_on;
-    use futures::future;
-    use futures::{SinkExt, StreamExt};
-
     let (mut tx, rx) = mpsc::channel(5);
 
     {
-        let mut is_positive = tx
-            .clone()
-            .with(|item| future::ok::<bool, mpsc::SendError>(item > 0));
+        let mut is_positive = tx.clone().with(|item| future::ok::<bool, mpsc::SendError>(item > 0));
 
-        let mut is_long = tx
-            .clone()
-            .with(|item: &str| future::ok::<bool, mpsc::SendError>(item.len() > 5));
+        let mut is_long =
+            tx.clone().with(|item: &str| future::ok::<bool, mpsc::SendError>(item.len() > 5));
 
         block_on(is_positive.clone().send(-1)).unwrap();
         block_on(is_long.clone().send("123456")).unwrap();
@@ -512,18 +404,12 @@
 
     block_on(tx.close()).unwrap();
 
-    assert_eq!(
-        block_on(rx.collect::<Vec<_>>()),
-        vec![false, true, false, true, false]
-    );
+    assert_eq!(block_on(rx.collect::<Vec<_>>()), vec![false, true, false, true, false]);
 }
 
 // test that a buffer is a no-nop around a sink that always accepts sends
 #[test]
 fn buffer_noop() {
-    use futures::executor::block_on;
-    use futures::sink::SinkExt;
-
     let mut sink = Vec::new().buffer(0);
     block_on(sink.send(0)).unwrap();
     block_on(sink.send(1)).unwrap();
@@ -539,15 +425,6 @@
 // and writing out when the underlying sink is ready
 #[test]
 fn buffer() {
-    use futures::executor::block_on;
-    use futures::future::FutureExt;
-    use futures::sink::SinkExt;
-
-    use start_send_fut::StartSendFut;
-    use flag_cx::flag_cx;
-    use unwrap::unwrap;
-    use allowance::manual_allow;
-
     let (sink, allow) = manual_allow::<i32>();
     let sink = sink.buffer(2);
 
@@ -567,10 +444,6 @@
 
 #[test]
 fn fanout_smoke() {
-    use futures::executor::block_on;
-    use futures::sink::SinkExt;
-    use futures::stream::{self, StreamExt};
-
     let sink1 = Vec::new();
     let sink2 = Vec::new();
     let mut sink = sink1.fanout(sink2);
@@ -582,16 +455,6 @@
 
 #[test]
 fn fanout_backpressure() {
-    use futures::channel::mpsc;
-    use futures::executor::block_on;
-    use futures::future::FutureExt;
-    use futures::sink::SinkExt;
-    use futures::stream::StreamExt;
-
-    use start_send_fut::StartSendFut;
-    use flag_cx::flag_cx;
-    use unwrap::unwrap;
-
     let (left_send, mut left_recv) = mpsc::channel(0);
     let (right_send, mut right_recv) = mpsc::channel(0);
     let sink = left_send.fanout(right_send);
@@ -624,12 +487,6 @@
 
 #[test]
 fn sink_map_err() {
-    use futures::channel::mpsc;
-    use futures::sink::{Sink, SinkExt};
-    use futures::task::Poll;
-    use futures_test::task::panic_context;
-    use std::pin::Pin;
-
     {
         let cx = &mut panic_context();
         let (tx, _rx) = mpsc::channel(1);
@@ -639,20 +496,11 @@
     }
 
     let tx = mpsc::channel(0).0;
-    assert_eq!(
-        Pin::new(&mut tx.sink_map_err(|_| ())).start_send(()),
-        Err(())
-    );
+    assert_eq!(Pin::new(&mut tx.sink_map_err(|_| ())).start_send(()), Err(()));
 }
 
 #[test]
 fn sink_unfold() {
-    use futures::channel::mpsc;
-    use futures::executor::block_on;
-    use futures::future::poll_fn;
-    use futures::sink::{self, Sink, SinkExt};
-    use futures::task::Poll;
-
     block_on(poll_fn(|cx| {
         let (tx, mut rx) = mpsc::channel(1);
         let unfold = sink::unfold((), |(), i: i32| {
@@ -685,14 +533,8 @@
 
 #[test]
 fn err_into() {
-    use futures::channel::mpsc;
-    use futures::sink::{Sink, SinkErrInto, SinkExt};
-    use futures::task::Poll;
-    use futures_test::task::panic_context;
-    use std::pin::Pin;
-
     #[derive(Copy, Clone, Debug, PartialEq, Eq)]
-    pub struct ErrIntoTest;
+    struct ErrIntoTest;
 
     impl From<mpsc::SendError> for ErrIntoTest {
         fn from(_: mpsc::SendError) -> Self {
@@ -709,8 +551,5 @@
     }
 
     let tx = mpsc::channel(0).0;
-    assert_eq!(
-        Pin::new(&mut tx.sink_err_into()).start_send(()),
-        Err(ErrIntoTest)
-    );
+    assert_eq!(Pin::new(&mut tx.sink_err_into()).start_send(()), Err(ErrIntoTest));
 }
diff --git a/tests/sink_fanout.rs b/tests/sink_fanout.rs
index 7d1fa43..e57b2d8 100644
--- a/tests/sink_fanout.rs
+++ b/tests/sink_fanout.rs
@@ -1,11 +1,11 @@
+use futures::channel::mpsc;
+use futures::executor::block_on;
+use futures::future::join3;
+use futures::sink::SinkExt;
+use futures::stream::{self, StreamExt};
+
 #[test]
 fn it_works() {
-    use futures::channel::mpsc;
-    use futures::executor::block_on;
-    use futures::future::join3;
-    use futures::sink::SinkExt;
-    use futures::stream::{self, StreamExt};
-
     let (tx1, rx1) = mpsc::channel(1);
     let (tx2, rx2) = mpsc::channel(2);
     let tx = tx1.fanout(tx2).sink_map_err(|_| ());
diff --git a/tests/split.rs b/tests/split.rs
deleted file mode 100644
index 86c2fc6..0000000
--- a/tests/split.rs
+++ /dev/null
@@ -1,75 +0,0 @@
-#[test]
-fn test_split() {
-    use futures::executor::block_on;
-    use futures::sink::{Sink, SinkExt};
-    use futures::stream::{self, Stream, StreamExt};
-    use futures::task::{Context, Poll};
-    use pin_project::pin_project;
-    use std::pin::Pin;
-
-    #[pin_project]
-    struct Join<T, U> {
-        #[pin]
-        stream: T,
-        #[pin]
-        sink: U,
-    }
-
-    impl<T: Stream, U> Stream for Join<T, U> {
-        type Item = T::Item;
-
-        fn poll_next(
-            self: Pin<&mut Self>,
-            cx: &mut Context<'_>,
-        ) -> Poll<Option<T::Item>> {
-            self.project().stream.poll_next(cx)
-        }
-    }
-
-    impl<T, U: Sink<Item>, Item> Sink<Item> for Join<T, U> {
-        type Error = U::Error;
-
-        fn poll_ready(
-            self: Pin<&mut Self>,
-            cx: &mut Context<'_>,
-        ) -> Poll<Result<(), Self::Error>> {
-            self.project().sink.poll_ready(cx)
-        }
-
-        fn start_send(
-            self: Pin<&mut Self>,
-            item: Item,
-        ) -> Result<(), Self::Error> {
-            self.project().sink.start_send(item)
-        }
-
-        fn poll_flush(
-            self: Pin<&mut Self>,
-            cx: &mut Context<'_>,
-        ) -> Poll<Result<(), Self::Error>> {
-            self.project().sink.poll_flush(cx)
-        }
-
-        fn poll_close(
-            self: Pin<&mut Self>,
-            cx: &mut Context<'_>,
-        ) -> Poll<Result<(), Self::Error>> {
-            self.project().sink.poll_close(cx)
-        }
-    }
-
-    let mut dest: Vec<i32> = Vec::new();
-    {
-       let join = Join {
-            stream: stream::iter(vec![10, 20, 30]),
-            sink: &mut dest
-        };
-
-        let (sink, stream) = join.split();
-        let join = sink.reunite(stream).expect("test_split: reunite error");
-        let (mut sink, stream) = join.split();
-        let mut stream = stream.map(Ok);
-        block_on(sink.send_all(&mut stream)).unwrap();
-    }
-    assert_eq!(dest, vec![10, 20, 30]);
-}
diff --git a/tests/stream.rs b/tests/stream.rs
index 14b283d..71ec654 100644
--- a/tests/stream.rs
+++ b/tests/stream.rs
@@ -1,8 +1,18 @@
+use std::iter;
+use std::sync::Arc;
+
+use futures::channel::mpsc;
+use futures::executor::block_on;
+use futures::future::{self, Future};
+use futures::lock::Mutex;
+use futures::sink::SinkExt;
+use futures::stream::{self, StreamExt};
+use futures::task::Poll;
+use futures::{ready, FutureExt};
+use futures_test::task::noop_context;
+
 #[test]
 fn select() {
-    use futures::executor::block_on;
-    use futures::stream::{self, StreamExt};
-
     fn select_and_compare(a: Vec<u32>, b: Vec<u32>, expected: Vec<u32>) {
         let a = stream::iter(a);
         let b = stream::iter(b);
@@ -17,19 +27,12 @@
 
 #[test]
 fn flat_map() {
-    use futures::stream::{self, StreamExt};
+    block_on(async {
+        let st =
+            stream::iter(vec![stream::iter(0..=4u8), stream::iter(6..=10), stream::iter(0..=2)]);
 
-    futures::executor::block_on(async {
-        let st = stream::iter(vec![
-            stream::iter(0..=4u8),
-            stream::iter(6..=10),
-            stream::iter(0..=2),
-        ]);
-
-        let values: Vec<_> = st
-            .flat_map(|s| s.filter(|v| futures::future::ready(v % 2 == 0)))
-            .collect()
-            .await;
+        let values: Vec<_> =
+            st.flat_map(|s| s.filter(|v| futures::future::ready(v % 2 == 0))).collect().await;
 
         assert_eq!(values, vec![0, 2, 4, 6, 8, 10, 0, 2]);
     });
@@ -37,28 +40,287 @@
 
 #[test]
 fn scan() {
-    use futures::stream::{self, StreamExt};
+    block_on(async {
+        let values = stream::iter(vec![1u8, 2, 3, 4, 6, 8, 2])
+            .scan(1, |state, e| {
+                *state += 1;
+                futures::future::ready(if e < *state { Some(e) } else { None })
+            })
+            .collect::<Vec<_>>()
+            .await;
 
-    futures::executor::block_on(async {
-        assert_eq!(
-            stream::iter(vec![1u8, 2, 3, 4, 6, 8, 2])
-                .scan(1, |state, e| {
-                    *state += 1;
-                    futures::future::ready(if e < *state { Some(e) } else { None })
-                })
-                .collect::<Vec<_>>()
-                .await,
-            vec![1u8, 2, 3, 4]
-        );
+        assert_eq!(values, vec![1u8, 2, 3, 4]);
     });
 }
 
 #[test]
-fn take_until() {
-    use futures::future::{self, Future};
-    use futures::stream::{self, StreamExt};
-    use futures::task::Poll;
+fn flatten_unordered() {
+    use futures::executor::block_on;
+    use futures::stream::*;
+    use futures::task::*;
+    use std::convert::identity;
+    use std::pin::Pin;
+    use std::thread;
+    use std::time::Duration;
 
+    struct DataStream {
+        data: Vec<u8>,
+        polled: bool,
+        wake_immediately: bool,
+    }
+
+    impl Stream for DataStream {
+        type Item = u8;
+
+        fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
+            if !self.polled {
+                if !self.wake_immediately {
+                    let waker = ctx.waker().clone();
+                    let sleep_time =
+                        Duration::from_millis(*self.data.first().unwrap_or(&0) as u64 / 10);
+                    thread::spawn(move || {
+                        thread::sleep(sleep_time);
+                        waker.wake_by_ref();
+                    });
+                } else {
+                    ctx.waker().wake_by_ref();
+                }
+                self.polled = true;
+                Poll::Pending
+            } else {
+                self.polled = false;
+                Poll::Ready(self.data.pop())
+            }
+        }
+    }
+
+    struct Interchanger {
+        polled: bool,
+        base: u8,
+        wake_immediately: bool,
+    }
+
+    impl Stream for Interchanger {
+        type Item = DataStream;
+
+        fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
+            if !self.polled {
+                self.polled = true;
+                if !self.wake_immediately {
+                    let waker = ctx.waker().clone();
+                    let sleep_time = Duration::from_millis(self.base as u64);
+                    thread::spawn(move || {
+                        thread::sleep(sleep_time);
+                        waker.wake_by_ref();
+                    });
+                } else {
+                    ctx.waker().wake_by_ref();
+                }
+                Poll::Pending
+            } else {
+                let data: Vec<_> = (0..6).rev().map(|v| v + self.base * 6).collect();
+                self.base += 1;
+                self.polled = false;
+                Poll::Ready(Some(DataStream {
+                    polled: false,
+                    data,
+                    wake_immediately: self.wake_immediately && self.base % 2 == 0,
+                }))
+            }
+        }
+    }
+
+    // basic behaviour
+    {
+        block_on(async {
+            let st = stream::iter(vec![
+                stream::iter(0..=4u8),
+                stream::iter(6..=10),
+                stream::iter(10..=12),
+            ]);
+
+            let fl_unordered = st.flatten_unordered(3).collect::<Vec<_>>().await;
+
+            assert_eq!(fl_unordered, vec![0, 6, 10, 1, 7, 11, 2, 8, 12, 3, 9, 4, 10]);
+        });
+
+        block_on(async {
+            let st = stream::iter(vec![
+                stream::iter(0..=4u8),
+                stream::iter(6..=10),
+                stream::iter(0..=2),
+            ]);
+
+            let mut fm_unordered = st
+                .flat_map_unordered(1, |s| s.filter(|v| futures::future::ready(v % 2 == 0)))
+                .collect::<Vec<_>>()
+                .await;
+
+            fm_unordered.sort_unstable();
+
+            assert_eq!(fm_unordered, vec![0, 0, 2, 2, 4, 6, 8, 10]);
+        });
+    }
+
+    // wake up immediately
+    {
+        block_on(async {
+            let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: true }
+                .take(10)
+                .map(|s| s.map(identity))
+                .flatten_unordered(10)
+                .collect::<Vec<_>>()
+                .await;
+
+            fl_unordered.sort_unstable();
+
+            assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>());
+        });
+
+        block_on(async {
+            let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: true }
+                .take(10)
+                .flat_map_unordered(10, |s| s.map(identity))
+                .collect::<Vec<_>>()
+                .await;
+
+            fm_unordered.sort_unstable();
+
+            assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
+        });
+    }
+
+    // wake up after delay
+    {
+        block_on(async {
+            let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: false }
+                .take(10)
+                .map(|s| s.map(identity))
+                .flatten_unordered(10)
+                .collect::<Vec<_>>()
+                .await;
+
+            fl_unordered.sort_unstable();
+
+            assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>());
+        });
+
+        block_on(async {
+            let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: false }
+                .take(10)
+                .flat_map_unordered(10, |s| s.map(identity))
+                .collect::<Vec<_>>()
+                .await;
+
+            fm_unordered.sort_unstable();
+
+            assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
+        });
+
+        block_on(async {
+            let (mut fm_unordered, mut fl_unordered) = futures_util::join!(
+                Interchanger { polled: false, base: 0, wake_immediately: false }
+                    .take(10)
+                    .flat_map_unordered(10, |s| s.map(identity))
+                    .collect::<Vec<_>>(),
+                Interchanger { polled: false, base: 0, wake_immediately: false }
+                    .take(10)
+                    .map(|s| s.map(identity))
+                    .flatten_unordered(10)
+                    .collect::<Vec<_>>()
+            );
+
+            fm_unordered.sort_unstable();
+            fl_unordered.sort_unstable();
+
+            assert_eq!(fm_unordered, fl_unordered);
+            assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
+        });
+    }
+
+    // waker panics
+    {
+        let stream = Arc::new(Mutex::new(
+            Interchanger { polled: false, base: 0, wake_immediately: true }
+                .take(10)
+                .flat_map_unordered(10, |s| s.map(identity)),
+        ));
+
+        struct PanicWaker;
+
+        impl ArcWake for PanicWaker {
+            fn wake_by_ref(_arc_self: &Arc<Self>) {
+                panic!("WAKE UP");
+            }
+        }
+
+        std::thread::spawn({
+            let stream = stream.clone();
+            move || {
+                let mut st = poll_fn(|cx| {
+                    let mut lock = ready!(stream.lock().poll_unpin(cx));
+
+                    let panic_waker = waker(Arc::new(PanicWaker));
+                    let mut panic_cx = Context::from_waker(&panic_waker);
+                    let _ = ready!(lock.poll_next_unpin(&mut panic_cx));
+
+                    Poll::Ready(Some(()))
+                });
+
+                block_on(st.next())
+            }
+        })
+        .join()
+        .unwrap_err();
+
+        block_on(async move {
+            let mut values: Vec<_> = stream.lock().await.by_ref().collect().await;
+            values.sort_unstable();
+
+            assert_eq!(values, (0..60).collect::<Vec<u8>>());
+        });
+    }
+
+    // stream panics
+    {
+        let st = stream::iter(iter::once(
+            once(Box::pin(async { panic!("Polled") })).left_stream::<DataStream>(),
+        ))
+        .chain(
+            Interchanger { polled: false, base: 0, wake_immediately: true }
+                .map(|stream| stream.right_stream())
+                .take(10),
+        );
+
+        let stream = Arc::new(Mutex::new(st.flatten_unordered(10)));
+
+        std::thread::spawn({
+            let stream = stream.clone();
+            move || {
+                let mut st = poll_fn(|cx| {
+                    let mut lock = ready!(stream.lock().poll_unpin(cx));
+                    let data = ready!(lock.poll_next_unpin(cx));
+
+                    Poll::Ready(data)
+                });
+
+                block_on(st.next())
+            }
+        })
+        .join()
+        .unwrap_err();
+
+        block_on(async move {
+            let mut values: Vec<_> = stream.lock().await.by_ref().collect().await;
+            values.sort_unstable();
+
+            assert_eq!(values, (0..60).collect::<Vec<u8>>());
+        });
+    }
+}
+
+#[test]
+fn take_until() {
     fn make_stop_fut(stop_on: u32) -> impl Future<Output = ()> {
         let mut i = 0;
         future::poll_fn(move |_cx| {
@@ -71,7 +333,7 @@
         })
     }
 
-    futures::executor::block_on(async {
+    block_on(async {
         // Verify stopping works:
         let stream = stream::iter(1u32..=10);
         let stop_fut = make_stop_fut(5);
@@ -123,10 +385,15 @@
 
 #[test]
 #[should_panic]
-fn ready_chunks_panic_on_cap_zero() {
-    use futures::channel::mpsc;
-    use futures::stream::StreamExt;
+fn chunks_panic_on_cap_zero() {
+    let (_, rx1) = mpsc::channel::<()>(1);
 
+    let _ = rx1.chunks(0);
+}
+
+#[test]
+#[should_panic]
+fn ready_chunks_panic_on_cap_zero() {
     let (_, rx1) = mpsc::channel::<()>(1);
 
     let _ = rx1.ready_chunks(0);
@@ -134,12 +401,6 @@
 
 #[test]
 fn ready_chunks() {
-    use futures::channel::mpsc;
-    use futures::stream::StreamExt;
-    use futures::sink::SinkExt;
-    use futures::FutureExt;
-    use futures_test::task::noop_context;
-
     let (mut tx, rx1) = mpsc::channel::<i32>(16);
 
     let mut s = rx1.ready_chunks(2);
@@ -147,14 +408,14 @@
     let mut cx = noop_context();
     assert!(s.next().poll_unpin(&mut cx).is_pending());
 
-    futures::executor::block_on(async {
+    block_on(async {
         tx.send(1).await.unwrap();
 
         assert_eq!(s.next().await.unwrap(), vec![1]);
         tx.send(2).await.unwrap();
         tx.send(3).await.unwrap();
         tx.send(4).await.unwrap();
-        assert_eq!(s.next().await.unwrap(), vec![2,3]);
+        assert_eq!(s.next().await.unwrap(), vec![2, 3]);
         assert_eq!(s.next().await.unwrap(), vec![4]);
     });
 }
diff --git a/tests/stream_abortable.rs b/tests/stream_abortable.rs
new file mode 100644
index 0000000..2339dd0
--- /dev/null
+++ b/tests/stream_abortable.rs
@@ -0,0 +1,46 @@
+use futures::channel::mpsc;
+use futures::executor::block_on;
+use futures::stream::{abortable, Stream, StreamExt};
+use futures::task::{Context, Poll};
+use futures::SinkExt;
+use futures_test::task::new_count_waker;
+use std::pin::Pin;
+
+#[test]
+fn abortable_works() {
+    let (_tx, a_rx) = mpsc::channel::<()>(1);
+    let (mut abortable_rx, abort_handle) = abortable(a_rx);
+
+    abort_handle.abort();
+    assert!(abortable_rx.is_aborted());
+    assert_eq!(None, block_on(abortable_rx.next()));
+}
+
+#[test]
+fn abortable_awakens() {
+    let (_tx, a_rx) = mpsc::channel::<()>(1);
+    let (mut abortable_rx, abort_handle) = abortable(a_rx);
+
+    let (waker, counter) = new_count_waker();
+    let mut cx = Context::from_waker(&waker);
+
+    assert_eq!(counter, 0);
+    assert_eq!(Poll::Pending, Pin::new(&mut abortable_rx).poll_next(&mut cx));
+    assert_eq!(counter, 0);
+
+    abort_handle.abort();
+    assert_eq!(counter, 1);
+    assert!(abortable_rx.is_aborted());
+    assert_eq!(Poll::Ready(None), Pin::new(&mut abortable_rx).poll_next(&mut cx));
+}
+
+#[test]
+fn abortable_resolves() {
+    let (mut tx, a_rx) = mpsc::channel::<()>(1);
+    let (mut abortable_rx, _abort_handle) = abortable(a_rx);
+
+    block_on(tx.send(())).unwrap();
+
+    assert!(!abortable_rx.is_aborted());
+    assert_eq!(Some(()), block_on(abortable_rx.next()));
+}
diff --git a/tests/buffer_unordered.rs b/tests/stream_buffer_unordered.rs
similarity index 89%
rename from tests/buffer_unordered.rs
rename to tests/stream_buffer_unordered.rs
index 5c8b8bf..9a2ee17 100644
--- a/tests/buffer_unordered.rs
+++ b/tests/stream_buffer_unordered.rs
@@ -1,13 +1,13 @@
+use futures::channel::{mpsc, oneshot};
+use futures::executor::{block_on, block_on_stream};
+use futures::sink::SinkExt;
+use futures::stream::StreamExt;
+use std::sync::mpsc as std_mpsc;
+use std::thread;
+
 #[test]
 #[ignore] // FIXME: https://github.com/rust-lang/futures-rs/issues/1790
 fn works() {
-    use futures::channel::{oneshot, mpsc};
-    use futures::executor::{block_on, block_on_stream};
-    use futures::sink::SinkExt;
-    use futures::stream::StreamExt;
-    use std::sync::mpsc as std_mpsc;
-    use std::thread;
-
     const N: usize = 4;
 
     let (mut tx, rx) = mpsc::channel(1);
diff --git a/tests/stream_catch_unwind.rs b/tests/stream_catch_unwind.rs
index 272558c..8b23a0a 100644
--- a/tests/stream_catch_unwind.rs
+++ b/tests/stream_catch_unwind.rs
@@ -1,8 +1,8 @@
+use futures::executor::block_on_stream;
+use futures::stream::{self, StreamExt};
+
 #[test]
 fn panic_in_the_middle_of_the_stream() {
-    use futures::executor::block_on_stream;
-    use futures::stream::{self, StreamExt};
-
     let stream = stream::iter(vec![Some(10), None, Some(11)]);
 
     // panic on second element
@@ -16,9 +16,6 @@
 
 #[test]
 fn no_panic() {
-    use futures::executor::block_on_stream;
-    use futures::stream::{self, StreamExt};
-
     let stream = stream::iter(vec![10, 11, 12]);
 
     let mut iter = block_on_stream(stream.catch_unwind());
diff --git a/tests/stream_futures_ordered.rs b/tests/stream_futures_ordered.rs
new file mode 100644
index 0000000..84e0bcc
--- /dev/null
+++ b/tests/stream_futures_ordered.rs
@@ -0,0 +1,86 @@
+use futures::channel::oneshot;
+use futures::executor::{block_on, block_on_stream};
+use futures::future::{self, join, Future, FutureExt, TryFutureExt};
+use futures::stream::{FuturesOrdered, StreamExt};
+use futures_test::task::noop_context;
+use std::any::Any;
+
+#[test]
+fn works_1() {
+    let (a_tx, a_rx) = oneshot::channel::<i32>();
+    let (b_tx, b_rx) = oneshot::channel::<i32>();
+    let (c_tx, c_rx) = oneshot::channel::<i32>();
+
+    let mut stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesOrdered<_>>();
+
+    b_tx.send(99).unwrap();
+    assert!(stream.poll_next_unpin(&mut noop_context()).is_pending());
+
+    a_tx.send(33).unwrap();
+    c_tx.send(33).unwrap();
+
+    let mut iter = block_on_stream(stream);
+    assert_eq!(Some(Ok(33)), iter.next());
+    assert_eq!(Some(Ok(99)), iter.next());
+    assert_eq!(Some(Ok(33)), iter.next());
+    assert_eq!(None, iter.next());
+}
+
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
+#[test]
+fn works_2() {
+    let (a_tx, a_rx) = oneshot::channel::<i32>();
+    let (b_tx, b_rx) = oneshot::channel::<i32>();
+    let (c_tx, c_rx) = oneshot::channel::<i32>();
+
+    let mut stream = vec![a_rx.boxed(), join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed()]
+        .into_iter()
+        .collect::<FuturesOrdered<_>>();
+
+    let mut cx = noop_context();
+    a_tx.send(33).unwrap();
+    b_tx.send(33).unwrap();
+    assert!(stream.poll_next_unpin(&mut cx).is_ready());
+    assert!(stream.poll_next_unpin(&mut cx).is_pending());
+    c_tx.send(33).unwrap();
+    assert!(stream.poll_next_unpin(&mut cx).is_ready());
+}
+
+#[test]
+fn from_iterator() {
+    let stream = vec![future::ready::<i32>(1), future::ready::<i32>(2), future::ready::<i32>(3)]
+        .into_iter()
+        .collect::<FuturesOrdered<_>>();
+    assert_eq!(stream.len(), 3);
+    assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1, 2, 3]);
+}
+
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
+#[test]
+fn queue_never_unblocked() {
+    let (_a_tx, a_rx) = oneshot::channel::<Box<dyn Any + Send>>();
+    let (b_tx, b_rx) = oneshot::channel::<Box<dyn Any + Send>>();
+    let (c_tx, c_rx) = oneshot::channel::<Box<dyn Any + Send>>();
+
+    let mut stream = vec![
+        Box::new(a_rx) as Box<dyn Future<Output = _> + Unpin>,
+        Box::new(
+            future::try_select(b_rx, c_rx)
+                .map_err(|e| e.factor_first().0)
+                .and_then(|e| future::ok(Box::new(e) as Box<dyn Any + Send>)),
+        ) as _,
+    ]
+    .into_iter()
+    .collect::<FuturesOrdered<_>>();
+
+    let cx = &mut noop_context();
+    for _ in 0..10 {
+        assert!(stream.poll_next_unpin(cx).is_pending());
+    }
+
+    b_tx.send(Box::new(())).unwrap();
+    assert!(stream.poll_next_unpin(cx).is_pending());
+    c_tx.send(Box::new(())).unwrap();
+    assert!(stream.poll_next_unpin(cx).is_pending());
+    assert!(stream.poll_next_unpin(cx).is_pending());
+}
diff --git a/tests/futures_unordered.rs b/tests/stream_futures_unordered.rs
similarity index 72%
rename from tests/futures_unordered.rs
rename to tests/stream_futures_unordered.rs
index ac0817e..f62f733 100644
--- a/tests/futures_unordered.rs
+++ b/tests/stream_futures_unordered.rs
@@ -1,17 +1,17 @@
-use futures::future::Future;
-use futures::stream::{FuturesUnordered, StreamExt};
+use futures::channel::oneshot;
+use futures::executor::{block_on, block_on_stream};
+use futures::future::{self, join, Future, FutureExt};
+use futures::stream::{FusedStream, FuturesUnordered, StreamExt};
 use futures::task::{Context, Poll};
+use futures_test::future::FutureTestExt;
 use futures_test::task::noop_context;
+use futures_test::{assert_stream_done, assert_stream_next, assert_stream_pending};
 use std::iter::FromIterator;
 use std::pin::Pin;
+use std::sync::atomic::{AtomicBool, Ordering};
 
 #[test]
 fn is_terminated() {
-    use futures::future;
-    use futures::stream::{FusedStream, FuturesUnordered, StreamExt};
-    use futures::task::Poll;
-    use futures_test::task::noop_context;
-
     let mut cx = noop_context();
     let mut tasks = FuturesUnordered::new();
 
@@ -39,19 +39,12 @@
 
 #[test]
 fn works_1() {
-    use futures::channel::oneshot;
-    use futures::executor::block_on_stream;
-    use futures::stream::FuturesUnordered;
-
     let (a_tx, a_rx) = oneshot::channel::<i32>();
     let (b_tx, b_rx) = oneshot::channel::<i32>();
     let (c_tx, c_rx) = oneshot::channel::<i32>();
 
-    let mut iter = block_on_stream(
-        vec![a_rx, b_rx, c_rx]
-            .into_iter()
-            .collect::<FuturesUnordered<_>>(),
-    );
+    let mut iter =
+        block_on_stream(vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>());
 
     b_tx.send(99).unwrap();
     assert_eq!(Some(Ok(99)), iter.next());
@@ -63,24 +56,16 @@
     assert_eq!(None, iter.next());
 }
 
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
 #[test]
 fn works_2() {
-    use futures::channel::oneshot;
-    use futures::future::{join, FutureExt};
-    use futures::stream::{FuturesUnordered, StreamExt};
-    use futures::task::Poll;
-    use futures_test::task::noop_context;
-
     let (a_tx, a_rx) = oneshot::channel::<i32>();
     let (b_tx, b_rx) = oneshot::channel::<i32>();
     let (c_tx, c_rx) = oneshot::channel::<i32>();
 
-    let mut stream = vec![
-        a_rx.boxed(),
-        join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed(),
-    ]
-    .into_iter()
-    .collect::<FuturesUnordered<_>>();
+    let mut stream = vec![a_rx.boxed(), join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed()]
+        .into_iter()
+        .collect::<FuturesUnordered<_>>();
 
     a_tx.send(9).unwrap();
     b_tx.send(10).unwrap();
@@ -94,29 +79,16 @@
 
 #[test]
 fn from_iterator() {
-    use futures::executor::block_on;
-    use futures::future;
-    use futures::stream::{FuturesUnordered, StreamExt};
-
-    let stream = vec![
-        future::ready::<i32>(1),
-        future::ready::<i32>(2),
-        future::ready::<i32>(3),
-    ]
-    .into_iter()
-    .collect::<FuturesUnordered<_>>();
+    let stream = vec![future::ready::<i32>(1), future::ready::<i32>(2), future::ready::<i32>(3)]
+        .into_iter()
+        .collect::<FuturesUnordered<_>>();
     assert_eq!(stream.len(), 3);
     assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1, 2, 3]);
 }
 
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
 #[test]
 fn finished_future() {
-    use std::marker::Unpin;
-    use futures::channel::oneshot;
-    use futures::future::{self, Future, FutureExt};
-    use futures::stream::{FuturesUnordered, StreamExt};
-    use futures_test::task::noop_context;
-
     let (_a_tx, a_rx) = oneshot::channel::<i32>();
     let (b_tx, b_rx) = oneshot::channel::<i32>();
     let (c_tx, c_rx) = oneshot::channel::<i32>();
@@ -142,17 +114,11 @@
 
 #[test]
 fn iter_mut_cancel() {
-    use futures::channel::oneshot;
-    use futures::executor::block_on_stream;
-    use futures::stream::FuturesUnordered;
-
     let (a_tx, a_rx) = oneshot::channel::<i32>();
     let (b_tx, b_rx) = oneshot::channel::<i32>();
     let (c_tx, c_rx) = oneshot::channel::<i32>();
 
-    let mut stream = vec![a_rx, b_rx, c_rx]
-        .into_iter()
-        .collect::<FuturesUnordered<_>>();
+    let mut stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>();
 
     for rx in stream.iter_mut() {
         rx.close();
@@ -172,16 +138,10 @@
 
 #[test]
 fn iter_mut_len() {
-    use futures::future;
-    use futures::stream::FuturesUnordered;
-
-    let mut stream = vec![
-        future::pending::<()>(),
-        future::pending::<()>(),
-        future::pending::<()>(),
-    ]
-    .into_iter()
-    .collect::<FuturesUnordered<_>>();
+    let mut stream =
+        vec![future::pending::<()>(), future::pending::<()>(), future::pending::<()>()]
+            .into_iter()
+            .collect::<FuturesUnordered<_>>();
 
     let mut iter_mut = stream.iter_mut();
     assert_eq!(iter_mut.len(), 3);
@@ -196,15 +156,6 @@
 
 #[test]
 fn iter_cancel() {
-    use std::marker::Unpin;
-    use std::pin::Pin;
-    use std::sync::atomic::{AtomicBool, Ordering};
-
-    use futures::executor::block_on_stream;
-    use futures::future::{self, Future, FutureExt};
-    use futures::stream::FuturesUnordered;
-    use futures::task::{Context, Poll};
-
     struct AtomicCancel<F> {
         future: F,
         cancel: AtomicBool,
@@ -250,16 +201,9 @@
 
 #[test]
 fn iter_len() {
-    use futures::future;
-    use futures::stream::FuturesUnordered;
-
-    let stream = vec![
-        future::pending::<()>(),
-        future::pending::<()>(),
-        future::pending::<()>(),
-    ]
-    .into_iter()
-    .collect::<FuturesUnordered<_>>();
+    let stream = vec![future::pending::<()>(), future::pending::<()>(), future::pending::<()>()]
+        .into_iter()
+        .collect::<FuturesUnordered<_>>();
 
     let mut iter = stream.iter();
     assert_eq!(iter.len(), 3);
@@ -273,12 +217,52 @@
 }
 
 #[test]
-fn futures_not_moved_after_poll() {
-    use futures::future;
-    use futures::stream::FuturesUnordered;
-    use futures_test::future::FutureTestExt;
-    use futures_test::{assert_stream_done, assert_stream_next, assert_stream_pending};
+fn into_iter_cancel() {
+    let (a_tx, a_rx) = oneshot::channel::<i32>();
+    let (b_tx, b_rx) = oneshot::channel::<i32>();
+    let (c_tx, c_rx) = oneshot::channel::<i32>();
 
+    let stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>();
+
+    let stream = stream
+        .into_iter()
+        .map(|mut rx| {
+            rx.close();
+            rx
+        })
+        .collect::<FuturesUnordered<_>>();
+
+    let mut iter = block_on_stream(stream);
+
+    assert!(a_tx.is_canceled());
+    assert!(b_tx.is_canceled());
+    assert!(c_tx.is_canceled());
+
+    assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
+    assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
+    assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
+    assert_eq!(iter.next(), None);
+}
+
+#[test]
+fn into_iter_len() {
+    let stream = vec![future::pending::<()>(), future::pending::<()>(), future::pending::<()>()]
+        .into_iter()
+        .collect::<FuturesUnordered<_>>();
+
+    let mut into_iter = stream.into_iter();
+    assert_eq!(into_iter.len(), 3);
+    assert!(into_iter.next().is_some());
+    assert_eq!(into_iter.len(), 2);
+    assert!(into_iter.next().is_some());
+    assert_eq!(into_iter.len(), 1);
+    assert!(into_iter.next().is_some());
+    assert_eq!(into_iter.len(), 0);
+    assert!(into_iter.next().is_none());
+}
+
+#[test]
+fn futures_not_moved_after_poll() {
     // Future that will be ready after being polled twice,
     // asserting that it does not move.
     let fut = future::ready(()).pending_once().assert_unmoved();
@@ -292,11 +276,6 @@
 
 #[test]
 fn len_valid_during_out_of_order_completion() {
-    use futures::channel::oneshot;
-    use futures::stream::{FuturesUnordered, StreamExt};
-    use futures::task::Poll;
-    use futures_test::task::noop_context;
-
     // Complete futures out-of-order and add new futures afterwards to ensure
     // length values remain correct.
     let (a_tx, a_rx) = oneshot::channel::<i32>();
@@ -368,3 +347,25 @@
     let mut tasks = FuturesUnordered::<F>::new();
     assert_eq!(Poll::Ready(None), tasks.poll_next_unpin(cx));
 }
+
+#[test]
+fn clear() {
+    let mut tasks = FuturesUnordered::from_iter(vec![future::ready(1), future::ready(2)]);
+
+    assert_eq!(block_on(tasks.next()), Some(1));
+    assert!(!tasks.is_empty());
+
+    tasks.clear();
+    assert!(tasks.is_empty());
+
+    tasks.push(future::ready(3));
+    assert!(!tasks.is_empty());
+
+    tasks.clear();
+    assert!(tasks.is_empty());
+
+    assert_eq!(block_on(tasks.next()), None);
+    assert!(tasks.is_terminated());
+    tasks.clear();
+    assert!(!tasks.is_terminated());
+}
diff --git a/tests/stream_into_async_read.rs b/tests/stream_into_async_read.rs
index 222c985..60188d3 100644
--- a/tests/stream_into_async_read.rs
+++ b/tests/stream_into_async_read.rs
@@ -1,31 +1,51 @@
-#[test]
-fn test_into_async_read() {
-    use core::pin::Pin;
-    use futures::io::AsyncRead;
-    use futures::stream::{self, TryStreamExt};
-    use futures::task::Poll;
-    use futures_test::{task::noop_context, stream::StreamTestExt};
+use core::pin::Pin;
+use futures::io::{AsyncBufRead, AsyncRead};
+use futures::stream::{self, TryStreamExt};
+use futures::task::Poll;
+use futures_test::{stream::StreamTestExt, task::noop_context};
 
-    macro_rules! assert_read {
-        ($reader:expr, $buf:expr, $item:expr) => {
-            let mut cx = noop_context();
-            loop {
-                match Pin::new(&mut $reader).poll_read(&mut cx, $buf) {
-                    Poll::Ready(Ok(x)) => {
-                        assert_eq!(x, $item);
-                        break;
-                    }
-                    Poll::Ready(Err(err)) => {
-                        panic!("assertion failed: expected value but got {}", err);
-                    }
-                    Poll::Pending => {
-                        continue;
-                    }
+macro_rules! assert_read {
+    ($reader:expr, $buf:expr, $item:expr) => {
+        let mut cx = noop_context();
+        loop {
+            match Pin::new(&mut $reader).poll_read(&mut cx, $buf) {
+                Poll::Ready(Ok(x)) => {
+                    assert_eq!(x, $item);
+                    break;
+                }
+                Poll::Ready(Err(err)) => {
+                    panic!("assertion failed: expected value but got {}", err);
+                }
+                Poll::Pending => {
+                    continue;
                 }
             }
-        };
-    }
+        }
+    };
+}
 
+macro_rules! assert_fill_buf {
+    ($reader:expr, $buf:expr) => {
+        let mut cx = noop_context();
+        loop {
+            match Pin::new(&mut $reader).poll_fill_buf(&mut cx) {
+                Poll::Ready(Ok(x)) => {
+                    assert_eq!(x, $buf);
+                    break;
+                }
+                Poll::Ready(Err(err)) => {
+                    panic!("assertion failed: expected value but got {}", err);
+                }
+                Poll::Pending => {
+                    continue;
+                }
+            }
+        }
+    };
+}
+
+#[test]
+fn test_into_async_read() {
     let stream = stream::iter((1..=3).flat_map(|_| vec![Ok(vec![]), Ok(vec![1, 2, 3, 4, 5])]));
     let mut reader = stream.interleave_pending().into_async_read();
     let mut buf = vec![0; 3];
@@ -53,32 +73,6 @@
 
 #[test]
 fn test_into_async_bufread() {
-    use core::pin::Pin;
-    use futures::io::AsyncBufRead;
-    use futures::stream::{self, TryStreamExt};
-    use futures::task::Poll;
-    use futures_test::{task::noop_context, stream::StreamTestExt};
-
-    macro_rules! assert_fill_buf {
-        ($reader:expr, $buf:expr) => {
-            let mut cx = noop_context();
-            loop {
-                match Pin::new(&mut $reader).poll_fill_buf(&mut cx) {
-                    Poll::Ready(Ok(x)) => {
-                        assert_eq!(x, $buf);
-                        break;
-                    }
-                    Poll::Ready(Err(err)) => {
-                        panic!("assertion failed: expected value but got {}", err);
-                    }
-                    Poll::Pending => {
-                        continue;
-                    }
-                }
-            }
-        };
-    }
-
     let stream = stream::iter((1..=2).flat_map(|_| vec![Ok(vec![]), Ok(vec![1, 2, 3, 4, 5])]));
     let mut reader = stream.interleave_pending().into_async_read();
 
diff --git a/tests/stream_peekable.rs b/tests/stream_peekable.rs
index 66a7385..153fcc2 100644
--- a/tests/stream_peekable.rs
+++ b/tests/stream_peekable.rs
@@ -1,13 +1,58 @@
+use futures::executor::block_on;
+use futures::pin_mut;
+use futures::stream::{self, Peekable, StreamExt};
+
 #[test]
 fn peekable() {
-    use futures::executor::block_on;
-    use futures::pin_mut;
-    use futures::stream::{self, Peekable, StreamExt};
-
     block_on(async {
         let peekable: Peekable<_> = stream::iter(vec![1u8, 2, 3]).peekable();
         pin_mut!(peekable);
         assert_eq!(peekable.as_mut().peek().await, Some(&1u8));
         assert_eq!(peekable.collect::<Vec<u8>>().await, vec![1, 2, 3]);
+
+        let s = stream::once(async { 1 }).peekable();
+        pin_mut!(s);
+        assert_eq!(s.as_mut().peek().await, Some(&1u8));
+        assert_eq!(s.collect::<Vec<u8>>().await, vec![1]);
+    });
+}
+
+#[test]
+fn peekable_mut() {
+    block_on(async {
+        let s = stream::iter(vec![1u8, 2, 3]).peekable();
+        pin_mut!(s);
+        if let Some(p) = s.as_mut().peek_mut().await {
+            if *p == 1 {
+                *p = 5;
+            }
+        }
+        assert_eq!(s.collect::<Vec<_>>().await, vec![5, 2, 3]);
+    });
+}
+
+#[test]
+fn peekable_next_if_eq() {
+    block_on(async {
+        // first, try on references
+        let s = stream::iter(vec!["Heart", "of", "Gold"]).peekable();
+        pin_mut!(s);
+        // try before `peek()`
+        assert_eq!(s.as_mut().next_if_eq(&"trillian").await, None);
+        assert_eq!(s.as_mut().next_if_eq(&"Heart").await, Some("Heart"));
+        // try after peek()
+        assert_eq!(s.as_mut().peek().await, Some(&"of"));
+        assert_eq!(s.as_mut().next_if_eq(&"of").await, Some("of"));
+        assert_eq!(s.as_mut().next_if_eq(&"zaphod").await, None);
+        // make sure `next()` still behaves
+        assert_eq!(s.next().await, Some("Gold"));
+
+        // make sure comparison works for owned values
+        let s = stream::iter(vec![String::from("Ludicrous"), "speed".into()]).peekable();
+        pin_mut!(s);
+        // make sure basic functionality works
+        assert_eq!(s.as_mut().next_if_eq("Ludicrous").await, Some("Ludicrous".into()));
+        assert_eq!(s.as_mut().next_if_eq("speed").await, Some("speed".into()));
+        assert_eq!(s.as_mut().next_if_eq("").await, None);
     });
 }
diff --git a/tests/stream_select_all.rs b/tests/stream_select_all.rs
index 6178412..4ae0735 100644
--- a/tests/stream_select_all.rs
+++ b/tests/stream_select_all.rs
@@ -1,10 +1,12 @@
+use futures::channel::mpsc;
+use futures::executor::{block_on, block_on_stream};
+use futures::future::{self, FutureExt};
+use futures::stream::{self, select_all, FusedStream, SelectAll, StreamExt};
+use futures::task::Poll;
+use futures_test::task::noop_context;
+
 #[test]
 fn is_terminated() {
-    use futures::future::{self, FutureExt};
-    use futures::stream::{FusedStream, SelectAll, StreamExt};
-    use futures::task::Poll;
-    use futures_test::task::noop_context;
-
     let mut cx = noop_context();
     let mut tasks = SelectAll::new();
 
@@ -30,9 +32,6 @@
 
 #[test]
 fn issue_1626() {
-    use futures::executor::block_on_stream;
-    use futures::stream;
-
     let a = stream::iter(0..=2);
     let b = stream::iter(10..=14);
 
@@ -51,10 +50,6 @@
 
 #[test]
 fn works_1() {
-    use futures::channel::mpsc;
-    use futures::executor::block_on_stream;
-    use futures::stream::select_all;
-
     let (a_tx, a_rx) = mpsc::unbounded::<u32>();
     let (b_tx, b_rx) = mpsc::unbounded::<u32>();
     let (c_tx, c_rx) = mpsc::unbounded::<u32>();
@@ -81,3 +76,122 @@
     drop((a_tx, b_tx, c_tx));
     assert_eq!(None, stream.next());
 }
+
+#[test]
+fn clear() {
+    let mut tasks =
+        select_all(vec![stream::iter(vec![1].into_iter()), stream::iter(vec![2].into_iter())]);
+
+    assert_eq!(block_on(tasks.next()), Some(1));
+    assert!(!tasks.is_empty());
+
+    tasks.clear();
+    assert!(tasks.is_empty());
+
+    tasks.push(stream::iter(vec![3].into_iter()));
+    assert!(!tasks.is_empty());
+
+    tasks.clear();
+    assert!(tasks.is_empty());
+
+    assert_eq!(block_on(tasks.next()), None);
+    assert!(tasks.is_terminated());
+    tasks.clear();
+    assert!(!tasks.is_terminated());
+}
+
+#[test]
+fn iter_mut() {
+    let mut stream =
+        vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()]
+            .into_iter()
+            .collect::<SelectAll<_>>();
+
+    let mut iter = stream.iter_mut();
+    assert_eq!(iter.len(), 3);
+    assert!(iter.next().is_some());
+    assert_eq!(iter.len(), 2);
+    assert!(iter.next().is_some());
+    assert_eq!(iter.len(), 1);
+    assert!(iter.next().is_some());
+    assert_eq!(iter.len(), 0);
+    assert!(iter.next().is_none());
+
+    let mut stream = vec![stream::iter(vec![]), stream::iter(vec![1]), stream::iter(vec![2])]
+        .into_iter()
+        .collect::<SelectAll<_>>();
+
+    assert_eq!(stream.len(), 3);
+    assert_eq!(block_on(stream.next()), Some(1));
+    assert_eq!(stream.len(), 2);
+    let mut iter = stream.iter_mut();
+    assert_eq!(iter.len(), 2);
+    assert!(iter.next().is_some());
+    assert_eq!(iter.len(), 1);
+    assert!(iter.next().is_some());
+    assert_eq!(iter.len(), 0);
+    assert!(iter.next().is_none());
+
+    assert_eq!(block_on(stream.next()), Some(2));
+    assert_eq!(stream.len(), 2);
+    assert_eq!(block_on(stream.next()), None);
+    let mut iter = stream.iter_mut();
+    assert_eq!(iter.len(), 0);
+    assert!(iter.next().is_none());
+}
+
+#[test]
+fn iter() {
+    let stream = vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()]
+        .into_iter()
+        .collect::<SelectAll<_>>();
+
+    let mut iter = stream.iter();
+    assert_eq!(iter.len(), 3);
+    assert!(iter.next().is_some());
+    assert_eq!(iter.len(), 2);
+    assert!(iter.next().is_some());
+    assert_eq!(iter.len(), 1);
+    assert!(iter.next().is_some());
+    assert_eq!(iter.len(), 0);
+    assert!(iter.next().is_none());
+
+    let mut stream = vec![stream::iter(vec![]), stream::iter(vec![1]), stream::iter(vec![2])]
+        .into_iter()
+        .collect::<SelectAll<_>>();
+
+    assert_eq!(stream.len(), 3);
+    assert_eq!(block_on(stream.next()), Some(1));
+    assert_eq!(stream.len(), 2);
+    let mut iter = stream.iter();
+    assert_eq!(iter.len(), 2);
+    assert!(iter.next().is_some());
+    assert_eq!(iter.len(), 1);
+    assert!(iter.next().is_some());
+    assert_eq!(iter.len(), 0);
+    assert!(iter.next().is_none());
+
+    assert_eq!(block_on(stream.next()), Some(2));
+    assert_eq!(stream.len(), 2);
+    assert_eq!(block_on(stream.next()), None);
+    let mut iter = stream.iter();
+    assert_eq!(iter.len(), 0);
+    assert!(iter.next().is_none());
+}
+
+#[test]
+fn into_iter() {
+    let stream = vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()]
+        .into_iter()
+        .collect::<SelectAll<_>>();
+
+    let mut iter = stream.into_iter();
+    assert_eq!(iter.len(), 3);
+    assert!(iter.next().is_some());
+    assert_eq!(iter.len(), 2);
+    assert!(iter.next().is_some());
+    assert_eq!(iter.len(), 1);
+    assert!(iter.next().is_some());
+    assert_eq!(iter.len(), 0);
+    assert!(iter.next().is_none());
+}
diff --git a/tests/stream_select_next_some.rs b/tests/stream_select_next_some.rs
index bec5262..8252ad7 100644
--- a/tests/stream_select_next_some.rs
+++ b/tests/stream_select_next_some.rs
@@ -1,11 +1,13 @@
+use futures::executor::block_on;
+use futures::future::{self, FusedFuture, FutureExt};
+use futures::select;
+use futures::stream::{FuturesUnordered, StreamExt};
+use futures::task::{Context, Poll};
+use futures_test::future::FutureTestExt;
+use futures_test::task::new_count_waker;
+
 #[test]
 fn is_terminated() {
-    use futures::future;
-    use futures::future::{FusedFuture, FutureExt};
-    use futures::stream::{FuturesUnordered, StreamExt};
-    use futures::task::{Context, Poll};
-    use futures_test::task::new_count_waker;
-
     let (waker, counter) = new_count_waker();
     let mut cx = Context::from_waker(&waker);
 
@@ -30,15 +32,11 @@
 
 #[test]
 fn select() {
-    use futures::{future, select};
-    use futures::stream::{FuturesUnordered, StreamExt};
-    use futures_test::future::FutureTestExt;
-
     // Checks that even though `async_tasks` will yield a `None` and return
     // `is_terminated() == true` during the first poll, it manages to toggle
     // back to having items after a future is pushed into it during the second
     // poll (after pending_once completes).
-    futures::executor::block_on(async {
+    block_on(async {
         let mut fut = future::ready(1).pending_once();
         let mut async_tasks = FuturesUnordered::new();
         let mut total = 0;
@@ -61,17 +59,13 @@
 // Check that `select!` macro does not fail when importing from `futures_util`.
 #[test]
 fn futures_util_select() {
-    use futures::future;
-    use futures::stream::{FuturesUnordered, StreamExt};
-    use futures_test::future::FutureTestExt;
-
     use futures_util::select;
 
     // Checks that even though `async_tasks` will yield a `None` and return
     // `is_terminated() == true` during the first poll, it manages to toggle
     // back to having items after a future is pushed into it during the second
     // poll (after pending_once completes).
-    futures::executor::block_on(async {
+    block_on(async {
         let mut fut = future::ready(1).pending_once();
         let mut async_tasks = FuturesUnordered::new();
         let mut total = 0;
diff --git a/tests/stream_split.rs b/tests/stream_split.rs
new file mode 100644
index 0000000..694c151
--- /dev/null
+++ b/tests/stream_split.rs
@@ -0,0 +1,57 @@
+use futures::executor::block_on;
+use futures::sink::{Sink, SinkExt};
+use futures::stream::{self, Stream, StreamExt};
+use futures::task::{Context, Poll};
+use pin_project::pin_project;
+use std::pin::Pin;
+
+#[test]
+fn test_split() {
+    #[pin_project]
+    struct Join<T, U> {
+        #[pin]
+        stream: T,
+        #[pin]
+        sink: U,
+    }
+
+    impl<T: Stream, U> Stream for Join<T, U> {
+        type Item = T::Item;
+
+        fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
+            self.project().stream.poll_next(cx)
+        }
+    }
+
+    impl<T, U: Sink<Item>, Item> Sink<Item> for Join<T, U> {
+        type Error = U::Error;
+
+        fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+            self.project().sink.poll_ready(cx)
+        }
+
+        fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
+            self.project().sink.start_send(item)
+        }
+
+        fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+            self.project().sink.poll_flush(cx)
+        }
+
+        fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+            self.project().sink.poll_close(cx)
+        }
+    }
+
+    let mut dest: Vec<i32> = Vec::new();
+    {
+        let join = Join { stream: stream::iter(vec![10, 20, 30]), sink: &mut dest };
+
+        let (sink, stream) = join.split();
+        let join = sink.reunite(stream).expect("test_split: reunite error");
+        let (mut sink, stream) = join.split();
+        let mut stream = stream.map(Ok);
+        block_on(sink.send_all(&mut stream)).unwrap();
+    }
+    assert_eq!(dest, vec![10, 20, 30]);
+}
diff --git a/tests/try_stream.rs b/tests/stream_try_stream.rs
similarity index 93%
rename from tests/try_stream.rs
rename to tests/stream_try_stream.rs
index 194e74d..d83fc54 100644
--- a/tests/try_stream.rs
+++ b/tests/stream_try_stream.rs
@@ -1,3 +1,5 @@
+#![cfg(not(miri))] // https://github.com/rust-lang/miri/issues/1038
+
 use futures::{
     stream::{self, StreamExt, TryStreamExt},
     task::Poll,
diff --git a/tests/unfold.rs b/tests/stream_unfold.rs
similarity index 89%
rename from tests/unfold.rs
rename to tests/stream_unfold.rs
index 95722cf..16b1081 100644
--- a/tests/unfold.rs
+++ b/tests/stream_unfold.rs
@@ -1,10 +1,7 @@
 use futures::future;
 use futures::stream;
-
 use futures_test::future::FutureTestExt;
-use futures_test::{
-    assert_stream_done, assert_stream_next, assert_stream_pending,
-};
+use futures_test::{assert_stream_done, assert_stream_next, assert_stream_pending};
 
 #[test]
 fn unfold1() {
diff --git a/tests/task_arc_wake.rs b/tests/task_arc_wake.rs
new file mode 100644
index 0000000..aedc15b
--- /dev/null
+++ b/tests/task_arc_wake.rs
@@ -0,0 +1,79 @@
+use futures::task::{self, ArcWake, Waker};
+use std::panic;
+use std::sync::{Arc, Mutex};
+
+struct CountingWaker {
+    nr_wake: Mutex<i32>,
+}
+
+impl CountingWaker {
+    fn new() -> Self {
+        Self { nr_wake: Mutex::new(0) }
+    }
+
+    fn wakes(&self) -> i32 {
+        *self.nr_wake.lock().unwrap()
+    }
+}
+
+impl ArcWake for CountingWaker {
+    fn wake_by_ref(arc_self: &Arc<Self>) {
+        let mut lock = arc_self.nr_wake.lock().unwrap();
+        *lock += 1;
+    }
+}
+
+#[test]
+fn create_from_arc() {
+    let some_w = Arc::new(CountingWaker::new());
+
+    let w1: Waker = task::waker(some_w.clone());
+    assert_eq!(2, Arc::strong_count(&some_w));
+    w1.wake_by_ref();
+    assert_eq!(1, some_w.wakes());
+
+    let w2 = w1.clone();
+    assert_eq!(3, Arc::strong_count(&some_w));
+
+    w2.wake_by_ref();
+    assert_eq!(2, some_w.wakes());
+
+    drop(w2);
+    assert_eq!(2, Arc::strong_count(&some_w));
+    drop(w1);
+    assert_eq!(1, Arc::strong_count(&some_w));
+}
+
+#[test]
+fn ref_wake_same() {
+    let some_w = Arc::new(CountingWaker::new());
+
+    let w1: Waker = task::waker(some_w.clone());
+    let w2 = task::waker_ref(&some_w);
+    let w3 = w2.clone();
+
+    assert!(w1.will_wake(&w2));
+    assert!(w2.will_wake(&w3));
+}
+
+#[test]
+fn proper_refcount_on_wake_panic() {
+    struct PanicWaker;
+
+    impl ArcWake for PanicWaker {
+        fn wake_by_ref(_arc_self: &Arc<Self>) {
+            panic!("WAKE UP");
+        }
+    }
+
+    let some_w = Arc::new(PanicWaker);
+
+    let w1: Waker = task::waker(some_w.clone());
+    assert_eq!(
+        "WAKE UP",
+        *panic::catch_unwind(|| w1.wake_by_ref()).unwrap_err().downcast::<&str>().unwrap()
+    );
+    assert_eq!(2, Arc::strong_count(&some_w)); // some_w + w1
+    drop(w1);
+    assert_eq!(1, Arc::strong_count(&some_w)); // some_w
+}
diff --git a/tests/atomic_waker.rs b/tests/task_atomic_waker.rs
similarity index 81%
rename from tests/atomic_waker.rs
rename to tests/task_atomic_waker.rs
index bf15d0f..2d1612a 100644
--- a/tests/atomic_waker.rs
+++ b/tests/task_atomic_waker.rs
@@ -1,14 +1,14 @@
+use futures::executor::block_on;
+use futures::future::poll_fn;
+use futures::task::{AtomicWaker, Poll};
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering;
+use std::sync::Arc;
+use std::thread;
+
+#[cfg_attr(miri, ignore)] // Miri is too slow
 #[test]
 fn basic() {
-    use std::sync::atomic::AtomicUsize;
-    use std::sync::atomic::Ordering;
-    use std::sync::Arc;
-    use std::thread;
-
-    use futures::executor::block_on;
-    use futures::future::poll_fn;
-    use futures::task::{AtomicWaker, Poll};
-
     let atomic_waker = Arc::new(AtomicWaker::new());
     let atomic_waker_copy = atomic_waker.clone();
 
diff --git a/tests/test_macro.rs b/tests/test_macro.rs
new file mode 100644
index 0000000..6adf51d
--- /dev/null
+++ b/tests/test_macro.rs
@@ -0,0 +1,20 @@
+#[futures_test::test]
+async fn it_works() {
+    let fut = async { true };
+    assert!(fut.await);
+
+    let fut = async { false };
+    assert!(!fut.await);
+}
+
+#[should_panic]
+#[futures_test::test]
+async fn it_is_being_run() {
+    let fut = async { false };
+    assert!(fut.await);
+}
+
+#[futures_test::test]
+async fn return_ty() -> Result<(), ()> {
+    Ok(())
+}
diff --git a/tests/try_join.rs b/tests/try_join.rs
index 6c6d084..0281ab8 100644
--- a/tests/try_join.rs
+++ b/tests/try_join.rs
@@ -1,9 +1,9 @@
 #![deny(unreachable_code)]
 
-use futures::{try_join, executor::block_on};
+use futures::{executor::block_on, try_join};
 
 // TODO: This abuses https://github.com/rust-lang/rust/issues/58733 in order to
-// test behaviour of the `try_join!` macro with the never type before it is
+// test behavior of the `try_join!` macro with the never type before it is
 // stabilized. Once `!` is again stabilized this can be removed and replaced
 // with direct use of `!` below where `Never` is used.
 trait MyTrait {
@@ -14,7 +14,6 @@
 }
 type Never = <fn() -> ! as MyTrait>::Output;
 
-
 #[test]
 fn try_join_never_error() {
     block_on(async {
diff --git a/tests/try_join_all.rs b/tests/try_join_all.rs
deleted file mode 100644
index 8e579a2..0000000
--- a/tests/try_join_all.rs
+++ /dev/null
@@ -1,58 +0,0 @@
-mod util {
-    use std::future::Future;
-    use futures::executor::block_on;
-    use std::fmt::Debug;
-
-    pub fn assert_done<T, F>(actual_fut: F, expected: T)
-    where
-        T: PartialEq + Debug,
-        F: FnOnce() -> Box<dyn Future<Output = T> + Unpin>,
-    {
-        let output = block_on(actual_fut());
-        assert_eq!(output, expected);
-    }
-}
-
-#[test]
-fn collect_collects() {
-    use futures_util::future::{err, ok, try_join_all};
-
-    use util::assert_done;
-
-    assert_done(|| Box::new(try_join_all(vec![ok(1), ok(2)])), Ok::<_, usize>(vec![1, 2]));
-    assert_done(|| Box::new(try_join_all(vec![ok(1), err(2)])), Err(2));
-    assert_done(|| Box::new(try_join_all(vec![ok(1)])), Ok::<_, usize>(vec![1]));
-    // REVIEW: should this be implemented?
-    // assert_done(|| Box::new(try_join_all(Vec::<i32>::new())), Ok(vec![]));
-
-    // TODO: needs more tests
-}
-
-#[test]
-fn try_join_all_iter_lifetime() {
-    use futures_util::future::{ok, try_join_all};
-    use std::future::Future;
-
-    use util::assert_done;
-
-    // In futures-rs version 0.1, this function would fail to typecheck due to an overly
-    // conservative type parameterization of `TryJoinAll`.
-    fn sizes(bufs: Vec<&[u8]>) -> Box<dyn Future<Output = Result<Vec<usize>, ()>> + Unpin> {
-        let iter = bufs.into_iter().map(|b| ok::<usize, ()>(b.len()));
-        Box::new(try_join_all(iter))
-    }
-
-    assert_done(|| sizes(vec![&[1,2,3], &[], &[0]]), Ok(vec![3_usize, 0, 1]));
-}
-
-#[test]
-fn try_join_all_from_iter() {
-    use futures_util::future::{ok, TryJoinAll};
-
-    use util::assert_done;
-
-    assert_done(
-        || Box::new(vec![ok(1), ok(2)].into_iter().collect::<TryJoinAll<_>>()),
-        Ok::<_, usize>(vec![1, 2]),
-    )
-}
diff --git a/tests_disabled/all.rs b/tests_disabled/all.rs
index 6c7e11c..a7a5710 100644
--- a/tests_disabled/all.rs
+++ b/tests_disabled/all.rs
@@ -1,27 +1,27 @@
-use futures::future;
-use futures::executor::block_on;
 use futures::channel::oneshot::{self, Canceled};
+use futures::executor::block_on;
+use futures::future;
 use std::sync::mpsc::{channel, TryRecvError};
 
-mod support;
-use support::*;
+// mod support;
+// use support::*;
 
 fn unselect<T, E, A, B>(r: Result<Either<(T, B), (T, A)>, Either<(E, B), (E, A)>>) -> Result<T, E> {
     match r {
-        Ok(Either::Left((t, _))) |
-        Ok(Either::Right((t, _))) => Ok(t),
-        Err(Either::Left((e, _))) |
-        Err(Either::Right((e, _))) => Err(e),
+        Ok(Either::Left((t, _))) | Ok(Either::Right((t, _))) => Ok(t),
+        Err(Either::Left((e, _))) | Err(Either::Right((e, _))) => Err(e),
     }
 }
 
 #[test]
 fn result_smoke() {
     fn is_future_v<A, B, C>(_: C)
-        where A: Send + 'static,
-              B: Send + 'static,
-              C: Future<Item=A, Error=B>
-    {}
+    where
+        A: Send + 'static,
+        B: Send + 'static,
+        C: Future<Item = A, Error = B>,
+    {
+    }
 
     is_future_v::<i32, u32, _>(f_ok(1).map(|a| a + 1));
     is_future_v::<i32, u32, _>(f_ok(1).map_err(|a| a + 1));
@@ -64,7 +64,9 @@
 
 #[test]
 fn test_empty() {
-    fn empty() -> Empty<i32, u32> { future::empty() }
+    fn empty() -> Empty<i32, u32> {
+        future::empty()
+    }
 
     assert_empty(|| empty());
     assert_empty(|| empty().select(empty()));
@@ -105,16 +107,22 @@
 
 #[test]
 fn smoke_oneshot() {
-    assert_done(|| {
-        let (c, p) = oneshot::channel();
-        c.send(1).unwrap();
-        p
-    }, Ok(1));
-    assert_done(|| {
-        let (c, p) = oneshot::channel::<i32>();
-        drop(c);
-        p
-    }, Err(Canceled));
+    assert_done(
+        || {
+            let (c, p) = oneshot::channel();
+            c.send(1).unwrap();
+            p
+        },
+        Ok(1),
+    );
+    assert_done(
+        || {
+            let (c, p) = oneshot::channel::<i32>();
+            drop(c);
+            p
+        },
+        Err(Canceled),
+    );
     let mut completes = Vec::new();
     assert_empty(|| {
         let (a, b) = oneshot::channel::<i32>();
@@ -129,9 +137,7 @@
     let (c, p) = oneshot::channel::<i32>();
     drop(c);
     let (tx, rx) = channel();
-    p.then(move |_| {
-        tx.send(())
-    }).forget();
+    p.then(move |_| tx.send(())).forget();
     rx.recv().unwrap();
 }
 
@@ -139,8 +145,14 @@
 fn select_cancels() {
     let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
     let ((btx, brx), (dtx, drx)) = (channel(), channel());
-    let b = b.map(move |b| { btx.send(b).unwrap(); b });
-    let d = d.map(move |d| { dtx.send(d).unwrap(); d });
+    let b = b.map(move |b| {
+        btx.send(b).unwrap();
+        b
+    });
+    let d = d.map(move |d| {
+        dtx.send(d).unwrap();
+        d
+    });
 
     let mut f = b.select(d).then(unselect);
     // assert!(f.poll(&mut Task::new()).is_pending());
@@ -156,8 +168,14 @@
 
         let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
         let ((btx, _brx), (dtx, drx)) = (channel(), channel());
-        let b = b.map(move |b| { btx.send(b).unwrap(); b });
-        let d = d.map(move |d| { dtx.send(d).unwrap(); d });
+        let b = b.map(move |b| {
+            btx.send(b).unwrap();
+            b
+        });
+        let d = d.map(move |d| {
+            dtx.send(d).unwrap();
+            d
+        });
 
         let mut f = b.select(d).then(unselect);
         assert!(f.poll(lw).ok().unwrap().is_pending());
@@ -173,8 +191,14 @@
 fn join_cancels() {
     let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
     let ((btx, _brx), (dtx, drx)) = (channel(), channel());
-    let b = b.map(move |b| { btx.send(b).unwrap(); b });
-    let d = d.map(move |d| { dtx.send(d).unwrap(); d });
+    let b = b.map(move |b| {
+        btx.send(b).unwrap();
+        b
+    });
+    let d = d.map(move |d| {
+        dtx.send(d).unwrap();
+        d
+    });
 
     let mut f = b.join(d);
     drop(a);
@@ -185,8 +209,14 @@
 
     let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
     let ((btx, _brx), (dtx, drx)) = (channel(), channel());
-    let b = b.map(move |b| { btx.send(b).unwrap(); b });
-    let d = d.map(move |d| { dtx.send(d).unwrap(); d });
+    let b = b.map(move |b| {
+        btx.send(b).unwrap();
+        b
+    });
+    let d = d.map(move |d| {
+        dtx.send(d).unwrap();
+        d
+    });
 
     let (tx, rx) = channel();
     let f = b.join(d);
@@ -194,7 +224,8 @@
         tx.send(()).unwrap();
         let res: Result<(), ()> = Ok(());
         res
-    }).forget();
+    })
+    .forget();
     assert!(rx.try_recv().is_err());
     drop(a);
     rx.recv().unwrap();
@@ -243,7 +274,6 @@
     })
 }
 
-
 #[test]
 fn select2() {
     assert_done(|| f_ok(2).select(empty()).then(unselect), Ok(2));
@@ -251,14 +281,15 @@
     assert_done(|| f_err(2).select(empty()).then(unselect), Err(2));
     assert_done(|| empty().select(f_err(2)).then(unselect), Err(2));
 
-    assert_done(|| {
-        f_ok(1).select(f_ok(2))
-               .map_err(|_| 0)
-               .and_then(|either_tup| {
-                   let (a, b) = either_tup.into_inner();
-                   b.map(move |b| a + b)
-               })
-    }, Ok(3));
+    assert_done(
+        || {
+            f_ok(1).select(f_ok(2)).map_err(|_| 0).and_then(|either_tup| {
+                let (a, b) = either_tup.into_inner();
+                b.map(move |b| a + b)
+            })
+        },
+        Ok(3),
+    );
 
     // Finish one half of a select and then fail the second, ensuring that we
     // get the notification of the second one.
@@ -297,8 +328,14 @@
     {
         let ((_a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
         let ((btx, brx), (dtx, drx)) = (channel(), channel());
-        let b = b.map(move |v| { btx.send(v).unwrap(); v });
-        let d = d.map(move |v| { dtx.send(v).unwrap(); v });
+        let b = b.map(move |v| {
+            btx.send(v).unwrap();
+            v
+        });
+        let d = d.map(move |v| {
+            dtx.send(v).unwrap();
+            v
+        });
         let f = b.select(d);
         drop(f);
         assert!(drx.recv().is_err());
@@ -309,8 +346,14 @@
     {
         let ((_a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
         let ((btx, brx), (dtx, drx)) = (channel(), channel());
-        let b = b.map(move |v| { btx.send(v).unwrap(); v });
-        let d = d.map(move |v| { dtx.send(v).unwrap(); v });
+        let b = b.map(move |v| {
+            btx.send(v).unwrap();
+            v
+        });
+        let d = d.map(move |v| {
+            dtx.send(v).unwrap();
+            v
+        });
         let mut f = b.select(d);
         let _res = noop_waker_lw(|lw| f.poll(lw));
         drop(f);
@@ -322,8 +365,14 @@
     {
         let ((a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
         let ((btx, brx), (dtx, drx)) = (channel(), channel());
-        let b = b.map(move |v| { btx.send(v).unwrap(); v });
-        let d = d.map(move |v| { dtx.send(v).unwrap(); v });
+        let b = b.map(move |v| {
+            btx.send(v).unwrap();
+            v
+        });
+        let d = d.map(move |v| {
+            dtx.send(v).unwrap();
+            v
+        });
         let (tx, rx) = channel();
         b.select(d).map(move |_| tx.send(()).unwrap()).forget();
         drop(a);
diff --git a/tests_disabled/bilock.rs b/tests_disabled/bilock.rs
index c1bc33f..0166ca4 100644
--- a/tests_disabled/bilock.rs
+++ b/tests_disabled/bilock.rs
@@ -1,11 +1,11 @@
-use futures::task;
-use futures::stream;
 use futures::future;
+use futures::stream;
+use futures::task;
 use futures_util::lock::BiLock;
 use std::thread;
 
-mod support;
-use support::*;
+// mod support;
+// use support::*;
 
 #[test]
 fn smoke() {
@@ -41,9 +41,9 @@
     });
 
     assert!(task::spawn(future)
-                .poll_future_notify(&notify_noop(), 0)
-                .expect("failure in poll")
-                .is_ready());
+        .poll_future_notify(&notify_noop(), 0)
+        .expect("failure in poll")
+        .is_ready());
 }
 
 #[test]
@@ -51,10 +51,7 @@
     const N: usize = 10000;
     let (a, b) = BiLock::new(0);
 
-    let a = Increment {
-        a: Some(a),
-        remaining: N,
-    };
+    let a = Increment { a: Some(a), remaining: N };
     let b = stream::iter_ok(0..N).fold(b, |b, _n| {
         b.lock().map(|mut b| {
             *b += 1;
@@ -89,7 +86,7 @@
         fn poll(&mut self) -> Poll<BiLock<usize>, ()> {
             loop {
                 if self.remaining == 0 {
-                    return Ok(self.a.take().unwrap().into())
+                    return Ok(self.a.take().unwrap().into());
                 }
 
                 let a = self.a.as_ref().unwrap();
diff --git a/tests_disabled/stream.rs b/tests_disabled/stream.rs
index ef0676d..854dbad 100644
--- a/tests_disabled/stream.rs
+++ b/tests_disabled/stream.rs
@@ -1,26 +1,26 @@
+use futures::channel::mpsc;
+use futures::channel::oneshot;
 use futures::executor::{block_on, block_on_stream};
 use futures::future::{err, ok};
 use futures::stream::{empty, iter_ok, poll_fn, Peekable};
-use futures::channel::oneshot;
-use futures::channel::mpsc;
 
-mod support;
-use support::*;
+// mod support;
+// use support::*;
 
 pub struct Iter<I> {
     iter: I,
 }
 
 pub fn iter<J, T, E>(i: J) -> Iter<J::IntoIter>
-    where J: IntoIterator<Item=Result<T, E>>,
+where
+    J: IntoIterator<Item = Result<T, E>>,
 {
-    Iter {
-        iter: i.into_iter(),
-    }
+    Iter { iter: i.into_iter() }
 }
 
 impl<I, T, E> Stream for Iter<I>
-    where I: Iterator<Item=Result<T, E>>,
+where
+    I: Iterator<Item = Result<T, E>>,
 {
     type Item = T;
     type Error = E;
@@ -34,21 +34,15 @@
     }
 }
 
-fn list() -> Box<Stream<Item=i32, Error=u32> + Send> {
+fn list() -> Box<Stream<Item = i32, Error = u32> + Send> {
     let (tx, rx) = mpsc::channel(1);
-    tx.send(Ok(1))
-      .and_then(|tx| tx.send(Ok(2)))
-      .and_then(|tx| tx.send(Ok(3)))
-      .forget();
+    tx.send(Ok(1)).and_then(|tx| tx.send(Ok(2))).and_then(|tx| tx.send(Ok(3))).forget();
     Box::new(rx.then(|r| r.unwrap()))
 }
 
-fn err_list() -> Box<Stream<Item=i32, Error=u32> + Send> {
+fn err_list() -> Box<Stream<Item = i32, Error = u32> + Send> {
     let (tx, rx) = mpsc::channel(1);
-    tx.send(Ok(1))
-      .and_then(|tx| tx.send(Ok(2)))
-      .and_then(|tx| tx.send(Err(3)))
-      .forget();
+    tx.send(Ok(1)).and_then(|tx| tx.send(Ok(2))).and_then(|tx| tx.send(Err(3))).forget();
     Box::new(rx.then(|r| r.unwrap()))
 }
 
@@ -89,40 +83,31 @@
 
 #[test]
 fn filter_map() {
-    assert_done(|| list().filter_map(|x| {
-        ok(if x % 2 == 0 {
-            Some(x + 10)
-        } else {
-            None
-        })
-    }).collect(), Ok(vec![12]));
+    assert_done(
+        || list().filter_map(|x| ok(if x % 2 == 0 { Some(x + 10) } else { None })).collect(),
+        Ok(vec![12]),
+    );
 }
 
 #[test]
 fn and_then() {
     assert_done(|| list().and_then(|a| Ok(a + 1)).collect(), Ok(vec![2, 3, 4]));
-    assert_done(|| list().and_then(|a| err::<i32, u32>(a as u32)).collect::<Vec<_>>(),
-                Err(1));
+    assert_done(|| list().and_then(|a| err::<i32, u32>(a as u32)).collect::<Vec<_>>(), Err(1));
 }
 
 #[test]
 fn then() {
     assert_done(|| list().then(|a| a.map(|e| e + 1)).collect(), Ok(vec![2, 3, 4]));
-
 }
 
 #[test]
 fn or_else() {
-    assert_done(|| err_list().or_else(|a| {
-        ok::<i32, u32>(a as i32)
-    }).collect(), Ok(vec![1, 2, 3]));
+    assert_done(|| err_list().or_else(|a| ok::<i32, u32>(a as i32)).collect(), Ok(vec![1, 2, 3]));
 }
 
 #[test]
 fn flatten() {
-    assert_done(|| list().map(|_| list()).flatten().collect(),
-                Ok(vec![1, 2, 3, 1, 2, 3, 1, 2, 3]));
-
+    assert_done(|| list().map(|_| list()).flatten().collect(), Ok(vec![1, 2, 3, 1, 2, 3, 1, 2, 3]));
 }
 
 #[test]
@@ -132,9 +117,7 @@
 
 #[test]
 fn skip_passes_errors_through() {
-    let mut s = block_on_stream(
-        iter(vec![Err(1), Err(2), Ok(3), Ok(4), Ok(5)]).skip(1)
-    );
+    let mut s = block_on_stream(iter(vec![Err(1), Err(2), Ok(3), Ok(4), Ok(5)]).skip(1));
     assert_eq!(s.next(), Some(Err(1)));
     assert_eq!(s.next(), Some(Err(2)));
     assert_eq!(s.next(), Some(Ok(4)));
@@ -144,8 +127,7 @@
 
 #[test]
 fn skip_while() {
-    assert_done(|| list().skip_while(|e| Ok(*e % 2 == 1)).collect(),
-                Ok(vec![2, 3]));
+    assert_done(|| list().skip_while(|e| Ok(*e % 2 == 1)).collect(), Ok(vec![2, 3]));
 }
 #[test]
 fn take() {
@@ -154,8 +136,7 @@
 
 #[test]
 fn take_while() {
-    assert_done(|| list().take_while(|e| Ok(*e < 3)).collect(),
-                Ok(vec![1, 2]));
+    assert_done(|| list().take_while(|e| Ok(*e < 3)).collect(), Ok(vec![1, 2]));
 }
 
 #[test]
@@ -193,9 +174,9 @@
     let (a, b) = oneshot::channel::<u32>();
     let (c, d) = oneshot::channel::<u32>();
 
-    tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item=_, Error=_> + Send>)
-      .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!()))))
-      .forget();
+    tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item = _, Error = _> + Send>)
+        .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!()))))
+        .forget();
 
     let mut rx = rx.buffered(2);
     sassert_empty(&mut rx);
@@ -211,9 +192,9 @@
     let (a, b) = oneshot::channel::<u32>();
     let (c, d) = oneshot::channel::<u32>();
 
-    tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item=_, Error=_> + Send>)
-      .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!()))))
-      .forget();
+    tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item = _, Error = _> + Send>)
+        .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!()))))
+        .forget();
 
     let mut rx = rx.buffered(1);
     sassert_empty(&mut rx);
@@ -233,8 +214,8 @@
     let (c, d) = oneshot::channel::<u32>();
 
     tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item = _, Error = _> + Send>)
-      .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!()))))
-      .forget();
+        .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!()))))
+        .forget();
 
     let mut rx = rx.buffer_unordered(2);
     sassert_empty(&mut rx);
@@ -250,8 +231,8 @@
     let (c, d) = oneshot::channel::<u32>();
 
     tx.send(Box::new(b.recover(|_| panic!())) as Box<Future<Item = _, Error = _> + Send>)
-      .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!()))))
-      .forget();
+        .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!()))))
+        .forget();
 
     // We don't even get to see `c` until `a` completes.
     let mut rx = rx.buffer_unordered(1);
@@ -267,21 +248,17 @@
 
 #[test]
 fn zip() {
-    assert_done(|| list().zip(list()).collect(),
-                Ok(vec![(1, 1), (2, 2), (3, 3)]));
-    assert_done(|| list().zip(list().take(2)).collect(),
-                Ok(vec![(1, 1), (2, 2)]));
-    assert_done(|| list().take(2).zip(list()).collect(),
-                Ok(vec![(1, 1), (2, 2)]));
+    assert_done(|| list().zip(list()).collect(), Ok(vec![(1, 1), (2, 2), (3, 3)]));
+    assert_done(|| list().zip(list().take(2)).collect(), Ok(vec![(1, 1), (2, 2)]));
+    assert_done(|| list().take(2).zip(list()).collect(), Ok(vec![(1, 1), (2, 2)]));
     assert_done(|| err_list().zip(list()).collect::<Vec<_>>(), Err(3));
-    assert_done(|| list().zip(list().map(|x| x + 1)).collect(),
-                Ok(vec![(1, 2), (2, 3), (3, 4)]));
+    assert_done(|| list().zip(list().map(|x| x + 1)).collect(), Ok(vec![(1, 2), (2, 3), (3, 4)]));
 }
 
 #[test]
 fn peek() {
     struct Peek {
-        inner: Peekable<Box<Stream<Item = i32, Error =u32> + Send>>
+        inner: Peekable<Box<Stream<Item = i32, Error = u32> + Send>>,
     }
 
     impl Future for Peek {
@@ -299,15 +276,12 @@
         }
     }
 
-    block_on(Peek {
-        inner: list().peekable(),
-    }).unwrap()
+    block_on(Peek { inner: list().peekable() }).unwrap()
 }
 
 #[test]
 fn wait() {
-    assert_eq!(block_on_stream(list()).collect::<Result<Vec<_>, _>>(),
-               Ok(vec![1, 2, 3]));
+    assert_eq!(block_on_stream(list()).collect::<Result<Vec<_>, _>>(), Ok(vec![1, 2, 3]));
 }
 
 #[test]
@@ -337,8 +311,10 @@
     let v = block_on(iter_ok::<_, Never>(vec![2, 3]).forward(v)).unwrap().1;
     assert_eq!(v, vec![0, 1, 2, 3]);
 
-    assert_done(move || iter_ok::<_, Never>(vec![4, 5]).forward(v).map(|(_, s)| s),
-                Ok(vec![0, 1, 2, 3, 4, 5]));
+    assert_done(
+        move || iter_ok::<_, Never>(vec![4, 5]).forward(v).map(|(_, s)| s),
+        Ok(vec![0, 1, 2, 3, 4, 5]),
+    );
 }
 
 #[test]