blob: df7be2b0a07b403ff0ab83c2cdfca796340f9d17 [file] [log] [blame]
// Copyright 2020 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! `URingExecutor`
//!
//! The executor runs all given futures to completion. Futures register wakers associated with
//! io_uring operations. A waker is called when the set of uring ops the waker is waiting on
//! completes.
//!
//! `URingExecutor` is meant to be used with the `futures-rs` crate that provides combinators and
//! utility functions to combine futures. In general, the interface provided by `URingExecutor`
//! shouldn't be used directly. Instead, use them by interacting with implementors of `IoSource`,
//! and the high-level future functions.
//!
//!
//! ## Read/Write buffer management.
//!
//! There are two key issues managing asynchronous IO buffers in rust.
//! 1) The kernel has a mutable reference to the memory until the completion is returned. Rust must
//! not have any references to it during that time.
//! 2) The memory must remain valid as long as the kernel has a reference to it.
//!
//! ### The kernel's mutable borrow of the buffer
//!
//! Because the buffers used for read and write must be passed to the kernel for an unknown
//! duration, the functions must maintain ownership of the memory. The core of this problem is that
//! the lifetime of the future isn't tied to the scope in which the kernel can modify the buffer the
//! future has a reference to. The buffer can be modified at any point from submission until the
//! operation completes. The operation can't be synchronously canceled when the future is dropped,
//! and Drop can't be used for safety guarantees. To ensure this never happens, only memory that
//! implements `BackingMemory` is accepted. For implementors of `BackingMemory` the mut borrow
//! isn't an issue because those are already Ok with external modifications to the memory (Like a
//! `VolatileSlice`).
//!
//! ### Buffer lifetime
//!
//! What if the kernel's reference to the buffer outlives the buffer itself? This could happen if a
//! read operation was submitted, then the memory is dropped. To solve this, the executor takes an
//! Arc to the backing memory. Vecs being read to are also wrapped in an Arc before being passed to
//! the executor. The executor holds the Arc and ensures all operations are complete before dropping
//! it, that guarantees the memory is valid for the duration.
//!
//! The buffers _have_ to be on the heap. Because we don't have a way to cancel a future if it is
//! dropped(can't rely on drop running), there is no way to ensure the kernel's buffer remains valid
//! until the operation completes unless the executor holds an Arc to the memory on the heap.
//!
//! ## Using `Vec` for reads/writes.
//!
//! There is a convenience wrapper `VecIoWrapper` provided for fully owned vectors. This type
//! ensures that only the kernel is allowed to access the `Vec` and wraps the the `Vec` in an Arc to
//! ensure it lives long enough.
use std::convert::TryInto;
use std::ffi::CStr;
use std::fs::File;
use std::future::Future;
use std::io;
use std::mem::{self, MaybeUninit};
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
use std::pin::Pin;
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::{Arc, Weak};
use std::task::Waker;
use std::task::{Context, Poll};
use std::thread::{self, ThreadId};
use async_task::Task;
use futures::task::noop_waker;
use io_uring::URingContext;
use once_cell::sync::Lazy;
use pin_utils::pin_mut;
use remain::sorted;
use slab::Slab;
use sync::Mutex;
use sys_util::{warn, WatchingEvents};
use thiserror::Error as ThisError;
use crate::queue::RunnableQueue;
use crate::waker::{new_waker, WakerToken, WeakWake};
use crate::{
mem::{BackingMemory, MemRegion},
BlockingPool,
};
#[sorted]
#[derive(Debug, ThisError)]
pub enum Error {
/// Creating a context to wait on FDs failed.
#[error("Error creating the fd waiting context: {0}")]
CreatingContext(io_uring::Error),
/// Failed to copy the FD for the polling context.
#[error("Failed to copy the FD for the polling context: {0}")]
DuplicatingFd(sys_util::Error),
/// The Executor is gone.
#[error("The URingExecutor is gone")]
ExecutorGone,
/// Invalid offset or length given for an iovec in backing memory.
#[error("Invalid offset/len for getting an iovec")]
InvalidOffset,
/// Invalid FD source specified.
#[error("Invalid source, FD not registered for use")]
InvalidSource,
/// Error doing the IO.
#[error("Error during IO: {0}")]
Io(io::Error),
/// Failed to remove the waker remove the polling context.
#[error("Error removing from the URing context: {0}")]
RemovingWaker(io_uring::Error),
/// Failed to submit the operation to the polling context.
#[error("Error adding to the URing context: {0}")]
SubmittingOp(io_uring::Error),
/// URingContext failure.
#[error("URingContext failure: {0}")]
URingContextError(io_uring::Error),
/// Failed to submit or wait for io_uring events.
#[error("URing::enter: {0}")]
URingEnter(io_uring::Error),
}
pub type Result<T> = std::result::Result<T, Error>;
impl From<Error> for io::Error {
fn from(e: Error) -> Self {
use Error::*;
match e {
DuplicatingFd(e) => e.into(),
ExecutorGone => io::Error::new(io::ErrorKind::Other, ExecutorGone),
InvalidOffset => io::Error::new(io::ErrorKind::InvalidInput, InvalidOffset),
InvalidSource => io::Error::new(io::ErrorKind::InvalidData, InvalidSource),
Io(e) => e,
CreatingContext(e) => e.into(),
RemovingWaker(e) => e.into(),
SubmittingOp(e) => e.into(),
URingContextError(e) => e.into(),
URingEnter(e) => e.into(),
}
}
}
static USE_URING: Lazy<bool> = Lazy::new(|| {
let mut utsname = MaybeUninit::zeroed();
// Safe because this will only modify `utsname` and we check the return value.
let res = unsafe { libc::uname(utsname.as_mut_ptr()) };
if res < 0 {
return false;
}
// Safe because the kernel has initialized `utsname`.
let utsname = unsafe { utsname.assume_init() };
// Safe because the pointer is valid and the kernel guarantees that this is a valid C string.
let release = unsafe { CStr::from_ptr(utsname.release.as_ptr()) };
let mut components = match release.to_str().map(|r| r.split('.').map(str::parse)) {
Ok(c) => c,
Err(_) => return false,
};
// Kernels older than 5.10 either didn't support io_uring or had bugs in the implementation.
match (components.next(), components.next()) {
(Some(Ok(major)), Some(Ok(minor))) if (major, minor) >= (5, 10) => {
// The kernel version is new enough so check if we can actually make a uring context.
URingContext::new(8).is_ok()
}
_ => false,
}
});
// Checks if the uring executor is available.
// Caches the result so that the check is only run once.
// Useful for falling back to the FD executor on pre-uring kernels.
pub(crate) fn use_uring() -> bool {
*USE_URING
}
pub struct RegisteredSource {
tag: usize,
ex: Weak<RawExecutor>,
}
impl RegisteredSource {
pub fn start_read_to_mem(
&self,
file_offset: u64,
mem: Arc<dyn BackingMemory + Send + Sync>,
addrs: &[MemRegion],
) -> Result<PendingOperation> {
let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
let token = ex.submit_read_to_vectored(self, mem, file_offset, addrs)?;
Ok(PendingOperation {
waker_token: Some(token),
ex: self.ex.clone(),
submitted: false,
})
}
pub fn start_write_from_mem(
&self,
file_offset: u64,
mem: Arc<dyn BackingMemory + Send + Sync>,
addrs: &[MemRegion],
) -> Result<PendingOperation> {
let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
let token = ex.submit_write_from_vectored(self, mem, file_offset, addrs)?;
Ok(PendingOperation {
waker_token: Some(token),
ex: self.ex.clone(),
submitted: false,
})
}
pub fn start_fallocate(&self, offset: u64, len: u64, mode: u32) -> Result<PendingOperation> {
let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
let token = ex.submit_fallocate(self, offset, len, mode)?;
Ok(PendingOperation {
waker_token: Some(token),
ex: self.ex.clone(),
submitted: false,
})
}
pub fn start_fsync(&self) -> Result<PendingOperation> {
let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
let token = ex.submit_fsync(self)?;
Ok(PendingOperation {
waker_token: Some(token),
ex: self.ex.clone(),
submitted: false,
})
}
pub fn poll_fd_readable(&self) -> Result<PendingOperation> {
let events = WatchingEvents::empty().set_read();
let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
let token = ex.submit_poll(self, &events)?;
Ok(PendingOperation {
waker_token: Some(token),
ex: self.ex.clone(),
submitted: false,
})
}
}
impl Drop for RegisteredSource {
fn drop(&mut self) {
if let Some(ex) = self.ex.upgrade() {
let _ = ex.deregister_source(self);
}
}
}
// Indicates that the executor is either within or about to make an io_uring_enter syscall. When a
// waker sees this value, it will add and submit a NOP to the uring, which will wake up the thread
// blocked on the io_uring_enter syscall.
const WAITING: i32 = 0xb80d_21b5u32 as i32;
// Indicates that the executor is processing any futures that are ready to run.
const PROCESSING: i32 = 0xdb31_83a3u32 as i32;
// Indicates that one or more futures may be ready to make progress.
const WOKEN: i32 = 0x0fc7_8f7eu32 as i32;
// Number of entries in the ring.
const NUM_ENTRIES: usize = 256;
// An operation that has been submitted to the uring and is potentially being waited on.
struct OpData {
_file: Arc<File>,
_mem: Option<Arc<dyn BackingMemory + Send + Sync>>,
waker: Option<Waker>,
canceled: bool,
}
// The current status of an operation that's been submitted to the uring.
enum OpStatus {
Nop,
Pending(OpData),
Completed(Option<::std::io::Result<u32>>),
}
struct Ring {
ops: Slab<OpStatus>,
registered_sources: Slab<Arc<File>>,
}
struct RawExecutor {
// The URingContext needs to be first so that it is dropped first, closing the uring fd, and
// releasing the resources borrowed by the kernel before we free them.
ctx: URingContext,
queue: RunnableQueue,
ring: Mutex<Ring>,
blocking_pool: BlockingPool,
thread_id: Mutex<Option<ThreadId>>,
state: AtomicI32,
}
impl RawExecutor {
fn new() -> Result<RawExecutor> {
Ok(RawExecutor {
ctx: URingContext::new(NUM_ENTRIES).map_err(Error::CreatingContext)?,
queue: RunnableQueue::new(),
ring: Mutex::new(Ring {
ops: Slab::with_capacity(NUM_ENTRIES),
registered_sources: Slab::with_capacity(NUM_ENTRIES),
}),
blocking_pool: Default::default(),
thread_id: Mutex::new(None),
state: AtomicI32::new(PROCESSING),
})
}
fn wake(&self) {
let oldstate = self.state.swap(WOKEN, Ordering::Release);
if oldstate == WAITING {
let mut ring = self.ring.lock();
let entry = ring.ops.vacant_entry();
let next_op_token = entry.key();
if let Err(e) = self.ctx.add_nop(usize_to_u64(next_op_token)) {
warn!("Failed to add NOP for waking up executor: {}", e);
}
entry.insert(OpStatus::Nop);
mem::drop(ring);
match self.ctx.submit() {
Ok(()) => {}
// If the kernel's submit ring is full then we know we won't block when calling
// io_uring_enter, which is all we really care about.
Err(io_uring::Error::RingEnter(libc::EBUSY)) => {}
Err(e) => warn!("Failed to submit NOP for waking up executor: {}", e),
}
}
}
fn spawn<F>(self: &Arc<Self>, f: F) -> Task<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let raw = Arc::downgrade(self);
let schedule = move |runnable| {
if let Some(r) = raw.upgrade() {
r.queue.push_back(runnable);
r.wake();
}
};
let (runnable, task) = async_task::spawn(f, schedule);
runnable.schedule();
task
}
fn spawn_local<F>(self: &Arc<Self>, f: F) -> Task<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
let raw = Arc::downgrade(self);
let schedule = move |runnable| {
if let Some(r) = raw.upgrade() {
r.queue.push_back(runnable);
r.wake();
}
};
let (runnable, task) = async_task::spawn_local(f, schedule);
runnable.schedule();
task
}
fn spawn_blocking<F, R>(self: &Arc<Self>, f: F) -> Task<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
self.blocking_pool.spawn(f)
}
fn runs_tasks_on_current_thread(&self) -> bool {
let executor_thread = self.thread_id.lock();
executor_thread
.map(|id| id == thread::current().id())
.unwrap_or(false)
}
fn run<F: Future>(&self, cx: &mut Context, done: F) -> Result<F::Output> {
let current_thread = thread::current().id();
let mut thread_id = self.thread_id.lock();
assert_eq!(
*thread_id.get_or_insert(current_thread),
current_thread,
"`URingExecutor::run` cannot be called from more than one thread"
);
mem::drop(thread_id);
pin_mut!(done);
loop {
self.state.store(PROCESSING, Ordering::Release);
for runnable in self.queue.iter() {
runnable.run();
}
if let Poll::Ready(val) = done.as_mut().poll(cx) {
return Ok(val);
}
let oldstate = self.state.compare_exchange(
PROCESSING,
WAITING,
Ordering::Acquire,
Ordering::Acquire,
);
if let Err(oldstate) = oldstate {
debug_assert_eq!(oldstate, WOKEN);
// One or more futures have become runnable.
continue;
}
let events = self.ctx.wait().map_err(Error::URingEnter)?;
// Set the state back to PROCESSING to prevent any tasks woken up by the loop below from
// writing to the eventfd.
self.state.store(PROCESSING, Ordering::Release);
let mut ring = self.ring.lock();
for (raw_token, result) in events {
// While the `expect()` might fail on arbitrary `u64`s, the `raw_token` was
// something that we originally gave to the kernel and that was created from a
// `usize` so we should always be able to convert it back into a `usize`.
let token = raw_token
.try_into()
.expect("`u64` doesn't fit inside a `usize`");
let op = ring
.ops
.get_mut(token)
.expect("Received completion token for unexpected operation");
match mem::replace(op, OpStatus::Completed(Some(result))) {
// No one is waiting on a Nop.
OpStatus::Nop => mem::drop(ring.ops.remove(token)),
OpStatus::Pending(data) => {
if data.canceled {
// No one is waiting for this operation and the uring is done with
// it so it's safe to remove.
ring.ops.remove(token);
}
if let Some(waker) = data.waker {
waker.wake();
}
}
OpStatus::Completed(_) => panic!("uring operation completed more than once"),
}
}
}
}
fn get_result(&self, token: &WakerToken, cx: &mut Context) -> Option<io::Result<u32>> {
let mut ring = self.ring.lock();
let op = ring
.ops
.get_mut(token.0)
.expect("`get_result` called on unknown operation");
match op {
OpStatus::Nop => panic!("`get_result` called on nop"),
OpStatus::Pending(data) => {
if data.canceled {
panic!("`get_result` called on canceled operation");
}
data.waker = Some(cx.waker().clone());
None
}
OpStatus::Completed(res) => {
let out = res.take();
ring.ops.remove(token.0);
Some(out.expect("Missing result in completed operation"))
}
}
}
// Remove the waker for the given token if it hasn't fired yet.
fn cancel_operation(&self, token: WakerToken) {
let mut ring = self.ring.lock();
if let Some(op) = ring.ops.get_mut(token.0) {
match op {
OpStatus::Nop => panic!("`cancel_operation` called on nop"),
OpStatus::Pending(data) => {
if data.canceled {
panic!("uring operation canceled more than once");
}
// Clear the waker as it is no longer needed.
data.waker = None;
data.canceled = true;
// Keep the rest of the op data as the uring might still be accessing either
// the source of the backing memory so it needs to live until the kernel
// completes the operation. TODO: cancel the operation in the uring.
}
OpStatus::Completed(_) => {
ring.ops.remove(token.0);
}
}
}
}
fn register_source(&self, f: Arc<File>) -> usize {
self.ring.lock().registered_sources.insert(f)
}
fn deregister_source(&self, source: &RegisteredSource) {
// There isn't any need to pull pending ops out, the all have Arc's to the file and mem they
// need.let them complete. deregister with pending ops is not a common path no need to
// optimize that case yet.
self.ring.lock().registered_sources.remove(source.tag);
}
fn submit_poll(
&self,
source: &RegisteredSource,
events: &sys_util::WatchingEvents,
) -> Result<WakerToken> {
let mut ring = self.ring.lock();
let src = ring
.registered_sources
.get(source.tag)
.map(Arc::clone)
.ok_or(Error::InvalidSource)?;
let entry = ring.ops.vacant_entry();
let next_op_token = entry.key();
self.ctx
.add_poll_fd(src.as_raw_fd(), events, usize_to_u64(next_op_token))
.map_err(Error::SubmittingOp)?;
entry.insert(OpStatus::Pending(OpData {
_file: src,
_mem: None,
waker: None,
canceled: false,
}));
Ok(WakerToken(next_op_token))
}
fn submit_fallocate(
&self,
source: &RegisteredSource,
offset: u64,
len: u64,
mode: u32,
) -> Result<WakerToken> {
let mut ring = self.ring.lock();
let src = ring
.registered_sources
.get(source.tag)
.map(Arc::clone)
.ok_or(Error::InvalidSource)?;
let entry = ring.ops.vacant_entry();
let next_op_token = entry.key();
self.ctx
.add_fallocate(
src.as_raw_fd(),
offset,
len,
mode,
usize_to_u64(next_op_token),
)
.map_err(Error::SubmittingOp)?;
entry.insert(OpStatus::Pending(OpData {
_file: src,
_mem: None,
waker: None,
canceled: false,
}));
Ok(WakerToken(next_op_token))
}
fn submit_fsync(&self, source: &RegisteredSource) -> Result<WakerToken> {
let mut ring = self.ring.lock();
let src = ring
.registered_sources
.get(source.tag)
.map(Arc::clone)
.ok_or(Error::InvalidSource)?;
let entry = ring.ops.vacant_entry();
let next_op_token = entry.key();
self.ctx
.add_fsync(src.as_raw_fd(), usize_to_u64(next_op_token))
.map_err(Error::SubmittingOp)?;
entry.insert(OpStatus::Pending(OpData {
_file: src,
_mem: None,
waker: None,
canceled: false,
}));
Ok(WakerToken(next_op_token))
}
fn submit_read_to_vectored(
&self,
source: &RegisteredSource,
mem: Arc<dyn BackingMemory + Send + Sync>,
offset: u64,
addrs: &[MemRegion],
) -> Result<WakerToken> {
if addrs
.iter()
.any(|&mem_range| mem.get_volatile_slice(mem_range).is_err())
{
return Err(Error::InvalidOffset);
}
let mut ring = self.ring.lock();
let src = ring
.registered_sources
.get(source.tag)
.map(Arc::clone)
.ok_or(Error::InvalidSource)?;
// We can't insert the OpData into the slab yet because `iovecs` borrows `mem` below.
let entry = ring.ops.vacant_entry();
let next_op_token = entry.key();
// The addresses have already been validated, so unwrapping them will succeed.
// validate their addresses before submitting.
let iovecs = addrs
.iter()
.map(|&mem_range| *mem.get_volatile_slice(mem_range).unwrap().as_iobuf());
unsafe {
// Safe because all the addresses are within the Memory that an Arc is kept for the
// duration to ensure the memory is valid while the kernel accesses it.
// Tested by `dont_drop_backing_mem_read` unit test.
self.ctx
.add_readv_iter(iovecs, src.as_raw_fd(), offset, usize_to_u64(next_op_token))
.map_err(Error::SubmittingOp)?;
}
entry.insert(OpStatus::Pending(OpData {
_file: src,
_mem: Some(mem),
waker: None,
canceled: false,
}));
Ok(WakerToken(next_op_token))
}
fn submit_write_from_vectored(
&self,
source: &RegisteredSource,
mem: Arc<dyn BackingMemory + Send + Sync>,
offset: u64,
addrs: &[MemRegion],
) -> Result<WakerToken> {
if addrs
.iter()
.any(|&mem_range| mem.get_volatile_slice(mem_range).is_err())
{
return Err(Error::InvalidOffset);
}
let mut ring = self.ring.lock();
let src = ring
.registered_sources
.get(source.tag)
.map(Arc::clone)
.ok_or(Error::InvalidSource)?;
// We can't insert the OpData into the slab yet because `iovecs` borrows `mem` below.
let entry = ring.ops.vacant_entry();
let next_op_token = entry.key();
// The addresses have already been validated, so unwrapping them will succeed.
// validate their addresses before submitting.
let iovecs = addrs
.iter()
.map(|&mem_range| *mem.get_volatile_slice(mem_range).unwrap().as_iobuf());
unsafe {
// Safe because all the addresses are within the Memory that an Arc is kept for the
// duration to ensure the memory is valid while the kernel accesses it.
// Tested by `dont_drop_backing_mem_write` unit test.
self.ctx
.add_writev_iter(iovecs, src.as_raw_fd(), offset, usize_to_u64(next_op_token))
.map_err(Error::SubmittingOp)?;
}
entry.insert(OpStatus::Pending(OpData {
_file: src,
_mem: Some(mem),
waker: None,
canceled: false,
}));
Ok(WakerToken(next_op_token))
}
}
impl WeakWake for RawExecutor {
fn wake_by_ref(weak_self: &Weak<Self>) {
if let Some(arc_self) = weak_self.upgrade() {
RawExecutor::wake(&arc_self);
}
}
}
impl Drop for RawExecutor {
fn drop(&mut self) {
// Wake up any futures still waiting on uring operations.
let ring = self.ring.get_mut();
for (_, op) in ring.ops.iter_mut() {
match op {
OpStatus::Nop => {}
OpStatus::Pending(data) => {
// If the operation wasn't already canceled then wake up the future waiting on
// it. When polled that future will get an ExecutorGone error anyway so there's
// no point in waiting until the operation completes to wake it up.
if !data.canceled {
if let Some(waker) = data.waker.take() {
waker.wake();
}
}
data.canceled = true;
}
OpStatus::Completed(_) => {}
}
}
// Since the RawExecutor is wrapped in an Arc it may end up being dropped from a different
// thread than the one that called `run` or `run_until`. Since we know there are no other
// references, just clear the thread id so that we don't panic.
*self.thread_id.lock() = None;
// Now run the executor loop once more to poll any futures we just woke up.
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let res = self.run(&mut cx, async {});
if let Err(e) = res {
warn!("Failed to drive uring to completion: {}", e);
}
}
}
/// An executor that uses io_uring for its asynchronous I/O operations. See the documentation of
/// `Executor` for more details about the methods.
#[derive(Clone)]
pub struct URingExecutor {
raw: Arc<RawExecutor>,
}
impl URingExecutor {
pub fn new() -> Result<URingExecutor> {
let raw = RawExecutor::new().map(Arc::new)?;
Ok(URingExecutor { raw })
}
pub fn spawn<F>(&self, f: F) -> Task<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.raw.spawn(f)
}
pub fn spawn_local<F>(&self, f: F) -> Task<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
self.raw.spawn_local(f)
}
pub fn spawn_blocking<F, R>(&self, f: F) -> Task<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
self.raw.spawn_blocking(f)
}
pub fn run(&self) -> Result<()> {
let waker = new_waker(Arc::downgrade(&self.raw));
let mut cx = Context::from_waker(&waker);
self.raw.run(&mut cx, crate::empty::<()>())
}
pub fn run_until<F: Future>(&self, f: F) -> Result<F::Output> {
let waker = new_waker(Arc::downgrade(&self.raw));
let mut ctx = Context::from_waker(&waker);
self.raw.run(&mut ctx, f)
}
/// Register a file and memory pair for buffered asynchronous operation.
pub(crate) fn register_source<F: AsRawFd>(&self, fd: &F) -> Result<RegisteredSource> {
let duped_fd = unsafe {
// Safe because duplicating an FD doesn't affect memory safety, and the dup'd FD
// will only be added to the poll loop.
File::from_raw_fd(dup_fd(fd.as_raw_fd())?)
};
Ok(RegisteredSource {
tag: self.raw.register_source(Arc::new(duped_fd)),
ex: Arc::downgrade(&self.raw),
})
}
}
// Used to dup the FDs passed to the executor so there is a guarantee they aren't closed while
// waiting in TLS to be added to the main polling context.
unsafe fn dup_fd(fd: RawFd) -> Result<RawFd> {
let ret = libc::fcntl(fd, libc::F_DUPFD_CLOEXEC, 0);
if ret < 0 {
Err(Error::DuplicatingFd(sys_util::Error::last()))
} else {
Ok(ret)
}
}
// Converts a `usize` into a `u64` and panics if the conversion fails.
#[inline]
fn usize_to_u64(val: usize) -> u64 {
val.try_into().expect("`usize` doesn't fit inside a `u64`")
}
pub struct PendingOperation {
waker_token: Option<WakerToken>,
ex: Weak<RawExecutor>,
submitted: bool,
}
impl Future for PendingOperation {
type Output = Result<u32>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let token = self
.waker_token
.as_ref()
.expect("PendingOperation polled after returning Poll::Ready");
if let Some(ex) = self.ex.upgrade() {
if let Some(result) = ex.get_result(token, cx) {
self.waker_token = None;
Poll::Ready(result.map_err(Error::Io))
} else {
// If we haven't submitted the operation yet, and the executor runs on a different
// thread then submit it now. Otherwise the executor will submit it automatically
// the next time it calls UringContext::wait.
if !self.submitted && !ex.runs_tasks_on_current_thread() {
match ex.ctx.submit() {
Ok(()) => self.submitted = true,
// If the kernel ring is full then wait until some ops are removed from the
// completion queue. This op should get submitted the next time the executor
// calls UringContext::wait.
Err(io_uring::Error::RingEnter(libc::EBUSY)) => {}
Err(e) => return Poll::Ready(Err(Error::URingEnter(e))),
}
}
Poll::Pending
}
} else {
Poll::Ready(Err(Error::ExecutorGone))
}
}
}
impl Drop for PendingOperation {
fn drop(&mut self) {
if let Some(waker_token) = self.waker_token.take() {
if let Some(ex) = self.ex.upgrade() {
ex.cancel_operation(waker_token);
}
}
}
}
#[cfg(test)]
mod tests {
use std::future::Future;
use std::io::{Read, Write};
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::executor::block_on;
use super::*;
use crate::mem::{BackingMemory, MemRegion, VecIoWrapper};
// A future that returns ready when the uring queue is empty.
struct UringQueueEmpty<'a> {
ex: &'a URingExecutor,
}
impl<'a> Future for UringQueueEmpty<'a> {
type Output = ();
fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
if self.ex.raw.ring.lock().ops.is_empty() {
Poll::Ready(())
} else {
Poll::Pending
}
}
}
#[test]
fn dont_drop_backing_mem_read() {
if !use_uring() {
return;
}
// Create a backing memory wrapped in an Arc and check that the drop isn't called while the
// op is pending.
let bm =
Arc::new(VecIoWrapper::from(vec![0u8; 4096])) as Arc<dyn BackingMemory + Send + Sync>;
// Use pipes to create a future that will block forever.
let (rx, mut tx) = sys_util::pipe(true).unwrap();
// Set up the TLS for the uring_executor by creating one.
let ex = URingExecutor::new().unwrap();
// Register the receive side of the pipe with the executor.
let registered_source = ex.register_source(&rx).expect("register source failed");
// Submit the op to the kernel. Next, test that the source keeps its Arc open for the duration
// of the op.
let pending_op = registered_source
.start_read_to_mem(0, Arc::clone(&bm), &[MemRegion { offset: 0, len: 8 }])
.expect("failed to start read to mem");
// Here the Arc count must be two, one for `bm` and one to signify that the kernel has a
// reference while the op is active.
assert_eq!(Arc::strong_count(&bm), 2);
// Dropping the operation shouldn't reduce the Arc count, as the kernel could still be using
// it.
drop(pending_op);
assert_eq!(Arc::strong_count(&bm), 2);
// Finishing the operation should put the Arc count back to 1.
// write to the pipe to wake the read pipe and then wait for the uring result in the
// executor.
tx.write_all(&[0u8; 8]).expect("write failed");
ex.run_until(UringQueueEmpty { ex: &ex })
.expect("Failed to wait for read pipe ready");
assert_eq!(Arc::strong_count(&bm), 1);
}
#[test]
fn dont_drop_backing_mem_write() {
if !use_uring() {
return;
}
// Create a backing memory wrapped in an Arc and check that the drop isn't called while the
// op is pending.
let bm =
Arc::new(VecIoWrapper::from(vec![0u8; 4096])) as Arc<dyn BackingMemory + Send + Sync>;
// Use pipes to create a future that will block forever.
let (mut rx, tx) = sys_util::new_pipe_full().expect("Pipe failed");
// Set up the TLS for the uring_executor by creating one.
let ex = URingExecutor::new().unwrap();
// Register the receive side of the pipe with the executor.
let registered_source = ex.register_source(&tx).expect("register source failed");
// Submit the op to the kernel. Next, test that the source keeps its Arc open for the duration
// of the op.
let pending_op = registered_source
.start_write_from_mem(0, Arc::clone(&bm), &[MemRegion { offset: 0, len: 8 }])
.expect("failed to start write to mem");
// Here the Arc count must be two, one for `bm` and one to signify that the kernel has a
// reference while the op is active.
assert_eq!(Arc::strong_count(&bm), 2);
// Dropping the operation shouldn't reduce the Arc count, as the kernel could still be using
// it.
drop(pending_op);
assert_eq!(Arc::strong_count(&bm), 2);
// Finishing the operation should put the Arc count back to 1.
// write to the pipe to wake the read pipe and then wait for the uring result in the
// executor.
let mut buf = vec![0u8; sys_util::round_up_to_page_size(1)];
rx.read_exact(&mut buf).expect("read to empty failed");
ex.run_until(UringQueueEmpty { ex: &ex })
.expect("Failed to wait for write pipe ready");
assert_eq!(Arc::strong_count(&bm), 1);
}
#[test]
fn canceled_before_completion() {
if !use_uring() {
return;
}
async fn cancel_io(op: PendingOperation) {
mem::drop(op);
}
async fn check_result(op: PendingOperation, expected: u32) {
let actual = op.await.expect("operation failed to complete");
assert_eq!(expected, actual);
}
let bm =
Arc::new(VecIoWrapper::from(vec![0u8; 16])) as Arc<dyn BackingMemory + Send + Sync>;
let (rx, tx) = sys_util::pipe(true).expect("Pipe failed");
let ex = URingExecutor::new().unwrap();
let rx_source = ex.register_source(&rx).expect("register source failed");
let tx_source = ex.register_source(&tx).expect("register source failed");
let read_task = rx_source
.start_read_to_mem(0, Arc::clone(&bm), &[MemRegion { offset: 0, len: 8 }])
.expect("failed to start read to mem");
ex.spawn_local(cancel_io(read_task)).detach();
// Write to the pipe so that the kernel operation will complete.
let buf =
Arc::new(VecIoWrapper::from(vec![0xc2u8; 16])) as Arc<dyn BackingMemory + Send + Sync>;
let write_task = tx_source
.start_write_from_mem(0, Arc::clone(&buf), &[MemRegion { offset: 0, len: 8 }])
.expect("failed to start write from mem");
ex.run_until(check_result(write_task, 8))
.expect("Failed to run executor");
}
#[test]
fn drop_before_completion() {
if !use_uring() {
return;
}
const VALUE: u64 = 0xef6c_a8df_b842_eb9c;
async fn check_op(op: PendingOperation) {
let err = op.await.expect_err("Op completed successfully");
match err {
Error::ExecutorGone => {}
e => panic!("Unexpected error from op: {}", e),
}
}
let (mut rx, mut tx) = sys_util::pipe(true).expect("Pipe failed");
let ex = URingExecutor::new().unwrap();
let tx_source = ex.register_source(&tx).expect("Failed to register source");
let bm = Arc::new(VecIoWrapper::from(VALUE.to_ne_bytes().to_vec()));
let op = tx_source
.start_write_from_mem(
0,
bm,
&[MemRegion {
offset: 0,
len: mem::size_of::<u64>(),
}],
)
.expect("Failed to start write from mem");
ex.spawn_local(check_op(op)).detach();
// Now drop the executor. It shouldn't run the write operation.
mem::drop(ex);
// Make sure the executor did not complete the uring operation.
let new_val = [0x2e; 8];
tx.write_all(&new_val).unwrap();
let mut buf = 0u64.to_ne_bytes();
rx.read_exact(&mut buf[..])
.expect("Failed to read from pipe");
assert_eq!(buf, new_val);
}
#[test]
fn drop_on_different_thread() {
if !use_uring() {
return;
}
let ex = URingExecutor::new().unwrap();
let ex2 = ex.clone();
let t = thread::spawn(move || ex2.run_until(async {}));
t.join().unwrap().unwrap();
// Leave an uncompleted operation in the queue so that the drop impl will try to drive it to
// completion.
let (_rx, tx) = sys_util::pipe(true).expect("Pipe failed");
let tx = ex.register_source(&tx).expect("Failed to register source");
let bm = Arc::new(VecIoWrapper::from(0xf2e96u64.to_ne_bytes().to_vec()));
let op = tx
.start_write_from_mem(
0,
bm,
&[MemRegion {
offset: 0,
len: mem::size_of::<u64>(),
}],
)
.expect("Failed to start write from mem");
mem::drop(ex);
match block_on(op).expect_err("Pending operation completed after executor was dropped") {
Error::ExecutorGone => {}
e => panic!("Unexpected error after dropping executor: {}", e),
}
}
}