Merge remote-tracking branch 'origin/upstream' am: 93e24a1529 am: 3c3c2a56ab

Original change: undetermined

Change-Id: I96ea8b8846bfad4cb15a1906407f7a5fcae6da48
Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
diff --git a/Android.bp b/Android.bp
new file mode 100644
index 0000000..a2f60c6
--- /dev/null
+++ b/Android.bp
@@ -0,0 +1,32 @@
+// This file is generated by cargo_embargo.
+// Do not modify this file after the first "rust_*" or "genrule" module
+// because the changes will be overridden on upgrade.
+// Content before the first "rust_*" or "genrule" module is preserved.
+
+package {
+    default_applicable_licenses: ["external_rust_crates_libwant_license"],
+}
+
+license {
+    name: "external_rust_crates_libwant_license",
+    visibility: [":__subpackages__"],
+    license_kinds: ["SPDX-license-identifier-MIT"],
+    license_text: ["LICENSE"],
+}
+
+rust_library {
+    name: "libwant",
+    host_supported: true,
+    crate_name: "want",
+    cargo_env_compat: true,
+    cargo_pkg_version: "0.3.1",
+    crate_root: "src/lib.rs",
+    edition: "2018",
+    rustlibs: ["libtry_lock"],
+    apex_available: [
+        "//apex_available:platform",
+        "//apex_available:anyapex",
+    ],
+    product_available: true,
+    vendor_available: true,
+}
diff --git a/Cargo.toml b/Cargo.toml
new file mode 100644
index 0000000..7559f79
--- /dev/null
+++ b/Cargo.toml
@@ -0,0 +1,35 @@
+# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO
+#
+# When uploading crates to the registry Cargo will automatically
+# "normalize" Cargo.toml files for maximal compatibility
+# with all versions of Cargo and also rewrite `path` dependencies
+# to registry (e.g., crates.io) dependencies.
+#
+# If you are reading this file be aware that the original Cargo.toml
+# will likely look very different (and much more reasonable).
+# See Cargo.toml.orig for the original contents.
+
+[package]
+edition = "2018"
+name = "want"
+version = "0.3.1"
+authors = ["Sean McArthur <sean@seanmonstar.com>"]
+description = "Detect when another Future wants a result."
+documentation = "https://docs.rs/want"
+readme = "README.md"
+keywords = [
+    "futures",
+    "channel",
+    "async",
+]
+license = "MIT"
+repository = "https://github.com/seanmonstar/want"
+
+[dependencies.try-lock]
+version = "0.2.4"
+
+[dev-dependencies.tokio-executor]
+version = "0.2.0-alpha.2"
+
+[dev-dependencies.tokio-sync]
+version = "0.2.0-alpha.2"
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
new file mode 100644
index 0000000..0e33387
--- /dev/null
+++ b/Cargo.toml.orig
@@ -0,0 +1,17 @@
+[package]
+name = "want"
+version = "0.3.1" # remember to update html_root_url
+description = "Detect when another Future wants a result."
+keywords = ["futures", "channel", "async"]
+authors = ["Sean McArthur <sean@seanmonstar.com>"]
+license = "MIT"
+repository = "https://github.com/seanmonstar/want"
+documentation = "https://docs.rs/want"
+edition = "2018"
+
+[dependencies]
+try-lock = "0.2.4"
+
+[dev-dependencies]
+tokio-executor = "0.2.0-alpha.2"
+tokio-sync = "0.2.0-alpha.2"
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..e0f0f8a
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,20 @@
+Copyright (c) 2018-2019 Sean McArthur
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
+
diff --git a/METADATA b/METADATA
new file mode 100644
index 0000000..9ac579f
--- /dev/null
+++ b/METADATA
@@ -0,0 +1,20 @@
+name: "want"
+description: "Detect when another Future wants a result."
+third_party {
+  identifier {
+    type: "crates.io"
+    value: "want"
+  }
+  identifier {
+    type: "Archive"
+    value: "https://static.crates.io/crates/want/want-0.3.1.crate"
+    primary_source: true
+  }
+  version: "0.3.1"
+  license_type: NOTICE
+  last_upgrade_date {
+    year: 2024
+    month: 5
+    day: 28
+  }
+}
diff --git a/MODULE_LICENSE_MIT b/MODULE_LICENSE_MIT
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/MODULE_LICENSE_MIT
diff --git a/OWNERS b/OWNERS
new file mode 100644
index 0000000..48bea6e
--- /dev/null
+++ b/OWNERS
@@ -0,0 +1,2 @@
+# Bug component: 688011
+include platform/prebuilts/rust:main:/OWNERS
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..bd71b52
--- /dev/null
+++ b/README.md
@@ -0,0 +1,24 @@
+# Want
+
+- [Crates.io](https://crates.io/crates/want)
+- [Docs](https://docs.rs/want)
+
+A `Future`s channel-like utility to signal when a value is wanted.
+
+Futures are supposed to be lazy, and only starting work if `Future::poll`
+is called. The same is true of `Stream`s, but when using a channel as
+a `Stream`, it can be hard to know if the receiver is ready for the next
+value.
+
+Put another way, given a `(tx, rx)` from `futures::sync::mpsc::channel()`,
+how can the sender (`tx`) know when the receiver (`rx`) actually wants more
+work to be produced? Just because there is room in the channel buffer
+doesn't mean the work would be used by the receiver.
+
+This is where something like `want` comes in. Added to a channel, you can
+make sure that the `tx` only creates the message and sends it when the `rx`
+has `poll()` for it, and the buffer was empty.
+
+## License
+
+`want` is provided under the MIT license. See [LICENSE](LICENSE).
diff --git a/benches/throughput.rs b/benches/throughput.rs
new file mode 100644
index 0000000..5acca6b
--- /dev/null
+++ b/benches/throughput.rs
@@ -0,0 +1,14 @@
+#![feature(test)]
+
+extern crate test;
+extern crate want;
+
+#[bench]
+fn throughput(b: &mut test::Bencher) {
+    let (mut gv, mut tk) = want::new();
+
+    b.iter(move || {
+        tk.want();
+        assert!(gv.poll_want().unwrap().is_ready());
+    });
+}
diff --git a/cargo_embargo.json b/cargo_embargo.json
new file mode 100644
index 0000000..cb908d7
--- /dev/null
+++ b/cargo_embargo.json
@@ -0,0 +1,3 @@
+{
+  "run_cargo": false
+}
diff --git a/src/lib.rs b/src/lib.rs
new file mode 100644
index 0000000..96b2920
--- /dev/null
+++ b/src/lib.rs
@@ -0,0 +1,579 @@
+#![doc(html_root_url = "https://docs.rs/want/0.3.1")]
+#![deny(warnings)]
+#![deny(missing_docs)]
+#![deny(missing_debug_implementations)]
+
+//! A Futures channel-like utility to signal when a value is wanted.
+//!
+//! Futures are supposed to be lazy, and only starting work if `Future::poll`
+//! is called. The same is true of `Stream`s, but when using a channel as
+//! a `Stream`, it can be hard to know if the receiver is ready for the next
+//! value.
+//!
+//! Put another way, given a `(tx, rx)` from `futures::sync::mpsc::channel()`,
+//! how can the sender (`tx`) know when the receiver (`rx`) actually wants more
+//! work to be produced? Just because there is room in the channel buffer
+//! doesn't mean the work would be used by the receiver.
+//!
+//! This is where something like `want` comes in. Added to a channel, you can
+//! make sure that the `tx` only creates the message and sends it when the `rx`
+//! has `poll()` for it, and the buffer was empty.
+//!
+//! # Example
+//!
+//! ```nightly
+//! # //#![feature(async_await)]
+//! extern crate want;
+//!
+//! # fn spawn<T>(_t: T) {}
+//! # fn we_still_want_message() -> bool { true }
+//! # fn mpsc_channel() -> (Tx, Rx) { (Tx, Rx) }
+//! # struct Tx;
+//! # impl Tx { fn send<T>(&mut self, _: T) {} }
+//! # struct Rx;
+//! # impl Rx { async fn recv(&mut self) -> Option<Expensive> { Some(Expensive) } }
+//!
+//! // Some message that is expensive to produce.
+//! struct Expensive;
+//!
+//! // Some futures-aware MPSC channel...
+//! let (mut tx, mut rx) = mpsc_channel();
+//!
+//! // And our `want` channel!
+//! let (mut gv, mut tk) = want::new();
+//!
+//!
+//! // Our receiving task...
+//! spawn(async move {
+//!     // Maybe something comes up that prevents us from ever
+//!     // using the expensive message.
+//!     //
+//!     // Without `want`, the "send" task may have started to
+//!     // produce the expensive message even though we wouldn't
+//!     // be able to use it.
+//!     if !we_still_want_message() {
+//!         return;
+//!     }
+//!
+//!     // But we can use it! So tell the `want` channel.
+//!     tk.want();
+//!
+//!     match rx.recv().await {
+//!         Some(_msg) => println!("got a message"),
+//!         None => println!("DONE"),
+//!     }
+//! });
+//!
+//! // Our sending task
+//! spawn(async move {
+//!     // It's expensive to create a new message, so we wait until the
+//!     // receiving end truly *wants* the message.
+//!     if let Err(_closed) = gv.want().await {
+//!         // Looks like they will never want it...
+//!         return;
+//!     }
+//!
+//!     // They want it, let's go!
+//!     tx.send(Expensive);
+//! });
+//!
+//! # fn main() {}
+//! ```
+
+use std::fmt;
+use std::future::Future;
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::sync::atomic::AtomicUsize;
+// SeqCst is the only ordering used to ensure accessing the state and
+// TryLock are never re-ordered.
+use std::sync::atomic::Ordering::SeqCst;
+use std::task::{self, Poll, Waker};
+
+
+use try_lock::TryLock;
+
+/// Create a new `want` channel.
+pub fn new() -> (Giver, Taker) {
+    let inner = Arc::new(Inner {
+        state: AtomicUsize::new(State::Idle.into()),
+        task: TryLock::new(None),
+    });
+    let inner2 = inner.clone();
+    (
+        Giver {
+            inner,
+        },
+        Taker {
+            inner: inner2,
+        },
+    )
+}
+
+/// An entity that gives a value when wanted.
+pub struct Giver {
+    inner: Arc<Inner>,
+}
+
+/// An entity that wants a value.
+pub struct Taker {
+    inner: Arc<Inner>,
+}
+
+/// A cloneable `Giver`.
+///
+/// It differs from `Giver` in that you cannot poll for `want`. It's only
+/// usable as a cancellation watcher.
+#[derive(Clone)]
+pub struct SharedGiver {
+    inner: Arc<Inner>,
+}
+
+/// The `Taker` has canceled its interest in a value.
+pub struct Closed {
+    _inner: (),
+}
+
+#[derive(Clone, Copy, Debug)]
+enum State {
+    Idle,
+    Want,
+    Give,
+    Closed,
+}
+
+impl From<State> for usize {
+    fn from(s: State) -> usize {
+        match s {
+            State::Idle => 0,
+            State::Want => 1,
+            State::Give => 2,
+            State::Closed => 3,
+        }
+    }
+}
+
+impl From<usize> for State {
+    fn from(num: usize) -> State {
+        match num {
+            0 => State::Idle,
+            1 => State::Want,
+            2 => State::Give,
+            3 => State::Closed,
+            _ => unreachable!("unknown state: {}", num),
+        }
+    }
+}
+
+struct Inner {
+    state: AtomicUsize,
+    task: TryLock<Option<Waker>>,
+}
+
+// ===== impl Giver ======
+
+impl Giver {
+    /// Returns a `Future` that fulfills when the `Taker` has done some action.
+    pub fn want(&mut self) -> impl Future<Output = Result<(), Closed>> + '_ {
+        Want(self)
+    }
+
+    /// Poll whether the `Taker` has registered interest in another value.
+    ///
+    /// - If the `Taker` has called `want()`, this returns `Async::Ready(())`.
+    /// - If the `Taker` has not called `want()` since last poll, this
+    ///   returns `Async::NotReady`, and parks the current task to be notified
+    ///   when the `Taker` does call `want()`.
+    /// - If the `Taker` has canceled (or dropped), this returns `Closed`.
+    ///
+    /// After knowing that the Taker is wanting, the state can be reset by
+    /// calling [`give`](Giver::give).
+    pub fn poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Closed>> {
+        loop {
+            let state = self.inner.state.load(SeqCst).into();
+            match state {
+                State::Want => {
+                    return Poll::Ready(Ok(()));
+                },
+                State::Closed => {
+                    return Poll::Ready(Err(Closed { _inner: () }));
+                },
+                State::Idle | State::Give => {
+                    // Taker doesn't want anything yet, so park.
+                    if let Some(mut locked) = self.inner.task.try_lock_explicit(SeqCst, SeqCst) {
+
+                        // While we have the lock, try to set to GIVE.
+                        let old = self.inner.state.compare_exchange(
+                            state.into(),
+                            State::Give.into(),
+                            SeqCst,
+                            SeqCst,
+                        );
+                        // If it's still the first state (Idle or Give), park current task.
+                        if old == Ok(state.into()) {
+                            let park = locked.as_ref()
+                                .map(|w| !w.will_wake(cx.waker()))
+                                .unwrap_or(true);
+                            if park {
+                                let old = mem::replace(&mut *locked, Some(cx.waker().clone()));
+                                drop(locked);
+                                if let Some(prev_task) = old {
+                                    // there was an old task parked here.
+                                    // it might be waiting to be notified,
+                                    // so poke it before dropping.
+                                    prev_task.wake();
+                                };
+                            }
+                            return Poll::Pending;
+                        }
+                        // Otherwise, something happened! Go around the loop again.
+                    } else {
+                        // if we couldn't take the lock, then a Taker has it.
+                        // The *ONLY* reason is because it is in the process of notifying us
+                        // of its want.
+                        //
+                        // We need to loop again to see what state it was changed to.
+                    }
+                },
+            }
+        }
+    }
+
+    /// Mark the state as idle, if the Taker currently is wanting.
+    ///
+    /// Returns true if Taker was wanting, false otherwise.
+    #[inline]
+    pub fn give(&self) -> bool {
+        // only set to IDLE if it is still Want
+        let old = self.inner.state.compare_exchange(
+            State::Want.into(),
+            State::Idle.into(),
+            SeqCst,
+            SeqCst);
+        old == Ok(State::Want.into())
+    }
+
+    /// Check if the `Taker` has called `want()` without parking a task.
+    ///
+    /// This is safe to call outside of a futures task context, but other
+    /// means of being notified is left to the user.
+    #[inline]
+    pub fn is_wanting(&self) -> bool {
+        self.inner.state.load(SeqCst) == State::Want.into()
+    }
+
+
+    /// Check if the `Taker` has canceled interest without parking a task.
+    #[inline]
+    pub fn is_canceled(&self) -> bool {
+        self.inner.state.load(SeqCst) == State::Closed.into()
+    }
+
+    /// Converts this into a `SharedGiver`.
+    #[inline]
+    pub fn shared(self) -> SharedGiver {
+        SharedGiver {
+            inner: self.inner,
+        }
+    }
+}
+
+impl fmt::Debug for Giver {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("Giver")
+            .field("state", &self.inner.state())
+            .finish()
+    }
+}
+
+// ===== impl SharedGiver ======
+
+impl SharedGiver {
+    /// Check if the `Taker` has called `want()` without parking a task.
+    ///
+    /// This is safe to call outside of a futures task context, but other
+    /// means of being notified is left to the user.
+    #[inline]
+    pub fn is_wanting(&self) -> bool {
+        self.inner.state.load(SeqCst) == State::Want.into()
+    }
+
+
+    /// Check if the `Taker` has canceled interest without parking a task.
+    #[inline]
+    pub fn is_canceled(&self) -> bool {
+        self.inner.state.load(SeqCst) == State::Closed.into()
+    }
+}
+
+impl fmt::Debug for SharedGiver {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("SharedGiver")
+            .field("state", &self.inner.state())
+            .finish()
+    }
+}
+
+// ===== impl Taker ======
+
+impl Taker {
+    /// Signal to the `Giver` that the want is canceled.
+    ///
+    /// This is useful to tell that the channel is closed if you cannot
+    /// drop the value yet.
+    #[inline]
+    pub fn cancel(&mut self) {
+        self.signal(State::Closed)
+    }
+
+    /// Signal to the `Giver` that a value is wanted.
+    #[inline]
+    pub fn want(&mut self) {
+        debug_assert!(
+            self.inner.state.load(SeqCst) != State::Closed.into(),
+            "want called after cancel"
+        );
+        self.signal(State::Want)
+    }
+
+    #[inline]
+    fn signal(&mut self, state: State) {
+        let old_state = self.inner.state.swap(state.into(), SeqCst).into();
+        match old_state {
+            State::Idle | State::Want | State::Closed => (),
+            State::Give => {
+                loop {
+                    if let Some(mut locked) = self.inner.task.try_lock_explicit(SeqCst, SeqCst) {
+                        if let Some(task) = locked.take() {
+                            drop(locked);
+                            task.wake();
+                        }
+                        return;
+                    } else {
+                        // if we couldn't take the lock, then a Giver has it.
+                        // The *ONLY* reason is because it is in the process of parking.
+                        //
+                        // We need to loop and take the lock so we can notify this task.
+                    }
+                }
+            },
+        }
+    }
+}
+
+impl Drop for Taker {
+    #[inline]
+    fn drop(&mut self) {
+        self.signal(State::Closed);
+    }
+}
+
+impl fmt::Debug for Taker {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("Taker")
+            .field("state", &self.inner.state())
+            .finish()
+    }
+}
+
+// ===== impl Closed ======
+
+impl fmt::Debug for Closed {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("Closed")
+            .finish()
+    }
+}
+
+// ===== impl Inner ======
+
+impl Inner {
+    #[inline]
+    fn state(&self) -> State {
+        self.state.load(SeqCst).into()
+    }
+}
+
+// ===== impl PollFn ======
+
+struct Want<'a>(&'a mut Giver);
+
+
+impl Future for Want<'_> {
+    type Output = Result<(), Closed>;
+
+    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+        self.0.poll_want(cx)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::thread;
+    use tokio_sync::oneshot;
+    use super::*;
+
+    fn block_on<F: Future>(f: F) -> F::Output {
+        tokio_executor::enter()
+            .expect("block_on enter")
+            .block_on(f)
+    }
+
+    #[test]
+    fn want_ready() {
+        let (mut gv, mut tk) = new();
+
+        tk.want();
+
+        block_on(gv.want()).unwrap();
+    }
+
+    #[test]
+    fn want_notify_0() {
+        let (mut gv, mut tk) = new();
+        let (tx, rx) = oneshot::channel();
+
+        thread::spawn(move || {
+            tk.want();
+            // use a oneshot to keep this thread alive
+            // until other thread was notified of want
+            block_on(rx).expect("rx");
+        });
+
+        block_on(gv.want()).expect("want");
+
+        assert!(gv.is_wanting(), "still wanting after poll_want success");
+        assert!(gv.give(), "give is true when wanting");
+
+        assert!(!gv.is_wanting(), "no longer wanting after give");
+        assert!(!gv.is_canceled(), "give doesn't cancel");
+
+        assert!(!gv.give(), "give is false if not wanting");
+
+        tx.send(()).expect("tx");
+    }
+
+    /*
+    /// This tests that if the Giver moves tasks after parking,
+    /// it will still wake up the correct task.
+    #[test]
+    fn want_notify_moving_tasks() {
+        use std::sync::Arc;
+        use futures::executor::{spawn, Notify, NotifyHandle};
+
+        struct WantNotify;
+
+        impl Notify for WantNotify {
+            fn notify(&self, _id: usize) {
+            }
+        }
+
+        fn n() -> NotifyHandle {
+            Arc::new(WantNotify).into()
+        }
+
+        let (mut gv, mut tk) = new();
+
+        let mut s = spawn(poll_fn(move || {
+            gv.poll_want()
+        }));
+
+        // Register with t1 as the task::current()
+        let t1 = n();
+        assert!(s.poll_future_notify(&t1, 1).unwrap().is_not_ready());
+
+        thread::spawn(move || {
+            thread::sleep(::std::time::Duration::from_millis(100));
+            tk.want();
+        });
+
+        // And now, move to a ThreadNotify task.
+        s.into_inner().wait().expect("poll_want");
+    }
+    */
+
+    #[test]
+    fn cancel() {
+        // explicit
+        let (mut gv, mut tk) = new();
+
+        assert!(!gv.is_canceled());
+
+        tk.cancel();
+
+        assert!(gv.is_canceled());
+        block_on(gv.want()).unwrap_err();
+
+        // implicit
+        let (mut gv, tk) = new();
+
+        assert!(!gv.is_canceled());
+
+        drop(tk);
+
+        assert!(gv.is_canceled());
+        block_on(gv.want()).unwrap_err();
+
+        // notifies
+        let (mut gv, tk) = new();
+
+        thread::spawn(move || {
+            let _tk = tk;
+            // and dropped
+        });
+
+        block_on(gv.want()).unwrap_err();
+    }
+
+    /*
+    #[test]
+    fn stress() {
+        let nthreads = 5;
+        let nwants = 100;
+
+        for _ in 0..nthreads {
+            let (mut gv, mut tk) = new();
+            let (mut tx, mut rx) = mpsc::channel(0);
+
+            // rx thread
+            thread::spawn(move || {
+                let mut cnt = 0;
+                poll_fn(move || {
+                    while cnt < nwants {
+                        let n = match rx.poll().expect("rx poll") {
+                            Async::Ready(n) => n.expect("rx opt"),
+                            Async::NotReady => {
+                                tk.want();
+                                return Ok(Async::NotReady);
+                            },
+                        };
+                        assert_eq!(cnt, n);
+                        cnt += 1;
+                    }
+                    Ok::<_, ()>(Async::Ready(()))
+                }).wait().expect("rx wait");
+            });
+
+            // tx thread
+            thread::spawn(move || {
+                let mut cnt = 0;
+                let nsent = poll_fn(move || {
+                    loop {
+                        while let Ok(()) = tx.try_send(cnt) {
+                            cnt += 1;
+                        }
+                        match gv.poll_want() {
+                            Ok(Async::Ready(_)) => (),
+                            Ok(Async::NotReady) => return Ok::<_, ()>(Async::NotReady),
+                            Err(_) => return Ok(Async::Ready(cnt)),
+                        }
+                    }
+                }).wait().expect("tx wait");
+
+                assert_eq!(nsent, nwants);
+            }).join().expect("thread join");
+        }
+    }
+    */
+}