Upgrade rust/crates/tokio-stream to 0.1.7 am: bd64f25cb3

Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/tokio-stream/+/1790954

Change-Id: I0a1fa2b834256a4813632667956adba28e43aec2
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index 44a2cd9..749a519 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,5 @@
 {
   "git": {
-    "sha1": "aaa150d211a4f5c621369746da0804a3b786b861"
+    "sha1": "5d61c997e9281e00f35c7ea426786996945f47f5"
   }
 }
diff --git a/Android.bp b/Android.bp
index 5c46ebc..b9bf77c 100644
--- a/Android.bp
+++ b/Android.bp
@@ -41,24 +41,24 @@
 //   autocfg-1.0.1
 //   bytes-1.0.1 "default,std"
 //   cfg-if-1.0.0
-//   futures-core-0.3.15 "alloc,default,std"
-//   instant-0.1.9
-//   libc-0.2.94 "default,std"
+//   futures-core-0.3.16 "alloc,default,std"
+//   instant-0.1.10
+//   libc-0.2.98 "default,std"
 //   lock_api-0.4.4
 //   log-0.4.14
 //   memchr-2.4.0 "default,std"
-//   mio-0.7.11 "default,net,os-ext,os-poll,os-util,tcp,udp,uds"
+//   mio-0.7.13 "default,net,os-ext,os-poll,os-util,tcp,udp,uds"
 //   num_cpus-1.13.0
-//   once_cell-1.7.2 "alloc,default,race,std"
+//   once_cell-1.8.0 "alloc,default,race,std"
 //   parking_lot-0.11.1 "default"
 //   parking_lot_core-0.8.3
-//   pin-project-lite-0.2.6
-//   proc-macro2-1.0.26 "default,proc-macro"
+//   pin-project-lite-0.2.7
+//   proc-macro2-1.0.28 "default,proc-macro"
 //   quote-1.0.9 "default,proc-macro"
 //   scopeguard-1.1.0
-//   signal-hook-registry-1.3.0
+//   signal-hook-registry-1.4.0
 //   smallvec-1.6.1
-//   syn-1.0.72 "clone-impls,default,derive,extra-traits,full,parsing,printing,proc-macro,quote,visit-mut"
-//   tokio-1.6.0 "bytes,default,fs,full,io-std,io-util,libc,macros,memchr,mio,net,num_cpus,once_cell,parking_lot,process,rt,rt-multi-thread,signal,signal-hook-registry,sync,test-util,time,tokio-macros,winapi"
-//   tokio-macros-1.2.0
+//   syn-1.0.74 "clone-impls,default,derive,extra-traits,full,parsing,printing,proc-macro,quote,visit-mut"
+//   tokio-1.9.0 "bytes,default,fs,full,io-std,io-util,libc,macros,memchr,mio,net,num_cpus,once_cell,parking_lot,process,rt,rt-multi-thread,signal,signal-hook-registry,sync,test-util,time,tokio-macros,winapi"
+//   tokio-macros-1.3.0
 //   unicode-xid-0.2.2 "default"
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0c59804..a0cdef0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,13 @@
+# 0.1.7 (July 7, 2021)
+
+### Fixed
+
+- sync: fix watch wrapper ([#3914])
+- time: fix `Timeout::size_hint` ([#3902])
+
+[#3902]: https://github.com/tokio-rs/tokio/pull/3902
+[#3914]: https://github.com/tokio-rs/tokio/pull/3914
+
 # 0.1.6 (May 14, 2021)
 
 ### Added
diff --git a/Cargo.toml b/Cargo.toml
index 10e6e4a..5a8627a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,11 +13,11 @@
 [package]
 edition = "2018"
 name = "tokio-stream"
-version = "0.1.6"
+version = "0.1.7"
 authors = ["Tokio Contributors <team@tokio.rs>"]
 description = "Utilities to work with `Stream` and `tokio`.\n"
 homepage = "https://tokio.rs"
-documentation = "https://docs.rs/tokio-stream/0.1.6/tokio_stream"
+documentation = "https://docs.rs/tokio-stream/0.1.7/tokio_stream"
 categories = ["asynchronous"]
 license = "MIT"
 repository = "https://github.com/tokio-rs/tokio"
@@ -31,7 +31,7 @@
 version = "0.2.0"
 
 [dependencies.tokio]
-version = "1.2.0"
+version = "1.8.0"
 features = ["sync"]
 
 [dependencies.tokio-util]
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 5c95917..911657c 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -6,13 +6,13 @@
 #   - Cargo.toml
 # - Update CHANGELOG.md.
 # - Create "tokio-stream-0.1.x" git tag.
-version = "0.1.6"
+version = "0.1.7"
 edition = "2018"
 authors = ["Tokio Contributors <team@tokio.rs>"]
 license = "MIT"
 repository = "https://github.com/tokio-rs/tokio"
 homepage = "https://tokio.rs"
-documentation = "https://docs.rs/tokio-stream/0.1.6/tokio_stream"
+documentation = "https://docs.rs/tokio-stream/0.1.7/tokio_stream"
 description = """
 Utilities to work with `Stream` and `tokio`.
 """
@@ -30,7 +30,7 @@
 [dependencies]
 futures-core = { version = "0.3.0" }
 pin-project-lite = "0.2.0"
-tokio = { version = "1.2.0", path = "../tokio", features = ["sync"] }
+tokio = { version = "1.8.0", path = "../tokio", features = ["sync"] }
 tokio-util = { version = "0.6.3", path = "../tokio-util", optional = true }
 
 [dev-dependencies]
diff --git a/METADATA b/METADATA
index 17893f4..8693d16 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@
   }
   url {
     type: ARCHIVE
-    value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.6.crate"
+    value: "https://static.crates.io/crates/tokio-stream/tokio-stream-0.1.7.crate"
   }
-  version: "0.1.6"
+  version: "0.1.7"
   license_type: NOTICE
   last_upgrade_date {
     year: 2021
-    month: 5
-    day: 19
+    month: 8
+    day: 9
   }
 }
diff --git a/src/lib.rs b/src/lib.rs
index af99488..b7f232f 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -10,7 +10,7 @@
     unreachable_pub
 )]
 #![cfg_attr(docsrs, feature(doc_cfg))]
-#![cfg_attr(docsrs, deny(broken_intra_doc_links))]
+#![cfg_attr(docsrs, deny(rustdoc::broken_intra_doc_links))]
 #![doc(test(
     no_crate_inject,
     attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
diff --git a/src/stream_ext.rs b/src/stream_ext.rs
index 51532ee..1157c9e 100644
--- a/src/stream_ext.rs
+++ b/src/stream_ext.rs
@@ -515,7 +515,7 @@
     /// Skip elements from the underlying stream while the provided predicate
     /// resolves to `true`.
     ///
-    /// This function, like [`Iterator::skip_while`], will ignore elemets from the
+    /// This function, like [`Iterator::skip_while`], will ignore elements from the
     /// stream until the predicate `f` resolves to `false`. Once one element
     /// returns false, the rest of the elements will be yielded.
     ///
diff --git a/src/stream_ext/collect.rs b/src/stream_ext/collect.rs
index 23f48b0..a33a6d6 100644
--- a/src/stream_ext/collect.rs
+++ b/src/stream_ext/collect.rs
@@ -113,7 +113,7 @@
     }
 
     fn finalize(_: sealed::Internal, collection: &mut String) -> String {
-        mem::replace(collection, String::new())
+        mem::take(collection)
     }
 }
 
@@ -132,7 +132,7 @@
     }
 
     fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Vec<T> {
-        mem::replace(collection, vec![])
+        mem::take(collection)
     }
 }
 
diff --git a/src/stream_ext/timeout.rs b/src/stream_ext/timeout.rs
index de17dc0..98d7cd5 100644
--- a/src/stream_ext/timeout.rs
+++ b/src/stream_ext/timeout.rs
@@ -69,7 +69,18 @@
     }
 
     fn size_hint(&self) -> (usize, Option<usize>) {
-        self.stream.size_hint()
+        let (lower, upper) = self.stream.size_hint();
+
+        // The timeout stream may insert an error before and after each message
+        // from the underlying stream, but no more than one error between each
+        // message. Hence the upper bound is computed as 2x+1.
+
+        // Using a helper function to enable use of question mark operator.
+        fn twice_plus_one(value: Option<usize>) -> Option<usize> {
+            value?.checked_mul(2)?.checked_add(1)
+        }
+
+        (lower, twice_plus_one(upper))
     }
 }
 
diff --git a/src/stream_map.rs b/src/stream_map.rs
index 7fc136f..9dc529a 100644
--- a/src/stream_map.rs
+++ b/src/stream_map.rs
@@ -364,11 +364,11 @@
     /// # Examples
     ///
     /// ```
-    /// use std::collections::HashMap;
+    /// use tokio_stream::{StreamMap, pending};
     ///
-    /// let mut a = HashMap::new();
+    /// let mut a = StreamMap::new();
     /// assert!(a.is_empty());
-    /// a.insert(1, "a");
+    /// a.insert(1, pending::<i32>());
     /// assert!(!a.is_empty());
     /// ```
     pub fn is_empty(&self) -> bool {
diff --git a/src/wrappers/watch.rs b/src/wrappers/watch.rs
index 0ffd1b8..1daca10 100644
--- a/src/wrappers/watch.rs
+++ b/src/wrappers/watch.rs
@@ -11,7 +11,7 @@
 /// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`].
 ///
 /// This stream will always start by yielding the current value when the WatchStream is polled,
-/// regardles of whether it was the initial value or sent afterwards.
+/// regardless of whether it was the initial value or sent afterwards.
 ///
 /// # Examples
 ///
@@ -72,10 +72,10 @@
     type Item = T;
 
     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
-        let (result, rx) = ready!(self.inner.poll(cx));
+        let (result, mut rx) = ready!(self.inner.poll(cx));
         match result {
             Ok(_) => {
-                let received = (*rx.borrow()).clone();
+                let received = (*rx.borrow_and_update()).clone();
                 self.inner.set(make_future(rx));
                 Poll::Ready(Some(received))
             }
diff --git a/tests/async_send_sync.rs b/tests/async_send_sync.rs
index c06bebd..f1c8b4e 100644
--- a/tests/async_send_sync.rs
+++ b/tests/async_send_sync.rs
@@ -1,3 +1,5 @@
+#![allow(clippy::diverging_sub_expression)]
+
 use std::rc::Rc;
 
 #[allow(dead_code)]
diff --git a/tests/stream_chain.rs b/tests/stream_chain.rs
index 759de30..f3b7edb 100644
--- a/tests/stream_chain.rs
+++ b/tests/stream_chain.rs
@@ -89,12 +89,12 @@
         }
 
         fn size_hint(&self) -> (usize, Option<usize>) {
-            (usize::max_value(), Some(usize::max_value()))
+            (usize::MAX, Some(usize::MAX))
         }
     }
 
     let m1 = Monster;
     let m2 = Monster;
     let m = m1.chain(m2);
-    assert_eq!(m.size_hint(), (usize::max_value(), None));
+    assert_eq!(m.size_hint(), (usize::MAX, None));
 }
diff --git a/tests/stream_merge.rs b/tests/stream_merge.rs
index 69cd568..f603bcc 100644
--- a/tests/stream_merge.rs
+++ b/tests/stream_merge.rs
@@ -72,12 +72,12 @@
         }
 
         fn size_hint(&self) -> (usize, Option<usize>) {
-            (usize::max_value(), Some(usize::max_value()))
+            (usize::MAX, Some(usize::MAX))
         }
     }
 
     let m1 = Monster;
     let m2 = Monster;
     let m = m1.merge(m2);
-    assert_eq!(m.size_hint(), (usize::max_value(), None));
+    assert_eq!(m.size_hint(), (usize::MAX, None));
 }
diff --git a/tests/watch.rs b/tests/watch.rs
new file mode 100644
index 0000000..92bcdf4
--- /dev/null
+++ b/tests/watch.rs
@@ -0,0 +1,27 @@
+use tokio::sync::watch;
+use tokio_stream::wrappers::WatchStream;
+use tokio_stream::StreamExt;
+
+#[tokio::test]
+async fn message_not_twice() {
+    let (tx, rx) = watch::channel("hello");
+
+    let mut counter = 0;
+    let mut stream = WatchStream::new(rx).map(move |payload| {
+        println!("{}", payload);
+        if payload == "goodbye" {
+            counter += 1;
+        }
+        if counter >= 2 {
+            panic!("too many goodbyes");
+        }
+    });
+
+    let task = tokio::spawn(async move { while stream.next().await.is_some() {} });
+
+    // Send goodbye just once
+    tx.send("goodbye").unwrap();
+
+    drop(tx);
+    task.await.unwrap();
+}