blob: 39bce9581c23cb11677a3c4041a75955c6e371b6 [file] [log] [blame]
// Copyright 2020 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! `URingExecutor`
//! The executor runs all given futures to completion. Futures register wakers associated with
//! io_uring operations. A waker is called when the set of uring ops the waker is waiting on
//! completes.
//! `URingExecutor` is meant to be used with the `futures-rs` crate that provides combinators and
//! utility functions to combine futures. In general, the interface provided by `URingExecutor`
//! shouldn't be used directly. Instead, use them by interacting with implementors of `IoSource`,
//! and the high-level future functions.
//! ## Read/Write buffer management.
//! There are two key issues managing asynchronous IO buffers in rust.
//! 1) The kernel has a mutable reference to the memory until the completion is returned. Rust must
//! not have any references to it during that time.
//! 2) The memory must remain valid as long as the kernel has a reference to it.
//! ### The kernel's mutable borrow of the buffer
//! Because the buffers used for read and write must be passed to the kernel for an unknown
//! duration, the functions must maintain ownership of the memory. The core of this problem is that
//! the lifetime of the future isn't tied to the scope in which the kernel can modify the buffer the
//! future has a reference to. The buffer can be modified at any point from submission until the
//! operation completes. The operation can't be synchronously canceled when the future is dropped,
//! and Drop can't be used for safety guarantees. To ensure this never happens, only memory that
//! implements `BackingMemory` is accepted. For implementors of `BackingMemory` the mut borrow
//! isn't an issue because those are already Ok with external modifications to the memory (Like a
//! `VolatileSlice`).
//! ### Buffer lifetime
//! What if the kernel's reference to the buffer outlives the buffer itself? This could happen if a
//! read operation was submitted, then the memory is dropped. To solve this, the executor takes an
//! Arc to the backing memory. Vecs being read to are also wrapped in an Arc before being passed to
//! the executor. The executor holds the Arc and ensures all operations are complete before dropping
//! it, that guarantees the memory is valid for the duration.
//! The buffers _have_ to be on the heap. Because we don't have a way to cancel a future if it is
//! dropped(can't rely on drop running), there is no way to ensure the kernel's buffer remains valid
//! until the operation completes unless the executor holds an Arc to the memory on the heap.
//! ## Using `Vec` for reads/writes.
//! There is a convenience wrapper `VecIoWrapper` provided for fully owned vectors. This type
//! ensures that only the kernel is allowed to access the `Vec` and wraps the the `Vec` in an Arc to
//! ensure it lives long enough.
use std::convert::TryInto;
use std::fs::File;
use std::future::Future;
use std::io;
use std::mem;
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
use std::pin::Pin;
use std::sync::atomic::{AtomicI32, AtomicU32, Ordering};
use std::sync::{Arc, Weak};
use std::task::Waker;
use std::task::{Context, Poll};
use std::thread::{self, ThreadId};
use async_task::Task;
use futures::task::noop_waker;
use io_uring::URingContext;
use pin_utils::pin_mut;
use slab::Slab;
use sync::Mutex;
use sys_util::{warn, WatchingEvents};
use thiserror::Error as ThisError;
use crate::mem::{BackingMemory, MemRegion};
use crate::queue::RunnableQueue;
use crate::waker::{new_waker, WakerToken, WeakWake};
#[derive(Debug, ThisError)]
pub enum Error {
/// Failed to copy the FD for the polling context.
#[error("Failed to copy the FD for the polling context: {0}")]
/// The Executor is gone.
#[error("The URingExecutor is gone")]
/// Invalid offset or length given for an iovec in backing memory.
#[error("Invalid offset/len for getting an iovec")]
/// Invalid FD source specified.
#[error("Invalid source, FD not registered for use")]
/// Error doing the IO.
#[error("Error during IO: {0}")]
/// Creating a context to wait on FDs failed.
#[error("Error creating the fd waiting context: {0}")]
/// Failed to remove the waker remove the polling context.
#[error("Error removing from the URing context: {0}")]
/// Failed to submit the operation to the polling context.
#[error("Error adding to the URing context: {0}")]
/// URingContext failure.
#[error("URingContext failure: {0}")]
/// Failed to submit or wait for io_uring events.
#[error("URing::enter: {0}")]
pub type Result<T> = std::result::Result<T, Error>;
// Checks if the uring executor is available.
// Caches the result so that the check is only run once.
// Useful for falling back to the FD executor on pre-uring kernels.
pub(crate) fn use_uring() -> bool {
const UNKNOWN: u32 = 0;
const URING: u32 = 1;
const FD: u32 = 2;
static USE_URING: AtomicU32 = AtomicU32::new(UNKNOWN);
match USE_URING.load(Ordering::Relaxed) {
// Create a dummy uring context to check that the kernel understands the syscalls.
if URingContext::new(8).is_ok() {, Ordering::Relaxed);
} else {, Ordering::Relaxed);
URING => true,
FD => false,
_ => unreachable!("invalid use uring state"),
pub struct RegisteredSource {
tag: usize,
ex: Weak<RawExecutor>,
impl RegisteredSource {
pub fn start_read_to_mem(
file_offset: u64,
mem: Arc<dyn BackingMemory + Send + Sync>,
addrs: &[MemRegion],
) -> Result<PendingOperation> {
let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
let token = ex.submit_read_to_vectored(self, mem, file_offset, addrs)?;
Ok(PendingOperation {
waker_token: Some(token),
ex: self.ex.clone(),
submitted: false,
pub fn start_write_from_mem(
file_offset: u64,
mem: Arc<dyn BackingMemory + Send + Sync>,
addrs: &[MemRegion],
) -> Result<PendingOperation> {
let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
let token = ex.submit_write_from_vectored(self, mem, file_offset, addrs)?;
Ok(PendingOperation {
waker_token: Some(token),
ex: self.ex.clone(),
submitted: false,
pub fn start_fallocate(&self, offset: u64, len: u64, mode: u32) -> Result<PendingOperation> {
let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
let token = ex.submit_fallocate(self, offset, len, mode)?;
Ok(PendingOperation {
waker_token: Some(token),
ex: self.ex.clone(),
submitted: false,
pub fn start_fsync(&self) -> Result<PendingOperation> {
let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
let token = ex.submit_fsync(self)?;
Ok(PendingOperation {
waker_token: Some(token),
ex: self.ex.clone(),
submitted: false,
pub fn poll_fd_readable(&self) -> Result<PendingOperation> {
let events = WatchingEvents::empty().set_read();
let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
let token = ex.submit_poll(self, &events)?;
Ok(PendingOperation {
waker_token: Some(token),
ex: self.ex.clone(),
submitted: false,
impl Drop for RegisteredSource {
fn drop(&mut self) {
if let Some(ex) = self.ex.upgrade() {
let _ = ex.deregister_source(self);
// Indicates that the executor is either within or about to make an io_uring_enter syscall. When a
// waker sees this value, it will add and submit a NOP to the uring, which will wake up the thread
// blocked on the io_uring_enter syscall.
const WAITING: i32 = 0xb80d_21b5u32 as i32;
// Indicates that the executor is processing any futures that are ready to run.
const PROCESSING: i32 = 0xdb31_83a3u32 as i32;
// Indicates that one or more futures may be ready to make progress.
const WOKEN: i32 = 0x0fc7_8f7eu32 as i32;
// Number of entries in the ring.
const NUM_ENTRIES: usize = 256;
// An operation that has been submitted to the uring and is potentially being waited on.
struct OpData {
_file: Arc<File>,
_mem: Option<Arc<dyn BackingMemory + Send + Sync>>,
waker: Option<Waker>,
canceled: bool,
// The current status of an operation that's been submitted to the uring.
enum OpStatus {
struct Ring {
ops: Slab<OpStatus>,
registered_sources: Slab<Arc<File>>,
struct RawExecutor {
// The URingContext needs to be first so that it is dropped first, closing the uring fd, and
// releasing the resources borrowed by the kernel before we free them.
ctx: URingContext,
queue: RunnableQueue,
ring: Mutex<Ring>,
thread_id: Mutex<Option<ThreadId>>,
state: AtomicI32,
impl RawExecutor {
fn new() -> Result<RawExecutor> {
Ok(RawExecutor {
ctx: URingContext::new(NUM_ENTRIES).map_err(Error::CreatingContext)?,
queue: RunnableQueue::new(),
ring: Mutex::new(Ring {
ops: Slab::with_capacity(NUM_ENTRIES),
registered_sources: Slab::with_capacity(NUM_ENTRIES),
thread_id: Mutex::new(None),
state: AtomicI32::new(PROCESSING),
fn wake(&self) {
let oldstate = self.state.swap(WOKEN, Ordering::Release);
if oldstate == WAITING {
let mut ring = self.ring.lock();
let entry = ring.ops.vacant_entry();
let next_op_token = entry.key();
if let Err(e) = self.ctx.add_nop(usize_to_u64(next_op_token)) {
warn!("Failed to add NOP for waking up executor: {}", e);
match self.ctx.submit() {
Ok(()) => {}
// If the kernel's submit ring is full then we know we won't block when calling
// io_uring_enter, which is all we really care about.
Err(io_uring::Error::RingEnter(libc::EBUSY)) => {}
Err(e) => warn!("Failed to submit NOP for waking up executor: {}", e),
fn spawn<F>(self: &Arc<Self>, f: F) -> Task<F::Output>
F: Future + Send + 'static,
F::Output: Send + 'static,
let raw = Arc::downgrade(self);
let schedule = move |runnable| {
if let Some(r) = raw.upgrade() {
let (runnable, task) = async_task::spawn(f, schedule);
fn spawn_local<F>(self: &Arc<Self>, f: F) -> Task<F::Output>
F: Future + 'static,
F::Output: 'static,
let raw = Arc::downgrade(self);
let schedule = move |runnable| {
if let Some(r) = raw.upgrade() {
let (runnable, task) = async_task::spawn_local(f, schedule);
fn runs_tasks_on_current_thread(&self) -> bool {
let executor_thread = self.thread_id.lock();
.map(|id| id == thread::current().id())
fn run<F: Future>(&self, cx: &mut Context, done: F) -> Result<F::Output> {
let current_thread = thread::current().id();
let mut thread_id = self.thread_id.lock();
"`URingExecutor::run` cannot be called from more than one thread"
loop {, Ordering::Release);
for runnable in self.queue.iter() {;
if let Poll::Ready(val) = done.as_mut().poll(cx) {
return Ok(val);
let oldstate = self.state.compare_exchange(
if let Err(oldstate) = oldstate {
debug_assert_eq!(oldstate, WOKEN);
// One or more futures have become runnable.
let events = self.ctx.wait().map_err(Error::URingEnter)?;
// Set the state back to PROCESSING to prevent any tasks woken up by the loop below from
// writing to the eventfd., Ordering::Release);
let mut ring = self.ring.lock();
for (raw_token, result) in events {
// While the `expect()` might fail on arbitrary `u64`s, the `raw_token` was
// something that we originally gave to the kernel and that was created from a
// `usize` so we should always be able to convert it back into a `usize`.
let token = raw_token
.expect("`u64` doesn't fit inside a `usize`");
let op = ring
.expect("Received completion token for unexpected operation");
match mem::replace(op, OpStatus::Completed(Some(result))) {
// No one is waiting on a Nop.
OpStatus::Nop => mem::drop(ring.ops.remove(token)),
OpStatus::Pending(data) => {
if data.canceled {
// No one is waiting for this operation and the uring is done with
// it so it's safe to remove.
if let Some(waker) = data.waker {
OpStatus::Completed(_) => panic!("uring operation completed more than once"),
fn get_result(&self, token: &WakerToken, cx: &mut Context) -> Option<io::Result<u32>> {
let mut ring = self.ring.lock();
let op = ring
.expect("`get_result` called on unknown operation");
match op {
OpStatus::Nop => panic!("`get_result` called on nop"),
OpStatus::Pending(data) => {
if data.canceled {
panic!("`get_result` called on canceled operation");
data.waker = Some(cx.waker().clone());
OpStatus::Completed(res) => {
let out = res.take();
Some(out.expect("Missing result in completed operation"))
// Remove the waker for the given token if it hasn't fired yet.
fn cancel_operation(&self, token: WakerToken) {
let mut ring = self.ring.lock();
if let Some(op) = ring.ops.get_mut(token.0) {
match op {
OpStatus::Nop => panic!("`cancel_operation` called on nop"),
OpStatus::Pending(data) => {
if data.canceled {
panic!("uring operation canceled more than once");
// Clear the waker as it is no longer needed.
data.waker = None;
data.canceled = true;
// Keep the rest of the op data as the uring might still be accessing either
// the source of the backing memory so it needs to live until the kernel
// completes the operation. TODO: cancel the operation in the uring.
OpStatus::Completed(_) => {
fn register_source(&self, f: Arc<File>) -> usize {
fn deregister_source(&self, source: &RegisteredSource) {
// There isn't any need to pull pending ops out, the all have Arc's to the file and mem they
// need.let them complete. deregister with pending ops is not a common path no need to
// optimize that case yet.
fn submit_poll(
source: &RegisteredSource,
events: &sys_util::WatchingEvents,
) -> Result<WakerToken> {
let mut ring = self.ring.lock();
let src = ring
let entry = ring.ops.vacant_entry();
let next_op_token = entry.key();
.add_poll_fd(src.as_raw_fd(), events, usize_to_u64(next_op_token))
entry.insert(OpStatus::Pending(OpData {
_file: src,
_mem: None,
waker: None,
canceled: false,
fn submit_fallocate(
source: &RegisteredSource,
offset: u64,
len: u64,
mode: u32,
) -> Result<WakerToken> {
let mut ring = self.ring.lock();
let src = ring
let entry = ring.ops.vacant_entry();
let next_op_token = entry.key();
entry.insert(OpStatus::Pending(OpData {
_file: src,
_mem: None,
waker: None,
canceled: false,
fn submit_fsync(&self, source: &RegisteredSource) -> Result<WakerToken> {
let mut ring = self.ring.lock();
let src = ring
let entry = ring.ops.vacant_entry();
let next_op_token = entry.key();
.add_fsync(src.as_raw_fd(), usize_to_u64(next_op_token))
entry.insert(OpStatus::Pending(OpData {
_file: src,
_mem: None,
waker: None,
canceled: false,
fn submit_read_to_vectored(
source: &RegisteredSource,
mem: Arc<dyn BackingMemory + Send + Sync>,
offset: u64,
addrs: &[MemRegion],
) -> Result<WakerToken> {
if addrs
.any(|&mem_range| mem.get_volatile_slice(mem_range).is_err())
return Err(Error::InvalidOffset);
let mut ring = self.ring.lock();
let src = ring
// We can't insert the OpData into the slab yet because `iovecs` borrows `mem` below.
let entry = ring.ops.vacant_entry();
let next_op_token = entry.key();
// The addresses have already been validated, so unwrapping them will succeed.
// validate their addresses before submitting.
let iovecs = addrs
.map(|&mem_range| *mem.get_volatile_slice(mem_range).unwrap().as_iobuf());
unsafe {
// Safe because all the addresses are within the Memory that an Arc is kept for the
// duration to ensure the memory is valid while the kernel accesses it.
// Tested by `dont_drop_backing_mem_read` unit test.
.add_readv_iter(iovecs, src.as_raw_fd(), offset, usize_to_u64(next_op_token))
entry.insert(OpStatus::Pending(OpData {
_file: src,
_mem: Some(mem),
waker: None,
canceled: false,
fn submit_write_from_vectored(
source: &RegisteredSource,
mem: Arc<dyn BackingMemory + Send + Sync>,
offset: u64,
addrs: &[MemRegion],
) -> Result<WakerToken> {
if addrs
.any(|&mem_range| mem.get_volatile_slice(mem_range).is_err())
return Err(Error::InvalidOffset);
let mut ring = self.ring.lock();
let src = ring
// We can't insert the OpData into the slab yet because `iovecs` borrows `mem` below.
let entry = ring.ops.vacant_entry();
let next_op_token = entry.key();
// The addresses have already been validated, so unwrapping them will succeed.
// validate their addresses before submitting.
let iovecs = addrs
.map(|&mem_range| *mem.get_volatile_slice(mem_range).unwrap().as_iobuf());
unsafe {
// Safe because all the addresses are within the Memory that an Arc is kept for the
// duration to ensure the memory is valid while the kernel accesses it.
// Tested by `dont_drop_backing_mem_write` unit test.
.add_writev_iter(iovecs, src.as_raw_fd(), offset, usize_to_u64(next_op_token))
entry.insert(OpStatus::Pending(OpData {
_file: src,
_mem: Some(mem),
waker: None,
canceled: false,
impl WeakWake for RawExecutor {
fn wake_by_ref(weak_self: &Weak<Self>) {
if let Some(arc_self) = weak_self.upgrade() {
impl Drop for RawExecutor {
fn drop(&mut self) {
// Wake up any futures still waiting on uring operations.
let ring = self.ring.get_mut();
for (_, op) in ring.ops.iter_mut() {
match op {
OpStatus::Nop => {}
OpStatus::Pending(data) => {
// If the operation wasn't already canceled then wake up the future waiting on
// it. When polled that future will get an ExecutorGone error anyway so there's
// no point in waiting until the operation completes to wake it up.
if !data.canceled {
if let Some(waker) = data.waker.take() {
data.canceled = true;
OpStatus::Completed(_) => {}
// Since the RawExecutor is wrapped in an Arc it may end up being dropped from a different
// thread than the one that called `run` or `run_until`. Since we know there are no other
// references, just clear the thread id so that we don't panic.
*self.thread_id.lock() = None;
// Now run the executor loop once more to poll any futures we just woke up.
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let res = cx, async {});
if let Err(e) = res {
warn!("Failed to drive uring to completion: {}", e);
/// An executor that uses io_uring for its asynchronous I/O operations. See the documentation of
/// `Executor` for more details about the methods.
pub struct URingExecutor {
raw: Arc<RawExecutor>,
impl URingExecutor {
pub fn new() -> Result<URingExecutor> {
let raw = RawExecutor::new().map(Arc::new)?;
Ok(URingExecutor { raw })
pub fn spawn<F>(&self, f: F) -> Task<F::Output>
F: Future + Send + 'static,
F::Output: Send + 'static,
pub fn spawn_local<F>(&self, f: F) -> Task<F::Output>
F: Future + 'static,
F::Output: 'static,
pub fn run(&self) -> Result<()> {
let waker = new_waker(Arc::downgrade(&self.raw));
let mut cx = Context::from_waker(&waker); cx, crate::empty::<()>())
pub fn run_until<F: Future>(&self, f: F) -> Result<F::Output> {
let waker = new_waker(Arc::downgrade(&self.raw));
let mut ctx = Context::from_waker(&waker); ctx, f)
/// Register a file and memory pair for buffered asynchronous operation.
pub(crate) fn register_source<F: AsRawFd>(&self, fd: &F) -> Result<RegisteredSource> {
let duped_fd = unsafe {
// Safe because duplicating an FD doesn't affect memory safety, and the dup'd FD
// will only be added to the poll loop.
Ok(RegisteredSource {
tag: self.raw.register_source(Arc::new(duped_fd)),
ex: Arc::downgrade(&self.raw),
// Used to dup the FDs passed to the executor so there is a guarantee they aren't closed while
// waiting in TLS to be added to the main polling context.
unsafe fn dup_fd(fd: RawFd) -> Result<RawFd> {
let ret = libc::fcntl(fd, libc::F_DUPFD_CLOEXEC, 0);
if ret < 0 {
} else {
// Converts a `usize` into a `u64` and panics if the conversion fails.
fn usize_to_u64(val: usize) -> u64 {
val.try_into().expect("`usize` doesn't fit inside a `u64`")
pub struct PendingOperation {
waker_token: Option<WakerToken>,
ex: Weak<RawExecutor>,
submitted: bool,
impl Future for PendingOperation {
type Output = Result<u32>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let token = self
.expect("PendingOperation polled after returning Poll::Ready");
if let Some(ex) = self.ex.upgrade() {
if let Some(result) = ex.get_result(token, cx) {
self.waker_token = None;
} else {
// If we haven't submitted the operation yet, and the executor runs on a different
// thread then submit it now. Otherwise the executor will submit it automatically
// the next time it calls UringContext::wait.
if !self.submitted && !ex.runs_tasks_on_current_thread() {
match ex.ctx.submit() {
Ok(()) => self.submitted = true,
// If the kernel ring is full then wait until some ops are removed from the
// completion queue. This op should get submitted the next time the executor
// calls UringContext::wait.
Err(io_uring::Error::RingEnter(libc::EBUSY)) => {}
Err(e) => return Poll::Ready(Err(Error::URingEnter(e))),
} else {
impl Drop for PendingOperation {
fn drop(&mut self) {
if let Some(waker_token) = self.waker_token.take() {
if let Some(ex) = self.ex.upgrade() {
mod tests {
use std::future::Future;
use std::io::{Read, Write};
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::executor::block_on;
use super::*;
use crate::mem::{BackingMemory, MemRegion, VecIoWrapper};
// A future that returns ready when the uring queue is empty.
struct UringQueueEmpty<'a> {
ex: &'a URingExecutor,
impl<'a> Future for UringQueueEmpty<'a> {
type Output = ();
fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
if self.ex.raw.ring.lock().ops.is_empty() {
} else {
fn dont_drop_backing_mem_read() {
// Create a backing memory wrapped in an Arc and check that the drop isn't called while the
// op is pending.
let bm =
Arc::new(VecIoWrapper::from(vec![0u8; 4096])) as Arc<dyn BackingMemory + Send + Sync>;
// Use pipes to create a future that will block forever.
let (rx, mut tx) = sys_util::pipe(true).unwrap();
// Set up the TLS for the uring_executor by creating one.
let ex = URingExecutor::new().unwrap();
// Register the receive side of the pipe with the executor.
let registered_source = ex.register_source(&rx).expect("register source failed");
// Submit the op to the kernel. Next, test that the source keeps its Arc open for the duration
// of the op.
let pending_op = registered_source
.start_read_to_mem(0, Arc::clone(&bm), &[MemRegion { offset: 0, len: 8 }])
.expect("failed to start read to mem");
// Here the Arc count must be two, one for `bm` and one to signify that the kernel has a
// reference while the op is active.
assert_eq!(Arc::strong_count(&bm), 2);
// Dropping the operation shouldn't reduce the Arc count, as the kernel could still be using
// it.
assert_eq!(Arc::strong_count(&bm), 2);
// Finishing the operation should put the Arc count back to 1.
// write to the pipe to wake the read pipe and then wait for the uring result in the
// executor.
tx.write_all(&[0u8; 8]).expect("write failed");
ex.run_until(UringQueueEmpty { ex: &ex })
.expect("Failed to wait for read pipe ready");
assert_eq!(Arc::strong_count(&bm), 1);
fn dont_drop_backing_mem_write() {
// Create a backing memory wrapped in an Arc and check that the drop isn't called while the
// op is pending.
let bm =
Arc::new(VecIoWrapper::from(vec![0u8; 4096])) as Arc<dyn BackingMemory + Send + Sync>;
// Use pipes to create a future that will block forever.
let (mut rx, tx) = sys_util::new_pipe_full().expect("Pipe failed");
// Set up the TLS for the uring_executor by creating one.
let ex = URingExecutor::new().unwrap();
// Register the receive side of the pipe with the executor.
let registered_source = ex.register_source(&tx).expect("register source failed");
// Submit the op to the kernel. Next, test that the source keeps its Arc open for the duration
// of the op.
let pending_op = registered_source
.start_write_from_mem(0, Arc::clone(&bm), &[MemRegion { offset: 0, len: 8 }])
.expect("failed to start write to mem");
// Here the Arc count must be two, one for `bm` and one to signify that the kernel has a
// reference while the op is active.
assert_eq!(Arc::strong_count(&bm), 2);
// Dropping the operation shouldn't reduce the Arc count, as the kernel could still be using
// it.
assert_eq!(Arc::strong_count(&bm), 2);
// Finishing the operation should put the Arc count back to 1.
// write to the pipe to wake the read pipe and then wait for the uring result in the
// executor.
let mut buf = vec![0u8; sys_util::round_up_to_page_size(1)];
rx.read_exact(&mut buf).expect("read to empty failed");
ex.run_until(UringQueueEmpty { ex: &ex })
.expect("Failed to wait for write pipe ready");
assert_eq!(Arc::strong_count(&bm), 1);
fn canceled_before_completion() {
async fn cancel_io(op: PendingOperation) {
async fn check_result(op: PendingOperation, expected: u32) {
let actual = op.await.expect("operation failed to complete");
assert_eq!(expected, actual);
let bm =
Arc::new(VecIoWrapper::from(vec![0u8; 16])) as Arc<dyn BackingMemory + Send + Sync>;
let (rx, tx) = sys_util::pipe(true).expect("Pipe failed");
let ex = URingExecutor::new().unwrap();
let rx_source = ex.register_source(&rx).expect("register source failed");
let tx_source = ex.register_source(&tx).expect("register source failed");
let read_task = rx_source
.start_read_to_mem(0, Arc::clone(&bm), &[MemRegion { offset: 0, len: 8 }])
.expect("failed to start read to mem");
// Write to the pipe so that the kernel operation will complete.
let buf =
Arc::new(VecIoWrapper::from(vec![0xc2u8; 16])) as Arc<dyn BackingMemory + Send + Sync>;
let write_task = tx_source
.start_write_from_mem(0, Arc::clone(&buf), &[MemRegion { offset: 0, len: 8 }])
.expect("failed to start write from mem");
ex.run_until(check_result(write_task, 8))
.expect("Failed to run executor");
fn drop_before_completion() {
const VALUE: u64 = 0xef6c_a8df_b842_eb9c;
async fn check_op(op: PendingOperation) {
let err = op.await.expect_err("Op completed successfully");
match err {
Error::ExecutorGone => {}
e => panic!("Unexpected error from op: {}", e),
let (mut rx, mut tx) = sys_util::pipe(true).expect("Pipe failed");
let ex = URingExecutor::new().unwrap();
let tx_source = ex.register_source(&tx).expect("Failed to register source");
let bm = Arc::new(VecIoWrapper::from(VALUE.to_ne_bytes().to_vec()));
let op = tx_source
&[MemRegion {
offset: 0,
len: mem::size_of::<u64>(),
.expect("Failed to start write from mem");
// Now drop the executor. It shouldn't run the write operation.
// Make sure the executor did not complete the uring operation.
let new_val = [0x2e; 8];
let mut buf = 0u64.to_ne_bytes();
rx.read_exact(&mut buf[..])
.expect("Failed to read from pipe");
assert_eq!(buf, new_val);
fn drop_on_different_thread() {
let ex = URingExecutor::new().unwrap();
let ex2 = ex.clone();
let t = thread::spawn(move || ex2.run_until(async {}));
// Leave an uncompleted operation in the queue so that the drop impl will try to drive it to
// completion.
let (_rx, tx) = sys_util::pipe(true).expect("Pipe failed");
let tx = ex.register_source(&tx).expect("Failed to register source");
let bm = Arc::new(VecIoWrapper::from(0xf2e96u64.to_ne_bytes().to_vec()));
let op = tx
&[MemRegion {
offset: 0,
len: mem::size_of::<u64>(),
.expect("Failed to start write from mem");
match block_on(op).expect_err("Pending operation completed after executor was dropped") {
Error::ExecutorGone => {}
e => panic!("Unexpected error after dropping executor: {}", e),