Snap for 8512216 from 3049d36a1d1b1613a52f86adf1ea8408e5e3c837 to tm-frc-neuralnetworks-release

Change-Id: I353a55f4ec14b64d435dd03c96609fc57b18bac1
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index ffd4f55..e483977 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,6 @@
 {
   "git": {
-    "sha1": "7caefa51304e78fd5018cd5d2a03f3b9089cc010"
-  }
-}
+    "sha1": "fc1e3250219170e31cddb8857a276cba7dd08d44"
+  },
+  "path_in_vcs": "futures"
+}
\ No newline at end of file
diff --git a/Android.bp b/Android.bp
index bc471eb..a594b53 100644
--- a/Android.bp
+++ b/Android.bp
@@ -42,7 +42,7 @@
     host_supported: true,
     crate_name: "futures",
     cargo_env_compat: true,
-    cargo_pkg_version: "0.3.17",
+    cargo_pkg_version: "0.3.21",
     srcs: ["src/lib.rs"],
     edition: "2018",
     features: [
diff --git a/Cargo.toml b/Cargo.toml
index 17c4d5f..f740f96 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -11,53 +11,72 @@
 
 [package]
 edition = "2018"
+rust-version = "1.45"
 name = "futures"
-version = "0.3.17"
-authors = ["Alex Crichton <alex@alexcrichton.com>"]
-description = "An implementation of futures and streams featuring zero allocations,\ncomposability, and iterator-like interfaces.\n"
+version = "0.3.21"
+description = """
+An implementation of futures and streams featuring zero allocations,
+composability, and iterator-like interfaces.
+"""
 homepage = "https://rust-lang.github.io/futures-rs"
-documentation = "https://docs.rs/futures/0.3"
 readme = "../README.md"
-keywords = ["futures", "async", "future"]
+keywords = [
+    "futures",
+    "async",
+    "future",
+]
 categories = ["asynchronous"]
 license = "MIT OR Apache-2.0"
 repository = "https://github.com/rust-lang/futures-rs"
+
 [package.metadata.docs.rs]
 all-features = true
-rustdoc-args = ["--cfg", "docsrs"]
+rustdoc-args = [
+    "--cfg",
+    "docsrs",
+]
 
 [package.metadata.playground]
-features = ["std", "async-await", "compat", "io-compat", "executor", "thread-pool"]
+features = [
+    "std",
+    "async-await",
+    "compat",
+    "io-compat",
+    "executor",
+    "thread-pool",
+]
+
 [dependencies.futures-channel]
-version = "0.3.17"
+version = "0.3.21"
 features = ["sink"]
 default-features = false
 
 [dependencies.futures-core]
-version = "0.3.17"
+version = "0.3.21"
 default-features = false
 
 [dependencies.futures-executor]
-version = "0.3.17"
+version = "0.3.21"
 optional = true
 default-features = false
 
 [dependencies.futures-io]
-version = "0.3.17"
+version = "0.3.21"
 default-features = false
 
 [dependencies.futures-sink]
-version = "0.3.17"
+version = "0.3.21"
 default-features = false
 
 [dependencies.futures-task]
-version = "0.3.17"
+version = "0.3.21"
 default-features = false
 
 [dependencies.futures-util]
-version = "0.3.17"
+version = "0.3.21"
 features = ["sink"]
 default-features = false
+
 [dev-dependencies.assert_matches]
 version = "1.3.0"
 
@@ -74,16 +93,55 @@
 version = "0.1.11"
 
 [features]
-alloc = ["futures-core/alloc", "futures-task/alloc", "futures-sink/alloc", "futures-channel/alloc", "futures-util/alloc"]
-async-await = ["futures-util/async-await", "futures-util/async-await-macro"]
+alloc = [
+    "futures-core/alloc",
+    "futures-task/alloc",
+    "futures-sink/alloc",
+    "futures-channel/alloc",
+    "futures-util/alloc",
+]
+async-await = [
+    "futures-util/async-await",
+    "futures-util/async-await-macro",
+]
 bilock = ["futures-util/bilock"]
 cfg-target-has-atomic = []
-compat = ["std", "futures-util/compat"]
-default = ["std", "async-await", "executor"]
-executor = ["std", "futures-executor/std"]
-io-compat = ["compat", "futures-util/io-compat"]
-read-initializer = ["futures-io/read-initializer", "futures-util/read-initializer"]
-std = ["alloc", "futures-core/std", "futures-task/std", "futures-io/std", "futures-sink/std", "futures-util/std", "futures-util/io", "futures-util/channel"]
-thread-pool = ["executor", "futures-executor/thread-pool"]
-unstable = ["futures-core/unstable", "futures-task/unstable", "futures-channel/unstable", "futures-io/unstable", "futures-util/unstable"]
+compat = [
+    "std",
+    "futures-util/compat",
+]
+default = [
+    "std",
+    "async-await",
+    "executor",
+]
+executor = [
+    "std",
+    "futures-executor/std",
+]
+io-compat = [
+    "compat",
+    "futures-util/io-compat",
+]
+std = [
+    "alloc",
+    "futures-core/std",
+    "futures-task/std",
+    "futures-io/std",
+    "futures-sink/std",
+    "futures-util/std",
+    "futures-util/io",
+    "futures-util/channel",
+]
+thread-pool = [
+    "executor",
+    "futures-executor/thread-pool",
+]
+unstable = [
+    "futures-core/unstable",
+    "futures-task/unstable",
+    "futures-channel/unstable",
+    "futures-io/unstable",
+    "futures-util/unstable",
+]
 write-all-vectored = ["futures-util/write-all-vectored"]
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index b01b12e..6871f47 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,14 +1,13 @@
 [package]
 name = "futures"
+version = "0.3.21"
 edition = "2018"
-version = "0.3.17"
-authors = ["Alex Crichton <alex@alexcrichton.com>"]
+rust-version = "1.45"
 license = "MIT OR Apache-2.0"
 readme = "../README.md"
 keywords = ["futures", "async", "future"]
 repository = "https://github.com/rust-lang/futures-rs"
 homepage = "https://rust-lang.github.io/futures-rs"
-documentation = "https://docs.rs/futures/0.3"
 description = """
 An implementation of futures and streams featuring zero allocations,
 composability, and iterator-like interfaces.
@@ -16,13 +15,13 @@
 categories = ["asynchronous"]
 
 [dependencies]
-futures-core = { path = "../futures-core", version = "0.3.17", default-features = false }
-futures-task = { path = "../futures-task", version = "0.3.17", default-features = false }
-futures-channel = { path = "../futures-channel", version = "0.3.17", default-features = false, features = ["sink"] }
-futures-executor = { path = "../futures-executor", version = "0.3.17", default-features = false, optional = true }
-futures-io = { path = "../futures-io", version = "0.3.17", default-features = false }
-futures-sink = { path = "../futures-sink", version = "0.3.17", default-features = false }
-futures-util = { path = "../futures-util", version = "0.3.17", default-features = false, features = ["sink"] }
+futures-core = { path = "../futures-core", version = "0.3.21", default-features = false }
+futures-task = { path = "../futures-task", version = "0.3.21", default-features = false }
+futures-channel = { path = "../futures-channel", version = "0.3.21", default-features = false, features = ["sink"] }
+futures-executor = { path = "../futures-executor", version = "0.3.21", default-features = false, optional = true }
+futures-io = { path = "../futures-io", version = "0.3.21", default-features = false }
+futures-sink = { path = "../futures-sink", version = "0.3.21", default-features = false }
+futures-util = { path = "../futures-util", version = "0.3.21", default-features = false, features = ["sink"] }
 
 [dev-dependencies]
 futures-executor = { path = "../futures-executor", features = ["thread-pool"] }
@@ -48,7 +47,6 @@
 # `unstable` feature as an explicit opt-in to unstable API.
 unstable = ["futures-core/unstable", "futures-task/unstable", "futures-channel/unstable", "futures-io/unstable", "futures-util/unstable"]
 bilock = ["futures-util/bilock"]
-read-initializer = ["futures-io/read-initializer", "futures-util/read-initializer"]
 write-all-vectored = ["futures-util/write-all-vectored"]
 
 # These features are no longer used.
diff --git a/METADATA b/METADATA
index ba70d3e..554f4df 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@
   }
   url {
     type: ARCHIVE
-    value: "https://static.crates.io/crates/futures/futures-0.3.17.crate"
+    value: "https://static.crates.io/crates/futures/futures-0.3.21.crate"
   }
-  version: "0.3.17"
+  version: "0.3.21"
   license_type: NOTICE
   last_upgrade_date {
-    year: 2021
-    month: 9
-    day: 22
+    year: 2022
+    month: 3
+    day: 1
   }
 }
diff --git a/src/lib.rs b/src/lib.rs
index 362aa3c..b8ebc61 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -25,6 +25,7 @@
 //! within macros and keywords such as async and await!.
 //!
 //! ```rust
+//! # if cfg!(miri) { return; } // https://github.com/rust-lang/miri/issues/1038
 //! # use futures::channel::mpsc;
 //! # use futures::executor; ///standard executors to provide a context for futures and streams
 //! # use futures::executor::ThreadPool;
@@ -78,7 +79,6 @@
 //! The majority of examples and code snippets in this crate assume that they are
 //! inside an async block as written above.
 
-#![cfg_attr(feature = "read-initializer", feature(read_initializer))]
 #![cfg_attr(not(feature = "std"), no_std)]
 #![warn(
     missing_debug_implementations,
@@ -99,9 +99,6 @@
 #[cfg(all(feature = "bilock", not(feature = "unstable")))]
 compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features");
 
-#[cfg(all(feature = "read-initializer", not(feature = "unstable")))]
-compile_error!("The `read-initializer` feature requires the `unstable` feature as an explicit opt-in to unstable features");
-
 #[doc(no_inline)]
 pub use futures_core::future::{Future, TryFuture};
 #[doc(no_inline)]
@@ -154,13 +151,73 @@
 
 #[cfg(feature = "executor")]
 #[cfg_attr(docsrs, doc(cfg(feature = "executor")))]
-#[doc(inline)]
-pub use futures_executor as executor;
+pub mod executor {
+    //! Built-in executors and related tools.
+    //!
+    //! All asynchronous computation occurs within an executor, which is
+    //! capable of spawning futures as tasks. This module provides several
+    //! built-in executors, as well as tools for building your own.
+    //!
+    //!
+    //! This module is only available when the `executor` feature of this
+    //! library is activated.
+    //!
+    //! # Using a thread pool (M:N task scheduling)
+    //!
+    //! Most of the time tasks should be executed on a [thread pool](ThreadPool).
+    //! A small set of worker threads can handle a very large set of spawned tasks
+    //! (which are much lighter weight than threads). Tasks spawned onto the pool
+    //! with the [`spawn_ok`](ThreadPool::spawn_ok) function will run ambiently on
+    //! the created threads.
+    //!
+    //! # Spawning additional tasks
+    //!
+    //! Tasks can be spawned onto a spawner by calling its [`spawn_obj`] method
+    //! directly. In the case of `!Send` futures, [`spawn_local_obj`] can be used
+    //! instead.
+    //!
+    //! # Single-threaded execution
+    //!
+    //! In addition to thread pools, it's possible to run a task (and the tasks
+    //! it spawns) entirely within a single thread via the [`LocalPool`] executor.
+    //! Aside from cutting down on synchronization costs, this executor also makes
+    //! it possible to spawn non-`Send` tasks, via [`spawn_local_obj`]. The
+    //! [`LocalPool`] is best suited for running I/O-bound tasks that do relatively
+    //! little work between I/O operations.
+    //!
+    //! There is also a convenience function [`block_on`] for simply running a
+    //! future to completion on the current thread.
+    //!
+    //! [`spawn_obj`]: https://docs.rs/futures/0.3/futures/task/trait.Spawn.html#tymethod.spawn_obj
+    //! [`spawn_local_obj`]: https://docs.rs/futures/0.3/futures/task/trait.LocalSpawn.html#tymethod.spawn_local_obj
+
+    pub use futures_executor::{
+        block_on, block_on_stream, enter, BlockingStream, Enter, EnterError, LocalPool,
+        LocalSpawner,
+    };
+
+    #[cfg(feature = "thread-pool")]
+    #[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))]
+    pub use futures_executor::{ThreadPool, ThreadPoolBuilder};
+}
 
 #[cfg(feature = "compat")]
 #[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
-#[doc(inline)]
-pub use futures_util::compat;
+pub mod compat {
+    //! Interop between `futures` 0.1 and 0.3.
+    //!
+    //! This module is only available when the `compat` feature of this
+    //! library is activated.
+
+    pub use futures_util::compat::{
+        Compat, Compat01As03, Compat01As03Sink, CompatSink, Executor01As03, Executor01CompatExt,
+        Executor01Future, Future01CompatExt, Sink01CompatExt, Stream01CompatExt,
+    };
+
+    #[cfg(feature = "io-compat")]
+    #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
+    pub use futures_util::compat::{AsyncRead01CompatExt, AsyncWrite01CompatExt};
+}
 
 pub mod prelude {
     //! A "prelude" for crates using the `futures` crate.
@@ -181,10 +238,12 @@
     pub use crate::stream::{self, Stream, TryStream};
 
     #[doc(no_inline)]
+    #[allow(unreachable_pub)]
     pub use crate::future::{FutureExt as _, TryFutureExt as _};
     #[doc(no_inline)]
     pub use crate::sink::SinkExt as _;
     #[doc(no_inline)]
+    #[allow(unreachable_pub)]
     pub use crate::stream::{StreamExt as _, TryStreamExt as _};
 
     #[cfg(feature = "std")]
@@ -192,6 +251,7 @@
 
     #[cfg(feature = "std")]
     #[doc(no_inline)]
+    #[allow(unreachable_pub)]
     pub use crate::io::{
         AsyncBufReadExt as _, AsyncReadExt as _, AsyncSeekExt as _, AsyncWriteExt as _,
     };
diff --git a/tests/compat.rs b/tests/compat.rs
index c4125d8..ac04a95 100644
--- a/tests/compat.rs
+++ b/tests/compat.rs
@@ -1,4 +1,5 @@
 #![cfg(feature = "compat")]
+#![cfg(not(miri))] // Miri does not support epoll
 
 use futures::compat::Future01CompatExt;
 use futures::prelude::*;
diff --git a/tests/eventual.rs b/tests/eventual.rs
index bff000d..3461380 100644
--- a/tests/eventual.rs
+++ b/tests/eventual.rs
@@ -1,3 +1,5 @@
+#![cfg(not(miri))] // https://github.com/rust-lang/miri/issues/1038
+
 use futures::channel::oneshot;
 use futures::executor::ThreadPool;
 use futures::future::{self, ok, Future, FutureExt, TryFutureExt};
diff --git a/tests/future_join_all.rs b/tests/future_join_all.rs
index ae05a21..44486e1 100644
--- a/tests/future_join_all.rs
+++ b/tests/future_join_all.rs
@@ -1,22 +1,24 @@
 use futures::executor::block_on;
 use futures::future::{join_all, ready, Future, JoinAll};
+use futures::pin_mut;
 use std::fmt::Debug;
 
-fn assert_done<T, F>(actual_fut: F, expected: T)
+#[track_caller]
+fn assert_done<T>(actual_fut: impl Future<Output = T>, expected: T)
 where
     T: PartialEq + Debug,
-    F: FnOnce() -> Box<dyn Future<Output = T> + Unpin>,
 {
-    let output = block_on(actual_fut());
+    pin_mut!(actual_fut);
+    let output = block_on(actual_fut);
     assert_eq!(output, expected);
 }
 
 #[test]
 fn collect_collects() {
-    assert_done(|| Box::new(join_all(vec![ready(1), ready(2)])), vec![1, 2]);
-    assert_done(|| Box::new(join_all(vec![ready(1)])), vec![1]);
+    assert_done(join_all(vec![ready(1), ready(2)]), vec![1, 2]);
+    assert_done(join_all(vec![ready(1)]), vec![1]);
     // REVIEW: should this be implemented?
-    // assert_done(|| Box::new(join_all(Vec::<i32>::new())), vec![]);
+    // assert_done(join_all(Vec::<i32>::new()), vec![]);
 
     // TODO: needs more tests
 }
@@ -25,18 +27,15 @@
 fn join_all_iter_lifetime() {
     // In futures-rs version 0.1, this function would fail to typecheck due to an overly
     // conservative type parameterization of `JoinAll`.
-    fn sizes(bufs: Vec<&[u8]>) -> Box<dyn Future<Output = Vec<usize>> + Unpin> {
+    fn sizes(bufs: Vec<&[u8]>) -> impl Future<Output = Vec<usize>> {
         let iter = bufs.into_iter().map(|b| ready::<usize>(b.len()));
-        Box::new(join_all(iter))
+        join_all(iter)
     }
 
-    assert_done(|| sizes(vec![&[1, 2, 3], &[], &[0]]), vec![3_usize, 0, 1]);
+    assert_done(sizes(vec![&[1, 2, 3], &[], &[0]]), vec![3_usize, 0, 1]);
 }
 
 #[test]
 fn join_all_from_iter() {
-    assert_done(
-        || Box::new(vec![ready(1), ready(2)].into_iter().collect::<JoinAll<_>>()),
-        vec![1, 2],
-    )
+    assert_done(vec![ready(1), ready(2)].into_iter().collect::<JoinAll<_>>(), vec![1, 2])
 }
diff --git a/tests/future_shared.rs b/tests/future_shared.rs
index 718d6c4..3ceaebb 100644
--- a/tests/future_shared.rs
+++ b/tests/future_shared.rs
@@ -96,6 +96,7 @@
     assert_eq!(block_on(future1), 1);
 }
 
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
 #[test]
 fn peek() {
     let mut local_pool = LocalPool::new();
diff --git a/tests/future_try_join_all.rs b/tests/future_try_join_all.rs
index a4b3bb7..9a82487 100644
--- a/tests/future_try_join_all.rs
+++ b/tests/future_try_join_all.rs
@@ -1,24 +1,26 @@
 use futures::executor::block_on;
+use futures::pin_mut;
 use futures_util::future::{err, ok, try_join_all, TryJoinAll};
 use std::fmt::Debug;
 use std::future::Future;
 
-fn assert_done<T, F>(actual_fut: F, expected: T)
+#[track_caller]
+fn assert_done<T>(actual_fut: impl Future<Output = T>, expected: T)
 where
     T: PartialEq + Debug,
-    F: FnOnce() -> Box<dyn Future<Output = T> + Unpin>,
 {
-    let output = block_on(actual_fut());
+    pin_mut!(actual_fut);
+    let output = block_on(actual_fut);
     assert_eq!(output, expected);
 }
 
 #[test]
 fn collect_collects() {
-    assert_done(|| Box::new(try_join_all(vec![ok(1), ok(2)])), Ok::<_, usize>(vec![1, 2]));
-    assert_done(|| Box::new(try_join_all(vec![ok(1), err(2)])), Err(2));
-    assert_done(|| Box::new(try_join_all(vec![ok(1)])), Ok::<_, usize>(vec![1]));
+    assert_done(try_join_all(vec![ok(1), ok(2)]), Ok::<_, usize>(vec![1, 2]));
+    assert_done(try_join_all(vec![ok(1), err(2)]), Err(2));
+    assert_done(try_join_all(vec![ok(1)]), Ok::<_, usize>(vec![1]));
     // REVIEW: should this be implemented?
-    // assert_done(|| Box::new(try_join_all(Vec::<i32>::new())), Ok(vec![]));
+    // assert_done(try_join_all(Vec::<i32>::new()), Ok(vec![]));
 
     // TODO: needs more tests
 }
@@ -27,18 +29,18 @@
 fn try_join_all_iter_lifetime() {
     // In futures-rs version 0.1, this function would fail to typecheck due to an overly
     // conservative type parameterization of `TryJoinAll`.
-    fn sizes(bufs: Vec<&[u8]>) -> Box<dyn Future<Output = Result<Vec<usize>, ()>> + Unpin> {
+    fn sizes(bufs: Vec<&[u8]>) -> impl Future<Output = Result<Vec<usize>, ()>> {
         let iter = bufs.into_iter().map(|b| ok::<usize, ()>(b.len()));
-        Box::new(try_join_all(iter))
+        try_join_all(iter)
     }
 
-    assert_done(|| sizes(vec![&[1, 2, 3], &[], &[0]]), Ok(vec![3_usize, 0, 1]));
+    assert_done(sizes(vec![&[1, 2, 3], &[], &[0]]), Ok(vec![3_usize, 0, 1]));
 }
 
 #[test]
 fn try_join_all_from_iter() {
     assert_done(
-        || Box::new(vec![ok(1), ok(2)].into_iter().collect::<TryJoinAll<_>>()),
+        vec![ok(1), ok(2)].into_iter().collect::<TryJoinAll<_>>(),
         Ok::<_, usize>(vec![1, 2]),
     )
 }
diff --git a/tests/io_line_writer.rs b/tests/io_line_writer.rs
new file mode 100644
index 0000000..b483e0f
--- /dev/null
+++ b/tests/io_line_writer.rs
@@ -0,0 +1,73 @@
+use futures::executor::block_on;
+use futures::io::{AsyncWriteExt, LineWriter};
+use std::io;
+
+#[test]
+fn line_writer() {
+    let mut writer = LineWriter::new(Vec::new());
+
+    block_on(writer.write(&[0])).unwrap();
+    assert_eq!(*writer.get_ref(), []);
+
+    block_on(writer.write(&[1])).unwrap();
+    assert_eq!(*writer.get_ref(), []);
+
+    block_on(writer.flush()).unwrap();
+    assert_eq!(*writer.get_ref(), [0, 1]);
+
+    block_on(writer.write(&[0, b'\n', 1, b'\n', 2])).unwrap();
+    assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n']);
+
+    block_on(writer.flush()).unwrap();
+    assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n', 2]);
+
+    block_on(writer.write(&[3, b'\n'])).unwrap();
+    assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n', 2, 3, b'\n']);
+}
+
+#[test]
+fn line_vectored() {
+    let mut line_writer = LineWriter::new(Vec::new());
+    assert_eq!(
+        block_on(line_writer.write_vectored(&[
+            io::IoSlice::new(&[]),
+            io::IoSlice::new(b"\n"),
+            io::IoSlice::new(&[]),
+            io::IoSlice::new(b"a"),
+        ]))
+        .unwrap(),
+        2
+    );
+    assert_eq!(line_writer.get_ref(), b"\n");
+
+    assert_eq!(
+        block_on(line_writer.write_vectored(&[
+            io::IoSlice::new(&[]),
+            io::IoSlice::new(b"b"),
+            io::IoSlice::new(&[]),
+            io::IoSlice::new(b"a"),
+            io::IoSlice::new(&[]),
+            io::IoSlice::new(b"c"),
+        ]))
+        .unwrap(),
+        3
+    );
+    assert_eq!(line_writer.get_ref(), b"\n");
+    block_on(line_writer.flush()).unwrap();
+    assert_eq!(line_writer.get_ref(), b"\nabac");
+    assert_eq!(block_on(line_writer.write_vectored(&[])).unwrap(), 0);
+
+    assert_eq!(
+        block_on(line_writer.write_vectored(&[
+            io::IoSlice::new(&[]),
+            io::IoSlice::new(&[]),
+            io::IoSlice::new(&[]),
+            io::IoSlice::new(&[]),
+        ]))
+        .unwrap(),
+        0
+    );
+
+    assert_eq!(block_on(line_writer.write_vectored(&[io::IoSlice::new(b"a\nb")])).unwrap(), 3);
+    assert_eq!(line_writer.get_ref(), b"\nabaca\nb");
+}
diff --git a/tests/lock_mutex.rs b/tests/lock_mutex.rs
index 7c33864..c92ef50 100644
--- a/tests/lock_mutex.rs
+++ b/tests/lock_mutex.rs
@@ -34,6 +34,7 @@
     assert!(waiter.poll_unpin(&mut panic_context()).is_ready());
 }
 
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
 #[test]
 fn mutex_contested() {
     let (tx, mut rx) = mpsc::unbounded();
diff --git a/tests/macro_comma_support.rs b/tests/macro_comma_support.rs
index 85871e9..3b082d2 100644
--- a/tests/macro_comma_support.rs
+++ b/tests/macro_comma_support.rs
@@ -14,6 +14,7 @@
     }))
 }
 
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
 #[test]
 fn poll() {
     use futures::poll;
diff --git a/tests/ready_queue.rs b/tests/ready_queue.rs
index 8290132..afba8f2 100644
--- a/tests/ready_queue.rs
+++ b/tests/ready_queue.rs
@@ -93,6 +93,9 @@
 
 #[test]
 fn stress() {
+    #[cfg(miri)]
+    const ITER: usize = 30;
+    #[cfg(not(miri))]
     const ITER: usize = 300;
 
     for i in 0..ITER {
diff --git a/tests/recurse.rs b/tests/recurse.rs
index d81753c..f06524f 100644
--- a/tests/recurse.rs
+++ b/tests/recurse.rs
@@ -3,6 +3,7 @@
 use std::sync::mpsc;
 use std::thread;
 
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
 #[test]
 fn lots() {
     #[cfg(not(futures_sanitizer))]
diff --git a/tests/sink.rs b/tests/sink.rs
index f3cf11b..dc826bd 100644
--- a/tests/sink.rs
+++ b/tests/sink.rs
@@ -288,6 +288,7 @@
 
 // test `flush` by using `with` to make the first insertion into a sink block
 // until a oneshot is completed
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
 #[test]
 fn with_flush() {
     let (tx, rx) = oneshot::channel();
diff --git a/tests/stream.rs b/tests/stream.rs
index 0d453d1..71ec654 100644
--- a/tests/stream.rs
+++ b/tests/stream.rs
@@ -1,10 +1,14 @@
+use std::iter;
+use std::sync::Arc;
+
 use futures::channel::mpsc;
 use futures::executor::block_on;
 use futures::future::{self, Future};
+use futures::lock::Mutex;
 use futures::sink::SinkExt;
 use futures::stream::{self, StreamExt};
 use futures::task::Poll;
-use futures::FutureExt;
+use futures::{ready, FutureExt};
 use futures_test::task::noop_context;
 
 #[test]
@@ -50,6 +54,272 @@
 }
 
 #[test]
+fn flatten_unordered() {
+    use futures::executor::block_on;
+    use futures::stream::*;
+    use futures::task::*;
+    use std::convert::identity;
+    use std::pin::Pin;
+    use std::thread;
+    use std::time::Duration;
+
+    struct DataStream {
+        data: Vec<u8>,
+        polled: bool,
+        wake_immediately: bool,
+    }
+
+    impl Stream for DataStream {
+        type Item = u8;
+
+        fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
+            if !self.polled {
+                if !self.wake_immediately {
+                    let waker = ctx.waker().clone();
+                    let sleep_time =
+                        Duration::from_millis(*self.data.first().unwrap_or(&0) as u64 / 10);
+                    thread::spawn(move || {
+                        thread::sleep(sleep_time);
+                        waker.wake_by_ref();
+                    });
+                } else {
+                    ctx.waker().wake_by_ref();
+                }
+                self.polled = true;
+                Poll::Pending
+            } else {
+                self.polled = false;
+                Poll::Ready(self.data.pop())
+            }
+        }
+    }
+
+    struct Interchanger {
+        polled: bool,
+        base: u8,
+        wake_immediately: bool,
+    }
+
+    impl Stream for Interchanger {
+        type Item = DataStream;
+
+        fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
+            if !self.polled {
+                self.polled = true;
+                if !self.wake_immediately {
+                    let waker = ctx.waker().clone();
+                    let sleep_time = Duration::from_millis(self.base as u64);
+                    thread::spawn(move || {
+                        thread::sleep(sleep_time);
+                        waker.wake_by_ref();
+                    });
+                } else {
+                    ctx.waker().wake_by_ref();
+                }
+                Poll::Pending
+            } else {
+                let data: Vec<_> = (0..6).rev().map(|v| v + self.base * 6).collect();
+                self.base += 1;
+                self.polled = false;
+                Poll::Ready(Some(DataStream {
+                    polled: false,
+                    data,
+                    wake_immediately: self.wake_immediately && self.base % 2 == 0,
+                }))
+            }
+        }
+    }
+
+    // basic behaviour
+    {
+        block_on(async {
+            let st = stream::iter(vec![
+                stream::iter(0..=4u8),
+                stream::iter(6..=10),
+                stream::iter(10..=12),
+            ]);
+
+            let fl_unordered = st.flatten_unordered(3).collect::<Vec<_>>().await;
+
+            assert_eq!(fl_unordered, vec![0, 6, 10, 1, 7, 11, 2, 8, 12, 3, 9, 4, 10]);
+        });
+
+        block_on(async {
+            let st = stream::iter(vec![
+                stream::iter(0..=4u8),
+                stream::iter(6..=10),
+                stream::iter(0..=2),
+            ]);
+
+            let mut fm_unordered = st
+                .flat_map_unordered(1, |s| s.filter(|v| futures::future::ready(v % 2 == 0)))
+                .collect::<Vec<_>>()
+                .await;
+
+            fm_unordered.sort_unstable();
+
+            assert_eq!(fm_unordered, vec![0, 0, 2, 2, 4, 6, 8, 10]);
+        });
+    }
+
+    // wake up immediately
+    {
+        block_on(async {
+            let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: true }
+                .take(10)
+                .map(|s| s.map(identity))
+                .flatten_unordered(10)
+                .collect::<Vec<_>>()
+                .await;
+
+            fl_unordered.sort_unstable();
+
+            assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>());
+        });
+
+        block_on(async {
+            let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: true }
+                .take(10)
+                .flat_map_unordered(10, |s| s.map(identity))
+                .collect::<Vec<_>>()
+                .await;
+
+            fm_unordered.sort_unstable();
+
+            assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
+        });
+    }
+
+    // wake up after delay
+    {
+        block_on(async {
+            let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: false }
+                .take(10)
+                .map(|s| s.map(identity))
+                .flatten_unordered(10)
+                .collect::<Vec<_>>()
+                .await;
+
+            fl_unordered.sort_unstable();
+
+            assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>());
+        });
+
+        block_on(async {
+            let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: false }
+                .take(10)
+                .flat_map_unordered(10, |s| s.map(identity))
+                .collect::<Vec<_>>()
+                .await;
+
+            fm_unordered.sort_unstable();
+
+            assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
+        });
+
+        block_on(async {
+            let (mut fm_unordered, mut fl_unordered) = futures_util::join!(
+                Interchanger { polled: false, base: 0, wake_immediately: false }
+                    .take(10)
+                    .flat_map_unordered(10, |s| s.map(identity))
+                    .collect::<Vec<_>>(),
+                Interchanger { polled: false, base: 0, wake_immediately: false }
+                    .take(10)
+                    .map(|s| s.map(identity))
+                    .flatten_unordered(10)
+                    .collect::<Vec<_>>()
+            );
+
+            fm_unordered.sort_unstable();
+            fl_unordered.sort_unstable();
+
+            assert_eq!(fm_unordered, fl_unordered);
+            assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
+        });
+    }
+
+    // waker panics
+    {
+        let stream = Arc::new(Mutex::new(
+            Interchanger { polled: false, base: 0, wake_immediately: true }
+                .take(10)
+                .flat_map_unordered(10, |s| s.map(identity)),
+        ));
+
+        struct PanicWaker;
+
+        impl ArcWake for PanicWaker {
+            fn wake_by_ref(_arc_self: &Arc<Self>) {
+                panic!("WAKE UP");
+            }
+        }
+
+        std::thread::spawn({
+            let stream = stream.clone();
+            move || {
+                let mut st = poll_fn(|cx| {
+                    let mut lock = ready!(stream.lock().poll_unpin(cx));
+
+                    let panic_waker = waker(Arc::new(PanicWaker));
+                    let mut panic_cx = Context::from_waker(&panic_waker);
+                    let _ = ready!(lock.poll_next_unpin(&mut panic_cx));
+
+                    Poll::Ready(Some(()))
+                });
+
+                block_on(st.next())
+            }
+        })
+        .join()
+        .unwrap_err();
+
+        block_on(async move {
+            let mut values: Vec<_> = stream.lock().await.by_ref().collect().await;
+            values.sort_unstable();
+
+            assert_eq!(values, (0..60).collect::<Vec<u8>>());
+        });
+    }
+
+    // stream panics
+    {
+        let st = stream::iter(iter::once(
+            once(Box::pin(async { panic!("Polled") })).left_stream::<DataStream>(),
+        ))
+        .chain(
+            Interchanger { polled: false, base: 0, wake_immediately: true }
+                .map(|stream| stream.right_stream())
+                .take(10),
+        );
+
+        let stream = Arc::new(Mutex::new(st.flatten_unordered(10)));
+
+        std::thread::spawn({
+            let stream = stream.clone();
+            move || {
+                let mut st = poll_fn(|cx| {
+                    let mut lock = ready!(stream.lock().poll_unpin(cx));
+                    let data = ready!(lock.poll_next_unpin(cx));
+
+                    Poll::Ready(data)
+                });
+
+                block_on(st.next())
+            }
+        })
+        .join()
+        .unwrap_err();
+
+        block_on(async move {
+            let mut values: Vec<_> = stream.lock().await.by_ref().collect().await;
+            values.sort_unstable();
+
+            assert_eq!(values, (0..60).collect::<Vec<u8>>());
+        });
+    }
+}
+
+#[test]
 fn take_until() {
     fn make_stop_fut(stop_on: u32) -> impl Future<Output = ()> {
         let mut i = 0;
diff --git a/tests/stream_futures_ordered.rs b/tests/stream_futures_ordered.rs
index 7506c65..84e0bcc 100644
--- a/tests/stream_futures_ordered.rs
+++ b/tests/stream_futures_ordered.rs
@@ -26,6 +26,7 @@
     assert_eq!(None, iter.next());
 }
 
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
 #[test]
 fn works_2() {
     let (a_tx, a_rx) = oneshot::channel::<i32>();
@@ -54,6 +55,7 @@
     assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1, 2, 3]);
 }
 
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
 #[test]
 fn queue_never_unblocked() {
     let (_a_tx, a_rx) = oneshot::channel::<Box<dyn Any + Send>>();
diff --git a/tests/stream_futures_unordered.rs b/tests/stream_futures_unordered.rs
index 4b9afcc..f62f733 100644
--- a/tests/stream_futures_unordered.rs
+++ b/tests/stream_futures_unordered.rs
@@ -56,6 +56,7 @@
     assert_eq!(None, iter.next());
 }
 
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
 #[test]
 fn works_2() {
     let (a_tx, a_rx) = oneshot::channel::<i32>();
@@ -85,6 +86,7 @@
     assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1, 2, 3]);
 }
 
+#[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
 #[test]
 fn finished_future() {
     let (_a_tx, a_rx) = oneshot::channel::<i32>();
diff --git a/tests/stream_try_stream.rs b/tests/stream_try_stream.rs
index 194e74d..d83fc54 100644
--- a/tests/stream_try_stream.rs
+++ b/tests/stream_try_stream.rs
@@ -1,3 +1,5 @@
+#![cfg(not(miri))] // https://github.com/rust-lang/miri/issues/1038
+
 use futures::{
     stream::{self, StreamExt, TryStreamExt},
     task::Poll,
diff --git a/tests/task_atomic_waker.rs b/tests/task_atomic_waker.rs
index cec3db2..2d1612a 100644
--- a/tests/task_atomic_waker.rs
+++ b/tests/task_atomic_waker.rs
@@ -6,6 +6,7 @@
 use std::sync::Arc;
 use std::thread;
 
+#[cfg_attr(miri, ignore)] // Miri is too slow
 #[test]
 fn basic() {
     let atomic_waker = Arc::new(AtomicWaker::new());