Snap for 6435660 from ee6d02b6c48b7119b81a913a760092450cf32df2 to sdk-release

Change-Id: I36f7cfb5e1f840901fd5366e9efedceaafd47168
diff --git a/Android.bp b/Android.bp
new file mode 100644
index 0000000..522f8d6
--- /dev/null
+++ b/Android.bp
@@ -0,0 +1,19 @@
+// This file is generated by cargo2android.py.
+
+rust_library_host_rlib {
+    name: "libfutures_channel",
+    crate_name: "futures_channel",
+    srcs: ["src/lib.rs"],
+    edition: "2018",
+    features: [
+        "alloc",
+        "default",
+        "std",
+    ],
+    rlibs: [
+        "libfutures_core",
+    ],
+}
+
+// dependent_library ["feature_list"]
+//   futures-core-0.3.4 "alloc,std"
diff --git a/Cargo.toml b/Cargo.toml
new file mode 100644
index 0000000..846095c
--- /dev/null
+++ b/Cargo.toml
@@ -0,0 +1,40 @@
+# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO
+#
+# When uploading crates to the registry Cargo will automatically
+# "normalize" Cargo.toml files for maximal compatibility
+# with all versions of Cargo and also rewrite `path` dependencies
+# to registry (e.g., crates.io) dependencies
+#
+# If you believe there's an error in this file please file an
+# issue against the rust-lang/cargo repository. If you're
+# editing this file be aware that the upstream Cargo.toml
+# will likely look very different (and much more reasonable)
+
+[package]
+edition = "2018"
+name = "futures-channel"
+version = "0.3.4"
+authors = ["Alex Crichton <alex@alexcrichton.com>"]
+description = "Channels for asynchronous communication using futures-rs.\n"
+homepage = "https://rust-lang.github.io/futures-rs"
+documentation = "https://docs.rs/futures-channel/0.3.0"
+license = "MIT OR Apache-2.0"
+repository = "https://github.com/rust-lang/futures-rs"
+[package.metadata.docs.rs]
+all-features = true
+[dependencies.futures-core]
+version = "0.3.4"
+default-features = false
+
+[dependencies.futures-sink]
+version = "0.3.4"
+optional = true
+default-features = false
+
+[features]
+alloc = ["futures-core/alloc"]
+cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic"]
+default = ["std"]
+sink = ["futures-sink"]
+std = ["alloc", "futures-core/std"]
+unstable = ["futures-core/unstable"]
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
new file mode 100644
index 0000000..55f292d
--- /dev/null
+++ b/Cargo.toml.orig
@@ -0,0 +1,31 @@
+[package]
+name = "futures-channel"
+edition = "2018"
+version = "0.3.4"
+authors = ["Alex Crichton <alex@alexcrichton.com>"]
+license = "MIT OR Apache-2.0"
+repository = "https://github.com/rust-lang/futures-rs"
+homepage = "https://rust-lang.github.io/futures-rs"
+documentation = "https://docs.rs/futures-channel/0.3.0"
+description = """
+Channels for asynchronous communication using futures-rs.
+"""
+
+[features]
+default = ["std"]
+std = ["alloc", "futures-core/std"]
+alloc = ["futures-core/alloc"]
+sink = ["futures-sink"]
+
+# Unstable features
+# These features are outside of the normal semver guarantees and require the
+# `unstable` feature as an explicit opt-in to unstable API.
+unstable = ["futures-core/unstable"]
+cfg-target-has-atomic = ["futures-core/cfg-target-has-atomic"]
+
+[dependencies]
+futures-core = { path = "../futures-core", version = "0.3.4", default-features = false }
+futures-sink = { path = "../futures-sink", version = "0.3.4", default-features = false, optional = true }
+
+[package.metadata.docs.rs]
+all-features = true
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..3fb36a4
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,235 @@
+                              Apache License
+                        Version 2.0, January 2004
+                     http://www.apache.org/licenses/
+
+TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+1. Definitions.
+
+   "License" shall mean the terms and conditions for use, reproduction,
+   and distribution as defined by Sections 1 through 9 of this document.
+
+   "Licensor" shall mean the copyright owner or entity authorized by
+   the copyright owner that is granting the License.
+
+   "Legal Entity" shall mean the union of the acting entity and all
+   other entities that control, are controlled by, or are under common
+   control with that entity. For the purposes of this definition,
+   "control" means (i) the power, direct or indirect, to cause the
+   direction or management of such entity, whether by contract or
+   otherwise, or (ii) ownership of fifty percent (50%) or more of the
+   outstanding shares, or (iii) beneficial ownership of such entity.
+
+   "You" (or "Your") shall mean an individual or Legal Entity
+   exercising permissions granted by this License.
+
+   "Source" form shall mean the preferred form for making modifications,
+   including but not limited to software source code, documentation
+   source, and configuration files.
+
+   "Object" form shall mean any form resulting from mechanical
+   transformation or translation of a Source form, including but
+   not limited to compiled object code, generated documentation,
+   and conversions to other media types.
+
+   "Work" shall mean the work of authorship, whether in Source or
+   Object form, made available under the License, as indicated by a
+   copyright notice that is included in or attached to the work
+   (an example is provided in the Appendix below).
+
+   "Derivative Works" shall mean any work, whether in Source or Object
+   form, that is based on (or derived from) the Work and for which the
+   editorial revisions, annotations, elaborations, or other modifications
+   represent, as a whole, an original work of authorship. For the purposes
+   of this License, Derivative Works shall not include works that remain
+   separable from, or merely link (or bind by name) to the interfaces of,
+   the Work and Derivative Works thereof.
+
+   "Contribution" shall mean any work of authorship, including
+   the original version of the Work and any modifications or additions
+   to that Work or Derivative Works thereof, that is intentionally
+   submitted to Licensor for inclusion in the Work by the copyright owner
+   or by an individual or Legal Entity authorized to submit on behalf of
+   the copyright owner. For the purposes of this definition, "submitted"
+   means any form of electronic, verbal, or written communication sent
+   to the Licensor or its representatives, including but not limited to
+   communication on electronic mailing lists, source code control systems,
+   and issue tracking systems that are managed by, or on behalf of, the
+   Licensor for the purpose of discussing and improving the Work, but
+   excluding communication that is conspicuously marked or otherwise
+   designated in writing by the copyright owner as "Not a Contribution."
+
+   "Contributor" shall mean Licensor and any individual or Legal Entity
+   on behalf of whom a Contribution has been received by Licensor and
+   subsequently incorporated within the Work.
+
+2. Grant of Copyright License. Subject to the terms and conditions of
+   this License, each Contributor hereby grants to You a perpetual,
+   worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+   copyright license to reproduce, prepare Derivative Works of,
+   publicly display, publicly perform, sublicense, and distribute the
+   Work and such Derivative Works in Source or Object form.
+
+3. Grant of Patent License. Subject to the terms and conditions of
+   this License, each Contributor hereby grants to You a perpetual,
+   worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+   (except as stated in this section) patent license to make, have made,
+   use, offer to sell, sell, import, and otherwise transfer the Work,
+   where such license applies only to those patent claims licensable
+   by such Contributor that are necessarily infringed by their
+   Contribution(s) alone or by combination of their Contribution(s)
+   with the Work to which such Contribution(s) was submitted. If You
+   institute patent litigation against any entity (including a
+   cross-claim or counterclaim in a lawsuit) alleging that the Work
+   or a Contribution incorporated within the Work constitutes direct
+   or contributory patent infringement, then any patent licenses
+   granted to You under this License for that Work shall terminate
+   as of the date such litigation is filed.
+
+4. Redistribution. You may reproduce and distribute copies of the
+   Work or Derivative Works thereof in any medium, with or without
+   modifications, and in Source or Object form, provided that You
+   meet the following conditions:
+
+   (a) You must give any other recipients of the Work or
+       Derivative Works a copy of this License; and
+
+   (b) You must cause any modified files to carry prominent notices
+       stating that You changed the files; and
+
+   (c) You must retain, in the Source form of any Derivative Works
+       that You distribute, all copyright, patent, trademark, and
+       attribution notices from the Source form of the Work,
+       excluding those notices that do not pertain to any part of
+       the Derivative Works; and
+
+   (d) If the Work includes a "NOTICE" text file as part of its
+       distribution, then any Derivative Works that You distribute must
+       include a readable copy of the attribution notices contained
+       within such NOTICE file, excluding those notices that do not
+       pertain to any part of the Derivative Works, in at least one
+       of the following places: within a NOTICE text file distributed
+       as part of the Derivative Works; within the Source form or
+       documentation, if provided along with the Derivative Works; or,
+       within a display generated by the Derivative Works, if and
+       wherever such third-party notices normally appear. The contents
+       of the NOTICE file are for informational purposes only and
+       do not modify the License. You may add Your own attribution
+       notices within Derivative Works that You distribute, alongside
+       or as an addendum to the NOTICE text from the Work, provided
+       that such additional attribution notices cannot be construed
+       as modifying the License.
+
+   You may add Your own copyright statement to Your modifications and
+   may provide additional or different license terms and conditions
+   for use, reproduction, or distribution of Your modifications, or
+   for any such Derivative Works as a whole, provided Your use,
+   reproduction, and distribution of the Work otherwise complies with
+   the conditions stated in this License.
+
+5. Submission of Contributions. Unless You explicitly state otherwise,
+   any Contribution intentionally submitted for inclusion in the Work
+   by You to the Licensor shall be under the terms and conditions of
+   this License, without any additional terms or conditions.
+   Notwithstanding the above, nothing herein shall supersede or modify
+   the terms of any separate license agreement you may have executed
+   with Licensor regarding such Contributions.
+
+6. Trademarks. This License does not grant permission to use the trade
+   names, trademarks, service marks, or product names of the Licensor,
+   except as required for reasonable and customary use in describing the
+   origin of the Work and reproducing the content of the NOTICE file.
+
+7. Disclaimer of Warranty. Unless required by applicable law or
+   agreed to in writing, Licensor provides the Work (and each
+   Contributor provides its Contributions) on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+   implied, including, without limitation, any warranties or conditions
+   of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+   PARTICULAR PURPOSE. You are solely responsible for determining the
+   appropriateness of using or redistributing the Work and assume any
+   risks associated with Your exercise of permissions under this License.
+
+8. Limitation of Liability. In no event and under no legal theory,
+   whether in tort (including negligence), contract, or otherwise,
+   unless required by applicable law (such as deliberate and grossly
+   negligent acts) or agreed to in writing, shall any Contributor be
+   liable to You for damages, including any direct, indirect, special,
+   incidental, or consequential damages of any character arising as a
+   result of this License or out of the use or inability to use the
+   Work (including but not limited to damages for loss of goodwill,
+   work stoppage, computer failure or malfunction, or any and all
+   other commercial damages or losses), even if such Contributor
+   has been advised of the possibility of such damages.
+
+9. Accepting Warranty or Additional Liability. While redistributing
+   the Work or Derivative Works thereof, You may choose to offer,
+   and charge a fee for, acceptance of support, warranty, indemnity,
+   or other liability obligations and/or rights consistent with this
+   License. However, in accepting such obligations, You may act only
+   on Your own behalf and on Your sole responsibility, not on behalf
+   of any other Contributor, and only if You agree to indemnify,
+   defend, and hold each Contributor harmless for any liability
+   incurred by, or claims asserted against, such Contributor by reason
+   of your accepting any such warranty or additional liability.
+
+END OF TERMS AND CONDITIONS
+
+APPENDIX: How to apply the Apache License to your work.
+
+   To apply the Apache License to your work, attach the following
+   boilerplate notice, with the fields enclosed by brackets "[]"
+   replaced with your own identifying information. (Don't include
+   the brackets!)  The text should be enclosed in the appropriate
+   comment syntax for the file format. We also recommend that a
+   file or class name and description of purpose be included on the
+   same "printed page" as the copyright notice for easier
+   identification within third-party archives.
+
+Copyright (c) 2016 Alex Crichton
+Copyright (c) 2017 The Tokio Authors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+	http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+
+
+
+
+For src/mpsc/queue.rs:
+
+Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+   1. Redistributions of source code must retain the above copyright notice,
+      this list of conditions and the following disclaimer.
+
+   2. Redistributions in binary form must reproduce the above copyright
+      notice, this list of conditions and the following disclaimer in the
+      documentation and/or other materials provided with the distribution.
+
+THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
+WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
+SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
+OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+The views and conclusions contained in the software and documentation are
+those of the authors and should not be interpreted as representing official
+policies, either expressed or implied, of Dmitry Vyukov.
+
diff --git a/METADATA b/METADATA
new file mode 100644
index 0000000..4fcad10
--- /dev/null
+++ b/METADATA
@@ -0,0 +1,17 @@
+name: "futures-channel"
+description:
+    "Channels for asynchronous communication using futures-rs."
+
+third_party {
+  url {
+    type: HOMEPAGE
+    value: "https://crates.io/crates/futures-channel"
+  }
+  url {
+    type: GIT
+    value: "https://github.com/rust-lang/futures-rs"
+  }
+  version: "0.3.4"
+  last_upgrade_date { year: 2020 month: 3 day: 17 }
+  license_type: NOTICE
+}
diff --git a/MODULE_LICENSE_APACHE2 b/MODULE_LICENSE_APACHE2
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/MODULE_LICENSE_APACHE2
diff --git a/OWNERS b/OWNERS
new file mode 100644
index 0000000..46fc303
--- /dev/null
+++ b/OWNERS
@@ -0,0 +1 @@
+include platform/prebuilts/rust:/OWNERS
diff --git a/benches/sync_mpsc.rs b/benches/sync_mpsc.rs
new file mode 100644
index 0000000..e22fe60
--- /dev/null
+++ b/benches/sync_mpsc.rs
@@ -0,0 +1,144 @@
+#![feature(test)]
+
+extern crate test;
+use crate::test::Bencher;
+
+use {
+    futures::{
+        channel::mpsc::{self, Sender, UnboundedSender},
+        ready,
+        stream::{Stream, StreamExt},
+        sink::Sink,
+        task::{Context, Poll},
+    },
+    futures_test::task::noop_context,
+    std::pin::Pin,
+};
+
+/// Single producer, single consumer
+#[bench]
+fn unbounded_1_tx(b: &mut Bencher) {
+    let mut cx = noop_context();
+    b.iter(|| {
+        let (tx, mut rx) = mpsc::unbounded();
+
+        // 1000 iterations to avoid measuring overhead of initialization
+        // Result should be divided by 1000
+        for i in 0..1000 {
+
+            // Poll, not ready, park
+            assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx));
+
+            UnboundedSender::unbounded_send(&tx, i).unwrap();
+
+            // Now poll ready
+            assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
+        }
+    })
+}
+
+/// 100 producers, single consumer
+#[bench]
+fn unbounded_100_tx(b: &mut Bencher) {
+    let mut cx = noop_context();
+    b.iter(|| {
+        let (tx, mut rx) = mpsc::unbounded();
+
+        let tx: Vec<_> = (0..100).map(|_| tx.clone()).collect();
+
+        // 1000 send/recv operations total, result should be divided by 1000
+        for _ in 0..10 {
+            for (i, x) in tx.iter().enumerate() {
+                assert_eq!(Poll::Pending, rx.poll_next_unpin(&mut cx));
+
+                UnboundedSender::unbounded_send(x, i).unwrap();
+
+                assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
+            }
+        }
+    })
+}
+
+#[bench]
+fn unbounded_uncontended(b: &mut Bencher) {
+    let mut cx = noop_context();
+    b.iter(|| {
+        let (tx, mut rx) = mpsc::unbounded();
+
+        for i in 0..1000 {
+            UnboundedSender::unbounded_send(&tx, i).expect("send");
+            // No need to create a task, because poll is not going to park.
+            assert_eq!(Poll::Ready(Some(i)), rx.poll_next_unpin(&mut cx));
+        }
+    })
+}
+
+
+/// A Stream that continuously sends incrementing number of the queue
+struct TestSender {
+    tx: Sender<u32>,
+    last: u32, // Last number sent
+}
+
+// Could be a Future, it doesn't matter
+impl Stream for TestSender {
+    type Item = u32;
+
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
+        -> Poll<Option<Self::Item>>
+    {
+        let this = &mut *self;
+        let mut tx = Pin::new(&mut this.tx);
+
+        ready!(tx.as_mut().poll_ready(cx)).unwrap();
+        tx.as_mut().start_send(this.last + 1).unwrap();
+        this.last += 1;
+        assert_eq!(Poll::Pending, tx.as_mut().poll_flush(cx));
+        Poll::Ready(Some(this.last))
+    }
+}
+
+/// Single producers, single consumer
+#[bench]
+fn bounded_1_tx(b: &mut Bencher) {
+    let mut cx = noop_context();
+    b.iter(|| {
+        let (tx, mut rx) = mpsc::channel(0);
+
+        let mut tx = TestSender { tx, last: 0 };
+
+        for i in 0..1000 {
+            assert_eq!(Poll::Ready(Some(i + 1)), tx.poll_next_unpin(&mut cx));
+            assert_eq!(Poll::Pending, tx.poll_next_unpin(&mut cx));
+            assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(&mut cx));
+        }
+    })
+}
+
+/// 100 producers, single consumer
+#[bench]
+fn bounded_100_tx(b: &mut Bencher) {
+    let mut cx = noop_context();
+    b.iter(|| {
+        // Each sender can send one item after specified capacity
+        let (tx, mut rx) = mpsc::channel(0);
+
+        let mut tx: Vec<_> = (0..100).map(|_| {
+            TestSender {
+                tx: tx.clone(),
+                last: 0
+            }
+        }).collect();
+
+        for i in 0..10 {
+            for x in &mut tx {
+                // Send an item
+                assert_eq!(Poll::Ready(Some(i + 1)), x.poll_next_unpin(&mut cx));
+                // Then block
+                assert_eq!(Poll::Pending, x.poll_next_unpin(&mut cx));
+                // Recv the item
+                assert_eq!(Poll::Ready(Some(i + 1)), rx.poll_next_unpin(&mut cx));
+            }
+        }
+    })
+}
diff --git a/src/lib.rs b/src/lib.rs
new file mode 100644
index 0000000..d33c919
--- /dev/null
+++ b/src/lib.rs
@@ -0,0 +1,42 @@
+//! Asynchronous channels.
+//!
+//! This crate provides channels that can be used to communicate between
+//! asynchronous tasks.
+//!
+//! All items of this library are only available when the `std` or `alloc` feature of this
+//! library is activated, and it is activated by default.
+
+#![cfg_attr(feature = "cfg-target-has-atomic", feature(cfg_target_has_atomic))]
+
+#![cfg_attr(not(feature = "std"), no_std)]
+
+#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)]
+// It cannot be included in the published code because this lints have false positives in the minimum required version.
+#![cfg_attr(test, warn(single_use_lifetimes))]
+#![warn(clippy::all)]
+
+#![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))]
+
+#![doc(html_root_url = "https://docs.rs/futures-channel/0.3.0")]
+
+#[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))]
+compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feature as an explicit opt-in to unstable features");
+
+macro_rules! cfg_target_has_atomic {
+    ($($item:item)*) => {$(
+        #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
+        $item
+    )*};
+}
+
+cfg_target_has_atomic! {
+    #[cfg(feature = "alloc")]
+    extern crate alloc;
+
+    #[cfg(feature = "alloc")]
+    mod lock;
+    #[cfg(feature = "std")]
+    pub mod mpsc;
+    #[cfg(feature = "alloc")]
+    pub mod oneshot;
+}
diff --git a/src/lock.rs b/src/lock.rs
new file mode 100644
index 0000000..e477369
--- /dev/null
+++ b/src/lock.rs
@@ -0,0 +1,105 @@
+//! A "mutex" which only supports `try_lock`
+//!
+//! As a futures library the eventual call to an event loop should be the only
+//! thing that ever blocks, so this is assisted with a fast user-space
+//! implementation of a lock that can only have a `try_lock` operation.
+
+use core::cell::UnsafeCell;
+use core::ops::{Deref, DerefMut};
+use core::sync::atomic::Ordering::SeqCst;
+use core::sync::atomic::AtomicBool;
+
+/// A "mutex" around a value, similar to `std::sync::Mutex<T>`.
+///
+/// This lock only supports the `try_lock` operation, however, and does not
+/// implement poisoning.
+#[derive(Debug)]
+pub(crate) struct Lock<T> {
+    locked: AtomicBool,
+    data: UnsafeCell<T>,
+}
+
+/// Sentinel representing an acquired lock through which the data can be
+/// accessed.
+pub(crate) struct TryLock<'a, T> {
+    __ptr: &'a Lock<T>,
+}
+
+// The `Lock` structure is basically just a `Mutex<T>`, and these two impls are
+// intended to mirror the standard library's corresponding impls for `Mutex<T>`.
+//
+// If a `T` is sendable across threads, so is the lock, and `T` must be sendable
+// across threads to be `Sync` because it allows mutable access from multiple
+// threads.
+unsafe impl<T: Send> Send for Lock<T> {}
+unsafe impl<T: Send> Sync for Lock<T> {}
+
+impl<T> Lock<T> {
+    /// Creates a new lock around the given value.
+    pub(crate) fn new(t: T) -> Lock<T> {
+        Lock {
+            locked: AtomicBool::new(false),
+            data: UnsafeCell::new(t),
+        }
+    }
+
+    /// Attempts to acquire this lock, returning whether the lock was acquired or
+    /// not.
+    ///
+    /// If `Some` is returned then the data this lock protects can be accessed
+    /// through the sentinel. This sentinel allows both mutable and immutable
+    /// access.
+    ///
+    /// If `None` is returned then the lock is already locked, either elsewhere
+    /// on this thread or on another thread.
+    pub(crate) fn try_lock(&self) -> Option<TryLock<'_, T>> {
+        if !self.locked.swap(true, SeqCst) {
+            Some(TryLock { __ptr: self })
+        } else {
+            None
+        }
+    }
+}
+
+impl<T> Deref for TryLock<'_, T> {
+    type Target = T;
+    fn deref(&self) -> &T {
+        // The existence of `TryLock` represents that we own the lock, so we
+        // can safely access the data here.
+        unsafe { &*self.__ptr.data.get() }
+    }
+}
+
+impl<T> DerefMut for TryLock<'_, T> {
+    fn deref_mut(&mut self) -> &mut T {
+        // The existence of `TryLock` represents that we own the lock, so we
+        // can safely access the data here.
+        //
+        // Additionally, we're the *only* `TryLock` in existence so mutable
+        // access should be ok.
+        unsafe { &mut *self.__ptr.data.get() }
+    }
+}
+
+impl<T> Drop for TryLock<'_, T> {
+    fn drop(&mut self) {
+        self.__ptr.locked.store(false, SeqCst);
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::Lock;
+
+    #[test]
+    fn smoke() {
+        let a = Lock::new(1);
+        let mut a1 = a.try_lock().unwrap();
+        assert!(a.try_lock().is_none());
+        assert_eq!(*a1, 1);
+        *a1 = 2;
+        drop(a1);
+        assert_eq!(*a.try_lock().unwrap(), 2);
+        assert_eq!(*a.try_lock().unwrap(), 2);
+    }
+}
diff --git a/src/mpsc/mod.rs b/src/mpsc/mod.rs
new file mode 100644
index 0000000..f69f440
--- /dev/null
+++ b/src/mpsc/mod.rs
@@ -0,0 +1,1289 @@
+//! A multi-producer, single-consumer queue for sending values across
+//! asynchronous tasks.
+//!
+//! Similarly to the `std`, channel creation provides [`Receiver`] and
+//! [`Sender`] handles. [`Receiver`] implements [`Stream`] and allows a task to
+//! read values out of the channel. If there is no message to read from the
+//! channel, the current task will be notified when a new value is sent.
+//! [`Sender`] implements the `Sink` trait and allows a task to send messages into
+//! the channel. If the channel is at capacity, the send will be rejected and
+//! the task will be notified when additional capacity is available. In other
+//! words, the channel provides backpressure.
+//!
+//! Unbounded channels are also available using the `unbounded` constructor.
+//!
+//! # Disconnection
+//!
+//! When all [`Sender`] handles have been dropped, it is no longer
+//! possible to send values into the channel. This is considered the termination
+//! event of the stream. As such, [`Receiver::poll_next`]
+//! will return `Ok(Ready(None))`.
+//!
+//! If the [`Receiver`] handle is dropped, then messages can no longer
+//! be read out of the channel. In this case, all further attempts to send will
+//! result in an error.
+//!
+//! # Clean Shutdown
+//!
+//! If the [`Receiver`] is simply dropped, then it is possible for
+//! there to be messages still in the channel that will not be processed. As
+//! such, it is usually desirable to perform a "clean" shutdown. To do this, the
+//! receiver will first call `close`, which will prevent any further messages to
+//! be sent into the channel. Then, the receiver consumes the channel to
+//! completion, at which point the receiver can be dropped.
+//!
+//! [`Sender`]: struct.Sender.html
+//! [`Receiver`]: struct.Receiver.html
+//! [`Stream`]: ../../futures_core/stream/trait.Stream.html
+//! [`Receiver::poll_next`]:
+//!     ../../futures_core/stream/trait.Stream.html#tymethod.poll_next
+
+// At the core, the channel uses an atomic FIFO queue for message passing. This
+// queue is used as the primary coordination primitive. In order to enforce
+// capacity limits and handle back pressure, a secondary FIFO queue is used to
+// send parked task handles.
+//
+// The general idea is that the channel is created with a `buffer` size of `n`.
+// The channel capacity is `n + num-senders`. Each sender gets one "guaranteed"
+// slot to hold a message. This allows `Sender` to know for a fact that a send
+// will succeed *before* starting to do the actual work of sending the value.
+// Since most of this work is lock-free, once the work starts, it is impossible
+// to safely revert.
+//
+// If the sender is unable to process a send operation, then the current
+// task is parked and the handle is sent on the parked task queue.
+//
+// Note that the implementation guarantees that the channel capacity will never
+// exceed the configured limit, however there is no *strict* guarantee that the
+// receiver will wake up a parked task *immediately* when a slot becomes
+// available. However, it will almost always unpark a task when a slot becomes
+// available and it is *guaranteed* that a sender will be unparked when the
+// message that caused the sender to become parked is read out of the channel.
+//
+// The steps for sending a message are roughly:
+//
+// 1) Increment the channel message count
+// 2) If the channel is at capacity, push the task handle onto the wait queue
+// 3) Push the message onto the message queue.
+//
+// The steps for receiving a message are roughly:
+//
+// 1) Pop a message from the message queue
+// 2) Pop a task handle from the wait queue
+// 3) Decrement the channel message count.
+//
+// It's important for the order of operations on lock-free structures to happen
+// in reverse order between the sender and receiver. This makes the message
+// queue the primary coordination structure and establishes the necessary
+// happens-before semantics required for the acquire / release semantics used
+// by the queue structure.
+
+use futures_core::stream::{FusedStream, Stream};
+use futures_core::task::{Context, Poll, Waker};
+use futures_core::task::__internal::AtomicWaker;
+use std::fmt;
+use std::pin::Pin;
+use std::sync::{Arc, Mutex};
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::SeqCst;
+
+use crate::mpsc::queue::Queue;
+
+mod queue;
+#[cfg(feature = "sink")]
+mod sink_impl;
+
+#[derive(Debug)]
+struct UnboundedSenderInner<T> {
+    // Channel state shared between the sender and receiver.
+    inner: Arc<UnboundedInner<T>>,
+}
+
+#[derive(Debug)]
+struct BoundedSenderInner<T> {
+    // Channel state shared between the sender and receiver.
+    inner: Arc<BoundedInner<T>>,
+
+    // Handle to the task that is blocked on this sender. This handle is sent
+    // to the receiver half in order to be notified when the sender becomes
+    // unblocked.
+    sender_task: Arc<Mutex<SenderTask>>,
+
+    // True if the sender might be blocked. This is an optimization to avoid
+    // having to lock the mutex most of the time.
+    maybe_parked: bool,
+}
+
+// We never project Pin<&mut SenderInner> to `Pin<&mut T>`
+impl<T> Unpin for UnboundedSenderInner<T> {}
+impl<T> Unpin for BoundedSenderInner<T> {}
+
+/// The transmission end of a bounded mpsc channel.
+///
+/// This value is created by the [`channel`](channel) function.
+#[derive(Debug)]
+pub struct Sender<T>(Option<BoundedSenderInner<T>>);
+
+/// The transmission end of an unbounded mpsc channel.
+///
+/// This value is created by the [`unbounded`](unbounded) function.
+#[derive(Debug)]
+pub struct UnboundedSender<T>(Option<UnboundedSenderInner<T>>);
+
+trait AssertKinds: Send + Sync + Clone {}
+impl AssertKinds for UnboundedSender<u32> {}
+
+/// The receiving end of a bounded mpsc channel.
+///
+/// This value is created by the [`channel`](channel) function.
+#[derive(Debug)]
+pub struct Receiver<T> {
+    inner: Option<Arc<BoundedInner<T>>>,
+}
+
+/// The receiving end of an unbounded mpsc channel.
+///
+/// This value is created by the [`unbounded`](unbounded) function.
+#[derive(Debug)]
+pub struct UnboundedReceiver<T> {
+    inner: Option<Arc<UnboundedInner<T>>>,
+}
+
+// `Pin<&mut UnboundedReceiver<T>>` is never projected to `Pin<&mut T>`
+impl<T> Unpin for UnboundedReceiver<T> {}
+
+/// The error type for [`Sender`s](Sender) used as `Sink`s.
+#[derive(Clone, Debug, PartialEq, Eq)]
+pub struct SendError {
+    kind: SendErrorKind,
+}
+
+/// The error type returned from [`try_send`](Sender::try_send).
+#[derive(Clone, PartialEq, Eq)]
+pub struct TrySendError<T> {
+    err: SendError,
+    val: T,
+}
+
+#[derive(Clone, Debug, PartialEq, Eq)]
+enum SendErrorKind {
+    Full,
+    Disconnected,
+}
+
+/// The error type returned from [`try_next`](Receiver::try_next).
+pub struct TryRecvError {
+    _priv: (),
+}
+
+impl fmt::Display for SendError {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        if self.is_full() {
+            write!(f, "send failed because channel is full")
+        } else {
+            write!(f, "send failed because receiver is gone")
+        }
+    }
+}
+
+impl std::error::Error for SendError {}
+
+impl SendError {
+    /// Returns true if this error is a result of the channel being full.
+    pub fn is_full(&self) -> bool {
+        match self.kind {
+            SendErrorKind::Full => true,
+            _ => false,
+        }
+    }
+
+    /// Returns true if this error is a result of the receiver being dropped.
+    pub fn is_disconnected(&self) -> bool {
+        match self.kind {
+            SendErrorKind::Disconnected => true,
+            _ => false,
+        }
+    }
+}
+
+impl<T> fmt::Debug for TrySendError<T> {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("TrySendError")
+            .field("kind", &self.err.kind)
+            .finish()
+    }
+}
+
+impl<T> fmt::Display for TrySendError<T> {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        if self.is_full() {
+            write!(f, "send failed because channel is full")
+        } else {
+            write!(f, "send failed because receiver is gone")
+        }
+    }
+}
+
+impl<T: core::any::Any> std::error::Error for TrySendError<T> {}
+
+impl<T> TrySendError<T> {
+    /// Returns true if this error is a result of the channel being full.
+    pub fn is_full(&self) -> bool {
+        self.err.is_full()
+    }
+
+    /// Returns true if this error is a result of the receiver being dropped.
+    pub fn is_disconnected(&self) -> bool {
+        self.err.is_disconnected()
+    }
+
+    /// Returns the message that was attempted to be sent but failed.
+    pub fn into_inner(self) -> T {
+        self.val
+    }
+
+    /// Drops the message and converts into a `SendError`.
+    pub fn into_send_error(self) -> SendError {
+        self.err
+    }
+}
+
+impl fmt::Debug for TryRecvError {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_tuple("TryRecvError")
+            .finish()
+    }
+}
+
+impl fmt::Display for TryRecvError {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "receiver channel is empty")
+    }
+}
+
+impl std::error::Error for TryRecvError {}
+
+#[derive(Debug)]
+struct UnboundedInner<T> {
+    // Internal channel state. Consists of the number of messages stored in the
+    // channel as well as a flag signalling that the channel is closed.
+    state: AtomicUsize,
+
+    // Atomic, FIFO queue used to send messages to the receiver
+    message_queue: Queue<T>,
+
+    // Number of senders in existence
+    num_senders: AtomicUsize,
+
+    // Handle to the receiver's task.
+    recv_task: AtomicWaker,
+}
+
+#[derive(Debug)]
+struct BoundedInner<T> {
+    // Max buffer size of the channel. If `None` then the channel is unbounded.
+    buffer: usize,
+
+    // Internal channel state. Consists of the number of messages stored in the
+    // channel as well as a flag signalling that the channel is closed.
+    state: AtomicUsize,
+
+    // Atomic, FIFO queue used to send messages to the receiver
+    message_queue: Queue<T>,
+
+    // Atomic, FIFO queue used to send parked task handles to the receiver.
+    parked_queue: Queue<Arc<Mutex<SenderTask>>>,
+
+    // Number of senders in existence
+    num_senders: AtomicUsize,
+
+    // Handle to the receiver's task.
+    recv_task: AtomicWaker,
+}
+
+// Struct representation of `Inner::state`.
+#[derive(Debug, Clone, Copy)]
+struct State {
+    // `true` when the channel is open
+    is_open: bool,
+
+    // Number of messages in the channel
+    num_messages: usize,
+}
+
+// The `is_open` flag is stored in the left-most bit of `Inner::state`
+const OPEN_MASK: usize = usize::max_value() - (usize::max_value() >> 1);
+
+// When a new channel is created, it is created in the open state with no
+// pending messages.
+const INIT_STATE: usize = OPEN_MASK;
+
+// The maximum number of messages that a channel can track is `usize::max_value() >> 1`
+const MAX_CAPACITY: usize = !(OPEN_MASK);
+
+// The maximum requested buffer size must be less than the maximum capacity of
+// a channel. This is because each sender gets a guaranteed slot.
+const MAX_BUFFER: usize = MAX_CAPACITY >> 1;
+
+// Sent to the consumer to wake up blocked producers
+#[derive(Debug)]
+struct SenderTask {
+    task: Option<Waker>,
+    is_parked: bool,
+}
+
+impl SenderTask {
+    fn new() -> Self {
+        SenderTask {
+            task: None,
+            is_parked: false,
+        }
+    }
+
+    fn notify(&mut self) {
+        self.is_parked = false;
+
+        if let Some(task) = self.task.take() {
+            task.wake();
+        }
+    }
+}
+
+/// Creates a bounded mpsc channel for communicating between asynchronous tasks.
+///
+/// Being bounded, this channel provides backpressure to ensure that the sender
+/// outpaces the receiver by only a limited amount. The channel's capacity is
+/// equal to `buffer + num-senders`. In other words, each sender gets a
+/// guaranteed slot in the channel capacity, and on top of that there are
+/// `buffer` "first come, first serve" slots available to all senders.
+///
+/// The [`Receiver`](Receiver) returned implements the
+/// [`Stream`](futures_core::stream::Stream) trait, while [`Sender`](Sender) implements
+/// `Sink`.
+pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
+    // Check that the requested buffer size does not exceed the maximum buffer
+    // size permitted by the system.
+    assert!(buffer < MAX_BUFFER, "requested buffer size too large");
+
+    let inner = Arc::new(BoundedInner {
+        buffer,
+        state: AtomicUsize::new(INIT_STATE),
+        message_queue: Queue::new(),
+        parked_queue: Queue::new(),
+        num_senders: AtomicUsize::new(1),
+        recv_task: AtomicWaker::new(),
+    });
+
+    let tx = BoundedSenderInner {
+        inner: inner.clone(),
+        sender_task: Arc::new(Mutex::new(SenderTask::new())),
+        maybe_parked: false,
+    };
+
+    let rx = Receiver {
+        inner: Some(inner),
+    };
+
+    (Sender(Some(tx)), rx)
+}
+
+/// Creates an unbounded mpsc channel for communicating between asynchronous
+/// tasks.
+///
+/// A `send` on this channel will always succeed as long as the receive half has
+/// not been closed. If the receiver falls behind, messages will be arbitrarily
+/// buffered.
+///
+/// **Note** that the amount of available system memory is an implicit bound to
+/// the channel. Using an `unbounded` channel has the ability of causing the
+/// process to run out of memory. In this case, the process will be aborted.
+pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
+
+    let inner = Arc::new(UnboundedInner {
+        state: AtomicUsize::new(INIT_STATE),
+        message_queue: Queue::new(),
+        num_senders: AtomicUsize::new(1),
+        recv_task: AtomicWaker::new(),
+    });
+
+    let tx = UnboundedSenderInner {
+        inner: inner.clone(),
+    };
+
+    let rx = UnboundedReceiver {
+        inner: Some(inner),
+    };
+
+    (UnboundedSender(Some(tx)), rx)
+}
+
+/*
+ *
+ * ===== impl Sender =====
+ *
+ */
+
+impl<T> UnboundedSenderInner<T> {
+    fn poll_ready_nb(&self) -> Poll<Result<(), SendError>> {
+        let state = decode_state(self.inner.state.load(SeqCst));
+        if state.is_open {
+            Poll::Ready(Ok(()))
+        } else {
+            Poll::Ready(Err(SendError {
+                kind: SendErrorKind::Disconnected,
+            }))
+        }
+    }
+
+
+    // Push message to the queue and signal to the receiver
+    fn queue_push_and_signal(&self, msg: T) {
+        // Push the message onto the message queue
+        self.inner.message_queue.push(msg);
+
+        // Signal to the receiver that a message has been enqueued. If the
+        // receiver is parked, this will unpark the task.
+        self.inner.recv_task.wake();
+    }
+
+    // Increment the number of queued messages. Returns the resulting number.
+    fn inc_num_messages(&self) -> Option<usize> {
+        let mut curr = self.inner.state.load(SeqCst);
+
+        loop {
+            let mut state = decode_state(curr);
+
+            // The receiver end closed the channel.
+            if !state.is_open {
+                return None;
+            }
+
+            // This probably is never hit? Odds are the process will run out of
+            // memory first. It may be worth to return something else in this
+            // case?
+            assert!(state.num_messages < MAX_CAPACITY, "buffer space \
+                    exhausted; sending this messages would overflow the state");
+
+            state.num_messages += 1;
+
+            let next = encode_state(&state);
+            match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
+                Ok(_) => {
+                    return Some(state.num_messages)
+                }
+                Err(actual) => curr = actual,
+            }
+        }
+    }
+
+    /// Returns whether the senders send to the same receiver.
+    fn same_receiver(&self, other: &Self) -> bool {
+        Arc::ptr_eq(&self.inner, &other.inner)
+    }
+
+    /// Returns pointer to the Arc containing sender
+    ///
+    /// The returned pointer is not referenced and should be only used for hashing!
+    fn ptr(&self) -> *const UnboundedInner<T> {
+        &*self.inner
+    }
+
+    /// Returns whether this channel is closed without needing a context.
+    fn is_closed(&self) -> bool {
+        !decode_state(self.inner.state.load(SeqCst)).is_open
+    }
+
+    /// Closes this channel from the sender side, preventing any new messages.
+    fn close_channel(&self) {
+        // There's no need to park this sender, its dropping,
+        // and we don't want to check for capacity, so skip
+        // that stuff from `do_send`.
+
+        self.inner.set_closed();
+        self.inner.recv_task.wake();
+    }
+}
+
+impl<T> BoundedSenderInner<T> {
+    /// Attempts to send a message on this `Sender`, returning the message
+    /// if there was an error.
+    fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
+        // If the sender is currently blocked, reject the message
+        if !self.poll_unparked(None).is_ready() {
+            return Err(TrySendError {
+                err: SendError {
+                    kind: SendErrorKind::Full,
+                },
+                val: msg,
+            });
+        }
+
+        // The channel has capacity to accept the message, so send it
+        self.do_send_b(msg)
+    }
+
+    // Do the send without failing.
+    // Can be called only by bounded sender.
+    #[allow(clippy::debug_assert_with_mut_call)]
+    fn do_send_b(&mut self, msg: T)
+        -> Result<(), TrySendError<T>>
+    {
+        // Anyone callig do_send *should* make sure there is room first,
+        // but assert here for tests as a sanity check.
+        debug_assert!(self.poll_unparked(None).is_ready());
+
+        // First, increment the number of messages contained by the channel.
+        // This operation will also atomically determine if the sender task
+        // should be parked.
+        //
+        // None is returned in the case that the channel has been closed by the
+        // receiver. This happens when `Receiver::close` is called or the
+        // receiver is dropped.
+        let park_self = match self.inc_num_messages() {
+            Some(num_messages) => {
+                // Block if the current number of pending messages has exceeded
+                // the configured buffer size
+                num_messages > self.inner.buffer
+            }
+            None => return Err(TrySendError {
+                err: SendError {
+                    kind: SendErrorKind::Disconnected,
+                },
+                val: msg,
+            }),
+        };
+
+        // If the channel has reached capacity, then the sender task needs to
+        // be parked. This will send the task handle on the parked task queue.
+        //
+        // However, when `do_send` is called while dropping the `Sender`,
+        // `task::current()` can't be called safely. In this case, in order to
+        // maintain internal consistency, a blank message is pushed onto the
+        // parked task queue.
+        if park_self {
+            self.park();
+        }
+
+        self.queue_push_and_signal(msg);
+
+        Ok(())
+    }
+
+    // Push message to the queue and signal to the receiver
+    fn queue_push_and_signal(&self, msg: T) {
+        // Push the message onto the message queue
+        self.inner.message_queue.push(msg);
+
+        // Signal to the receiver that a message has been enqueued. If the
+        // receiver is parked, this will unpark the task.
+        self.inner.recv_task.wake();
+    }
+
+    // Increment the number of queued messages. Returns the resulting number.
+    fn inc_num_messages(&self) -> Option<usize> {
+        let mut curr = self.inner.state.load(SeqCst);
+
+        loop {
+            let mut state = decode_state(curr);
+
+            // The receiver end closed the channel.
+            if !state.is_open {
+                return None;
+            }
+
+            // This probably is never hit? Odds are the process will run out of
+            // memory first. It may be worth to return something else in this
+            // case?
+            assert!(state.num_messages < MAX_CAPACITY, "buffer space \
+                    exhausted; sending this messages would overflow the state");
+
+            state.num_messages += 1;
+
+            let next = encode_state(&state);
+            match self.inner.state.compare_exchange(curr, next, SeqCst, SeqCst) {
+                Ok(_) => {
+                    return Some(state.num_messages)
+                }
+                Err(actual) => curr = actual,
+            }
+        }
+    }
+
+    fn park(&mut self) {
+        {
+            let mut sender = self.sender_task.lock().unwrap();
+            sender.task = None;
+            sender.is_parked = true;
+        }
+
+        // Send handle over queue
+        let t = self.sender_task.clone();
+        self.inner.parked_queue.push(t);
+
+        // Check to make sure we weren't closed after we sent our task on the
+        // queue
+        let state = decode_state(self.inner.state.load(SeqCst));
+        self.maybe_parked = state.is_open;
+    }
+
+    /// Polls the channel to determine if there is guaranteed capacity to send
+    /// at least one item without waiting.
+    ///
+    /// # Return value
+    ///
+    /// This method returns:
+    ///
+    /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
+    /// - `Poll::Pending` if the channel may not have
+    ///   capacity, in which case the current task is queued to be notified once
+    ///   capacity is available;
+    /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
+    fn poll_ready(
+        &mut self,
+        cx: &mut Context<'_>,
+    ) -> Poll<Result<(), SendError>> {
+        let state = decode_state(self.inner.state.load(SeqCst));
+        if !state.is_open {
+            return Poll::Ready(Err(SendError {
+                kind: SendErrorKind::Disconnected,
+            }));
+        }
+
+        self.poll_unparked(Some(cx)).map(Ok)
+    }
+
+    /// Returns whether the senders send to the same receiver.
+    fn same_receiver(&self, other: &Self) -> bool {
+        Arc::ptr_eq(&self.inner, &other.inner)
+    }
+
+    /// Returns pointer to the Arc containing sender
+    ///
+    /// The returned pointer is not referenced and should be only used for hashing!
+    fn ptr(&self) -> *const BoundedInner<T> {
+        &*self.inner
+    }
+
+    /// Returns whether this channel is closed without needing a context.
+    fn is_closed(&self) -> bool {
+        !decode_state(self.inner.state.load(SeqCst)).is_open
+    }
+
+    /// Closes this channel from the sender side, preventing any new messages.
+    fn close_channel(&self) {
+        // There's no need to park this sender, its dropping,
+        // and we don't want to check for capacity, so skip
+        // that stuff from `do_send`.
+
+        self.inner.set_closed();
+        self.inner.recv_task.wake();
+    }
+
+    fn poll_unparked(&mut self, cx: Option<&mut Context<'_>>) -> Poll<()> {
+        // First check the `maybe_parked` variable. This avoids acquiring the
+        // lock in most cases
+        if self.maybe_parked {
+            // Get a lock on the task handle
+            let mut task = self.sender_task.lock().unwrap();
+
+            if !task.is_parked {
+                self.maybe_parked = false;
+                return Poll::Ready(())
+            }
+
+            // At this point, an unpark request is pending, so there will be an
+            // unpark sometime in the future. We just need to make sure that
+            // the correct task will be notified.
+            //
+            // Update the task in case the `Sender` has been moved to another
+            // task
+            task.task = cx.map(|cx| cx.waker().clone());
+
+            Poll::Pending
+        } else {
+            Poll::Ready(())
+        }
+    }
+}
+
+impl<T> Sender<T> {
+    /// Attempts to send a message on this `Sender`, returning the message
+    /// if there was an error.
+    pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
+        if let Some(inner) = &mut self.0 {
+            inner.try_send(msg)
+        } else {
+            Err(TrySendError {
+                err: SendError {
+                    kind: SendErrorKind::Disconnected,
+                },
+                val: msg,
+            })
+        }
+    }
+
+    /// Send a message on the channel.
+    ///
+    /// This function should only be called after
+    /// [`poll_ready`](Sender::poll_ready) has reported that the channel is
+    /// ready to receive a message.
+    pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
+        self.try_send(msg)
+            .map_err(|e| e.err)
+    }
+
+    /// Polls the channel to determine if there is guaranteed capacity to send
+    /// at least one item without waiting.
+    ///
+    /// # Return value
+    ///
+    /// This method returns:
+    ///
+    /// - `Poll::Ready(Ok(_))` if there is sufficient capacity;
+    /// - `Poll::Pending` if the channel may not have
+    ///   capacity, in which case the current task is queued to be notified once
+    ///   capacity is available;
+    /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped.
+    pub fn poll_ready(
+        &mut self,
+        cx: &mut Context<'_>,
+    ) -> Poll<Result<(), SendError>> {
+        let inner = self.0.as_mut().ok_or(SendError {
+            kind: SendErrorKind::Disconnected,
+        })?;
+        inner.poll_ready(cx)
+    }
+
+    /// Returns whether this channel is closed without needing a context.
+    pub fn is_closed(&self) -> bool {
+        self.0.as_ref().map(BoundedSenderInner::is_closed).unwrap_or(true)
+    }
+
+    /// Closes this channel from the sender side, preventing any new messages.
+    pub fn close_channel(&mut self) {
+        if let Some(inner) = &mut self.0 {
+            inner.close_channel();
+        }
+    }
+
+    /// Disconnects this sender from the channel, closing it if there are no more senders left.
+    pub fn disconnect(&mut self) {
+        self.0 = None;
+    }
+
+    /// Returns whether the senders send to the same receiver.
+    pub fn same_receiver(&self, other: &Self) -> bool {
+        match (&self.0, &other.0) {
+            (Some(inner), Some(other)) => inner.same_receiver(other),
+            _ => false,
+        }
+    }
+
+    /// Hashes the receiver into the provided hasher
+    pub fn hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher {
+        use std::hash::Hash;
+
+        let ptr = self.0.as_ref().map(|inner| inner.ptr());
+        ptr.hash(hasher);
+    }
+}
+
+impl<T> UnboundedSender<T> {
+    /// Check if the channel is ready to receive a message.
+    pub fn poll_ready(
+        &self,
+        _: &mut Context<'_>,
+    ) -> Poll<Result<(), SendError>> {
+        let inner = self.0.as_ref().ok_or(SendError {
+            kind: SendErrorKind::Disconnected,
+        })?;
+        inner.poll_ready_nb()
+    }
+
+    /// Returns whether this channel is closed without needing a context.
+    pub fn is_closed(&self) -> bool {
+        self.0.as_ref().map(UnboundedSenderInner::is_closed).unwrap_or(true)
+    }
+
+    /// Closes this channel from the sender side, preventing any new messages.
+    pub fn close_channel(&self) {
+        if let Some(inner) = &self.0 {
+            inner.close_channel();
+        }
+    }
+
+    /// Disconnects this sender from the channel, closing it if there are no more senders left.
+    pub fn disconnect(&mut self) {
+        self.0 = None;
+    }
+
+    // Do the send without parking current task.
+    fn do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>> {
+        if let Some(inner) = &self.0 {
+            if inner.inc_num_messages().is_some() {
+                inner.queue_push_and_signal(msg);
+                return Ok(());
+            }
+        }
+
+        Err(TrySendError {
+            err: SendError {
+                kind: SendErrorKind::Disconnected,
+            },
+            val: msg,
+        })
+    }
+
+    /// Send a message on the channel.
+    ///
+    /// This method should only be called after `poll_ready` has been used to
+    /// verify that the channel is ready to receive a message.
+    pub fn start_send(&mut self, msg: T) -> Result<(), SendError> {
+        self.do_send_nb(msg)
+            .map_err(|e| e.err)
+    }
+
+    /// Sends a message along this channel.
+    ///
+    /// This is an unbounded sender, so this function differs from `Sink::send`
+    /// by ensuring the return type reflects that the channel is always ready to
+    /// receive messages.
+    pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError<T>> {
+        self.do_send_nb(msg)
+    }
+
+    /// Returns whether the senders send to the same receiver.
+    pub fn same_receiver(&self, other: &Self) -> bool {
+        match (&self.0, &other.0) {
+            (Some(inner), Some(other)) => inner.same_receiver(other),
+            _ => false,
+        }
+    }
+
+    /// Hashes the receiver into the provided hasher
+    pub fn hash_receiver<H>(&self, hasher: &mut H) where H: std::hash::Hasher {
+        use std::hash::Hash;
+
+        let ptr = self.0.as_ref().map(|inner| inner.ptr());
+        ptr.hash(hasher);
+    }
+}
+
+impl<T> Clone for Sender<T> {
+    fn clone(&self) -> Sender<T> {
+        Sender(self.0.clone())
+    }
+}
+
+impl<T> Clone for UnboundedSender<T> {
+    fn clone(&self) -> UnboundedSender<T> {
+        UnboundedSender(self.0.clone())
+    }
+}
+
+impl<T> Clone for UnboundedSenderInner<T> {
+    fn clone(&self) -> UnboundedSenderInner<T> {
+        // Since this atomic op isn't actually guarding any memory and we don't
+        // care about any orderings besides the ordering on the single atomic
+        // variable, a relaxed ordering is acceptable.
+        let mut curr = self.inner.num_senders.load(SeqCst);
+
+        loop {
+            // If the maximum number of senders has been reached, then fail
+            if curr == MAX_BUFFER {
+                panic!("cannot clone `Sender` -- too many outstanding senders");
+            }
+
+            debug_assert!(curr < MAX_BUFFER);
+
+            let next = curr + 1;
+            let actual = self.inner.num_senders.compare_and_swap(curr, next, SeqCst);
+
+            // The ABA problem doesn't matter here. We only care that the
+            // number of senders never exceeds the maximum.
+            if actual == curr {
+                return UnboundedSenderInner {
+                    inner: self.inner.clone(),
+                };
+            }
+
+            curr = actual;
+        }
+    }
+}
+
+impl<T> Clone for BoundedSenderInner<T> {
+    fn clone(&self) -> BoundedSenderInner<T> {
+        // Since this atomic op isn't actually guarding any memory and we don't
+        // care about any orderings besides the ordering on the single atomic
+        // variable, a relaxed ordering is acceptable.
+        let mut curr = self.inner.num_senders.load(SeqCst);
+
+        loop {
+            // If the maximum number of senders has been reached, then fail
+            if curr == self.inner.max_senders() {
+                panic!("cannot clone `Sender` -- too many outstanding senders");
+            }
+
+            debug_assert!(curr < self.inner.max_senders());
+
+            let next = curr + 1;
+            let actual = self.inner.num_senders.compare_and_swap(curr, next, SeqCst);
+
+            // The ABA problem doesn't matter here. We only care that the
+            // number of senders never exceeds the maximum.
+            if actual == curr {
+                return BoundedSenderInner {
+                    inner: self.inner.clone(),
+                    sender_task: Arc::new(Mutex::new(SenderTask::new())),
+                    maybe_parked: false,
+                };
+            }
+
+            curr = actual;
+        }
+    }
+}
+
+impl<T> Drop for UnboundedSenderInner<T> {
+    fn drop(&mut self) {
+        // Ordering between variables don't matter here
+        let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
+
+        if prev == 1 {
+            self.close_channel();
+        }
+    }
+}
+
+impl<T> Drop for BoundedSenderInner<T> {
+    fn drop(&mut self) {
+        // Ordering between variables don't matter here
+        let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
+
+        if prev == 1 {
+            self.close_channel();
+        }
+    }
+}
+
+/*
+ *
+ * ===== impl Receiver =====
+ *
+ */
+
+impl<T> Receiver<T> {
+    /// Closes the receiving half of a channel, without dropping it.
+    ///
+    /// This prevents any further messages from being sent on the channel while
+    /// still enabling the receiver to drain messages that are buffered.
+    pub fn close(&mut self) {
+        if let Some(inner) = &mut self.inner {
+            inner.set_closed();
+
+            // Wake up any threads waiting as they'll see that we've closed the
+            // channel and will continue on their merry way.
+            while let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
+                task.lock().unwrap().notify();
+            }
+        }
+    }
+
+    /// Tries to receive the next message without notifying a context if empty.
+    ///
+    /// It is not recommended to call this function from inside of a future,
+    /// only when you've otherwise arranged to be notified when the channel is
+    /// no longer empty.
+    ///
+    /// This function will panic if called after `try_next` or `poll_next` has
+    /// returned None.
+    pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
+        match self.next_message() {
+            Poll::Ready(msg) => {
+                Ok(msg)
+            },
+            Poll::Pending => Err(TryRecvError { _priv: () }),
+        }
+    }
+
+    fn next_message(&mut self) -> Poll<Option<T>> {
+        let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`");
+        // Pop off a message
+        match unsafe { inner.message_queue.pop_spin() } {
+            Some(msg) => {
+                // If there are any parked task handles in the parked queue,
+                // pop one and unpark it.
+                self.unpark_one();
+
+                // Decrement number of messages
+                self.dec_num_messages();
+
+                Poll::Ready(Some(msg))
+            }
+            None => {
+                let state = decode_state(inner.state.load(SeqCst));
+                if state.is_open || state.num_messages != 0 {
+                    // If queue is open, we need to return Pending
+                    // to be woken up when new messages arrive.
+                    // If queue is closed but num_messages is non-zero,
+                    // it means that senders updated the state,
+                    // but didn't put message to queue yet,
+                    // so we need to park until sender unparks the task
+                    // after queueing the message.
+                    Poll::Pending
+                } else {
+                    // If closed flag is set AND there are no pending messages
+                    // it means end of stream
+                    self.inner = None;
+                    Poll::Ready(None)
+                }
+            }
+        }
+    }
+
+    // Unpark a single task handle if there is one pending in the parked queue
+    fn unpark_one(&mut self) {
+        if let Some(inner) = &mut self.inner {
+            if let Some(task) = unsafe { inner.parked_queue.pop_spin() } {
+                task.lock().unwrap().notify();
+            }
+        }
+    }
+
+    fn dec_num_messages(&self) {
+        if let Some(inner) = &self.inner {
+            // OPEN_MASK is highest bit, so it's unaffected by subtraction
+            // unless there's underflow, and we know there's no underflow
+            // because number of messages at this point is always > 0.
+            inner.state.fetch_sub(1, SeqCst);
+        }
+    }
+}
+
+// The receiver does not ever take a Pin to the inner T
+impl<T> Unpin for Receiver<T> {}
+
+impl<T> FusedStream for Receiver<T> {
+    fn is_terminated(&self) -> bool {
+        self.inner.is_none()
+    }
+}
+
+impl<T> Stream for Receiver<T> {
+    type Item = T;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<T>> {
+            // Try to read a message off of the message queue.
+        match self.next_message() {
+            Poll::Ready(msg) => {
+                if msg.is_none() {
+                    self.inner = None;
+                }
+                Poll::Ready(msg)
+            },
+            Poll::Pending => {
+                // There are no messages to read, in this case, park.
+                self.inner.as_ref().unwrap().recv_task.register(cx.waker());
+                // Check queue again after parking to prevent race condition:
+                // a message could be added to the queue after previous `next_message`
+                // before `register` call.
+                self.next_message()
+            }
+        }
+    }
+}
+
+impl<T> Drop for Receiver<T> {
+    fn drop(&mut self) {
+        // Drain the channel of all pending messages
+        self.close();
+        if self.inner.is_some() {
+            while let Poll::Ready(Some(..)) = self.next_message() {
+                // ...
+            }
+        }
+    }
+}
+
+impl<T> UnboundedReceiver<T> {
+    /// Closes the receiving half of a channel, without dropping it.
+    ///
+    /// This prevents any further messages from being sent on the channel while
+    /// still enabling the receiver to drain messages that are buffered.
+    pub fn close(&mut self) {
+        if let Some(inner) = &mut self.inner {
+            inner.set_closed();
+        }
+    }
+
+    /// Tries to receive the next message without notifying a context if empty.
+    ///
+    /// It is not recommended to call this function from inside of a future,
+    /// only when you've otherwise arranged to be notified when the channel is
+    /// no longer empty.
+    ///
+    /// This function will panic if called after `try_next` or `poll_next` has
+    /// returned None.
+    pub fn try_next(&mut self) -> Result<Option<T>, TryRecvError> {
+        match self.next_message() {
+            Poll::Ready(msg) => {
+                Ok(msg)
+            },
+            Poll::Pending => Err(TryRecvError { _priv: () }),
+        }
+    }
+
+    fn next_message(&mut self) -> Poll<Option<T>> {
+        let inner = self.inner.as_mut().expect("Receiver::next_message called after `None`");
+        // Pop off a message
+        match unsafe { inner.message_queue.pop_spin() } {
+            Some(msg) => {
+                // Decrement number of messages
+                self.dec_num_messages();
+
+                Poll::Ready(Some(msg))
+            }
+            None => {
+                let state = decode_state(inner.state.load(SeqCst));
+                if state.is_open || state.num_messages != 0 {
+                    // If queue is open, we need to return Pending
+                    // to be woken up when new messages arrive.
+                    // If queue is closed but num_messages is non-zero,
+                    // it means that senders updated the state,
+                    // but didn't put message to queue yet,
+                    // so we need to park until sender unparks the task
+                    // after queueing the message.
+                    Poll::Pending
+                } else {
+                    // If closed flag is set AND there are no pending messages
+                    // it means end of stream
+                    self.inner = None;
+                    Poll::Ready(None)
+                }
+            }
+        }
+    }
+
+    fn dec_num_messages(&self) {
+        if let Some(inner) = &self.inner {
+            // OPEN_MASK is highest bit, so it's unaffected by subtraction
+            // unless there's underflow, and we know there's no underflow
+            // because number of messages at this point is always > 0.
+            inner.state.fetch_sub(1, SeqCst);
+        }
+    }
+}
+
+impl<T> FusedStream for UnboundedReceiver<T> {
+    fn is_terminated(&self) -> bool {
+        self.inner.is_none()
+    }
+}
+
+impl<T> Stream for UnboundedReceiver<T> {
+    type Item = T;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<T>> {
+        // Try to read a message off of the message queue.
+        match self.next_message() {
+            Poll::Ready(msg) => {
+                if msg.is_none() {
+                    self.inner = None;
+                }
+                Poll::Ready(msg)
+            },
+            Poll::Pending => {
+                // There are no messages to read, in this case, park.
+                self.inner.as_ref().unwrap().recv_task.register(cx.waker());
+                // Check queue again after parking to prevent race condition:
+                // a message could be added to the queue after previous `next_message`
+                // before `register` call.
+                self.next_message()
+            }
+        }
+    }
+}
+
+impl<T> Drop for UnboundedReceiver<T> {
+    fn drop(&mut self) {
+        // Drain the channel of all pending messages
+        self.close();
+        if self.inner.is_some() {
+            while let Poll::Ready(Some(..)) = self.next_message() {
+                // ...
+            }
+        }
+    }
+}
+
+/*
+ *
+ * ===== impl Inner =====
+ *
+ */
+
+impl<T> UnboundedInner<T> {
+    // Clear `open` flag in the state, keep `num_messages` intact.
+    fn set_closed(&self) {
+        let curr = self.state.load(SeqCst);
+        if !decode_state(curr).is_open {
+            return;
+        }
+
+        self.state.fetch_and(!OPEN_MASK, SeqCst);
+    }
+}
+
+impl<T> BoundedInner<T> {
+    // The return value is such that the total number of messages that can be
+    // enqueued into the channel will never exceed MAX_CAPACITY
+    fn max_senders(&self) -> usize {
+        MAX_CAPACITY - self.buffer
+    }
+
+    // Clear `open` flag in the state, keep `num_messages` intact.
+    fn set_closed(&self) {
+        let curr = self.state.load(SeqCst);
+        if !decode_state(curr).is_open {
+            return;
+        }
+
+        self.state.fetch_and(!OPEN_MASK, SeqCst);
+    }
+}
+
+unsafe impl<T: Send> Send for UnboundedInner<T> {}
+unsafe impl<T: Send> Sync for UnboundedInner<T> {}
+
+unsafe impl<T: Send> Send for BoundedInner<T> {}
+unsafe impl<T: Send> Sync for BoundedInner<T> {}
+
+/*
+ *
+ * ===== Helpers =====
+ *
+ */
+
+fn decode_state(num: usize) -> State {
+    State {
+        is_open: num & OPEN_MASK == OPEN_MASK,
+        num_messages: num & MAX_CAPACITY,
+    }
+}
+
+fn encode_state(state: &State) -> usize {
+    let mut num = state.num_messages;
+
+    if state.is_open {
+        num |= OPEN_MASK;
+    }
+
+    num
+}
diff --git a/src/mpsc/queue.rs b/src/mpsc/queue.rs
new file mode 100644
index 0000000..353e75e
--- /dev/null
+++ b/src/mpsc/queue.rs
@@ -0,0 +1,178 @@
+/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ *    1. Redistributions of source code must retain the above copyright notice,
+ *       this list of conditions and the following disclaimer.
+ *
+ *    2. Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in the
+ *       documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
+ * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
+ * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
+ * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and documentation are
+ * those of the authors and should not be interpreted as representing official
+ * policies, either expressed or implied, of Dmitry Vyukov.
+ */
+
+//! A mostly lock-free multi-producer, single consumer queue for sending
+//! messages between asynchronous tasks.
+//!
+//! The queue implementation is essentially the same one used for mpsc channels
+//! in the standard library.
+//!
+//! Note that the current implementation of this queue has a caveat of the `pop`
+//! method, and see the method for more information about it. Due to this
+//! caveat, this queue may not be appropriate for all use-cases.
+
+// http://www.1024cores.net/home/lock-free-algorithms
+//                         /queues/non-intrusive-mpsc-node-based-queue
+
+// NOTE: this implementation is lifted from the standard library and only
+//       slightly modified
+
+pub(super) use self::PopResult::*;
+
+use std::thread;
+use std::cell::UnsafeCell;
+use std::ptr;
+use std::sync::atomic::{AtomicPtr, Ordering};
+
+/// A result of the `pop` function.
+pub(super) enum PopResult<T> {
+    /// Some data has been popped
+    Data(T),
+    /// The queue is empty
+    Empty,
+    /// The queue is in an inconsistent state. Popping data should succeed, but
+    /// some pushers have yet to make enough progress in order allow a pop to
+    /// succeed. It is recommended that a pop() occur "in the near future" in
+    /// order to see if the sender has made progress or not
+    Inconsistent,
+}
+
+#[derive(Debug)]
+struct Node<T> {
+    next: AtomicPtr<Node<T>>,
+    value: Option<T>,
+}
+
+/// The multi-producer single-consumer structure. This is not cloneable, but it
+/// may be safely shared so long as it is guaranteed that there is only one
+/// popper at a time (many pushers are allowed).
+#[derive(Debug)]
+pub(super) struct Queue<T> {
+    head: AtomicPtr<Node<T>>,
+    tail: UnsafeCell<*mut Node<T>>,
+}
+
+unsafe impl<T: Send> Send for Queue<T> { }
+unsafe impl<T: Send> Sync for Queue<T> { }
+
+impl<T> Node<T> {
+    unsafe fn new(v: Option<T>) -> *mut Node<T> {
+        Box::into_raw(Box::new(Node {
+            next: AtomicPtr::new(ptr::null_mut()),
+            value: v,
+        }))
+    }
+}
+
+impl<T> Queue<T> {
+    /// Creates a new queue that is safe to share among multiple producers and
+    /// one consumer.
+    pub(super) fn new() -> Queue<T> {
+        let stub = unsafe { Node::new(None) };
+        Queue {
+            head: AtomicPtr::new(stub),
+            tail: UnsafeCell::new(stub),
+        }
+    }
+
+    /// Pushes a new value onto this queue.
+    pub(super) fn push(&self, t: T) {
+        unsafe {
+            let n = Node::new(Some(t));
+            let prev = self.head.swap(n, Ordering::AcqRel);
+            (*prev).next.store(n, Ordering::Release);
+        }
+    }
+
+    /// Pops some data from this queue.
+    ///
+    /// Note that the current implementation means that this function cannot
+    /// return `Option<T>`. It is possible for this queue to be in an
+    /// inconsistent state where many pushes have succeeded and completely
+    /// finished, but pops cannot return `Some(t)`. This inconsistent state
+    /// happens when a pusher is preempted at an inopportune moment.
+    ///
+    /// This inconsistent state means that this queue does indeed have data, but
+    /// it does not currently have access to it at this time.
+    ///
+    /// This function is unsafe because only one thread can call it at a time.
+    pub(super) unsafe fn pop(&self) -> PopResult<T> {
+        let tail = *self.tail.get();
+        let next = (*tail).next.load(Ordering::Acquire);
+
+        if !next.is_null() {
+            *self.tail.get() = next;
+            assert!((*tail).value.is_none());
+            assert!((*next).value.is_some());
+            let ret = (*next).value.take().unwrap();
+            drop(Box::from_raw(tail));
+            return Data(ret);
+        }
+
+        if self.head.load(Ordering::Acquire) == tail {Empty} else {Inconsistent}
+    }
+
+    /// Pop an element similarly to `pop` function, but spin-wait on inconsistent
+    /// queue state instead of returning `Inconsistent`.
+    ///
+    /// This function is unsafe because only one thread can call it at a time.
+    pub(super) unsafe fn pop_spin(&self) -> Option<T> {
+        loop {
+            match self.pop() {
+                Empty => return None,
+                Data(t) => return Some(t),
+                // Inconsistent means that there will be a message to pop
+                // in a short time. This branch can only be reached if
+                // values are being produced from another thread, so there
+                // are a few ways that we can deal with this:
+                //
+                // 1) Spin
+                // 2) thread::yield_now()
+                // 3) task::current().unwrap() & return Pending
+                //
+                // For now, thread::yield_now() is used, but it would
+                // probably be better to spin a few times then yield.
+                Inconsistent => {
+                    thread::yield_now();
+                }
+            }
+        }
+    }
+}
+
+impl<T> Drop for Queue<T> {
+    fn drop(&mut self) {
+        unsafe {
+            let mut cur = *self.tail.get();
+            while !cur.is_null() {
+                let next = (*cur).next.load(Ordering::Relaxed);
+                drop(Box::from_raw(cur));
+                cur = next;
+            }
+        }
+    }
+}
diff --git a/src/mpsc/sink_impl.rs b/src/mpsc/sink_impl.rs
new file mode 100644
index 0000000..e7f5457
--- /dev/null
+++ b/src/mpsc/sink_impl.rs
@@ -0,0 +1,107 @@
+use super::{SendError, Sender, TrySendError, UnboundedSender};
+use futures_core::task::{Context, Poll};
+use futures_sink::Sink;
+use std::pin::Pin;
+
+impl<T> Sink<T> for Sender<T> {
+    type Error = SendError;
+
+    fn poll_ready(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Result<(), Self::Error>> {
+        (*self).poll_ready(cx)
+    }
+
+    fn start_send(
+        mut self: Pin<&mut Self>,
+        msg: T,
+    ) -> Result<(), Self::Error> {
+        (*self).start_send(msg)
+    }
+
+    fn poll_flush(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Result<(), Self::Error>> {
+        match (*self).poll_ready(cx) {
+            Poll::Ready(Err(ref e)) if e.is_disconnected() => {
+                // If the receiver disconnected, we consider the sink to be flushed.
+                Poll::Ready(Ok(()))
+            }
+            x => x,
+        }
+    }
+
+    fn poll_close(
+        mut self: Pin<&mut Self>,
+        _: &mut Context<'_>,
+    ) -> Poll<Result<(), Self::Error>> {
+        self.disconnect();
+        Poll::Ready(Ok(()))
+    }
+}
+
+impl<T> Sink<T> for UnboundedSender<T> {
+    type Error = SendError;
+
+    fn poll_ready(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Result<(), Self::Error>> {
+        UnboundedSender::poll_ready(&*self, cx)
+    }
+
+    fn start_send(
+        mut self: Pin<&mut Self>,
+        msg: T,
+    ) -> Result<(), Self::Error> {
+        UnboundedSender::start_send(&mut *self, msg)
+    }
+
+    fn poll_flush(
+        self: Pin<&mut Self>,
+        _: &mut Context<'_>,
+    ) -> Poll<Result<(), Self::Error>> {
+        Poll::Ready(Ok(()))
+    }
+
+    fn poll_close(
+        mut self: Pin<&mut Self>,
+        _: &mut Context<'_>,
+    ) -> Poll<Result<(), Self::Error>> {
+        self.disconnect();
+        Poll::Ready(Ok(()))
+    }
+}
+
+impl<T> Sink<T> for &UnboundedSender<T> {
+    type Error = SendError;
+
+    fn poll_ready(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Result<(), Self::Error>> {
+        UnboundedSender::poll_ready(*self, cx)
+    }
+
+    fn start_send(self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> {
+        self.unbounded_send(msg)
+            .map_err(TrySendError::into_send_error)
+    }
+
+    fn poll_flush(
+        self: Pin<&mut Self>,
+        _: &mut Context<'_>,
+    ) -> Poll<Result<(), Self::Error>> {
+        Poll::Ready(Ok(()))
+    }
+
+    fn poll_close(
+        self: Pin<&mut Self>,
+        _: &mut Context<'_>,
+    ) -> Poll<Result<(), Self::Error>> {
+        self.close_channel();
+        Poll::Ready(Ok(()))
+    }
+}
diff --git a/src/oneshot.rs b/src/oneshot.rs
new file mode 100644
index 0000000..d3be67e
--- /dev/null
+++ b/src/oneshot.rs
@@ -0,0 +1,432 @@
+//! A channel for sending a single message between asynchronous tasks.
+
+use alloc::sync::Arc;
+use core::fmt;
+use core::pin::Pin;
+use core::sync::atomic::AtomicBool;
+use core::sync::atomic::Ordering::SeqCst;
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll, Waker};
+
+use crate::lock::Lock;
+
+/// A future for a value that will be provided by another asynchronous task.
+///
+/// This is created by the [`channel`] function.
+#[must_use = "futures do nothing unless you `.await` or poll them"]
+#[derive(Debug)]
+pub struct Receiver<T> {
+    inner: Arc<Inner<T>>,
+}
+
+/// A means of transmitting a single value to another task.
+///
+/// This is created by the [`channel`] function.
+#[derive(Debug)]
+pub struct Sender<T> {
+    inner: Arc<Inner<T>>,
+}
+
+// The channels do not ever project Pin to the inner T
+impl<T> Unpin for Receiver<T> {}
+impl<T> Unpin for Sender<T> {}
+
+/// Internal state of the `Receiver`/`Sender` pair above. This is all used as
+/// the internal synchronization between the two for send/recv operations.
+#[derive(Debug)]
+struct Inner<T> {
+    /// Indicates whether this oneshot is complete yet. This is filled in both
+    /// by `Sender::drop` and by `Receiver::drop`, and both sides interpret it
+    /// appropriately.
+    ///
+    /// For `Receiver`, if this is `true`, then it's guaranteed that `data` is
+    /// unlocked and ready to be inspected.
+    ///
+    /// For `Sender` if this is `true` then the oneshot has gone away and it
+    /// can return ready from `poll_canceled`.
+    complete: AtomicBool,
+
+    /// The actual data being transferred as part of this `Receiver`. This is
+    /// filled in by `Sender::complete` and read by `Receiver::poll`.
+    ///
+    /// Note that this is protected by `Lock`, but it is in theory safe to
+    /// replace with an `UnsafeCell` as it's actually protected by `complete`
+    /// above. I wouldn't recommend doing this, however, unless someone is
+    /// supremely confident in the various atomic orderings here and there.
+    data: Lock<Option<T>>,
+
+    /// Field to store the task which is blocked in `Receiver::poll`.
+    ///
+    /// This is filled in when a oneshot is polled but not ready yet. Note that
+    /// the `Lock` here, unlike in `data` above, is important to resolve races.
+    /// Both the `Receiver` and the `Sender` halves understand that if they
+    /// can't acquire the lock then some important interference is happening.
+    rx_task: Lock<Option<Waker>>,
+
+    /// Like `rx_task` above, except for the task blocked in
+    /// `Sender::poll_canceled`. Additionally, `Lock` cannot be `UnsafeCell`.
+    tx_task: Lock<Option<Waker>>,
+}
+
+/// Creates a new one-shot channel for sending values across asynchronous tasks.
+///
+/// This function is similar to Rust's channel constructor found in the standard
+/// library. Two halves are returned, the first of which is a `Sender` handle,
+/// used to signal the end of a computation and provide its value. The second
+/// half is a `Receiver` which implements the `Future` trait, resolving to the
+/// value that was given to the `Sender` handle.
+///
+/// Each half can be separately owned and sent across tasks.
+///
+/// # Examples
+///
+/// ```
+/// use futures::channel::oneshot;
+/// use futures::future::FutureExt;
+/// use std::thread;
+///
+/// let (sender, receiver) = oneshot::channel::<i32>();
+///
+/// # let t =
+/// thread::spawn(|| {
+///     let future = receiver.map(|i| {
+///         println!("got: {:?}", i);
+///     });
+///     // ...
+/// # return future;
+/// });
+///
+/// sender.send(3).unwrap();
+/// # futures::executor::block_on(t.join().unwrap());
+/// ```
+pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
+    let inner = Arc::new(Inner::new());
+    let receiver = Receiver {
+        inner: inner.clone(),
+    };
+    let sender = Sender {
+        inner,
+    };
+    (sender, receiver)
+}
+
+impl<T> Inner<T> {
+    fn new() -> Inner<T> {
+        Inner {
+            complete: AtomicBool::new(false),
+            data: Lock::new(None),
+            rx_task: Lock::new(None),
+            tx_task: Lock::new(None),
+        }
+    }
+
+    fn send(&self, t: T) -> Result<(), T> {
+        if self.complete.load(SeqCst) {
+            return Err(t)
+        }
+
+        // Note that this lock acquisition may fail if the receiver
+        // is closed and sets the `complete` flag to true, whereupon
+        // the receiver may call `poll()`.
+        if let Some(mut slot) = self.data.try_lock() {
+            assert!(slot.is_none());
+            *slot = Some(t);
+            drop(slot);
+
+            // If the receiver called `close()` between the check at the
+            // start of the function, and the lock being released, then
+            // the receiver may not be around to receive it, so try to
+            // pull it back out.
+            if self.complete.load(SeqCst) {
+                // If lock acquisition fails, then receiver is actually
+                // receiving it, so we're good.
+                if let Some(mut slot) = self.data.try_lock() {
+                    if let Some(t) = slot.take() {
+                        return Err(t);
+                    }
+                }
+            }
+            Ok(())
+        } else {
+            // Must have been closed
+            Err(t)
+        }
+    }
+
+    fn poll_canceled(&self, cx: &mut Context<'_>) -> Poll<()> {
+        // Fast path up first, just read the flag and see if our other half is
+        // gone. This flag is set both in our destructor and the oneshot
+        // destructor, but our destructor hasn't run yet so if it's set then the
+        // oneshot is gone.
+        if self.complete.load(SeqCst) {
+            return Poll::Ready(())
+        }
+
+        // If our other half is not gone then we need to park our current task
+        // and move it into the `tx_task` slot to get notified when it's
+        // actually gone.
+        //
+        // If `try_lock` fails, then the `Receiver` is in the process of using
+        // it, so we can deduce that it's now in the process of going away and
+        // hence we're canceled. If it succeeds then we just store our handle.
+        //
+        // Crucially we then check `complete` *again* before we return.
+        // While we were storing our handle inside `tx_task` the
+        // `Receiver` may have been dropped. The first thing it does is set the
+        // flag, and if it fails to acquire the lock it assumes that we'll see
+        // the flag later on. So... we then try to see the flag later on!
+        let handle = cx.waker().clone();
+        match self.tx_task.try_lock() {
+            Some(mut p) => *p = Some(handle),
+            None => return Poll::Ready(()),
+        }
+        if self.complete.load(SeqCst) {
+            Poll::Ready(())
+        } else {
+            Poll::Pending
+        }
+    }
+
+    fn is_canceled(&self) -> bool {
+        self.complete.load(SeqCst)
+    }
+
+    fn drop_tx(&self) {
+        // Flag that we're a completed `Sender` and try to wake up a receiver.
+        // Whether or not we actually stored any data will get picked up and
+        // translated to either an item or cancellation.
+        //
+        // Note that if we fail to acquire the `rx_task` lock then that means
+        // we're in one of two situations:
+        //
+        // 1. The receiver is trying to block in `poll`
+        // 2. The receiver is being dropped
+        //
+        // In the first case it'll check the `complete` flag after it's done
+        // blocking to see if it succeeded. In the latter case we don't need to
+        // wake up anyone anyway. So in both cases it's ok to ignore the `None`
+        // case of `try_lock` and bail out.
+        //
+        // The first case crucially depends on `Lock` using `SeqCst` ordering
+        // under the hood. If it instead used `Release` / `Acquire` ordering,
+        // then it would not necessarily synchronize with `inner.complete`
+        // and deadlock might be possible, as was observed in
+        // https://github.com/rust-lang/futures-rs/pull/219.
+        self.complete.store(true, SeqCst);
+
+        if let Some(mut slot) = self.rx_task.try_lock() {
+            if let Some(task) = slot.take() {
+                drop(slot);
+                task.wake();
+            }
+        }
+
+        // If we registered a task for cancel notification drop it to reduce
+        // spurious wakeups
+        if let Some(mut slot) = self.tx_task.try_lock() {
+            drop(slot.take());
+        }
+    }
+
+    fn close_rx(&self) {
+        // Flag our completion and then attempt to wake up the sender if it's
+        // blocked. See comments in `drop` below for more info
+        self.complete.store(true, SeqCst);
+        if let Some(mut handle) = self.tx_task.try_lock() {
+            if let Some(task) = handle.take() {
+                drop(handle);
+                task.wake()
+            }
+        }
+    }
+
+    fn try_recv(&self) -> Result<Option<T>, Canceled> {
+        // If we're complete, either `::close_rx` or `::drop_tx` was called.
+        // We can assume a successful send if data is present.
+        if self.complete.load(SeqCst) {
+            if let Some(mut slot) = self.data.try_lock() {
+                if let Some(data) = slot.take() {
+                    return Ok(Some(data));
+                }
+            }
+            Err(Canceled)
+        } else {
+            Ok(None)
+        }
+    }
+
+    fn recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> {
+        // Check to see if some data has arrived. If it hasn't then we need to
+        // block our task.
+        //
+        // Note that the acquisition of the `rx_task` lock might fail below, but
+        // the only situation where this can happen is during `Sender::drop`
+        // when we are indeed completed already. If that's happening then we
+        // know we're completed so keep going.
+        let done = if self.complete.load(SeqCst) {
+            true
+        } else {
+            let task = cx.waker().clone();
+            match self.rx_task.try_lock() {
+                Some(mut slot) => { *slot = Some(task); false },
+                None => true,
+            }
+        };
+
+        // If we're `done` via one of the paths above, then look at the data and
+        // figure out what the answer is. If, however, we stored `rx_task`
+        // successfully above we need to check again if we're completed in case
+        // a message was sent while `rx_task` was locked and couldn't notify us
+        // otherwise.
+        //
+        // If we're not done, and we're not complete, though, then we've
+        // successfully blocked our task and we return `Pending`.
+        if done || self.complete.load(SeqCst) {
+            // If taking the lock fails, the sender will realise that the we're
+            // `done` when it checks the `complete` flag on the way out, and
+            // will treat the send as a failure.
+            if let Some(mut slot) = self.data.try_lock() {
+                if let Some(data) = slot.take() {
+                    return Poll::Ready(Ok(data));
+                }
+            }
+            Poll::Ready(Err(Canceled))
+        } else {
+            Poll::Pending
+        }
+    }
+
+    fn drop_rx(&self) {
+        // Indicate to the `Sender` that we're done, so any future calls to
+        // `poll_canceled` are weeded out.
+        self.complete.store(true, SeqCst);
+
+        // If we've blocked a task then there's no need for it to stick around,
+        // so we need to drop it. If this lock acquisition fails, though, then
+        // it's just because our `Sender` is trying to take the task, so we
+        // let them take care of that.
+        if let Some(mut slot) = self.rx_task.try_lock() {
+            let task = slot.take();
+            drop(slot);
+            drop(task);
+        }
+
+        // Finally, if our `Sender` wants to get notified of us going away, it
+        // would have stored something in `tx_task`. Here we try to peel that
+        // out and unpark it.
+        //
+        // Note that the `try_lock` here may fail, but only if the `Sender` is
+        // in the process of filling in the task. If that happens then we
+        // already flagged `complete` and they'll pick that up above.
+        if let Some(mut handle) = self.tx_task.try_lock() {
+            if let Some(task) = handle.take() {
+                drop(handle);
+                task.wake()
+            }
+        }
+    }
+}
+
+impl<T> Sender<T> {
+    /// Completes this oneshot with a successful result.
+    ///
+    /// This function will consume `self` and indicate to the other end, the
+    /// [`Receiver`](Receiver), that the value provided is the result of the
+    /// computation this represents.
+    ///
+    /// If the value is successfully enqueued for the remote end to receive,
+    /// then `Ok(())` is returned. If the receiving end was dropped before
+    /// this function was called, however, then `Err` is returned with the value
+    /// provided.
+    pub fn send(self, t: T) -> Result<(), T> {
+        self.inner.send(t)
+    }
+
+    /// Polls this `Sender` half to detect whether its associated
+    /// [`Receiver`](Receiver) with has been dropped.
+    ///
+    /// # Return values
+    ///
+    /// If `Ready(())` is returned then the associated `Receiver` has been
+    /// dropped, which means any work required for sending should be canceled.
+    ///
+    /// If `Pending` is returned then the associated `Receiver` is still
+    /// alive and may be able to receive a message if sent. The current task,
+    /// however, is scheduled to receive a notification if the corresponding
+    /// `Receiver` goes away.
+    pub fn poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()> {
+        self.inner.poll_canceled(cx)
+    }
+
+    /// Tests to see whether this `Sender`'s corresponding `Receiver`
+    /// has been dropped.
+    ///
+    /// Unlike [`poll_canceled`](Sender::poll_canceled), this function does not
+    /// enqueue a task for wakeup upon cancellation, but merely reports the
+    /// current state, which may be subject to concurrent modification.
+    pub fn is_canceled(&self) -> bool {
+        self.inner.is_canceled()
+    }
+}
+
+impl<T> Drop for Sender<T> {
+    fn drop(&mut self) {
+        self.inner.drop_tx()
+    }
+}
+
+/// Error returned from a [`Receiver`](Receiver) when the corresponding
+/// [`Sender`](Sender) is dropped.
+#[derive(Clone, Copy, PartialEq, Eq, Debug)]
+pub struct Canceled;
+
+impl fmt::Display for Canceled {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "oneshot canceled")
+    }
+}
+
+#[cfg(feature = "std")]
+impl std::error::Error for Canceled {}
+
+impl<T> Receiver<T> {
+    /// Gracefully close this receiver, preventing any subsequent attempts to
+    /// send to it.
+    ///
+    /// Any `send` operation which happens after this method returns is
+    /// guaranteed to fail. After calling this method, you can use
+    /// [`Receiver::poll`](core::future::Future::poll) to determine whether a
+    /// message had previously been sent.
+    pub fn close(&mut self) {
+        self.inner.close_rx()
+    }
+
+    /// Attempts to receive a message outside of the context of a task.
+    ///
+    /// Does not schedule a task wakeup or have any other side effects.
+    ///
+    /// A return value of `None` must be considered immediately stale (out of
+    /// date) unless [`close`](Receiver::close) has been called first.
+    ///
+    /// Returns an error if the sender was dropped.
+    pub fn try_recv(&mut self) -> Result<Option<T>, Canceled> {
+        self.inner.try_recv()
+    }
+}
+
+impl<T> Future for Receiver<T> {
+    type Output = Result<T, Canceled>;
+
+    fn poll(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Result<T, Canceled>> {
+        self.inner.recv(cx)
+    }
+}
+
+impl<T> Drop for Receiver<T> {
+    fn drop(&mut self) {
+        self.inner.drop_rx()
+    }
+}
diff --git a/tests/channel.rs b/tests/channel.rs
new file mode 100644
index 0000000..73dac64
--- /dev/null
+++ b/tests/channel.rs
@@ -0,0 +1,70 @@
+use futures::channel::mpsc;
+use futures::executor::block_on;
+use futures::future::poll_fn;
+use futures::stream::StreamExt;
+use futures::sink::SinkExt;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::thread;
+
+#[test]
+fn sequence() {
+    let (tx, rx) = mpsc::channel(1);
+
+    let amt = 20;
+    let t = thread::spawn(move || {
+        block_on(send_sequence(amt, tx))
+    });
+    let list: Vec<_> = block_on(rx.collect());
+    let mut list = list.into_iter();
+    for i in (1..=amt).rev() {
+        assert_eq!(list.next(), Some(i));
+    }
+    assert_eq!(list.next(), None);
+
+    t.join().unwrap();
+}
+
+async fn send_sequence(n: u32, mut sender: mpsc::Sender<u32>) {
+    for x in 0..n {
+        sender.send(n - x).await.unwrap();
+    }
+}
+
+#[test]
+fn drop_sender() {
+    let (tx, mut rx) = mpsc::channel::<u32>(1);
+    drop(tx);
+    let f = poll_fn(|cx| {
+        rx.poll_next_unpin(cx)
+    });
+    assert_eq!(block_on(f), None)
+}
+
+#[test]
+fn drop_rx() {
+    let (mut tx, rx) = mpsc::channel::<u32>(1);
+    block_on(tx.send(1)).unwrap();
+    drop(rx);
+    assert!(block_on(tx.send(1)).is_err());
+}
+
+#[test]
+fn drop_order() {
+    static DROPS: AtomicUsize = AtomicUsize::new(0);
+    let (mut tx, rx) = mpsc::channel(1);
+
+    struct A;
+
+    impl Drop for A {
+        fn drop(&mut self) {
+            DROPS.fetch_add(1, Ordering::SeqCst);
+        }
+    }
+
+    block_on(tx.send(A)).unwrap();
+    assert_eq!(DROPS.load(Ordering::SeqCst), 0);
+    drop(rx);
+    assert_eq!(DROPS.load(Ordering::SeqCst), 1);
+    assert!(block_on(tx.send(A)).is_err());
+    assert_eq!(DROPS.load(Ordering::SeqCst), 2);
+}
diff --git a/tests/mpsc-close.rs b/tests/mpsc-close.rs
new file mode 100644
index 0000000..50852eb
--- /dev/null
+++ b/tests/mpsc-close.rs
@@ -0,0 +1,144 @@
+use futures::channel::mpsc;
+use futures::executor::block_on;
+use futures::sink::SinkExt;
+use futures::stream::StreamExt;
+use std::sync::Arc;
+use std::thread;
+
+#[test]
+fn smoke() {
+    let (mut sender, receiver) = mpsc::channel(1);
+
+    let t = thread::spawn(move || {
+        while let Ok(()) = block_on(sender.send(42)) {}
+    });
+
+    // `receiver` needs to be dropped for `sender` to stop sending and therefore before the join.
+    block_on(receiver.take(3).for_each(|_| futures::future::ready(())));
+
+    t.join().unwrap()
+}
+
+#[test]
+fn multiple_senders_disconnect() {
+    {
+        let (mut tx1, mut rx) = mpsc::channel(1);
+        let (tx2, mut tx3, mut tx4) = (tx1.clone(), tx1.clone(), tx1.clone());
+
+        // disconnect, dropping and Sink::poll_close should all close this sender but leave the
+        // channel open for other senders
+        tx1.disconnect();
+        drop(tx2);
+        block_on(tx3.close()).unwrap();
+
+        assert!(tx1.is_closed());
+        assert!(tx3.is_closed());
+        assert!(!tx4.is_closed());
+
+        block_on(tx4.send(5)).unwrap();
+        assert_eq!(block_on(rx.next()), Some(5));
+
+        // dropping the final sender will close the channel
+        drop(tx4);
+        assert_eq!(block_on(rx.next()), None);
+    }
+
+    {
+        let (mut tx1, mut rx) = mpsc::unbounded();
+        let (tx2, mut tx3, mut tx4) = (tx1.clone(), tx1.clone(), tx1.clone());
+
+        // disconnect, dropping and Sink::poll_close should all close this sender but leave the
+        // channel open for other senders
+        tx1.disconnect();
+        drop(tx2);
+        block_on(tx3.close()).unwrap();
+
+        assert!(tx1.is_closed());
+        assert!(tx3.is_closed());
+        assert!(!tx4.is_closed());
+
+        block_on(tx4.send(5)).unwrap();
+        assert_eq!(block_on(rx.next()), Some(5));
+
+        // dropping the final sender will close the channel
+        drop(tx4);
+        assert_eq!(block_on(rx.next()), None);
+    }
+}
+
+#[test]
+fn multiple_senders_close_channel() {
+    {
+        let (mut tx1, mut rx) = mpsc::channel(1);
+        let mut tx2 = tx1.clone();
+
+        // close_channel should shut down the whole channel
+        tx1.close_channel();
+
+        assert!(tx1.is_closed());
+        assert!(tx2.is_closed());
+
+        let err = block_on(tx2.send(5)).unwrap_err();
+        assert!(err.is_disconnected());
+
+        assert_eq!(block_on(rx.next()), None);
+    }
+
+    {
+        let (tx1, mut rx) = mpsc::unbounded();
+        let mut tx2 = tx1.clone();
+
+        // close_channel should shut down the whole channel
+        tx1.close_channel();
+
+        assert!(tx1.is_closed());
+        assert!(tx2.is_closed());
+
+        let err = block_on(tx2.send(5)).unwrap_err();
+        assert!(err.is_disconnected());
+
+        assert_eq!(block_on(rx.next()), None);
+    }
+}
+
+#[test]
+fn single_receiver_drop_closes_channel_and_drains() {
+    {
+        let ref_count = Arc::new(0);
+        let weak_ref = Arc::downgrade(&ref_count);
+
+        let (sender, receiver) = mpsc::unbounded();
+        sender.unbounded_send(ref_count).expect("failed to send");
+
+        // Verify that the sent message is still live.
+        assert!(weak_ref.upgrade().is_some());
+
+        drop(receiver);
+
+        // The sender should know the channel is closed.
+        assert!(sender.is_closed());
+
+        // Verify that the sent message has been dropped.
+        assert!(weak_ref.upgrade().is_none());
+    }
+
+    {
+        let ref_count = Arc::new(0);
+        let weak_ref = Arc::downgrade(&ref_count);
+
+        let (mut sender, receiver) = mpsc::channel(1);
+        sender.try_send(ref_count).expect("failed to send");
+
+        // Verify that the sent message is still live.
+        assert!(weak_ref.upgrade().is_some());
+
+        drop(receiver);
+
+        // The sender should know the channel is closed.
+        assert!(sender.is_closed());
+
+        // Verify that the sent message has been dropped.
+        assert!(weak_ref.upgrade().is_none());
+        assert!(sender.is_closed());
+    }
+}
diff --git a/tests/mpsc.rs b/tests/mpsc.rs
new file mode 100644
index 0000000..409fa6e
--- /dev/null
+++ b/tests/mpsc.rs
@@ -0,0 +1,620 @@
+use futures::channel::{mpsc, oneshot};
+use futures::executor::{block_on, block_on_stream};
+use futures::future::{FutureExt, poll_fn};
+use futures::stream::{Stream, StreamExt};
+use futures::sink::{Sink, SinkExt};
+use futures::task::{Context, Poll};
+use futures::pin_mut;
+use futures_test::task::{new_count_waker, noop_context};
+use std::sync::{Arc, Mutex};
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::thread;
+
+trait AssertSend: Send {}
+impl AssertSend for mpsc::Sender<i32> {}
+impl AssertSend for mpsc::Receiver<i32> {}
+
+#[test]
+fn send_recv() {
+    let (mut tx, rx) = mpsc::channel::<i32>(16);
+
+    block_on(tx.send(1)).unwrap();
+    drop(tx);
+    let v: Vec<_> = block_on(rx.collect());
+    assert_eq!(v, vec![1]);
+}
+
+#[test]
+fn send_recv_no_buffer() {
+    // Run on a task context
+    block_on(poll_fn(move |cx| {
+        let (tx, rx) = mpsc::channel::<i32>(0);
+        pin_mut!(tx, rx);
+
+        assert!(tx.as_mut().poll_flush(cx).is_ready());
+        assert!(tx.as_mut().poll_ready(cx).is_ready());
+
+        // Send first message
+        assert!(tx.as_mut().start_send(1).is_ok());
+        assert!(tx.as_mut().poll_ready(cx).is_pending());
+
+        // poll_ready said Pending, so no room in buffer, therefore new sends
+        // should get rejected with is_full.
+        assert!(tx.as_mut().start_send(0).unwrap_err().is_full());
+        assert!(tx.as_mut().poll_ready(cx).is_pending());
+
+        // Take the value
+        assert_eq!(rx.as_mut().poll_next(cx), Poll::Ready(Some(1)));
+        assert!(tx.as_mut().poll_ready(cx).is_ready());
+
+        // Send second message
+        assert!(tx.as_mut().poll_ready(cx).is_ready());
+        assert!(tx.as_mut().start_send(2).is_ok());
+        assert!(tx.as_mut().poll_ready(cx).is_pending());
+
+        // Take the value
+        assert_eq!(rx.as_mut().poll_next(cx), Poll::Ready(Some(2)));
+        assert!(tx.as_mut().poll_ready(cx).is_ready());
+
+        Poll::Ready(())
+    }));
+}
+
+#[test]
+fn send_shared_recv() {
+    let (mut tx1, rx) = mpsc::channel::<i32>(16);
+    let mut rx = block_on_stream(rx);
+    let mut tx2 = tx1.clone();
+
+    block_on(tx1.send(1)).unwrap();
+    assert_eq!(rx.next(), Some(1));
+
+    block_on(tx2.send(2)).unwrap();
+    assert_eq!(rx.next(), Some(2));
+}
+
+#[test]
+fn send_recv_threads() {
+    let (mut tx, rx) = mpsc::channel::<i32>(16);
+
+    let t = thread::spawn(move|| {
+        block_on(tx.send(1)).unwrap();
+    });
+
+    let v: Vec<_> = block_on(rx.take(1).collect());
+    assert_eq!(v, vec![1]);
+
+    t.join().unwrap();
+}
+
+#[test]
+fn send_recv_threads_no_capacity() {
+    let (mut tx, rx) = mpsc::channel::<i32>(0);
+
+    let t = thread::spawn(move || {
+        block_on(tx.send(1)).unwrap();
+        block_on(tx.send(2)).unwrap();
+    });
+
+    let v: Vec<_> = block_on(rx.collect());
+    assert_eq!(v, vec![1, 2]);
+
+    t.join().unwrap();
+}
+
+#[test]
+fn recv_close_gets_none() {
+    let (mut tx, mut rx) = mpsc::channel::<i32>(10);
+
+    // Run on a task context
+    block_on(poll_fn(move |cx| {
+        rx.close();
+
+        assert_eq!(rx.poll_next_unpin(cx), Poll::Ready(None));
+        match tx.poll_ready(cx) {
+            Poll::Pending | Poll::Ready(Ok(_)) => panic!(),
+            Poll::Ready(Err(e)) => assert!(e.is_disconnected()),
+        };
+
+        Poll::Ready(())
+    }));
+}
+
+#[test]
+fn tx_close_gets_none() {
+    let (_, mut rx) = mpsc::channel::<i32>(10);
+
+    // Run on a task context
+    block_on(poll_fn(move |cx| {
+        assert_eq!(rx.poll_next_unpin(cx), Poll::Ready(None));
+        Poll::Ready(())
+    }));
+}
+
+// #[test]
+// fn spawn_sends_items() {
+//     let core = local_executor::Core::new();
+//     let stream = unfold(0, |i| Some(ok::<_,u8>((i, i + 1))));
+//     let rx = mpsc::spawn(stream, &core, 1);
+//     assert_eq!(core.run(rx.take(4).collect()).unwrap(),
+//                [0, 1, 2, 3]);
+// }
+
+// #[test]
+// fn spawn_kill_dead_stream() {
+//     use std::thread;
+//     use std::time::Duration;
+//     use futures::future::Either;
+//     use futures::sync::oneshot;
+//
+//     // a stream which never returns anything (maybe a remote end isn't
+//     // responding), but dropping it leads to observable side effects
+//     // (like closing connections, releasing limited resources, ...)
+//     #[derive(Debug)]
+//     struct Dead {
+//         // when dropped you should get Err(oneshot::Canceled) on the
+//         // receiving end
+//         done: oneshot::Sender<()>,
+//     }
+//     impl Stream for Dead {
+//         type Item = ();
+//         type Error = ();
+//
+//         fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+//             Ok(Poll::Pending)
+//         }
+//     }
+//
+//     // need to implement a timeout for the test, as it would hang
+//     // forever right now
+//     let (timeout_tx, timeout_rx) = oneshot::channel();
+//     thread::spawn(move || {
+//         thread::sleep(Duration::from_millis(1000));
+//         let _ = timeout_tx.send(());
+//     });
+//
+//     let core = local_executor::Core::new();
+//     let (done_tx, done_rx) = oneshot::channel();
+//     let stream = Dead{done: done_tx};
+//     let rx = mpsc::spawn(stream, &core, 1);
+//     let res = core.run(
+//         Ok::<_, ()>(())
+//         .into_future()
+//         .then(move |_| {
+//             // now drop the spawned stream: maybe some timeout exceeded,
+//             // or some connection on this end was closed by the remote
+//             // end.
+//             drop(rx);
+//             // and wait for the spawned stream to release its resources
+//             done_rx
+//         })
+//         .select2(timeout_rx)
+//     );
+//     match res {
+//         Err(Either::A((oneshot::Canceled, _))) => (),
+//         _ => {
+//             panic!("dead stream wasn't canceled");
+//         },
+//     }
+// }
+
+#[test]
+fn stress_shared_unbounded() {
+    const AMT: u32 = 10000;
+    const NTHREADS: u32 = 8;
+    let (tx, rx) = mpsc::unbounded::<i32>();
+
+    let t = thread::spawn(move|| {
+        let result: Vec<_> = block_on(rx.collect());
+        assert_eq!(result.len(), (AMT * NTHREADS) as usize);
+        for item in result {
+            assert_eq!(item, 1);
+        }
+    });
+
+    for _ in 0..NTHREADS {
+        let tx = tx.clone();
+
+        thread::spawn(move|| {
+            for _ in 0..AMT {
+                tx.unbounded_send(1).unwrap();
+            }
+        });
+    }
+
+    drop(tx);
+
+    t.join().ok().unwrap();
+}
+
+#[test]
+fn stress_shared_bounded_hard() {
+    const AMT: u32 = 10000;
+    const NTHREADS: u32 = 8;
+    let (tx, rx) = mpsc::channel::<i32>(0);
+
+    let t = thread::spawn(move|| {
+        let result: Vec<_> = block_on(rx.collect());
+        assert_eq!(result.len(), (AMT * NTHREADS) as usize);
+        for item in result {
+            assert_eq!(item, 1);
+        }
+    });
+
+    for _ in 0..NTHREADS {
+        let mut tx = tx.clone();
+
+        thread::spawn(move || {
+            for _ in 0..AMT {
+                block_on(tx.send(1)).unwrap();
+            }
+        });
+    }
+
+    drop(tx);
+
+    t.join().unwrap();
+}
+
+#[test]
+fn stress_receiver_multi_task_bounded_hard() {
+    const AMT: usize = 10_000;
+    const NTHREADS: u32 = 2;
+
+    let (mut tx, rx) = mpsc::channel::<usize>(0);
+    let rx = Arc::new(Mutex::new(Some(rx)));
+    let n = Arc::new(AtomicUsize::new(0));
+
+    let mut th = vec![];
+
+    for _ in 0..NTHREADS {
+        let rx = rx.clone();
+        let n = n.clone();
+
+        let t = thread::spawn(move || {
+            let mut i = 0;
+
+            loop {
+                i += 1;
+                let mut rx_opt = rx.lock().unwrap();
+                if let Some(rx) = &mut *rx_opt {
+                    if i % 5 == 0 {
+                        let item = block_on(rx.next());
+
+                        if item.is_none() {
+                            *rx_opt = None;
+                            break;
+                        }
+
+                        n.fetch_add(1, Ordering::Relaxed);
+                    } else {
+                        // Just poll
+                        let n = n.clone();
+                        match rx.poll_next_unpin(&mut noop_context()) {
+                            Poll::Ready(Some(_)) => {
+                                n.fetch_add(1, Ordering::Relaxed);
+                            }
+                            Poll::Ready(None) => {
+                                *rx_opt = None;
+                                break
+                            },
+                            Poll::Pending => {},
+                        }
+                    }
+                } else {
+                    break;
+                }
+            }
+        });
+
+        th.push(t);
+    }
+
+
+    for i in 0..AMT {
+        block_on(tx.send(i)).unwrap();
+    }
+    drop(tx);
+
+    for t in th {
+        t.join().unwrap();
+    }
+
+    assert_eq!(AMT, n.load(Ordering::Relaxed));
+}
+
+/// Stress test that receiver properly receives all the messages
+/// after sender dropped.
+#[test]
+fn stress_drop_sender() {
+    fn list() -> impl Stream<Item=i32> {
+        let (tx, rx) = mpsc::channel(1);
+        thread::spawn(move || {
+            block_on(send_one_two_three(tx));
+        });
+        rx
+    }
+
+    for _ in 0..10000 {
+        let v: Vec<_> = block_on(list().collect());
+        assert_eq!(v, vec![1, 2, 3]);
+    }
+}
+
+async fn send_one_two_three(mut tx: mpsc::Sender<i32>) {
+    for i in 1..=3 {
+        tx.send(i).await.unwrap();
+    }
+}
+
+/// Stress test that after receiver dropped,
+/// no messages are lost.
+fn stress_close_receiver_iter() {
+    let (tx, rx) = mpsc::unbounded();
+    let mut rx = block_on_stream(rx);
+    let (unwritten_tx, unwritten_rx) = std::sync::mpsc::channel();
+    let th = thread::spawn(move || {
+        for i in 1.. {
+            if tx.unbounded_send(i).is_err() {
+                unwritten_tx.send(i).expect("unwritten_tx");
+                return;
+            }
+        }
+    });
+
+    // Read one message to make sure thread effectively started
+    assert_eq!(Some(1), rx.next());
+
+    rx.close();
+
+    for i in 2.. {
+        match rx.next() {
+            Some(r) => assert!(i == r),
+            None => {
+                let unwritten = unwritten_rx.recv().expect("unwritten_rx");
+                assert_eq!(unwritten, i);
+                th.join().unwrap();
+                return;
+            }
+        }
+    }
+}
+
+#[test]
+fn stress_close_receiver() {
+    for _ in 0..10000 {
+        stress_close_receiver_iter();
+    }
+}
+
+async fn stress_poll_ready_sender(mut sender: mpsc::Sender<u32>, count: u32) {
+    for i in (1..=count).rev() {
+        sender.send(i).await.unwrap();
+    }
+}
+
+/// Tests that after `poll_ready` indicates capacity a channel can always send without waiting.
+#[test]
+fn stress_poll_ready() {
+    const AMT: u32 = 1000;
+    const NTHREADS: u32 = 8;
+
+    /// Run a stress test using the specified channel capacity.
+    fn stress(capacity: usize) {
+        let (tx, rx) = mpsc::channel(capacity);
+        let mut threads = Vec::new();
+        for _ in 0..NTHREADS {
+            let sender = tx.clone();
+            threads.push(thread::spawn(move || {
+                block_on(stress_poll_ready_sender(sender, AMT))
+            }));
+        }
+        drop(tx);
+
+        let result: Vec<_> = block_on(rx.collect());
+        assert_eq!(result.len() as u32, AMT * NTHREADS);
+
+        for thread in threads {
+            thread.join().unwrap();
+        }
+    }
+
+    stress(0);
+    stress(1);
+    stress(8);
+    stress(16);
+}
+
+#[test]
+fn try_send_1() {
+    const N: usize = 3000;
+    let (mut tx, rx) = mpsc::channel(0);
+
+    let t = thread::spawn(move || {
+        for i in 0..N {
+            loop {
+                if tx.try_send(i).is_ok() {
+                    break
+                }
+            }
+        }
+    });
+
+    let result: Vec<_> = block_on(rx.collect());
+    for (i, j) in result.into_iter().enumerate() {
+        assert_eq!(i, j);
+    }
+
+    t.join().unwrap();
+}
+
+#[test]
+fn try_send_2() {
+    let (mut tx, rx) = mpsc::channel(0);
+    let mut rx = block_on_stream(rx);
+
+    tx.try_send("hello").unwrap();
+
+    let (readytx, readyrx) = oneshot::channel::<()>();
+
+    let th = thread::spawn(move || {
+        block_on(poll_fn(|cx| {
+            assert!(tx.poll_ready(cx).is_pending());
+            Poll::Ready(())
+        }));
+
+        drop(readytx);
+        block_on(tx.send("goodbye")).unwrap();
+    });
+
+    let _ = block_on(readyrx);
+    assert_eq!(rx.next(), Some("hello"));
+    assert_eq!(rx.next(), Some("goodbye"));
+    assert_eq!(rx.next(), None);
+
+    th.join().unwrap();
+}
+
+#[test]
+fn try_send_fail() {
+    let (mut tx, rx) = mpsc::channel(0);
+    let mut rx = block_on_stream(rx);
+
+    tx.try_send("hello").unwrap();
+
+    // This should fail
+    assert!(tx.try_send("fail").is_err());
+
+    assert_eq!(rx.next(), Some("hello"));
+
+    tx.try_send("goodbye").unwrap();
+    drop(tx);
+
+    assert_eq!(rx.next(), Some("goodbye"));
+    assert_eq!(rx.next(), None);
+}
+
+#[test]
+fn try_send_recv() {
+    let (mut tx, mut rx) = mpsc::channel(1);
+    tx.try_send("hello").unwrap();
+    tx.try_send("hello").unwrap();
+    tx.try_send("hello").unwrap_err(); // should be full
+    rx.try_next().unwrap();
+    rx.try_next().unwrap();
+    rx.try_next().unwrap_err(); // should be empty
+    tx.try_send("hello").unwrap();
+    rx.try_next().unwrap();
+    rx.try_next().unwrap_err(); // should be empty
+}
+
+#[test]
+fn same_receiver() {
+    let (mut txa1, _) = mpsc::channel::<i32>(1);
+    let txa2 = txa1.clone();
+
+    let (mut txb1, _) = mpsc::channel::<i32>(1);
+    let txb2 = txb1.clone();
+
+    assert!(txa1.same_receiver(&txa2));
+    assert!(txb1.same_receiver(&txb2));
+    assert!(!txa1.same_receiver(&txb1));
+
+    txa1.disconnect();
+    txb1.close_channel();
+
+    assert!(!txa1.same_receiver(&txa2));
+    assert!(txb1.same_receiver(&txb2));
+}
+
+#[test]
+fn hash_receiver() {
+    use std::hash::Hasher;
+    use std::collections::hash_map::DefaultHasher;
+
+    let mut hasher_a1 = DefaultHasher::new();
+    let mut hasher_a2 = DefaultHasher::new();
+    let mut hasher_b1 = DefaultHasher::new();
+    let mut hasher_b2 = DefaultHasher::new();
+    let (mut txa1, _) = mpsc::channel::<i32>(1);
+    let txa2 = txa1.clone();
+
+    let (mut txb1, _) = mpsc::channel::<i32>(1);
+    let txb2 = txb1.clone();
+
+    txa1.hash_receiver(&mut hasher_a1);
+    let hash_a1 = hasher_a1.finish();
+    txa2.hash_receiver(&mut hasher_a2);
+    let hash_a2 = hasher_a2.finish();
+    txb1.hash_receiver(&mut hasher_b1);
+    let hash_b1 = hasher_b1.finish();
+    txb2.hash_receiver(&mut hasher_b2);
+    let hash_b2 = hasher_b2.finish();
+
+    assert_eq!(hash_a1, hash_a2);
+    assert_eq!(hash_b1, hash_b2);
+    assert!(hash_a1 != hash_b1);
+
+    txa1.disconnect();
+    txb1.close_channel();
+
+    let mut hasher_a1 = DefaultHasher::new();
+    let mut hasher_a2 = DefaultHasher::new();
+    let mut hasher_b1 = DefaultHasher::new();
+    let mut hasher_b2 = DefaultHasher::new();
+
+    txa1.hash_receiver(&mut hasher_a1);
+    let hash_a1 = hasher_a1.finish();
+    txa2.hash_receiver(&mut hasher_a2);
+    let hash_a2 = hasher_a2.finish();
+    txb1.hash_receiver(&mut hasher_b1);
+    let hash_b1 = hasher_b1.finish();
+    txb2.hash_receiver(&mut hasher_b2);
+    let hash_b2 = hasher_b2.finish();
+
+    assert!(hash_a1 != hash_a2);
+    assert_eq!(hash_b1, hash_b2);
+}
+
+#[test]
+fn send_backpressure() {
+    let (waker, counter) = new_count_waker();
+    let mut cx = Context::from_waker(&waker);
+
+    let (mut tx, mut rx) = mpsc::channel(1);
+    block_on(tx.send(1)).unwrap();
+
+    let mut task = tx.send(2);
+    assert_eq!(task.poll_unpin(&mut cx), Poll::Pending);
+    assert_eq!(counter, 0);
+
+    let item = block_on(rx.next()).unwrap();
+    assert_eq!(item, 1);
+    assert_eq!(counter, 1);
+    assert_eq!(task.poll_unpin(&mut cx), Poll::Ready(Ok(())));
+
+    let item = block_on(rx.next()).unwrap();
+    assert_eq!(item, 2);
+}
+
+#[test]
+fn send_backpressure_multi_senders() {
+    let (waker, counter) = new_count_waker();
+    let mut cx = Context::from_waker(&waker);
+
+    let (mut tx1, mut rx) = mpsc::channel(1);
+    let mut tx2 = tx1.clone();
+    block_on(tx1.send(1)).unwrap();
+
+    let mut task = tx2.send(2);
+    assert_eq!(task.poll_unpin(&mut cx), Poll::Pending);
+    assert_eq!(counter, 0);
+
+    let item = block_on(rx.next()).unwrap();
+    assert_eq!(item, 1);
+    assert_eq!(counter, 1);
+    assert_eq!(task.poll_unpin(&mut cx), Poll::Ready(Ok(())));
+
+    let item = block_on(rx.next()).unwrap();
+    assert_eq!(item, 2);
+}
diff --git a/tests/oneshot.rs b/tests/oneshot.rs
new file mode 100644
index 0000000..5194ce4
--- /dev/null
+++ b/tests/oneshot.rs
@@ -0,0 +1,265 @@
+use futures::channel::oneshot::{self, Sender};
+use futures::executor::block_on;
+use futures::future::{Future, FutureExt, poll_fn};
+use futures::task::{Context, Poll};
+use futures_test::task::panic_waker_ref;
+use std::pin::Pin;
+use std::sync::mpsc;
+use std::thread;
+
+#[test]
+fn smoke_poll() {
+    let (mut tx, rx) = oneshot::channel::<u32>();
+    let mut rx = Some(rx);
+    let f = poll_fn(|cx| {
+        assert!(tx.poll_canceled(cx).is_pending());
+        assert!(tx.poll_canceled(cx).is_pending());
+        drop(rx.take());
+        assert!(tx.poll_canceled(cx).is_ready());
+        assert!(tx.poll_canceled(cx).is_ready());
+        Poll::Ready(())
+    });
+
+    block_on(f);
+}
+
+#[test]
+fn cancel_notifies() {
+    let (tx, rx) = oneshot::channel::<u32>();
+
+    let t = thread::spawn(|| {
+        block_on(WaitForCancel { tx });
+    });
+    drop(rx);
+    t.join().unwrap();
+}
+
+struct WaitForCancel {
+    tx: Sender<u32>,
+}
+
+impl Future for WaitForCancel {
+    type Output = ();
+
+    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+        self.tx.poll_canceled(cx)
+    }
+}
+
+#[test]
+fn cancel_lots() {
+    let (tx, rx) = mpsc::channel::<(Sender<_>, mpsc::Sender<_>)>();
+    let t = thread::spawn(move || {
+        for (tx, tx2) in rx {
+            block_on(WaitForCancel { tx });
+            tx2.send(()).unwrap();
+        }
+    });
+
+    for _ in 0..20000 {
+        let (otx, orx) = oneshot::channel::<u32>();
+        let (tx2, rx2) = mpsc::channel();
+        tx.send((otx, tx2)).unwrap();
+        drop(orx);
+        rx2.recv().unwrap();
+    }
+    drop(tx);
+
+    t.join().unwrap();
+}
+
+#[test]
+fn cancel_after_sender_drop_doesnt_notify() {
+    let (mut tx, rx) = oneshot::channel::<u32>();
+    let mut cx = Context::from_waker(panic_waker_ref());
+    assert_eq!(tx.poll_canceled(&mut cx), Poll::Pending);
+    drop(tx);
+    drop(rx);
+}
+
+#[test]
+fn close() {
+    let (mut tx, mut rx) = oneshot::channel::<u32>();
+    rx.close();
+    block_on(poll_fn(|cx| {
+        match rx.poll_unpin(cx) {
+            Poll::Ready(Err(_)) => {},
+            _ => panic!(),
+        };
+        assert!(tx.poll_canceled(cx).is_ready());
+        Poll::Ready(())
+    }));
+}
+
+#[test]
+fn close_wakes() {
+    let (tx, mut rx) = oneshot::channel::<u32>();
+    let (tx2, rx2) = mpsc::channel();
+    let t = thread::spawn(move || {
+        rx.close();
+        rx2.recv().unwrap();
+    });
+    block_on(WaitForCancel { tx });
+    tx2.send(()).unwrap();
+    t.join().unwrap();
+}
+
+#[test]
+fn is_canceled() {
+    let (tx, rx) = oneshot::channel::<u32>();
+    assert!(!tx.is_canceled());
+    drop(rx);
+    assert!(tx.is_canceled());
+}
+
+#[test]
+fn cancel_sends() {
+    let (tx, rx) = mpsc::channel::<Sender<_>>();
+    let t = thread::spawn(move || {
+        for otx in rx {
+            let _ = otx.send(42);
+        }
+    });
+
+    for _ in 0..20000 {
+        let (otx, mut orx) = oneshot::channel::<u32>();
+        tx.send(otx).unwrap();
+
+        orx.close();
+        let _ = block_on(orx);
+    }
+
+    drop(tx);
+    t.join().unwrap();
+}
+
+// #[test]
+// fn spawn_sends_items() {
+//     let core = local_executor::Core::new();
+//     let future = ok::<_, ()>(1);
+//     let rx = spawn(future, &core);
+//     assert_eq!(core.run(rx).unwrap(), 1);
+// }
+//
+// #[test]
+// fn spawn_kill_dead_stream() {
+//     use std::thread;
+//     use std::time::Duration;
+//     use futures::future::Either;
+//     use futures::sync::oneshot;
+//
+//     // a future which never returns anything (forever accepting incoming
+//     // connections), but dropping it leads to observable side effects
+//     // (like closing listening sockets, releasing limited resources,
+//     // ...)
+//     #[derive(Debug)]
+//     struct Dead {
+//         // when dropped you should get Err(oneshot::Canceled) on the
+//         // receiving end
+//         done: oneshot::Sender<()>,
+//     }
+//     impl Future for Dead {
+//         type Item = ();
+//         type Error = ();
+//
+//         fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+//             Ok(Poll::Pending)
+//         }
+//     }
+//
+//     // need to implement a timeout for the test, as it would hang
+//     // forever right now
+//     let (timeout_tx, timeout_rx) = oneshot::channel();
+//     thread::spawn(move || {
+//         thread::sleep(Duration::from_millis(1000));
+//         let _ = timeout_tx.send(());
+//     });
+//
+//     let core = local_executor::Core::new();
+//     let (done_tx, done_rx) = oneshot::channel();
+//     let future = Dead{done: done_tx};
+//     let rx = spawn(future, &core);
+//     let res = core.run(
+//         Ok::<_, ()>(())
+//         .into_future()
+//         .then(move |_| {
+//             // now drop the spawned future: maybe some timeout exceeded,
+//             // or some connection on this end was closed by the remote
+//             // end.
+//             drop(rx);
+//             // and wait for the spawned future to release its resources
+//             done_rx
+//         })
+//         .select2(timeout_rx)
+//     );
+//     match res {
+//         Err(Either::A((oneshot::Canceled, _))) => (),
+//         Ok(Either::B(((), _))) => {
+//             panic!("dead future wasn't canceled (timeout)");
+//         },
+//         _ => {
+//             panic!("dead future wasn't canceled (unexpected result)");
+//         },
+//     }
+// }
+//
+// #[test]
+// fn spawn_dont_kill_forgot_dead_stream() {
+//     use std::thread;
+//     use std::time::Duration;
+//     use futures::future::Either;
+//     use futures::sync::oneshot;
+//
+//     // a future which never returns anything (forever accepting incoming
+//     // connections), but dropping it leads to observable side effects
+//     // (like closing listening sockets, releasing limited resources,
+//     // ...)
+//     #[derive(Debug)]
+//     struct Dead {
+//         // when dropped you should get Err(oneshot::Canceled) on the
+//         // receiving end
+//         done: oneshot::Sender<()>,
+//     }
+//     impl Future for Dead {
+//         type Item = ();
+//         type Error = ();
+//
+//         fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+//             Ok(Poll::Pending)
+//         }
+//     }
+//
+//     // need to implement a timeout for the test, as it would hang
+//     // forever right now
+//     let (timeout_tx, timeout_rx) = oneshot::channel();
+//     thread::spawn(move || {
+//         thread::sleep(Duration::from_millis(1000));
+//         let _ = timeout_tx.send(());
+//     });
+//
+//     let core = local_executor::Core::new();
+//     let (done_tx, done_rx) = oneshot::channel();
+//     let future = Dead{done: done_tx};
+//     let rx = spawn(future, &core);
+//     let res = core.run(
+//         Ok::<_, ()>(())
+//         .into_future()
+//         .then(move |_| {
+//             // forget the spawned future: should keep running, i.e. hit
+//             // the timeout below.
+//             rx.forget();
+//             // and wait for the spawned future to release its resources
+//             done_rx
+//         })
+//         .select2(timeout_rx)
+//     );
+//     match res {
+//         Err(Either::A((oneshot::Canceled, _))) => {
+//             panic!("forgotten dead future was canceled");
+//         },
+//         Ok(Either::B(((), _))) => (), // reached timeout
+//         _ => {
+//             panic!("forgotten dead future was canceled (unexpected result)");
+//         },
+//     }
+// }