| use crate::{ |
| io::{interest::Interest, PollEvented}, |
| process::{ |
| imp::{orphan::Wait, OrphanQueue}, |
| kill::Kill, |
| }, |
| util::error::RUNTIME_SHUTTING_DOWN_ERROR, |
| }; |
| |
| use libc::{syscall, SYS_pidfd_open, ENOSYS, PIDFD_NONBLOCK}; |
| use mio::{event::Source, unix::SourceFd}; |
| use std::{ |
| fs::File, |
| future::Future, |
| io, |
| marker::Unpin, |
| ops::Deref, |
| os::unix::io::{AsRawFd, FromRawFd, RawFd}, |
| pin::Pin, |
| process::ExitStatus, |
| sync::atomic::{AtomicBool, Ordering::Relaxed}, |
| task::{Context, Poll}, |
| }; |
| |
| #[derive(Debug)] |
| struct Pidfd { |
| fd: File, |
| } |
| |
| impl Pidfd { |
| fn open(pid: u32) -> Option<Pidfd> { |
| // Store false (0) to reduce executable size |
| static NO_PIDFD_SUPPORT: AtomicBool = AtomicBool::new(false); |
| |
| if NO_PIDFD_SUPPORT.load(Relaxed) { |
| return None; |
| } |
| |
| // Safety: The following function calls invovkes syscall pidfd_open, |
| // which takes two parameter: pidfd_open(fd: c_int, flag: c_int) |
| let fd = unsafe { syscall(SYS_pidfd_open, pid, PIDFD_NONBLOCK) }; |
| if fd == -1 { |
| let errno = io::Error::last_os_error().raw_os_error().unwrap(); |
| |
| if errno == ENOSYS { |
| NO_PIDFD_SUPPORT.store(true, Relaxed) |
| } |
| |
| None |
| } else { |
| // Safety: pidfd_open returns -1 on error or a valid fd with ownership. |
| Some(Pidfd { |
| fd: unsafe { File::from_raw_fd(fd as i32) }, |
| }) |
| } |
| } |
| } |
| |
| impl AsRawFd for Pidfd { |
| fn as_raw_fd(&self) -> RawFd { |
| self.fd.as_raw_fd() |
| } |
| } |
| |
| impl Source for Pidfd { |
| fn register( |
| &mut self, |
| registry: &mio::Registry, |
| token: mio::Token, |
| interest: mio::Interest, |
| ) -> io::Result<()> { |
| SourceFd(&self.as_raw_fd()).register(registry, token, interest) |
| } |
| |
| fn reregister( |
| &mut self, |
| registry: &mio::Registry, |
| token: mio::Token, |
| interest: mio::Interest, |
| ) -> io::Result<()> { |
| SourceFd(&self.as_raw_fd()).reregister(registry, token, interest) |
| } |
| |
| fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> { |
| SourceFd(&self.as_raw_fd()).deregister(registry) |
| } |
| } |
| |
| #[derive(Debug)] |
| struct PidfdReaperInner<W> |
| where |
| W: Unpin, |
| { |
| inner: W, |
| pidfd: PollEvented<Pidfd>, |
| } |
| |
| #[allow(deprecated)] |
| fn is_rt_shutdown_err(err: &io::Error) -> bool { |
| if let Some(inner) = err.get_ref() { |
| // Using `Error::description()` is more efficient than `format!("{inner}")`, |
| // so we use it here even if it is deprecated. |
| err.kind() == io::ErrorKind::Other |
| && inner.source().is_none() |
| && inner.description() == RUNTIME_SHUTTING_DOWN_ERROR |
| } else { |
| false |
| } |
| } |
| |
| impl<W> Future for PidfdReaperInner<W> |
| where |
| W: Wait + Unpin, |
| { |
| type Output = io::Result<ExitStatus>; |
| |
| fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| let this = Pin::into_inner(self); |
| |
| match ready!(this.pidfd.poll_read_ready(cx)) { |
| Err(err) if is_rt_shutdown_err(&err) => { |
| this.pidfd.reregister(Interest::READABLE)?; |
| ready!(this.pidfd.poll_read_ready(cx))? |
| } |
| res => res?, |
| } |
| Poll::Ready(Ok(this |
| .inner |
| .try_wait()? |
| .expect("pidfd is ready to read, the process should have exited"))) |
| } |
| } |
| |
| #[derive(Debug)] |
| pub(crate) struct PidfdReaper<W, Q> |
| where |
| W: Wait + Unpin, |
| Q: OrphanQueue<W> + Unpin, |
| { |
| inner: Option<PidfdReaperInner<W>>, |
| orphan_queue: Q, |
| } |
| |
| impl<W, Q> Deref for PidfdReaper<W, Q> |
| where |
| W: Wait + Unpin, |
| Q: OrphanQueue<W> + Unpin, |
| { |
| type Target = W; |
| |
| fn deref(&self) -> &Self::Target { |
| &self.inner.as_ref().expect("inner has gone away").inner |
| } |
| } |
| |
| impl<W, Q> PidfdReaper<W, Q> |
| where |
| W: Wait + Unpin, |
| Q: OrphanQueue<W> + Unpin, |
| { |
| pub(crate) fn new(inner: W, orphan_queue: Q) -> Result<Self, (Option<io::Error>, W)> { |
| if let Some(pidfd) = Pidfd::open(inner.id()) { |
| match PollEvented::new_with_interest(pidfd, Interest::READABLE) { |
| Ok(pidfd) => Ok(Self { |
| inner: Some(PidfdReaperInner { pidfd, inner }), |
| orphan_queue, |
| }), |
| Err(io_error) => Err((Some(io_error), inner)), |
| } |
| } else { |
| Err((None, inner)) |
| } |
| } |
| |
| pub(crate) fn inner_mut(&mut self) -> &mut W { |
| &mut self.inner.as_mut().expect("inner has gone away").inner |
| } |
| } |
| |
| impl<W, Q> Future for PidfdReaper<W, Q> |
| where |
| W: Wait + Unpin, |
| Q: OrphanQueue<W> + Unpin, |
| { |
| type Output = io::Result<ExitStatus>; |
| |
| fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| Pin::new( |
| Pin::into_inner(self) |
| .inner |
| .as_mut() |
| .expect("inner has gone away"), |
| ) |
| .poll(cx) |
| } |
| } |
| |
| impl<W, Q> Kill for PidfdReaper<W, Q> |
| where |
| W: Wait + Unpin + Kill, |
| Q: OrphanQueue<W> + Unpin, |
| { |
| fn kill(&mut self) -> io::Result<()> { |
| self.inner_mut().kill() |
| } |
| } |
| |
| impl<W, Q> Drop for PidfdReaper<W, Q> |
| where |
| W: Wait + Unpin, |
| Q: OrphanQueue<W> + Unpin, |
| { |
| fn drop(&mut self) { |
| let mut orphan = self.inner.take().expect("inner has gone away").inner; |
| if let Ok(Some(_)) = orphan.try_wait() { |
| return; |
| } |
| |
| self.orphan_queue.push_orphan(orphan); |
| } |
| } |
| |
| #[cfg(all(test, not(loom), not(miri)))] |
| mod test { |
| use super::*; |
| use crate::{ |
| process::unix::orphan::test::MockQueue, |
| runtime::{Builder as RuntimeBuilder, Runtime}, |
| }; |
| use std::process::{Command, Output}; |
| |
| fn create_runtime() -> Runtime { |
| RuntimeBuilder::new_current_thread() |
| .enable_io() |
| .build() |
| .unwrap() |
| } |
| |
| fn run_test(fut: impl Future<Output = ()>) { |
| create_runtime().block_on(fut) |
| } |
| |
| fn is_pidfd_available() -> bool { |
| let Output { stdout, status, .. } = Command::new("uname").arg("-r").output().unwrap(); |
| assert!(status.success()); |
| let stdout = String::from_utf8_lossy(&stdout); |
| |
| let mut kernel_version_iter = stdout.split_once('-').unwrap().0.split('.'); |
| let major: u32 = kernel_version_iter.next().unwrap().parse().unwrap(); |
| let minor: u32 = kernel_version_iter.next().unwrap().parse().unwrap(); |
| |
| major >= 6 || (major == 5 && minor >= 10) |
| } |
| |
| #[test] |
| fn test_pidfd_reaper_poll() { |
| if !is_pidfd_available() { |
| eprintln!("pidfd is not available on this linux kernel, skip this test"); |
| return; |
| } |
| |
| let queue = MockQueue::new(); |
| |
| run_test(async { |
| let child = Command::new("true").spawn().unwrap(); |
| let pidfd_reaper = PidfdReaper::new(child, &queue).unwrap(); |
| |
| let exit_status = pidfd_reaper.await.unwrap(); |
| assert!(exit_status.success()); |
| }); |
| |
| assert!(queue.all_enqueued.borrow().is_empty()); |
| } |
| |
| #[test] |
| fn test_pidfd_reaper_kill() { |
| if !is_pidfd_available() { |
| eprintln!("pidfd is not available on this linux kernel, skip this test"); |
| return; |
| } |
| |
| let queue = MockQueue::new(); |
| |
| run_test(async { |
| let child = Command::new("sleep").arg("1800").spawn().unwrap(); |
| let mut pidfd_reaper = PidfdReaper::new(child, &queue).unwrap(); |
| |
| pidfd_reaper.kill().unwrap(); |
| |
| let exit_status = pidfd_reaper.await.unwrap(); |
| assert!(!exit_status.success()); |
| }); |
| |
| assert!(queue.all_enqueued.borrow().is_empty()); |
| } |
| |
| #[test] |
| fn test_pidfd_reaper_drop() { |
| if !is_pidfd_available() { |
| eprintln!("pidfd is not available on this linux kernel, skip this test"); |
| return; |
| } |
| |
| let queue = MockQueue::new(); |
| |
| let mut child = Command::new("sleep").arg("1800").spawn().unwrap(); |
| |
| run_test(async { |
| let _pidfd_reaper = PidfdReaper::new(&mut child, &queue).unwrap(); |
| }); |
| |
| assert_eq!(queue.all_enqueued.borrow().len(), 1); |
| |
| child.kill().unwrap(); |
| child.wait().unwrap(); |
| } |
| } |