blob: 5fe7c354c5ffab7392c8198da2ac63536b07f1a9 [file] [log] [blame]
#![cfg_attr(not(feature = "rt"), allow(dead_code))]
//! Signal driver
use crate::io::driver::{Driver as IoDriver, Interest};
use crate::io::PollEvented;
use crate::park::Park;
use crate::signal::registry::globals;
use mio::net::UnixStream;
use std::io::{self, Read};
use std::ptr;
use std::sync::{Arc, Weak};
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use std::time::Duration;
/// Responsible for registering wakeups when an OS signal is received, and
/// subsequently dispatching notifications to any signal listeners as appropriate.
///
/// Note: this driver relies on having an enabled IO driver in order to listen to
/// pipe write wakeups.
#[derive(Debug)]
pub(crate) struct Driver {
/// Thread parker. The `Driver` park implementation delegates to this.
park: IoDriver,
/// A pipe for receiving wake events from the signal handler
receiver: PollEvented<UnixStream>,
/// Shared state
inner: Arc<Inner>,
}
#[derive(Clone, Debug, Default)]
pub(crate) struct Handle {
inner: Weak<Inner>,
}
#[derive(Debug)]
pub(super) struct Inner(());
// ===== impl Driver =====
impl Driver {
/// Creates a new signal `Driver` instance that delegates wakeups to `park`.
pub(crate) fn new(park: IoDriver) -> io::Result<Self> {
use std::mem::ManuallyDrop;
use std::os::unix::io::{AsRawFd, FromRawFd};
// NB: We give each driver a "fresh" receiver file descriptor to avoid
// the issues described in alexcrichton/tokio-process#42.
//
// In the past we would reuse the actual receiver file descriptor and
// swallow any errors around double registration of the same descriptor.
// I'm not sure if the second (failed) registration simply doesn't end
// up receiving wake up notifications, or there could be some race
// condition when consuming readiness events, but having distinct
// descriptors for distinct PollEvented instances appears to mitigate
// this.
//
// Unfortunately we cannot just use a single global PollEvented instance
// either, since we can't compare Handles or assume they will always
// point to the exact same reactor.
//
// Mio 0.7 removed `try_clone()` as an API due to unexpected behavior
// with registering dups with the same reactor. In this case, duping is
// safe as each dup is registered with separate reactors **and** we
// only expect at least one dup to receive the notification.
// Manually drop as we don't actually own this instance of UnixStream.
let receiver_fd = globals().receiver.as_raw_fd();
// safety: there is nothing unsafe about this, but the `from_raw_fd` fn is marked as unsafe.
let original =
ManuallyDrop::new(unsafe { std::os::unix::net::UnixStream::from_raw_fd(receiver_fd) });
let receiver = UnixStream::from_std(original.try_clone()?);
let receiver = PollEvented::new_with_interest_and_handle(
receiver,
Interest::READABLE | Interest::WRITABLE,
park.handle(),
)?;
Ok(Self {
park,
receiver,
inner: Arc::new(Inner(())),
})
}
/// Returns a handle to this event loop which can be sent across threads
/// and can be used as a proxy to the event loop itself.
pub(crate) fn handle(&self) -> Handle {
Handle {
inner: Arc::downgrade(&self.inner),
}
}
fn process(&self) {
// Check if the pipe is ready to read and therefore has "woken" us up
//
// To do so, we will `poll_read_ready` with a noop waker, since we don't
// need to actually be notified when read ready...
let waker = unsafe { Waker::from_raw(RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE)) };
let mut cx = Context::from_waker(&waker);
let ev = match self.receiver.registration().poll_read_ready(&mut cx) {
Poll::Ready(Ok(ev)) => ev,
Poll::Ready(Err(e)) => panic!("reactor gone: {}", e),
Poll::Pending => return, // No wake has arrived, bail
};
// Drain the pipe completely so we can receive a new readiness event
// if another signal has come in.
let mut buf = [0; 128];
loop {
match (&*self.receiver).read(&mut buf) {
Ok(0) => panic!("EOF on self-pipe"),
Ok(_) => continue, // Keep reading
Err(e) if e.kind() == io::ErrorKind::WouldBlock => break,
Err(e) => panic!("Bad read on self-pipe: {}", e),
}
}
self.receiver.registration().clear_readiness(ev);
// Broadcast any signals which were received
globals().broadcast();
}
}
const NOOP_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(noop_clone, noop, noop, noop);
unsafe fn noop_clone(_data: *const ()) -> RawWaker {
RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE)
}
unsafe fn noop(_data: *const ()) {}
// ===== impl Park for Driver =====
impl Park for Driver {
type Unpark = <IoDriver as Park>::Unpark;
type Error = io::Error;
fn unpark(&self) -> Self::Unpark {
self.park.unpark()
}
fn park(&mut self) -> Result<(), Self::Error> {
self.park.park()?;
self.process();
Ok(())
}
fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
self.park.park_timeout(duration)?;
self.process();
Ok(())
}
fn shutdown(&mut self) {
self.park.shutdown()
}
}
// ===== impl Handle =====
impl Handle {
pub(super) fn check_inner(&self) -> io::Result<()> {
if self.inner.strong_count() > 0 {
Ok(())
} else {
Err(io::Error::new(io::ErrorKind::Other, "signal driver gone"))
}
}
}
cfg_rt! {
impl Handle {
/// Returns a handle to the current driver
///
/// # Panics
///
/// This function panics if there is no current signal driver set.
pub(super) fn current() -> Self {
crate::runtime::context::signal_handle().expect(
"there is no signal driver running, must be called from the context of Tokio runtime",
)
}
}
}
cfg_not_rt! {
impl Handle {
/// Returns a handle to the current driver
///
/// # Panics
///
/// This function panics if there is no current signal driver set.
pub(super) fn current() -> Self {
panic!(
"there is no signal driver running, must be called from the context of Tokio runtime or with\
`rt` enabled.",
)
}
}
}