Upgrade rust/crates/tokio-stream to 0.1.6 am: 6ba6072f3d

Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/tokio-stream/+/1712600

Change-Id: I745c6471da63178118e2486705c1b2c164b4615f
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index 77b1c80..44a2cd9 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,5 @@
 {
   "git": {
-    "sha1": "b4918adbd8194e1ed7fd0c7fbe635b039b70c584"
+    "sha1": "aaa150d211a4f5c621369746da0804a3b786b861"
   }
 }
diff --git a/Android.bp b/Android.bp
index 4948062..5c46ebc 100644
--- a/Android.bp
+++ b/Android.bp
@@ -41,12 +41,12 @@
 //   autocfg-1.0.1
 //   bytes-1.0.1 "default,std"
 //   cfg-if-1.0.0
-//   futures-core-0.3.13 "alloc,default,std"
+//   futures-core-0.3.15 "alloc,default,std"
 //   instant-0.1.9
-//   libc-0.2.92 "default,std"
-//   lock_api-0.4.2
+//   libc-0.2.94 "default,std"
+//   lock_api-0.4.4
 //   log-0.4.14
-//   memchr-2.3.4 "default,std"
+//   memchr-2.4.0 "default,std"
 //   mio-0.7.11 "default,net,os-ext,os-poll,os-util,tcp,udp,uds"
 //   num_cpus-1.13.0
 //   once_cell-1.7.2 "alloc,default,race,std"
@@ -58,7 +58,7 @@
 //   scopeguard-1.1.0
 //   signal-hook-registry-1.3.0
 //   smallvec-1.6.1
-//   syn-1.0.68 "clone-impls,default,derive,extra-traits,full,parsing,printing,proc-macro,quote,visit-mut"
-//   tokio-1.4.0 "bytes,default,fs,full,io-std,io-util,libc,macros,memchr,mio,net,num_cpus,once_cell,parking_lot,process,rt,rt-multi-thread,signal,signal-hook-registry,sync,test-util,time,tokio-macros,winapi"
-//   tokio-macros-1.1.0
-//   unicode-xid-0.2.1 "default"
+//   syn-1.0.72 "clone-impls,default,derive,extra-traits,full,parsing,printing,proc-macro,quote,visit-mut"
+//   tokio-1.6.0 "bytes,default,fs,full,io-std,io-util,libc,macros,memchr,mio,net,num_cpus,once_cell,parking_lot,process,rt,rt-multi-thread,signal,signal-hook-registry,sync,test-util,time,tokio-macros,winapi"
+//   tokio-macros-1.2.0
+//   unicode-xid-0.2.2 "default"
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 59bb0fa..0c59804 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,16 @@
+# 0.1.6 (May 14, 2021)
+
+### Added
+
+- stream: implement `Error` and `Display` for `BroadcastStreamRecvError` ([#3745])
+
+### Fixed
+
+- stream: avoid yielding in `AllFuture` and `AnyFuture` ([#3625])
+
+[#3745]: https://github.com/tokio-rs/tokio/pull/3745
+[#3625]: https://github.com/tokio-rs/tokio/pull/3625
+
 # 0.1.5 (March 20, 2021)
 
 ### Fixed
diff --git a/Cargo.toml b/Cargo.toml
index c734987..10e6e4a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,11 +13,11 @@
 [package]
 edition = "2018"
 name = "tokio-stream"
-version = "0.1.5"
+version = "0.1.6"
 authors = ["Tokio Contributors <team@tokio.rs>"]
 description = "Utilities to work with `Stream` and `tokio`.\n"
 homepage = "https://tokio.rs"
-documentation = "https://docs.rs/tokio-stream/0.1.5/tokio_stream"
+documentation = "https://docs.rs/tokio-stream/0.1.6/tokio_stream"
 categories = ["asynchronous"]
 license = "MIT"
 repository = "https://github.com/tokio-rs/tokio"
@@ -45,7 +45,7 @@
 default-features = false
 
 [dev-dependencies.proptest]
-version = "0.10.0"
+version = "1"
 
 [dev-dependencies.tokio]
 version = "1.2.0"
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 8588325..5c95917 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -6,13 +6,13 @@
 #   - Cargo.toml
 # - Update CHANGELOG.md.
 # - Create "tokio-stream-0.1.x" git tag.
-version = "0.1.5"
+version = "0.1.6"
 edition = "2018"
 authors = ["Tokio Contributors <team@tokio.rs>"]
 license = "MIT"
 repository = "https://github.com/tokio-rs/tokio"
 homepage = "https://tokio.rs"
-documentation = "https://docs.rs/tokio-stream/0.1.5/tokio_stream"
+documentation = "https://docs.rs/tokio-stream/0.1.6/tokio_stream"
 description = """
 Utilities to work with `Stream` and `tokio`.
 """
@@ -31,7 +31,7 @@
 futures-core = { version = "0.3.0" }
 pin-project-lite = "0.2.0"
 tokio = { version = "1.2.0", path = "../tokio", features = ["sync"] }
-tokio-util = { version = "0.6.3", optional = true }
+tokio-util = { version = "0.6.3", path = "../tokio-util", optional = true }
 
 [dev-dependencies]
 tokio = { version = "1.2.0", path = "../tokio", features = ["full", "test-util"] }
@@ -39,7 +39,7 @@
 tokio-test = { path = "../tokio-test" }
 futures = { version = "0.3", default-features = false }
 
-proptest = "0.10.0"
+proptest = "1"
 
 [package.metadata.docs.rs]
 all-features = true
diff --git a/LICENSE b/LICENSE
index 243fcd6..ffa38bb 100644
--- a/LICENSE
+++ b/LICENSE
@@ -1,4 +1,4 @@
-Copyright (c) 2020 Tokio Contributors
+Copyright (c) 2021 Tokio Contributors
 
 Permission is hereby granted, free of charge, to any
 person obtaining a copy of this software and associated
diff --git a/METADATA b/METADATA
index 2dc18b1..17893f4 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@
   }
   url {
     type: ARCHIVE
-    value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.5.crate"
+    value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.6.crate"
   }
-  version: "0.1.5"
+  version: "0.1.6"
   license_type: NOTICE
   last_upgrade_date {
     year: 2021
-    month: 4
-    day: 2
+    month: 5
+    day: 19
   }
 }
diff --git a/src/stream_ext/all.rs b/src/stream_ext/all.rs
index 11573f9..b4dbc1e 100644
--- a/src/stream_ext/all.rs
+++ b/src/stream_ext/all.rs
@@ -38,18 +38,21 @@
 
     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
         let me = self.project();
-        let next = futures_core::ready!(Pin::new(me.stream).poll_next(cx));
+        let mut stream = Pin::new(me.stream);
 
-        match next {
-            Some(v) => {
-                if !(me.f)(v) {
-                    Poll::Ready(false)
-                } else {
-                    cx.waker().wake_by_ref();
-                    Poll::Pending
+        // Take a maximum of 32 items from the stream before yielding.
+        for _ in 0..32 {
+            match futures_core::ready!(stream.as_mut().poll_next(cx)) {
+                Some(v) => {
+                    if !(me.f)(v) {
+                        return Poll::Ready(false);
+                    }
                 }
+                None => return Poll::Ready(true),
             }
-            None => Poll::Ready(true),
         }
+
+        cx.waker().wake_by_ref();
+        Poll::Pending
     }
 }
diff --git a/src/stream_ext/any.rs b/src/stream_ext/any.rs
index 4c4c593..31394f2 100644
--- a/src/stream_ext/any.rs
+++ b/src/stream_ext/any.rs
@@ -38,18 +38,21 @@
 
     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
         let me = self.project();
-        let next = futures_core::ready!(Pin::new(me.stream).poll_next(cx));
+        let mut stream = Pin::new(me.stream);
 
-        match next {
-            Some(v) => {
-                if (me.f)(v) {
-                    Poll::Ready(true)
-                } else {
-                    cx.waker().wake_by_ref();
-                    Poll::Pending
+        // Take a maximum of 32 items from the stream before yielding.
+        for _ in 0..32 {
+            match futures_core::ready!(stream.as_mut().poll_next(cx)) {
+                Some(v) => {
+                    if (me.f)(v) {
+                        return Poll::Ready(true);
+                    }
                 }
+                None => return Poll::Ready(false),
             }
-            None => Poll::Ready(false),
         }
+
+        cx.waker().wake_by_ref();
+        Poll::Pending
     }
 }
diff --git a/src/stream_map.rs b/src/stream_map.rs
index 85b60cf..7fc136f 100644
--- a/src/stream_map.rs
+++ b/src/stream_map.rs
@@ -605,10 +605,10 @@
     /// Fast random number generate
     ///
     /// Implement xorshift64+: 2 32-bit xorshift sequences added together.
-    /// Shift triplet [17,7,16] was calculated as indicated in Marsaglia's
-    /// Xorshift paper: https://www.jstatsoft.org/article/view/v008i14/xorshift.pdf
+    /// Shift triplet `[17,7,16]` was calculated as indicated in Marsaglia's
+    /// Xorshift paper: <https://www.jstatsoft.org/article/view/v008i14/xorshift.pdf>
     /// This generator passes the SmallCrush suite, part of TestU01 framework:
-    /// http://simul.iro.umontreal.ca/testu01/tu01.html
+    /// <http://simul.iro.umontreal.ca/testu01/tu01.html>
     #[derive(Debug)]
     pub(crate) struct FastRand {
         one: Cell<u32>,
diff --git a/src/wrappers/broadcast.rs b/src/wrappers/broadcast.rs
index 06a982d..3bddbb7 100644
--- a/src/wrappers/broadcast.rs
+++ b/src/wrappers/broadcast.rs
@@ -27,6 +27,16 @@
     Lagged(u64),
 }
 
+impl fmt::Display for BroadcastStreamRecvError {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        match self {
+            BroadcastStreamRecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
+        }
+    }
+}
+
+impl std::error::Error for BroadcastStreamRecvError {}
+
 async fn make_future<T: Clone>(mut rx: Receiver<T>) -> (Result<T, RecvError>, Receiver<T>) {
     let result = rx.recv().await;
     (result, rx)