Upgrade rust/crates/futures-channel to 0.3.13 am: 7980c2c7f6 am: 55a47278c9 am: c3055f4268

Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/futures-channel/+/1662920

Change-Id: I79dac153ca76e3c5d5dabe79f80eade45669a4b0
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index 4fd4ba3..f3ad3ab 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,5 @@
 {
   "git": {
-    "sha1": "1d53a29ec16ccd5b094fb205edb73591455eb4b6"
+    "sha1": "c91f8691672c7401b1923ab00bf138975c99391a"
   }
 }
diff --git a/Android.bp b/Android.bp
index 355f9d8..eb938c1 100644
--- a/Android.bp
+++ b/Android.bp
@@ -1,4 +1,5 @@
 // This file is generated by cargo2android.py --run --dependencies --device --patch=patches/Android.bp.patch.
+// Do not modify this file as changes will be overridden on upgrade.
 
 package {
     default_applicable_licenses: [
@@ -61,4 +62,4 @@
 }
 
 // dependent_library ["feature_list"]
-//   futures-core-0.3.12 "alloc,std"
+//   futures-core-0.3.13 "alloc,std"
diff --git a/Cargo.toml b/Cargo.toml
index 494deb5..30ee771 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,7 +13,7 @@
 [package]
 edition = "2018"
 name = "futures-channel"
-version = "0.3.12"
+version = "0.3.13"
 authors = ["Alex Crichton <alex@alexcrichton.com>"]
 description = "Channels for asynchronous communication using futures-rs.\n"
 homepage = "https://rust-lang.github.io/futures-rs"
@@ -24,11 +24,11 @@
 all-features = true
 rustdoc-args = ["--cfg", "docsrs"]
 [dependencies.futures-core]
-version = "0.3.12"
+version = "0.3.13"
 default-features = false
 
 [dependencies.futures-sink]
-version = "0.3.12"
+version = "0.3.13"
 optional = true
 default-features = false
 
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index 5b4c639..9a33320 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -1,7 +1,7 @@
 [package]
 name = "futures-channel"
 edition = "2018"
-version = "0.3.12"
+version = "0.3.13"
 authors = ["Alex Crichton <alex@alexcrichton.com>"]
 license = "MIT OR Apache-2.0"
 repository = "https://github.com/rust-lang/futures-rs"
@@ -24,8 +24,8 @@
 cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic"]
 
 [dependencies]
-futures-core = { path = "../futures-core", version = "0.3.12", default-features = false }
-futures-sink = { path = "../futures-sink", version = "0.3.12", default-features = false, optional = true }
+futures-core = { path = "../futures-core", version = "0.3.13", default-features = false }
+futures-sink = { path = "../futures-sink", version = "0.3.13", default-features = false, optional = true }
 
 [dev-dependencies]
 futures = { path = "../futures", default-features = true }
diff --git a/METADATA b/METADATA
index 7481020..b53d840 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@
   }
   url {
     type: ARCHIVE
-    value: "https://static.crates.io/crates/futures-channel/futures-channel-0.3.12.crate"
+    value: "https://static.crates.io/crates/futures-channel/futures-channel-0.3.13.crate"
   }
-  version: "0.3.12"
+  version: "0.3.13"
   license_type: NOTICE
   last_upgrade_date {
     year: 2021
-    month: 2
-    day: 9
+    month: 4
+    day: 1
   }
 }
diff --git a/TEST_MAPPING b/TEST_MAPPING
index 7e10dd0..6798806 100644
--- a/TEST_MAPPING
+++ b/TEST_MAPPING
@@ -2,7 +2,55 @@
 {
   "presubmit": [
     {
+      "name": "anyhow_device_test_tests_test_boxed"
+    },
+    {
+      "name": "anyhow_device_test_tests_test_convert"
+    },
+    {
+      "name": "anyhow_device_test_tests_test_ffi"
+    },
+    {
+      "name": "anyhow_device_test_tests_test_repr"
+    },
+    {
+      "name": "tokio-test_device_test_tests_block_on"
+    },
+    {
+      "name": "anyhow_device_test_tests_test_chain"
+    },
+    {
+      "name": "anyhow_device_test_tests_test_source"
+    },
+    {
+      "name": "tokio-test_device_test_tests_io"
+    },
+    {
+      "name": "anyhow_device_test_tests_test_autotrait"
+    },
+    {
+      "name": "anyhow_device_test_src_lib"
+    },
+    {
+      "name": "anyhow_device_test_tests_test_context"
+    },
+    {
+      "name": "anyhow_device_test_tests_test_downcast"
+    },
+    {
+      "name": "anyhow_device_test_tests_test_macros"
+    },
+    {
       "name": "futures-util_device_test_src_lib"
+    },
+    {
+      "name": "anyhow_device_test_tests_test_fmt"
+    },
+    {
+      "name": "tokio-test_device_test_tests_macros"
+    },
+    {
+      "name": "tokio-test_device_test_src_lib"
     }
   ]
 }
diff --git a/src/mpsc/mod.rs b/src/mpsc/mod.rs
index 494c97b..dd50343 100644
--- a/src/mpsc/mod.rs
+++ b/src/mpsc/mod.rs
@@ -86,6 +86,7 @@
 use std::sync::{Arc, Mutex};
 use std::sync::atomic::AtomicUsize;
 use std::sync::atomic::Ordering::SeqCst;
+use std::thread;
 
 use crate::mpsc::queue::Queue;
 
@@ -1047,7 +1048,12 @@
             }
             None => {
                 let state = decode_state(inner.state.load(SeqCst));
-                if state.is_open || state.num_messages != 0 {
+                if state.is_closed() {
+                    // If closed flag is set AND there are no pending messages
+                    // it means end of stream
+                    self.inner = None;
+                    Poll::Ready(None)
+                } else {
                     // If queue is open, we need to return Pending
                     // to be woken up when new messages arrive.
                     // If queue is closed but num_messages is non-zero,
@@ -1056,11 +1062,6 @@
                     // so we need to park until sender unparks the task
                     // after queueing the message.
                     Poll::Pending
-                } else {
-                    // If closed flag is set AND there are no pending messages
-                    // it means end of stream
-                    self.inner = None;
-                    Poll::Ready(None)
                 }
             }
         }
@@ -1126,8 +1127,26 @@
         // Drain the channel of all pending messages
         self.close();
         if self.inner.is_some() {
-            while let Poll::Ready(Some(..)) = self.next_message() {
-                // ...
+            loop {
+                match self.next_message() {
+                    Poll::Ready(Some(_)) => {}
+                    Poll::Ready(None) => break,
+                    Poll::Pending => {
+                        let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
+
+                        // If the channel is closed, then there is no need to park.
+                        if state.is_closed() {
+                            break;
+                        }
+
+                        // TODO: Spinning isn't ideal, it might be worth
+                        // investigating using a condvar or some other strategy
+                        // here. That said, if this case is hit, then another thread
+                        // is about to push the value into the queue and this isn't
+                        // the only spinlock in the impl right now.
+                        thread::yield_now();
+                    }
+                }
             }
         }
     }
@@ -1173,7 +1192,12 @@
             }
             None => {
                 let state = decode_state(inner.state.load(SeqCst));
-                if state.is_open || state.num_messages != 0 {
+                if state.is_closed() {
+                    // If closed flag is set AND there are no pending messages
+                    // it means end of stream
+                    self.inner = None;
+                    Poll::Ready(None)
+                } else {
                     // If queue is open, we need to return Pending
                     // to be woken up when new messages arrive.
                     // If queue is closed but num_messages is non-zero,
@@ -1182,11 +1206,6 @@
                     // so we need to park until sender unparks the task
                     // after queueing the message.
                     Poll::Pending
-                } else {
-                    // If closed flag is set AND there are no pending messages
-                    // it means end of stream
-                    self.inner = None;
-                    Poll::Ready(None)
                 }
             }
         }
@@ -1240,8 +1259,26 @@
         // Drain the channel of all pending messages
         self.close();
         if self.inner.is_some() {
-            while let Poll::Ready(Some(..)) = self.next_message() {
-                // ...
+            loop {
+                match self.next_message() {
+                    Poll::Ready(Some(_)) => {}
+                    Poll::Ready(None) => break,
+                    Poll::Pending => {
+                        let state = decode_state(self.inner.as_ref().unwrap().state.load(SeqCst));
+
+                        // If the channel is closed, then there is no need to park.
+                        if state.is_closed() {
+                            break;
+                        }
+
+                        // TODO: Spinning isn't ideal, it might be worth
+                        // investigating using a condvar or some other strategy
+                        // here. That said, if this case is hit, then another thread
+                        // is about to push the value into the queue and this isn't
+                        // the only spinlock in the impl right now.
+                        thread::yield_now();
+                    }
+                }
             }
         }
     }
@@ -1289,6 +1326,12 @@
 unsafe impl<T: Send> Send for BoundedInner<T> {}
 unsafe impl<T: Send> Sync for BoundedInner<T> {}
 
+impl State {
+    fn is_closed(&self) -> bool {
+        !self.is_open && self.num_messages == 0
+    }
+}
+
 /*
  *
  * ===== Helpers =====
diff --git a/tests/mpsc-close.rs b/tests/mpsc-close.rs
index 50852eb..9eb5296 100644
--- a/tests/mpsc-close.rs
+++ b/tests/mpsc-close.rs
@@ -1,9 +1,13 @@
 use futures::channel::mpsc;
 use futures::executor::block_on;
+use futures::future::Future;
 use futures::sink::SinkExt;
 use futures::stream::StreamExt;
-use std::sync::Arc;
+use futures::task::{Context, Poll};
+use std::pin::Pin;
+use std::sync::{Arc, Weak};
 use std::thread;
+use std::time::{Duration, Instant};
 
 #[test]
 fn smoke() {
@@ -142,3 +146,133 @@
         assert!(sender.is_closed());
     }
 }
+
+// Stress test that `try_send()`s occurring concurrently with receiver
+// close/drops don't appear as successful sends.
+#[test]
+fn stress_try_send_as_receiver_closes() {
+    const AMT: usize = 10000;
+    // To provide variable timing characteristics (in the hopes of
+    // reproducing the collision that leads to a race), we busy-re-poll
+    // the test MPSC receiver a variable number of times before actually
+    // stopping.  We vary this countdown between 1 and the following
+    // value.
+    const MAX_COUNTDOWN: usize = 20;
+    // When we detect that a successfully sent item is still in the
+    // queue after a disconnect, we spin for up to 100ms to confirm that
+    // it is a persistent condition and not a concurrency illusion.
+    const SPIN_TIMEOUT_S: u64 = 10;
+    const SPIN_SLEEP_MS: u64 = 10;
+    struct TestRx {
+        rx: mpsc::Receiver<Arc<()>>,
+        // The number of times to query `rx` before dropping it.
+        poll_count: usize
+    }
+    struct TestTask {
+        command_rx: mpsc::Receiver<TestRx>,
+        test_rx: Option<mpsc::Receiver<Arc<()>>>,
+        countdown: usize,
+    }
+    impl TestTask {
+        /// Create a new TestTask
+        fn new() -> (TestTask, mpsc::Sender<TestRx>) {
+            let (command_tx, command_rx) = mpsc::channel::<TestRx>(0);
+            (
+                TestTask {
+                    command_rx,
+                    test_rx: None,
+                    countdown: 0, // 0 means no countdown is in progress.
+                },
+                command_tx,
+            )
+        }
+    }
+    impl Future for TestTask {
+        type Output = ();
+
+        fn poll(
+            mut self: Pin<&mut Self>,
+            cx: &mut Context<'_>,
+        ) -> Poll<Self::Output> {
+            // Poll the test channel, if one is present.
+            if let Some(rx) = &mut self.test_rx {
+                if let Poll::Ready(v) = rx.poll_next_unpin(cx) {
+                   let _ = v.expect("test finished unexpectedly!");
+                }
+                self.countdown -= 1;
+                // Busy-poll until the countdown is finished.
+                cx.waker().wake_by_ref();
+            }
+            // Accept any newly submitted MPSC channels for testing.
+            match self.command_rx.poll_next_unpin(cx) {
+                Poll::Ready(Some(TestRx { rx, poll_count })) => {
+                    self.test_rx = Some(rx);
+                    self.countdown = poll_count;
+                    cx.waker().wake_by_ref();
+                },
+                Poll::Ready(None) => return Poll::Ready(()),
+                Poll::Pending => {},
+            }
+            if self.countdown == 0 {
+                // Countdown complete -- drop the Receiver.
+                self.test_rx = None;
+            }
+            Poll::Pending
+        }
+    }
+    let (f, mut cmd_tx) = TestTask::new();
+    let bg = thread::spawn(move || block_on(f));
+    for i in 0..AMT {
+        let (mut test_tx, rx) = mpsc::channel(0);
+        let poll_count = i % MAX_COUNTDOWN;
+        cmd_tx.try_send(TestRx { rx, poll_count }).unwrap();
+        let mut prev_weak: Option<Weak<()>> = None;
+        let mut attempted_sends = 0;
+        let mut successful_sends = 0;
+        loop {
+            // Create a test item.
+            let item = Arc::new(());
+            let weak = Arc::downgrade(&item);
+            match test_tx.try_send(item) {
+                Ok(_) => {
+                    prev_weak = Some(weak);
+                    successful_sends += 1;
+                }
+                Err(ref e) if e.is_full() => {}
+                Err(ref e) if e.is_disconnected() => {
+                    // Test for evidence of the race condition.
+                    if let Some(prev_weak) = prev_weak {
+                        if prev_weak.upgrade().is_some() {
+                            // The previously sent item is still allocated.
+                            // However, there appears to be some aspect of the
+                            // concurrency that can legitimately cause the Arc
+                            // to be momentarily valid.  Spin for up to 100ms
+                            // waiting for the previously sent item to be
+                            // dropped.
+                            let t0 = Instant::now();
+                            let mut spins = 0;
+                            loop {
+                                if prev_weak.upgrade().is_none() {
+                                    break;
+                                }
+                                assert!(t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S),
+                                    "item not dropped on iteration {} after \
+                                     {} sends ({} successful). spin=({})",
+                                    i, attempted_sends, successful_sends, spins
+                                );
+                                spins += 1;
+                                thread::sleep(Duration::from_millis(SPIN_SLEEP_MS));
+                            }
+                        }
+                    }
+                    break;
+                }
+                Err(ref e) => panic!("unexpected error: {}", e),
+            }
+            attempted_sends += 1;
+        }
+    }
+    drop(cmd_tx);
+    bg.join()
+        .expect("background thread join");
+}