| #![cfg_attr(not(feature = "rt"), allow(dead_code))] |
| |
| //! Signal driver |
| |
| use crate::io::driver::{Driver as IoDriver, Interest}; |
| use crate::io::PollEvented; |
| use crate::park::Park; |
| use crate::signal::registry::globals; |
| |
| use mio::net::UnixStream; |
| use std::io::{self, Read}; |
| use std::ptr; |
| use std::sync::{Arc, Weak}; |
| use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; |
| use std::time::Duration; |
| |
| /// Responsible for registering wakeups when an OS signal is received, and |
| /// subsequently dispatching notifications to any signal listeners as appropriate. |
| /// |
| /// Note: this driver relies on having an enabled IO driver in order to listen to |
| /// pipe write wakeups. |
| #[derive(Debug)] |
| pub(crate) struct Driver { |
| /// Thread parker. The `Driver` park implementation delegates to this. |
| park: IoDriver, |
| |
| /// A pipe for receiving wake events from the signal handler |
| receiver: PollEvented<UnixStream>, |
| |
| /// Shared state |
| inner: Arc<Inner>, |
| } |
| |
| #[derive(Clone, Debug, Default)] |
| pub(crate) struct Handle { |
| inner: Weak<Inner>, |
| } |
| |
| #[derive(Debug)] |
| pub(super) struct Inner(()); |
| |
| // ===== impl Driver ===== |
| |
| impl Driver { |
| /// Creates a new signal `Driver` instance that delegates wakeups to `park`. |
| pub(crate) fn new(park: IoDriver) -> io::Result<Self> { |
| use std::mem::ManuallyDrop; |
| use std::os::unix::io::{AsRawFd, FromRawFd}; |
| |
| // NB: We give each driver a "fresh" receiver file descriptor to avoid |
| // the issues described in alexcrichton/tokio-process#42. |
| // |
| // In the past we would reuse the actual receiver file descriptor and |
| // swallow any errors around double registration of the same descriptor. |
| // I'm not sure if the second (failed) registration simply doesn't end |
| // up receiving wake up notifications, or there could be some race |
| // condition when consuming readiness events, but having distinct |
| // descriptors for distinct PollEvented instances appears to mitigate |
| // this. |
| // |
| // Unfortunately we cannot just use a single global PollEvented instance |
| // either, since we can't compare Handles or assume they will always |
| // point to the exact same reactor. |
| // |
| // Mio 0.7 removed `try_clone()` as an API due to unexpected behavior |
| // with registering dups with the same reactor. In this case, duping is |
| // safe as each dup is registered with separate reactors **and** we |
| // only expect at least one dup to receive the notification. |
| |
| // Manually drop as we don't actually own this instance of UnixStream. |
| let receiver_fd = globals().receiver.as_raw_fd(); |
| |
| // safety: there is nothing unsafe about this, but the `from_raw_fd` fn is marked as unsafe. |
| let original = |
| ManuallyDrop::new(unsafe { std::os::unix::net::UnixStream::from_raw_fd(receiver_fd) }); |
| let receiver = UnixStream::from_std(original.try_clone()?); |
| let receiver = PollEvented::new_with_interest_and_handle( |
| receiver, |
| Interest::READABLE | Interest::WRITABLE, |
| park.handle(), |
| )?; |
| |
| Ok(Self { |
| park, |
| receiver, |
| inner: Arc::new(Inner(())), |
| }) |
| } |
| |
| /// Returns a handle to this event loop which can be sent across threads |
| /// and can be used as a proxy to the event loop itself. |
| pub(crate) fn handle(&self) -> Handle { |
| Handle { |
| inner: Arc::downgrade(&self.inner), |
| } |
| } |
| |
| fn process(&self) { |
| // Check if the pipe is ready to read and therefore has "woken" us up |
| // |
| // To do so, we will `poll_read_ready` with a noop waker, since we don't |
| // need to actually be notified when read ready... |
| let waker = unsafe { Waker::from_raw(RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE)) }; |
| let mut cx = Context::from_waker(&waker); |
| |
| let ev = match self.receiver.registration().poll_read_ready(&mut cx) { |
| Poll::Ready(Ok(ev)) => ev, |
| Poll::Ready(Err(e)) => panic!("reactor gone: {}", e), |
| Poll::Pending => return, // No wake has arrived, bail |
| }; |
| |
| // Drain the pipe completely so we can receive a new readiness event |
| // if another signal has come in. |
| let mut buf = [0; 128]; |
| loop { |
| match (&*self.receiver).read(&mut buf) { |
| Ok(0) => panic!("EOF on self-pipe"), |
| Ok(_) => continue, // Keep reading |
| Err(e) if e.kind() == io::ErrorKind::WouldBlock => break, |
| Err(e) => panic!("Bad read on self-pipe: {}", e), |
| } |
| } |
| |
| self.receiver.registration().clear_readiness(ev); |
| |
| // Broadcast any signals which were received |
| globals().broadcast(); |
| } |
| } |
| |
| const NOOP_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(noop_clone, noop, noop, noop); |
| |
| unsafe fn noop_clone(_data: *const ()) -> RawWaker { |
| RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE) |
| } |
| |
| unsafe fn noop(_data: *const ()) {} |
| |
| // ===== impl Park for Driver ===== |
| |
| impl Park for Driver { |
| type Unpark = <IoDriver as Park>::Unpark; |
| type Error = io::Error; |
| |
| fn unpark(&self) -> Self::Unpark { |
| self.park.unpark() |
| } |
| |
| fn park(&mut self) -> Result<(), Self::Error> { |
| self.park.park()?; |
| self.process(); |
| Ok(()) |
| } |
| |
| fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { |
| self.park.park_timeout(duration)?; |
| self.process(); |
| Ok(()) |
| } |
| |
| fn shutdown(&mut self) { |
| self.park.shutdown() |
| } |
| } |
| |
| // ===== impl Handle ===== |
| |
| impl Handle { |
| pub(super) fn check_inner(&self) -> io::Result<()> { |
| if self.inner.strong_count() > 0 { |
| Ok(()) |
| } else { |
| Err(io::Error::new(io::ErrorKind::Other, "signal driver gone")) |
| } |
| } |
| } |
| |
| cfg_rt! { |
| impl Handle { |
| /// Returns a handle to the current driver |
| /// |
| /// # Panics |
| /// |
| /// This function panics if there is no current signal driver set. |
| pub(super) fn current() -> Self { |
| crate::runtime::context::signal_handle().expect( |
| "there is no signal driver running, must be called from the context of Tokio runtime", |
| ) |
| } |
| } |
| } |
| |
| cfg_not_rt! { |
| impl Handle { |
| /// Returns a handle to the current driver |
| /// |
| /// # Panics |
| /// |
| /// This function panics if there is no current signal driver set. |
| pub(super) fn current() -> Self { |
| panic!( |
| "there is no signal driver running, must be called from the context of Tokio runtime or with\ |
| `rt` enabled.", |
| ) |
| } |
| } |
| } |