Merge "Upgrade rust/crates/mio to 0.7.5" am: 8414138e74

Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/mio/+/1485276

Change-Id: I85719f7847b2b6d3807635b4877f45fa052c90ed
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
index 24a92d4..3096345 100644
--- a/.cargo_vcs_info.json
+++ b/.cargo_vcs_info.json
@@ -1,5 +1,5 @@
 {
   "git": {
-    "sha1": "d40494bf6a9d5437207671282aa1be6b93a964cd"
+    "sha1": "27fbd5f04bb5f52a4d1c358cf0c04c6074a3d46b"
   }
 }
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 5827f0c..1ca5125 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,18 @@
+# 0.7.5
+
+## Added
+
+* `TcpSocket::get_localaddr()` retrieves local address
+  (https://github.com/tokio-rs/mio/commit/b41a022b2242eef1969c70c8ba93e04c528dba47)
+* `TcpSocket::set_reuseport()` & `TcpSocket::get_reuseport()` configures and reads SO_REUSEPORT
+  (https://github.com/tokio-rs/mio/commit/183bbe409ab69cbf9db41d0263b41ec86202d9a0)
+* `unix:pipe()` a wrapper around pipe(2) sys call
+  (https://github.com/tokio-rs/mio/commit/2b7c0967a7362303946deb3d4ca2ae507af6c72d)
+* Add a check that a single Waker is active per Poll instance (only in debug mode)
+  (https://github.com/tokio-rs/mio/commit/f4874f28b32efcf4841691884c65a89734d96a56)
+* Added `Interest:remove()`
+  (https://github.com/tokio-rs/mio/commit/b8639c3d9ac07bb7e2e27685680c8a6510fa1357)
+
 # 0.7.4
 
 ## Fixes
diff --git a/Cargo.lock b/Cargo.lock
index df276dc..2a0e767 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -23,9 +23,9 @@
 
 [[package]]
 name = "libc"
-version = "0.2.73"
+version = "0.2.80"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bd7d4bd64732af4bf3a67f367c27df8520ad7e230c5817b8ff485864d80242b9"
+checksum = "4d58d1b70b004888f764dfbf6a26a3b0342a1632d33968e4a179d8011c760614"
 
 [[package]]
 name = "log"
@@ -38,7 +38,7 @@
 
 [[package]]
 name = "mio"
-version = "0.7.4"
+version = "0.7.5"
 dependencies = [
  "env_logger",
  "libc",
@@ -46,7 +46,6 @@
  "miow",
  "ntapi",
  "rand",
- "socket2",
  "winapi",
 ]
 
@@ -62,9 +61,9 @@
 
 [[package]]
 name = "ntapi"
-version = "0.3.4"
+version = "0.3.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7a31937dea023539c72ddae0e3571deadc1414b300483fa7aaec176168cfa9d2"
+checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44"
 dependencies = [
  "winapi",
 ]
diff --git a/Cargo.toml b/Cargo.toml
index 8045b6c..a490943 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,12 +13,12 @@
 [package]
 edition = "2018"
 name = "mio"
-version = "0.7.4"
+version = "0.7.5"
 authors = ["Carl Lerche <me@carllerche.com>"]
 include = ["Cargo.toml", "LICENSE", "README.md", "CHANGELOG.md", "src/**/*.rs", "examples/**/*.rs"]
 description = "Lightweight non-blocking IO"
 homepage = "https://github.com/tokio-rs/mio"
-documentation = "https://docs.rs/mio/0.7.4"
+documentation = "https://docs.rs/mio/0.7.5"
 readme = "README.md"
 keywords = ["io", "async", "non-blocking"]
 categories = ["asynchronous"]
@@ -47,14 +47,12 @@
 [dev-dependencies.rand]
 version = "0.4"
 
-[dev-dependencies.socket2]
-version = "0.3.15"
-
 [features]
 default = []
 extra-docs = []
 os-poll = []
 os-util = []
+pipe = ["os-poll"]
 tcp = []
 udp = []
 uds = []
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
index ce99d1c..e01d453 100644
--- a/Cargo.toml.orig
+++ b/Cargo.toml.orig
@@ -6,11 +6,11 @@
 # - Update CHANGELOG.md.
 # - Update doc URL.
 # - Create git tag
-version       = "0.7.4"
+version       = "0.7.5"
 license       = "MIT"
 authors       = ["Carl Lerche <me@carllerche.com>"]
 description   = "Lightweight non-blocking IO"
-documentation = "https://docs.rs/mio/0.7.4"
+documentation = "https://docs.rs/mio/0.7.5"
 homepage      = "https://github.com/tokio-rs/mio"
 repository    = "https://github.com/tokio-rs/mio"
 readme        = "README.md"
@@ -29,6 +29,7 @@
 default = []
 os-poll = []
 os-util = []
+pipe = ["os-poll"]
 tcp = []
 udp = []
 uds = []
@@ -48,7 +49,6 @@
 [dev-dependencies]
 env_logger = { version = "0.6.2", default-features = false }
 rand = "0.4"
-socket2 = "0.3.15"
 
 [package.metadata.docs.rs]
 all-features = true
diff --git a/METADATA b/METADATA
index 34c31cd..a63bcc3 100644
--- a/METADATA
+++ b/METADATA
@@ -7,13 +7,13 @@
   }
   url {
     type: ARCHIVE
-    value: "https://static.crates.io/crates/mio/mio-0.7.4.crate"
+    value: "https://static.crates.io/crates/mio/mio-0.7.5.crate"
   }
-  version: "0.7.4"
+  version: "0.7.5"
   license_type: NOTICE
   last_upgrade_date {
     year: 2020
-    month: 10
-    day: 26
+    month: 11
+    day: 2
   }
 }
diff --git a/examples/tcp_server.rs b/examples/tcp_server.rs
index dc871f3..42426ee 100644
--- a/examples/tcp_server.rs
+++ b/examples/tcp_server.rs
@@ -128,18 +128,23 @@
 
     if event.is_readable() {
         let mut connection_closed = false;
-        let mut received_data = Vec::with_capacity(4096);
+        let mut received_data = vec![0; 4096];
+        let mut bytes_read = 0;
         // We can (maybe) read from the connection.
         loop {
-            let mut buf = [0; 256];
-            match connection.read(&mut buf) {
+            match connection.read(&mut received_data[bytes_read..]) {
                 Ok(0) => {
                     // Reading 0 bytes means the other side has closed the
                     // connection or is done writing, then so are we.
                     connection_closed = true;
                     break;
                 }
-                Ok(n) => received_data.extend_from_slice(&buf[..n]),
+                Ok(n) => {
+                    bytes_read += n;
+                    if bytes_read == received_data.len() {
+                        received_data.resize(received_data.len() + 1024, 0);
+                    }
+                }
                 // Would block "errors" are the OS's way of saying that the
                 // connection is not actually ready to perform this I/O operation.
                 Err(ref err) if would_block(err) => break,
@@ -149,10 +154,13 @@
             }
         }
 
-        if let Ok(str_buf) = from_utf8(&received_data) {
-            println!("Received data: {}", str_buf.trim_end());
-        } else {
-            println!("Received (none UTF-8) data: {:?}", &received_data);
+        if bytes_read != 0 {
+            let received_data = &received_data[..bytes_read];
+            if let Ok(str_buf) = from_utf8(received_data) {
+                println!("Received data: {}", str_buf.trim_end());
+            } else {
+                println!("Received (none UTF-8) data: {:?}", received_data);
+            }
         }
 
         if connection_closed {
diff --git a/src/interest.rs b/src/interest.rs
index 6bc5929..ee5158a 100644
--- a/src/interest.rs
+++ b/src/interest.rs
@@ -8,7 +8,7 @@
 /// registered with [readable] interests and the socket becomes writable, no
 /// event will be returned from a call to [`poll`].
 ///
-/// [registering]: struct.Registry.html#method.reregister
+/// [registering]: struct.Registry.html#method.register
 /// [`event::Source`]: ./event/trait.Source.html
 /// [`Poll`]: struct.Poll.html
 /// [readable]: struct.Interest.html#associatedconstant.READABLE
@@ -70,6 +70,30 @@
         Interest(unsafe { NonZeroU8::new_unchecked(self.0.get() | other.0.get()) })
     }
 
+    /// Removes `other` `Interest` from `self`.
+    ///
+    /// Returns `None` if the set would be empty after removing `other`.
+    ///
+    /// ```
+    /// use mio::Interest;
+    ///
+    /// const RW_INTERESTS: Interest = Interest::READABLE.add(Interest::WRITABLE);
+    ///
+    /// // As long a one interest remain this will return `Some`.
+    /// let w_interest = RW_INTERESTS.remove(Interest::READABLE).unwrap();
+    /// assert!(!w_interest.is_readable());
+    /// assert!(w_interest.is_writable());
+    ///
+    /// // Removing all interests from the set will return `None`.
+    /// assert_eq!(w_interest.remove(Interest::WRITABLE), None);
+    ///
+    /// // Its also possible to remove multiple interests at once.
+    /// assert_eq!(RW_INTERESTS.remove(RW_INTERESTS), None);
+    /// ```
+    pub fn remove(self, other: Interest) -> Option<Interest> {
+        NonZeroU8::new(self.0.get() & !other.0.get()).map(Interest)
+    }
+
     /// Returns true if the value includes readable readiness.
     pub const fn is_readable(self) -> bool {
         (self.0.get() & READABLE) != 0
diff --git a/src/lib.rs b/src/lib.rs
index 6e98c87..332ee24 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,4 +1,4 @@
-#![doc(html_root_url = "https://docs.rs/mio/0.7.4")]
+#![doc(html_root_url = "https://docs.rs/mio/0.7.5")]
 #![deny(
     missing_docs,
     missing_debug_implementations,
@@ -69,9 +69,11 @@
 
 pub mod event;
 
-cfg_net! {
+cfg_io_source! {
     mod io_source;
+}
 
+cfg_net! {
     pub mod net;
 }
 
@@ -82,11 +84,27 @@
 pub use token::Token;
 pub use waker::Waker;
 
-#[cfg(all(unix, feature = "os-util"))]
-#[cfg_attr(docsrs, doc(cfg(all(unix, feature = "os-util"))))]
+#[cfg(all(unix, any(feature = "os-util", feature = "pipe")))]
+#[cfg_attr(
+    docsrs,
+    doc(cfg(all(unix, any(feature = "os-util", feature = "pipe"))))
+)]
 pub mod unix {
     //! Unix only extensions.
+
+    #[cfg(feature = "os-util")]
+    #[cfg_attr(docsrs, doc(cfg(all(unix, feature = "os-util"))))]
     pub use crate::sys::SourceFd;
+
+    cfg_pipe! {
+        pub mod pipe {
+            //! Unix pipe.
+            //!
+            //! See the [`new`] function for documentation.
+
+            pub use crate::sys::pipe::{new, Receiver, Sender};
+        }
+    }
 }
 
 #[cfg(all(windows, feature = "os-util"))]
@@ -121,6 +139,12 @@
     //! `os-util` enables additional OS specific facilities. Currently this
     //! means the `unix` module (with `SourceFd`) becomes available.
     //!
+    #![cfg_attr(feature = "pipe", doc = "## `pipe` (enabled)")]
+    #![cfg_attr(not(feature = "pipe"), doc = "## `pipe` (disabled)")]
+    //!
+    //! The `pipe` feature adds `unix::pipe`, and related types, a non-blocking
+    //! wrapper around the `pipe(2)` system call.
+    //!
     //! ## Network types
     //!
     //! Mio provide three features to enable network types:
diff --git a/src/macros/mod.rs b/src/macros/mod.rs
index 7db2579..2275ed9 100644
--- a/src/macros/mod.rs
+++ b/src/macros/mod.rs
@@ -36,6 +36,18 @@
     }
 }
 
+/// One of the features enabled that needs `IoSource`. That is `tcp`, or `udp`,
+/// or on Unix `uds` or `pipe`.
+macro_rules! cfg_io_source {
+    ($($item:item)*) => {
+        $(
+            #[cfg(any(feature = "tcp", feature = "udp", all(unix, any(feature = "uds",  feature = "pipe"))))]
+            #[cfg_attr(docsrs, doc(any(feature = "tcp", feature = "udp", all(unix, any(feature = "uds",  feature = "pipe")))))]
+            $item
+        )*
+    }
+}
+
 /// One of the `tcp`, `udp` features enabled.
 #[cfg(windows)]
 macro_rules! cfg_net {
@@ -82,13 +94,25 @@
     }
 }
 
+/// Feature `pipe` enabled.
+#[cfg(unix)]
+macro_rules! cfg_pipe {
+    ($($item:item)*) => {
+        $(
+            #[cfg(feature = "pipe")]
+            #[cfg_attr(docsrs, doc(cfg(feature = "pipe")))]
+            $item
+        )*
+    }
+}
+
 /// Feature `os-util` enabled, or one of the features that need `os-util`.
 #[cfg(unix)]
 macro_rules! cfg_any_os_util {
     ($($item:item)*) => {
         $(
-            #[cfg(any(feature = "os-util", feature = "tcp", feature = "udp", feature = "uds"))]
-            #[cfg_attr(docsrs, doc(cfg(any(feature = "os-util", feature = "tcp", feature = "udp", feature = "uds"))))]
+            #[cfg(any(feature = "os-util", feature = "tcp", feature = "udp", feature = "uds", feature = "pipe"))]
+            #[cfg_attr(docsrs, doc(cfg(any(feature = "os-util", feature = "tcp", feature = "udp", feature = "uds", feature = "pipe"))))]
             $item
         )*
     }
@@ -99,8 +123,8 @@
 macro_rules! cfg_any_os_util {
     ($($item:item)*) => {
         $(
-            #[cfg(any(feature = "os-util", feature = "tcp", feature = "udp"))]
-            #[cfg_attr(docsrs, doc(cfg(any(feature = "os-util", feature = "tcp", feature = "udp"))))]
+            #[cfg(any(feature = "os-util", feature = "tcp", feature = "udp", feature = "pipe"))]
+            #[cfg_attr(docsrs, doc(cfg(any(feature = "os-util", feature = "tcp", feature = "udp", feature = "pipe"))))]
             $item
         )*
     }
diff --git a/src/net/tcp/socket.rs b/src/net/tcp/socket.rs
index a91f665..f3e27c3 100644
--- a/src/net/tcp/socket.rs
+++ b/src/net/tcp/socket.rs
@@ -82,10 +82,36 @@
         sys::tcp::set_reuseaddr(self.sys, reuseaddr)
     }
 
+    /// Get the value of `SO_REUSEADDR` set on this socket.
+    pub fn get_reuseaddr(&self) -> io::Result<bool> {
+        sys::tcp::get_reuseaddr(self.sys)
+    }
+
+    /// Sets the value of `SO_REUSEPORT` on this socket.
+    /// Only supported available in unix
+    #[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))]
+    pub fn set_reuseport(&self, reuseport: bool) -> io::Result<()> {
+        sys::tcp::set_reuseport(self.sys, reuseport)
+    }
+
+    /// Get the value of `SO_REUSEPORT` set on this socket.
+    /// Only supported available in unix
+    #[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))]
+    pub fn get_reuseport(&self) -> io::Result<bool> {
+        sys::tcp::get_reuseport(self.sys)
+    }
+
     /// Sets the value of `SO_LINGER` on this socket.
     pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
         sys::tcp::set_linger(self.sys, dur)
     }
+
+    /// Returns the local address of this socket
+    ///
+    /// Will return `Err` result in windows if called before calling `bind`
+    pub fn get_localaddr(&self) -> io::Result<SocketAddr> {
+        sys::tcp::get_localaddr(self.sys)
+    }
 }
 
 impl Drop for TcpSocket {
diff --git a/src/poll.rs b/src/poll.rs
index 6446281..7ff2038 100644
--- a/src/poll.rs
+++ b/src/poll.rs
@@ -613,6 +613,15 @@
             .try_clone()
             .map(|selector| Registry { selector })
     }
+
+    /// Internal check to ensure only a single `Waker` is active per [`Poll`]
+    /// instance.
+    #[cfg(debug_assertions)]
+    pub(crate) fn register_waker(&self) {
+        if self.selector.register_waker() {
+            panic!("Only a single `Waker` can be active per `Poll` instance");
+        }
+    }
 }
 
 impl fmt::Debug for Registry {
diff --git a/src/sys/mod.rs b/src/sys/mod.rs
index 8852333..08bd271 100644
--- a/src/sys/mod.rs
+++ b/src/sys/mod.rs
@@ -72,7 +72,11 @@
         pub(crate) use self::unix::uds;
     }
 
-    cfg_net! {
+    cfg_pipe! {
+        pub(crate) use self::unix::pipe;
+    }
+
+    cfg_io_source! {
         pub(crate) use self::unix::IoSourceState;
     }
 }
diff --git a/src/sys/shell/mod.rs b/src/sys/shell/mod.rs
index 8303797..a63760a 100644
--- a/src/sys/shell/mod.rs
+++ b/src/sys/shell/mod.rs
@@ -23,7 +23,7 @@
     pub(crate) mod uds;
 }
 
-cfg_net! {
+cfg_io_source! {
     use std::io;
     #[cfg(windows)]
     use std::os::windows::io::RawSocket;
diff --git a/src/sys/shell/selector.rs b/src/sys/shell/selector.rs
index 0e0c031..69be370 100644
--- a/src/sys/shell/selector.rs
+++ b/src/sys/shell/selector.rs
@@ -18,6 +18,11 @@
     pub fn select(&self, _: &mut Events, _: Option<Duration>) -> io::Result<()> {
         os_required!();
     }
+
+    #[cfg(debug_assertions)]
+    pub fn register_waker(&self) -> bool {
+        os_required!();
+    }
 }
 
 #[cfg(unix)]
@@ -39,7 +44,7 @@
     }
 }
 
-cfg_net! {
+cfg_io_source! {
     #[cfg(debug_assertions)]
     impl Selector {
         pub fn id(&self) -> usize {
diff --git a/src/sys/shell/tcp.rs b/src/sys/shell/tcp.rs
index de1520b..3073d42 100644
--- a/src/sys/shell/tcp.rs
+++ b/src/sys/shell/tcp.rs
@@ -32,6 +32,20 @@
     os_required!();
 }
 
+pub(crate) fn get_reuseaddr(_: TcpSocket) -> io::Result<bool> {
+    os_required!();
+}
+
+#[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))]
+pub(crate) fn set_reuseport(_: TcpSocket, _: bool) -> io::Result<()> {
+    os_required!();
+}
+
+#[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))]
+pub(crate) fn get_reuseport(_: TcpSocket) -> io::Result<bool> {
+    os_required!();
+}
+
 pub(crate) fn set_linger(_: TcpSocket, _: Option<Duration>) -> io::Result<()> {
     os_required!();
 }
@@ -39,3 +53,7 @@
 pub fn accept(_: &net::TcpListener) -> io::Result<(net::TcpStream, SocketAddr)> {
     os_required!();
 }
+
+pub(crate) fn get_localaddr(_: TcpSocket) -> io::Result<SocketAddr> {
+    os_required!();
+}
diff --git a/src/sys/unix/mod.rs b/src/sys/unix/mod.rs
index 96d7f4d..f045fb5 100644
--- a/src/sys/unix/mod.rs
+++ b/src/sys/unix/mod.rs
@@ -38,7 +38,7 @@
         pub use self::uds::SocketAddr;
     }
 
-    cfg_net! {
+    cfg_io_source! {
         use std::io;
 
         // Both `kqueue` and `epoll` don't need to hold any user space state.
@@ -59,6 +59,10 @@
             }
         }
     }
+
+    cfg_pipe! {
+        pub(crate) mod pipe;
+    }
 }
 
 cfg_not_os_poll! {
diff --git a/src/sys/unix/pipe.rs b/src/sys/unix/pipe.rs
new file mode 100644
index 0000000..d838ebc
--- /dev/null
+++ b/src/sys/unix/pipe.rs
@@ -0,0 +1,383 @@
+//! Unix pipe.
+//!
+//! See the [`new`] function for documentation.
+
+use std::fs::File;
+use std::io::{self, IoSlice, IoSliceMut, Read, Write};
+use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
+use std::process::{ChildStderr, ChildStdin, ChildStdout};
+
+use crate::io_source::IoSource;
+use crate::{event, Interest, Registry, Token};
+
+/// Create a new non-blocking Unix pipe.
+///
+/// This is a wrapper around Unix's [`pipe(2)`] system call and can be used as
+/// inter-process or thread communication channel.
+///
+/// This channel may be created before forking the process and then one end used
+/// in each process, e.g. the parent process has the sending end to send command
+/// to the child process.
+///
+/// [`pipe(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/pipe.html
+///
+/// # Events
+///
+/// The [`Sender`] can be registered with [`WRITABLE`] interest to receive
+/// [writable events], the [`Receiver`] with [`READABLE`] interest. Once data is
+/// written to the `Sender` the `Receiver` will receive an [readable event].
+///
+/// In addition to those events, events will also be generated if the other side
+/// is dropped. To check if the `Sender` is dropped you'll need to check
+/// [`is_read_closed`] on events for the `Receiver`, if it returns true the
+/// `Sender` is dropped. On the `Sender` end check [`is_write_closed`], if it
+/// returns true the `Receiver` was dropped. Also see the second example below.
+///
+/// [`WRITABLE`]: Interest::WRITABLE
+/// [writable events]: event::Event::is_writable
+/// [`READABLE`]: Interest::READABLE
+/// [readable event]: event::Event::is_readable
+/// [`is_read_closed`]: event::Event::is_read_closed
+/// [`is_write_closed`]: event::Event::is_write_closed
+///
+/// # Deregistering
+///
+/// Both `Sender` and `Receiver` will deregister themselves when dropped,
+/// **iff** the file descriptors are not duplicated (via [`dup(2)`]).
+///
+/// [`dup(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/dup.html
+///
+/// # Examples
+///
+/// Simple example that writes data into the sending end and read it from the
+/// receiving end.
+///
+/// ```
+/// use std::io::{self, Read, Write};
+///
+/// use mio::{Poll, Events, Interest, Token};
+/// use mio::unix::pipe;
+///
+/// // Unique tokens for the two ends of the channel.
+/// const PIPE_RECV: Token = Token(0);
+/// const PIPE_SEND: Token = Token(1);
+///
+/// # fn main() -> io::Result<()> {
+/// // Create our `Poll` instance and the `Events` container.
+/// let mut poll = Poll::new()?;
+/// let mut events = Events::with_capacity(8);
+///
+/// // Create a new pipe.
+/// let (mut sender, mut receiver) = pipe::new()?;
+///
+/// // Register both ends of the channel.
+/// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?;
+/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?;
+///
+/// const MSG: &[u8; 11] = b"Hello world";
+///
+/// loop {
+///     poll.poll(&mut events, None)?;
+///
+///     for event in events.iter() {
+///         match event.token() {
+///             PIPE_SEND => sender.write(MSG)
+///                 .and_then(|n| if n != MSG.len() {
+///                         // We'll consider a short write an error in this
+///                         // example. NOTE: we can't use `write_all` with
+///                         // non-blocking I/O.
+///                         Err(io::ErrorKind::WriteZero.into())
+///                     } else {
+///                         Ok(())
+///                     })?,
+///             PIPE_RECV => {
+///                 let mut buf = [0; 11];
+///                 let n = receiver.read(&mut buf)?;
+///                 println!("received: {:?}", &buf[0..n]);
+///                 assert_eq!(n, MSG.len());
+///                 assert_eq!(&buf, &*MSG);
+///                 return Ok(());
+///             },
+///             _ => unreachable!(),
+///         }
+///     }
+/// }
+/// # }
+/// ```
+///
+/// Example that receives an event once the `Sender` is dropped.
+///
+/// ```
+/// # use std::io;
+/// #
+/// # use mio::{Poll, Events, Interest, Token};
+/// # use mio::unix::pipe;
+/// #
+/// # const PIPE_RECV: Token = Token(0);
+/// # const PIPE_SEND: Token = Token(1);
+/// #
+/// # fn main() -> io::Result<()> {
+/// // Same setup as in the example above.
+/// let mut poll = Poll::new()?;
+/// let mut events = Events::with_capacity(8);
+///
+/// let (mut sender, mut receiver) = pipe::new()?;
+///
+/// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?;
+/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?;
+///
+/// // Drop the sender.
+/// drop(sender);
+///
+/// poll.poll(&mut events, None)?;
+///
+/// for event in events.iter() {
+///     match event.token() {
+///         PIPE_RECV if event.is_read_closed() => {
+///             // Detected that the sender was dropped.
+///             println!("Sender dropped!");
+///             return Ok(());
+///         },
+///         _ => unreachable!(),
+///     }
+/// }
+/// # unreachable!();
+/// # }
+/// ```
+pub fn new() -> io::Result<(Sender, Receiver)> {
+    let mut fds: [RawFd; 2] = [-1, -1];
+
+    #[cfg(any(
+        target_os = "android",
+        target_os = "dragonfly",
+        target_os = "freebsd",
+        target_os = "linux",
+        target_os = "netbsd",
+        target_os = "openbsd",
+    ))]
+    unsafe {
+        if libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) != 0 {
+            return Err(io::Error::last_os_error());
+        }
+    }
+
+    #[cfg(any(target_os = "ios", target_os = "macos", target_os = "solaris"))]
+    unsafe {
+        // For platforms that don't have `pipe2(2)` we need to manually set the
+        // correct flags on the file descriptor.
+        if libc::pipe(fds.as_mut_ptr()) != 0 {
+            return Err(io::Error::last_os_error());
+        }
+
+        for fd in &fds {
+            if libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK) != 0
+                || libc::fcntl(*fd, libc::F_SETFD, libc::FD_CLOEXEC) != 0
+            {
+                let err = io::Error::last_os_error();
+                // Don't leak file descriptors. Can't handle error though.
+                let _ = libc::close(fds[0]);
+                let _ = libc::close(fds[1]);
+                return Err(err);
+            }
+        }
+    }
+
+    #[cfg(not(any(
+        target_os = "android",
+        target_os = "dragonfly",
+        target_os = "freebsd",
+        target_os = "linux",
+        target_os = "netbsd",
+        target_os = "openbsd",
+        target_os = "ios",
+        target_os = "macos",
+        target_os = "solaris",
+    )))]
+    compile_error!("unsupported target for `mio::unix::pipe`");
+
+    // Safety: we just initialised the `fds` above.
+    let r = unsafe { Receiver::from_raw_fd(fds[0]) };
+    let w = unsafe { Sender::from_raw_fd(fds[1]) };
+    Ok((w, r))
+}
+
+/// Sending end of an Unix pipe.
+///
+/// See [`new`] for documentation, including examples.
+#[derive(Debug)]
+pub struct Sender {
+    inner: IoSource<File>,
+}
+
+impl Sender {
+    /// Set the `Sender` into or out of non-blocking mode.
+    pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
+        set_nonblocking(self.inner.as_raw_fd(), nonblocking)
+    }
+}
+
+impl event::Source for Sender {
+    fn register(
+        &mut self,
+        registry: &Registry,
+        token: Token,
+        interests: Interest,
+    ) -> io::Result<()> {
+        self.inner.register(registry, token, interests)
+    }
+
+    fn reregister(
+        &mut self,
+        registry: &Registry,
+        token: Token,
+        interests: Interest,
+    ) -> io::Result<()> {
+        self.inner.reregister(registry, token, interests)
+    }
+
+    fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
+        self.inner.deregister(registry)
+    }
+}
+
+impl Write for Sender {
+    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+        self.inner.do_io(|sender| (&*sender).write(buf))
+    }
+
+    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
+        self.inner.do_io(|sender| (&*sender).write_vectored(bufs))
+    }
+
+    fn flush(&mut self) -> io::Result<()> {
+        self.inner.do_io(|sender| (&*sender).flush())
+    }
+}
+
+/// # Notes
+///
+/// The underlying pipe is **not** set to non-blocking.
+impl From<ChildStdin> for Sender {
+    fn from(stdin: ChildStdin) -> Sender {
+        // Safety: `ChildStdin` is guaranteed to be a valid file descriptor.
+        unsafe { Sender::from_raw_fd(stdin.into_raw_fd()) }
+    }
+}
+
+impl FromRawFd for Sender {
+    unsafe fn from_raw_fd(fd: RawFd) -> Sender {
+        Sender {
+            inner: IoSource::new(File::from_raw_fd(fd)),
+        }
+    }
+}
+
+impl AsRawFd for Sender {
+    fn as_raw_fd(&self) -> RawFd {
+        self.inner.as_raw_fd()
+    }
+}
+
+impl IntoRawFd for Sender {
+    fn into_raw_fd(self) -> RawFd {
+        self.inner.into_inner().into_raw_fd()
+    }
+}
+
+/// Receiving end of an Unix pipe.
+///
+/// See [`new`] for documentation, including examples.
+#[derive(Debug)]
+pub struct Receiver {
+    inner: IoSource<File>,
+}
+
+impl Receiver {
+    /// Set the `Receiver` into or out of non-blocking mode.
+    pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
+        set_nonblocking(self.inner.as_raw_fd(), nonblocking)
+    }
+}
+
+impl event::Source for Receiver {
+    fn register(
+        &mut self,
+        registry: &Registry,
+        token: Token,
+        interests: Interest,
+    ) -> io::Result<()> {
+        self.inner.register(registry, token, interests)
+    }
+
+    fn reregister(
+        &mut self,
+        registry: &Registry,
+        token: Token,
+        interests: Interest,
+    ) -> io::Result<()> {
+        self.inner.reregister(registry, token, interests)
+    }
+
+    fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
+        self.inner.deregister(registry)
+    }
+}
+
+impl Read for Receiver {
+    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+        self.inner.do_io(|sender| (&*sender).read(buf))
+    }
+
+    fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
+        self.inner.do_io(|sender| (&*sender).read_vectored(bufs))
+    }
+}
+
+/// # Notes
+///
+/// The underlying pipe is **not** set to non-blocking.
+impl From<ChildStdout> for Receiver {
+    fn from(stdout: ChildStdout) -> Receiver {
+        // Safety: `ChildStdout` is guaranteed to be a valid file descriptor.
+        unsafe { Receiver::from_raw_fd(stdout.into_raw_fd()) }
+    }
+}
+
+/// # Notes
+///
+/// The underlying pipe is **not** set to non-blocking.
+impl From<ChildStderr> for Receiver {
+    fn from(stderr: ChildStderr) -> Receiver {
+        // Safety: `ChildStderr` is guaranteed to be a valid file descriptor.
+        unsafe { Receiver::from_raw_fd(stderr.into_raw_fd()) }
+    }
+}
+
+impl FromRawFd for Receiver {
+    unsafe fn from_raw_fd(fd: RawFd) -> Receiver {
+        Receiver {
+            inner: IoSource::new(File::from_raw_fd(fd)),
+        }
+    }
+}
+
+impl AsRawFd for Receiver {
+    fn as_raw_fd(&self) -> RawFd {
+        self.inner.as_raw_fd()
+    }
+}
+
+impl IntoRawFd for Receiver {
+    fn into_raw_fd(self) -> RawFd {
+        self.inner.into_inner().into_raw_fd()
+    }
+}
+
+fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> {
+    let value = nonblocking as libc::c_int;
+    if unsafe { libc::ioctl(fd, libc::FIONBIO, &value) } == -1 {
+        Err(io::Error::last_os_error())
+    } else {
+        Ok(())
+    }
+}
diff --git a/src/sys/unix/selector/epoll.rs b/src/sys/unix/selector/epoll.rs
index 13f1617..76ee7f9 100644
--- a/src/sys/unix/selector/epoll.rs
+++ b/src/sys/unix/selector/epoll.rs
@@ -4,7 +4,7 @@
 use log::error;
 use std::os::unix::io::{AsRawFd, RawFd};
 #[cfg(debug_assertions)]
-use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
 use std::time::Duration;
 use std::{cmp, i32, io, ptr};
 
@@ -17,6 +17,8 @@
     #[cfg(debug_assertions)]
     id: usize,
     ep: RawFd,
+    #[cfg(debug_assertions)]
+    has_waker: AtomicBool,
 }
 
 impl Selector {
@@ -33,6 +35,8 @@
             #[cfg(debug_assertions)]
             id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
             ep,
+            #[cfg(debug_assertions)]
+            has_waker: AtomicBool::new(false),
         })
     }
 
@@ -42,6 +46,8 @@
             #[cfg(debug_assertions)]
             id: self.id,
             ep,
+            #[cfg(debug_assertions)]
+            has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)),
         })
     }
 
@@ -93,9 +99,14 @@
     pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
         syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_DEL, fd, ptr::null_mut())).map(|_| ())
     }
+
+    #[cfg(debug_assertions)]
+    pub fn register_waker(&self) -> bool {
+        self.has_waker.swap(true, Ordering::AcqRel)
+    }
 }
 
-cfg_net! {
+cfg_io_source! {
     impl Selector {
         #[cfg(debug_assertions)]
         pub fn id(&self) -> usize {
diff --git a/src/sys/unix/selector/kqueue.rs b/src/sys/unix/selector/kqueue.rs
index 2ebac9a..454f47d 100644
--- a/src/sys/unix/selector/kqueue.rs
+++ b/src/sys/unix/selector/kqueue.rs
@@ -4,7 +4,7 @@
 use std::ops::{Deref, DerefMut};
 use std::os::unix::io::{AsRawFd, RawFd};
 #[cfg(debug_assertions)]
-use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
 use std::time::Duration;
 use std::{cmp, io, ptr, slice};
 
@@ -69,6 +69,8 @@
     #[cfg(debug_assertions)]
     id: usize,
     kq: RawFd,
+    #[cfg(debug_assertions)]
+    has_waker: AtomicBool,
 }
 
 impl Selector {
@@ -79,6 +81,8 @@
                 #[cfg(debug_assertions)]
                 id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
                 kq,
+                #[cfg(debug_assertions)]
+                has_waker: AtomicBool::new(false),
             })
     }
 
@@ -88,6 +92,8 @@
             #[cfg(debug_assertions)]
             id: self.id,
             kq,
+            #[cfg(debug_assertions)]
+            has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)),
         })
     }
 
@@ -208,6 +214,11 @@
         kevent_register(self.kq, &mut changes, &[libc::ENOENT as Data])
     }
 
+    #[cfg(debug_assertions)]
+    pub fn register_waker(&self) -> bool {
+        self.has_waker.swap(true, Ordering::AcqRel)
+    }
+
     // Used by `Waker`.
     #[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))]
     pub fn setup_waker(&self, token: Token) -> io::Result<()> {
@@ -292,7 +303,7 @@
     Ok(())
 }
 
-cfg_net! {
+cfg_io_source! {
     #[cfg(debug_assertions)]
     impl Selector {
         pub fn id(&self) -> usize {
diff --git a/src/sys/unix/tcp.rs b/src/sys/unix/tcp.rs
index 81b371b..65b7400 100644
--- a/src/sys/unix/tcp.rs
+++ b/src/sys/unix/tcp.rs
@@ -1,4 +1,5 @@
 use std::io;
+use std::mem;
 use std::mem::{size_of, MaybeUninit};
 use std::net::{self, SocketAddr};
 use std::time::Duration;
@@ -58,6 +59,63 @@
     )).map(|_| ())
 }
 
+pub(crate) fn get_reuseaddr(socket: TcpSocket) -> io::Result<bool> {
+    let mut optval: libc::c_int = 0;
+    let mut optlen = mem::size_of::<libc::c_int>() as libc::socklen_t;
+
+    syscall!(getsockopt(
+        socket,
+        libc::SOL_SOCKET,
+        libc::SO_REUSEADDR,
+        &mut optval as *mut _ as *mut _,
+        &mut optlen,
+    ))?;
+
+    Ok(optval != 0)
+}
+
+#[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))]
+pub(crate) fn set_reuseport(socket: TcpSocket, reuseport: bool) -> io::Result<()> {
+    let val: libc::c_int = if reuseport { 1 } else { 0 };
+
+    syscall!(setsockopt(
+        socket,
+        libc::SOL_SOCKET,
+        libc::SO_REUSEPORT,
+        &val as *const libc::c_int as *const libc::c_void,
+        size_of::<libc::c_int>() as libc::socklen_t,
+    )).map(|_| ())
+}
+
+#[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))]
+pub(crate) fn get_reuseport(socket: TcpSocket) -> io::Result<bool> {
+    let mut optval: libc::c_int = 0;
+    let mut optlen = mem::size_of::<libc::c_int>() as libc::socklen_t;
+
+    syscall!(getsockopt(
+        socket,
+        libc::SOL_SOCKET,
+        libc::SO_REUSEPORT,
+        &mut optval as *mut _ as *mut _,
+        &mut optlen,
+    ))?;
+
+    Ok(optval != 0)
+}
+
+pub(crate) fn get_localaddr(socket: TcpSocket) -> io::Result<SocketAddr> {
+    let mut addr: libc::sockaddr_storage = unsafe { std::mem::zeroed() };
+    let mut length = size_of::<libc::sockaddr_storage>() as libc::socklen_t;
+
+    syscall!(getsockname(
+        socket,
+        &mut addr as *mut _ as *mut _,
+        &mut length
+    ))?;
+
+    unsafe { to_socket_addr(&addr) }
+}
+
 pub(crate) fn set_linger(socket: TcpSocket, dur: Option<Duration>) -> io::Result<()> {
     let val: libc::linger = libc::linger {
         l_onoff: if dur.is_some() { 1 } else { 0 },
diff --git a/src/sys/windows/afd.rs b/src/sys/windows/afd.rs
index 82c8e9e..b2e3b11 100644
--- a/src/sys/windows/afd.rs
+++ b/src/sys/windows/afd.rs
@@ -111,7 +111,7 @@
     }
 }
 
-cfg_net! {
+cfg_io_source! {
     use miow::iocp::CompletionPort;
     use ntapi::ntioapi::FILE_OPEN;
     use ntapi::ntioapi::NtCreateFile;
diff --git a/src/sys/windows/io_status_block.rs b/src/sys/windows/io_status_block.rs
index db6729c..9da5e7a 100644
--- a/src/sys/windows/io_status_block.rs
+++ b/src/sys/windows/io_status_block.rs
@@ -4,7 +4,7 @@
 
 pub struct IoStatusBlock(IO_STATUS_BLOCK);
 
-cfg_net! {
+cfg_io_source! {
     use ntapi::ntioapi::IO_STATUS_BLOCK_u;
 
     impl IoStatusBlock {
diff --git a/src/sys/windows/mod.rs b/src/sys/windows/mod.rs
index 7bba6dd..25590c2 100644
--- a/src/sys/windows/mod.rs
+++ b/src/sys/windows/mod.rs
@@ -42,6 +42,10 @@
 pub(crate) use waker::Waker;
 
 cfg_net! {
+    mod net;
+}
+
+cfg_io_source! {
     use std::io;
     use std::os::windows::io::RawSocket;
     use std::pin::Pin;
@@ -49,8 +53,6 @@
 
     use crate::{poll, Interest, Registry, Token};
 
-    mod net;
-
     struct InternalState {
         selector: Arc<SelectorInner>,
         token: Token,
diff --git a/src/sys/windows/selector.rs b/src/sys/windows/selector.rs
index 4a38300..df2c3f0 100644
--- a/src/sys/windows/selector.rs
+++ b/src/sys/windows/selector.rs
@@ -12,6 +12,7 @@
 
 use miow::iocp::{CompletionPort, CompletionStatus};
 use std::collections::VecDeque;
+use std::io;
 use std::marker::PhantomPinned;
 use std::os::windows::io::RawSocket;
 use std::pin::Pin;
@@ -20,7 +21,6 @@
 use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::{Arc, Mutex};
 use std::time::Duration;
-use std::io;
 use winapi::shared::ntdef::NT_SUCCESS;
 use winapi::shared::ntdef::{HANDLE, PVOID};
 use winapi::shared::ntstatus::STATUS_CANCELLED;
@@ -47,7 +47,7 @@
     }
 }
 
-cfg_net! {
+cfg_io_source! {
     const POLL_GROUP__MAX_GROUP_SIZE: usize = 32;
 
     impl AfdGroup {
@@ -256,7 +256,7 @@
     }
 }
 
-cfg_net! {
+cfg_io_source! {
     impl SockState {
         fn new(raw_socket: RawSocket, afd: Arc<Afd>) -> io::Result<SockState> {
             Ok(SockState {
@@ -327,8 +327,9 @@
 pub struct Selector {
     #[cfg(debug_assertions)]
     id: usize,
-
     pub(super) inner: Arc<SelectorInner>,
+    #[cfg(debug_assertions)]
+    has_waker: AtomicBool,
 }
 
 impl Selector {
@@ -340,6 +341,8 @@
                 #[cfg(debug_assertions)]
                 id,
                 inner: Arc::new(inner),
+                #[cfg(debug_assertions)]
+                has_waker: AtomicBool::new(false),
             }
         })
     }
@@ -349,6 +352,8 @@
             #[cfg(debug_assertions)]
             id: self.id,
             inner: Arc::clone(&self.inner),
+            #[cfg(debug_assertions)]
+            has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)),
         })
     }
 
@@ -360,6 +365,11 @@
         self.inner.select(events, timeout)
     }
 
+    #[cfg(debug_assertions)]
+    pub fn register_waker(&self) -> bool {
+        self.has_waker.swap(true, Ordering::AcqRel)
+    }
+
     pub(super) fn clone_port(&self) -> Arc<CompletionPort> {
         self.inner.cp.clone()
     }
@@ -370,7 +380,7 @@
     }
 }
 
-cfg_net! {
+cfg_io_source! {
     use super::InternalState;
     use crate::Token;
 
@@ -499,7 +509,7 @@
             } else if iocp_event.token() % 2 == 1 {
                 // Handle is a named pipe. This could be extended to be any non-AFD event.
                 let callback = (*(iocp_event.overlapped() as *mut super::Overlapped)).callback;
-    
+
                 let len = events.len();
                 callback(iocp_event.entry(), Some(events));
                 n += events.len() - len;
@@ -525,7 +535,7 @@
     }
 }
 
-cfg_net! {
+cfg_io_source! {
     use std::mem::size_of;
     use std::ptr::null_mut;
     use winapi::um::mswsock;
@@ -701,7 +711,7 @@
                             let callback = unsafe {
                                 (*(iocp_event.overlapped() as *mut super::Overlapped)).callback
                             };
-                
+
                             callback(iocp_event.entry(), None);
                         } else {
                             // drain sock state to release memory of Arc reference
diff --git a/src/sys/windows/tcp.rs b/src/sys/windows/tcp.rs
index 46ac1ac..b78d864 100644
--- a/src/sys/windows/tcp.rs
+++ b/src/sys/windows/tcp.rs
@@ -1,14 +1,17 @@
 use std::io;
 use std::mem::size_of;
-use std::net::{self, SocketAddr};
+use std::net::{self, SocketAddr, SocketAddrV4, SocketAddrV6};
 use std::time::Duration;
 use std::os::windows::io::FromRawSocket;
 use std::os::windows::raw::SOCKET as StdSocket; // winapi uses usize, stdlib uses u32/u64.
 
 use winapi::ctypes::{c_char, c_int, c_ushort};
+use winapi::shared::ws2def::{SOCKADDR_STORAGE, AF_INET, SOCKADDR_IN};
+use winapi::shared::ws2ipdef::SOCKADDR_IN6_LH;
+
 use winapi::shared::minwindef::{BOOL, TRUE, FALSE};
 use winapi::um::winsock2::{
-    self, closesocket, linger, setsockopt, PF_INET, PF_INET6, SOCKET, SOCKET_ERROR,
+    self, closesocket, linger, setsockopt, getsockopt, getsockname, PF_INET, PF_INET6, SOCKET, SOCKET_ERROR,
     SOCK_STREAM, SOL_SOCKET, SO_LINGER, SO_REUSEADDR,
 };
 
@@ -87,6 +90,47 @@
     }
 }
 
+pub(crate) fn get_reuseaddr(socket: TcpSocket) -> io::Result<bool> {
+    let mut optval: c_char = 0;
+    let mut optlen = size_of::<BOOL>() as c_int;
+
+    match unsafe { getsockopt(
+        socket,
+        SOL_SOCKET,
+        SO_REUSEADDR,
+        &mut optval as *mut _ as *mut _,
+        &mut optlen,
+    ) } {
+        SOCKET_ERROR => Err(io::Error::last_os_error()),
+        _ => Ok(optval != 0),
+    }
+}
+
+pub(crate) fn get_localaddr(socket: TcpSocket) -> io::Result<SocketAddr> {
+    let mut addr: SOCKADDR_STORAGE = unsafe { std::mem::zeroed() };
+    let mut length = std::mem::size_of_val(&addr) as c_int;
+
+    match unsafe { getsockname(
+        socket,
+        &mut addr as *mut _ as *mut _,
+        &mut length
+    ) } {
+        SOCKET_ERROR => Err(io::Error::last_os_error()),
+        _ => {
+            let storage: *const SOCKADDR_STORAGE = (&addr) as *const _;
+            if addr.ss_family as c_int == AF_INET {
+                let sock_addr : SocketAddrV4 = unsafe { *(storage as *const SOCKADDR_IN as *const _) };
+                Ok(sock_addr.into())
+            } else {
+                let sock_addr : SocketAddrV6 = unsafe { *(storage as *const SOCKADDR_IN6_LH as *const _) };
+                Ok(sock_addr.into())
+            }
+        },
+    }
+
+
+}
+
 pub(crate) fn set_linger(socket: TcpSocket, dur: Option<Duration>) -> io::Result<()> {
     let val: linger = linger {
         l_onoff: if dur.is_some() { 1 } else { 0 },
diff --git a/src/waker.rs b/src/waker.rs
index 44766ce..b8e4496 100644
--- a/src/waker.rs
+++ b/src/waker.rs
@@ -16,7 +16,7 @@
 /// `Waker` events are only guaranteed to be delivered while the `Waker` value
 /// is alive.
 ///
-/// Only a single `Waker` should active per [`Poll`], if multiple threads need
+/// Only a single `Waker` can be active per [`Poll`], if multiple threads need
 /// access to the `Waker` it can be shared via for example an `Arc`. What
 /// happens if multiple `Waker`s are registered with the same `Poll` is
 /// undefined.
@@ -81,6 +81,8 @@
 impl Waker {
     /// Create a new `Waker`.
     pub fn new(registry: &Registry, token: Token) -> io::Result<Waker> {
+        #[cfg(debug_assertions)]
+        registry.register_waker();
         sys::Waker::new(poll::selector(&registry), token).map(|inner| Waker { inner })
     }