Upgrade rust/crates/tokio-stream to 0.1.3 am: cd448d6d4d

Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/tokio-stream/+/1582319

MUST ONLY BE SUBMITTED BY AUTOMERGER

Change-Id: I133a7e897f9e504b33e040bfa07d1790a84f7b96
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
new file mode 100644
index 0000000..4cca143
--- /dev/null
+++ b/.cargo_vcs_info.json
@@ -0,0 +1,5 @@
+{
+  "git": {
+    "sha1": "23fdc2b3c4b33300c3e8b44dcb126057d90f4935"
+  }
+}
diff --git a/Android.bp b/Android.bp
index 221394c..46c372a 100644
--- a/Android.bp
+++ b/Android.bp
@@ -22,13 +22,12 @@
 // dependent_library ["feature_list"]
 //   autocfg-1.0.1
 //   bytes-1.0.1 "default,std"
-//   cfg-if-0.1.10
 //   cfg-if-1.0.0
 //   futures-core-0.3.12 "alloc,default,std"
 //   instant-0.1.9
-//   libc-0.2.82 "align,default,std"
+//   libc-0.2.86 "align,default,std"
 //   lock_api-0.4.2
-//   log-0.4.13
+//   log-0.4.14
 //   memchr-2.3.4 "default,std"
 //   mio-0.7.7 "default,net,os-ext,os-poll,os-util,tcp,udp,uds"
 //   num_cpus-1.13.0
@@ -42,6 +41,6 @@
 //   signal-hook-registry-1.3.0
 //   smallvec-1.6.1
 //   syn-1.0.60 "clone-impls,default,derive,extra-traits,full,parsing,printing,proc-macro,quote,visit-mut"
-//   tokio-1.1.0 "bytes,default,fs,full,io-std,io-util,libc,macros,memchr,mio,net,num_cpus,once_cell,parking_lot,process,rt,rt-multi-thread,signal,signal-hook-registry,sync,test-util,time,tokio-macros,winapi"
-//   tokio-macros-1.0.0
+//   tokio-1.2.0 "bytes,default,fs,full,io-std,io-util,libc,macros,memchr,mio,net,num_cpus,once_cell,parking_lot,process,rt,rt-multi-thread,signal,signal-hook-registry,sync,test-util,time,tokio-macros,winapi"
+//   tokio-macros-1.1.0
 //   unicode-xid-0.2.1 "default"
diff --git a/CHANGELOG.md b/CHANGELOG.md
index bbb7d8c..34de724 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,8 +1,19 @@
+# 0.1.3 (February 5, 2021)
+
+Added
+
+ - sync: add wrapper for broadcast and watch ([#3384], [#3504])
+
+[#3384]: https://github.com/tokio-rs/tokio/pull/3384
+[#3504]: https://github.com/tokio-rs/tokio/pull/3504
+
 # 0.1.2 (January 12, 2021)
 
 Fixed
 
- - docs: fix some wrappers missing in documentation (#3378)
+ - docs: fix some wrappers missing in documentation ([#3378])
+
+[#3378]: https://github.com/tokio-rs/tokio/pull/3378
 
 # 0.1.1 (January 4, 2021)
 
diff --git a/Cargo.toml b/Cargo.toml
index 443228a..75ea961 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,11 +13,11 @@
 [package]
 edition = "2018"
 name = "tokio-stream"
-version = "0.1.2"
+version = "0.1.3"
 authors = ["Tokio Contributors <team@tokio.rs>"]
 description = "Utilities to work with `Stream` and `tokio`.\n"
 homepage = "https://tokio.rs"
-documentation = "https://docs.rs/tokio-stream/0.1.2/tokio_stream"
+documentation = "https://docs.rs/tokio-stream/0.1.3/tokio_stream"
 categories = ["asynchronous"]
 license = "MIT"
 repository = "https://github.com/tokio-rs/tokio"
@@ -33,6 +33,10 @@
 [dependencies.tokio]
 version = "1.0"
 features = ["sync"]
+
+[dependencies.tokio-util]
+version = "0.6.3"
+optional = true
 [dev-dependencies.async-stream]
 version = "0.3"
 
@@ -52,4 +56,5 @@
 fs = ["tokio/fs"]
 io-util = ["tokio/io-util"]
 net = ["tokio/net"]
+sync = ["tokio/sync", "tokio-util"]
 time = ["tokio/time"]
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index d662c38..6fd9032 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -7,13 +7,13 @@
 #   - Cargo.toml
 # - Update CHANGELOG.md.
 # - Create "tokio-stream-0.1.x" git tag.
-version = "0.1.2"
+version = "0.1.3"
 edition = "2018"
 authors = ["Tokio Contributors <team@tokio.rs>"]
 license = "MIT"
 repository = "https://github.com/tokio-rs/tokio"
 homepage = "https://tokio.rs"
-documentation = "https://docs.rs/tokio-stream/0.1.2/tokio_stream"
+documentation = "https://docs.rs/tokio-stream/0.1.3/tokio_stream"
 description = """
 Utilities to work with `Stream` and `tokio`.
 """
@@ -25,16 +25,18 @@
 net = ["tokio/net"]
 io-util = ["tokio/io-util"]
 fs = ["tokio/fs"]
+sync = ["tokio/sync", "tokio-util"]
 
 [dependencies]
 futures-core = { version = "0.3.0" }
 pin-project-lite = "0.2.0"
 tokio = { version = "1.0", features = ["sync"] }
+tokio-util = { version = "0.6.3", optional = true }
 
 [dev-dependencies]
 tokio = { version = "1.0", features = ["full", "test-util"] }
-tokio-test = { path = "../tokio-test" }
 async-stream = "0.3"
+tokio-test = { path = "../tokio-test" }
 futures = { version = "0.3", default-features = false }
 
 proptest = "0.10.0"
diff --git a/METADATA b/METADATA
index f0d384f..bb02dba 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@
   }
   url {
     type: ARCHIVE
-    value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.2.crate"
+    value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.3.crate"
   }
-  version: "0.1.2"
+  version: "0.1.3"
   license_type: NOTICE
   last_upgrade_date {
     year: 2021
-    month: 1
-    day: 21
+    month: 2
+    day: 9
   }
 }
diff --git a/src/lib.rs b/src/lib.rs
index 307a839..731e0e5 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,4 +1,4 @@
-#![doc(html_root_url = "https://docs.rs/tokio-stream/0.1.2")]
+#![doc(html_root_url = "https://docs.rs/tokio-stream/0.1.3")]
 #![allow(
     clippy::cognitive_complexity,
     clippy::large_enum_variant,
diff --git a/src/macros.rs b/src/macros.rs
index 39ad86c..d4a72c8 100644
--- a/src/macros.rs
+++ b/src/macros.rs
@@ -38,6 +38,16 @@
     }
 }
 
+macro_rules! cfg_sync {
+    ($($item:item)*) => {
+        $(
+            #[cfg(feature = "sync")]
+            #[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
+            $item
+        )*
+    }
+}
+
 macro_rules! ready {
     ($e:expr $(,)?) => {
         match $e {
diff --git a/src/wrappers.rs b/src/wrappers.rs
index c0ffb23..0e8ebdf 100644
--- a/src/wrappers.rs
+++ b/src/wrappers.rs
@@ -1,11 +1,26 @@
 //! Wrappers for Tokio types that implement `Stream`.
 
+/// Error types for the wrappers.
+pub mod errors {
+    cfg_sync! {
+        pub use crate::wrappers::broadcast::BroadcastStreamRecvError;
+    }
+}
+
 mod mpsc_bounded;
 pub use mpsc_bounded::ReceiverStream;
 
 mod mpsc_unbounded;
 pub use mpsc_unbounded::UnboundedReceiverStream;
 
+cfg_sync! {
+    mod broadcast;
+    pub use broadcast::BroadcastStream;
+
+    mod watch;
+    pub use watch::WatchStream;
+}
+
 cfg_time! {
     mod interval;
     pub use interval::IntervalStream;
diff --git a/src/wrappers/broadcast.rs b/src/wrappers/broadcast.rs
new file mode 100644
index 0000000..06a982d
--- /dev/null
+++ b/src/wrappers/broadcast.rs
@@ -0,0 +1,63 @@
+use std::pin::Pin;
+use tokio::sync::broadcast::error::RecvError;
+use tokio::sync::broadcast::Receiver;
+
+use futures_core::Stream;
+use tokio_util::sync::ReusableBoxFuture;
+
+use std::fmt;
+use std::task::{Context, Poll};
+
+/// A wrapper around [`tokio::sync::broadcast::Receiver`] that implements [`Stream`].
+///
+/// [`tokio::sync::broadcast::Receiver`]: struct@tokio::sync::broadcast::Receiver
+/// [`Stream`]: trait@crate::Stream
+#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
+pub struct BroadcastStream<T> {
+    inner: ReusableBoxFuture<(Result<T, RecvError>, Receiver<T>)>,
+}
+
+/// An error returned from the inner stream of a [`BroadcastStream`].
+#[derive(Debug, PartialEq)]
+pub enum BroadcastStreamRecvError {
+    /// The receiver lagged too far behind. Attempting to receive again will
+    /// return the oldest message still retained by the channel.
+    ///
+    /// Includes the number of skipped messages.
+    Lagged(u64),
+}
+
+async fn make_future<T: Clone>(mut rx: Receiver<T>) -> (Result<T, RecvError>, Receiver<T>) {
+    let result = rx.recv().await;
+    (result, rx)
+}
+
+impl<T: 'static + Clone + Send> BroadcastStream<T> {
+    /// Create a new `BroadcastStream`.
+    pub fn new(rx: Receiver<T>) -> Self {
+        Self {
+            inner: ReusableBoxFuture::new(make_future(rx)),
+        }
+    }
+}
+
+impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
+    type Item = Result<T, BroadcastStreamRecvError>;
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        let (result, rx) = ready!(self.inner.poll(cx));
+        self.inner.set(make_future(rx));
+        match result {
+            Ok(item) => Poll::Ready(Some(Ok(item))),
+            Err(RecvError::Closed) => Poll::Ready(None),
+            Err(RecvError::Lagged(n)) => {
+                Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(n))))
+            }
+        }
+    }
+}
+
+impl<T> fmt::Debug for BroadcastStream<T> {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("BroadcastStream").finish()
+    }
+}
diff --git a/src/wrappers/watch.rs b/src/wrappers/watch.rs
new file mode 100644
index 0000000..a98a72c
--- /dev/null
+++ b/src/wrappers/watch.rs
@@ -0,0 +1,61 @@
+use std::pin::Pin;
+use tokio::sync::watch::Receiver;
+
+use futures_core::Stream;
+use tokio_util::sync::ReusableBoxFuture;
+
+use std::fmt;
+use std::task::{Context, Poll};
+use tokio::sync::watch::error::RecvError;
+
+/// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`].
+///
+/// [`tokio::sync::watch::Receiver`]: struct@tokio::sync::watch::Receiver
+/// [`Stream`]: trait@crate::Stream
+#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
+pub struct WatchStream<T> {
+    inner: ReusableBoxFuture<(Result<(), RecvError>, Receiver<T>)>,
+}
+
+async fn make_future<T: Clone + Send + Sync>(
+    mut rx: Receiver<T>,
+) -> (Result<(), RecvError>, Receiver<T>) {
+    let result = rx.changed().await;
+    (result, rx)
+}
+
+impl<T: 'static + Clone + Unpin + Send + Sync> WatchStream<T> {
+    /// Create a new `WatchStream`.
+    pub fn new(rx: Receiver<T>) -> Self {
+        Self {
+            inner: ReusableBoxFuture::new(make_future(rx)),
+        }
+    }
+}
+
+impl<T: Clone + 'static + Send + Sync> Stream for WatchStream<T> {
+    type Item = T;
+
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        let (result, rx) = ready!(self.inner.poll(cx));
+        match result {
+            Ok(_) => {
+                let received = (*rx.borrow()).clone();
+                self.inner.set(make_future(rx));
+                Poll::Ready(Some(received))
+            }
+            Err(_) => {
+                self.inner.set(make_future(rx));
+                Poll::Ready(None)
+            }
+        }
+    }
+}
+
+impl<T> Unpin for WatchStream<T> {}
+
+impl<T> fmt::Debug for WatchStream<T> {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("WatchStream").finish()
+    }
+}