blob: 5743d3dff4b88ddfa4346cb6d1767815b3488565 [file] [log] [blame]
// Copyright 2020 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! # `IoSourceExt`
//!
//! User functions to asynchronously access files.
//! Using `IoSource` directly is inconvenient and requires dealing with state
//! machines for the backing uring, future libraries, etc. `IoSourceExt` instead
//! provides users with a future that can be `await`ed from async context.
//!
//! Each member of `IoSourceExt` returns a future for the supported operation. One or more
//! operation can be pending at a time.
//!
//! Operations can only access memory in a `Vec` or an implementor of `BackingMemory`. See the
//! `URingExecutor` documentation for an explaination of why.
use crate::poll_source::PollSource;
use crate::UringSource;
use async_trait::async_trait;
use std::fs::File;
use std::os::unix::io::AsRawFd;
use std::rc::Rc;
use thiserror::Error as ThisError;
use crate::uring_mem::{BackingMemory, MemRegion};
use sys_util::net::UnixSeqpacket;
#[derive(ThisError, Debug)]
pub enum Error {
/// An error with a polled(FD) source.
#[error("An error with a poll source: {0}")]
Poll(crate::poll_source::Error),
/// An error with a uring source.
#[error("An error with a uring source: {0}")]
Uring(crate::uring_executor::Error),
}
pub type Result<T> = std::result::Result<T, Error>;
/// Ergonomic methods for async reads.
#[async_trait(?Send)]
pub trait ReadAsync {
/// Reads from the iosource at `file_offset` and fill the given `vec`.
async fn read_to_vec<'a>(&'a self, file_offset: u64, vec: Vec<u8>) -> Result<(usize, Vec<u8>)>;
/// Reads to the given `mem` at the given offsets from the file starting at `file_offset`.
async fn read_to_mem<'a>(
&'a self,
file_offset: u64,
mem: Rc<dyn BackingMemory>,
mem_offsets: &'a [MemRegion],
) -> Result<usize>;
/// Wait for the FD of `self` to be readable.
async fn wait_readable(&self) -> Result<()>;
/// Reads a single u64 from the current offset.
async fn read_u64(&self) -> Result<u64>;
}
/// Ergonomic methods for async writes.
#[async_trait(?Send)]
pub trait WriteAsync {
/// Writes from the given `vec` to the file starting at `file_offset`.
async fn write_from_vec<'a>(
&'a self,
file_offset: u64,
vec: Vec<u8>,
) -> Result<(usize, Vec<u8>)>;
/// Writes from the given `mem` from the given offsets to the file starting at `file_offset`.
async fn write_from_mem<'a>(
&'a self,
file_offset: u64,
mem: Rc<dyn BackingMemory>,
mem_offsets: &'a [MemRegion],
) -> Result<usize>;
/// See `fallocate(2)`. Note this op is synchronous when using the Polled backend.
async fn fallocate(&self, file_offset: u64, len: u64, mode: u32) -> Result<()>;
/// Sync all completed write operations to the backing storage.
async fn fsync(&self) -> Result<()>;
}
/// Subtrait for general async IO.
#[async_trait(?Send)]
pub trait IoSourceExt<F>: ReadAsync + WriteAsync {
/// Yields the underlying IO source.
fn into_source(self: Box<Self>) -> F;
/// Provides a mutable ref to the underlying IO source.
fn as_source_mut(&mut self) -> &mut F;
/// Provides a ref to the underlying IO source.
fn as_source(&self) -> &F;
}
/// Marker trait signifying that the implementor is suitable for use with
/// cros_async. Examples of this include File, and sys_util::net::UnixSeqpacket.
///
/// (Note: it'd be really nice to implement a TryFrom for any implementors, and
/// remove our factory functions. Unfortunately
/// https://github.com/rust-lang/rust/issues/50133 makes that too painful.)
pub trait IntoAsync: AsRawFd {}
impl IntoAsync for File {}
impl IntoAsync for UnixSeqpacket {}
impl IntoAsync for &UnixSeqpacket {}
/// Creates a concrete `IoSourceExt` that uses uring if available or falls back to the fd_executor if not.
/// Note that on older kernels (pre 5.6) FDs such as event or timer FDs are unreliable when
/// having readvwritev performed through io_uring. To deal with EventFd or TimerFd, use
/// `IoSourceExt::read_u64`.
pub fn async_from<'a, F: IntoAsync + 'a>(f: F) -> Result<Box<dyn IoSourceExt<F> + 'a>> {
if crate::uring_executor::use_uring() {
async_uring_from(f)
} else {
async_poll_from(f)
}
}
/// Creates a concrete `IoSourceExt` using Uring.
pub(crate) fn async_uring_from<'a, F: IntoAsync + 'a>(
f: F,
) -> Result<Box<dyn IoSourceExt<F> + 'a>> {
UringSource::new(f)
.map(|u| Box::new(u) as Box<dyn IoSourceExt<F>>)
.map_err(Error::Uring)
}
/// Creates a concrete `IoSourceExt` using the fd_executor.
pub(crate) fn async_poll_from<'a, F: IntoAsync + 'a>(f: F) -> Result<Box<dyn IoSourceExt<F> + 'a>> {
PollSource::new(f)
.map(|u| Box::new(u) as Box<dyn IoSourceExt<F>>)
.map_err(Error::Poll)
}
#[cfg(test)]
mod tests {
use std::fs::{File, OpenOptions};
use futures::pin_mut;
use super::*;
use crate::uring_mem::{MemRegion, VecIoWrapper};
use crate::Executor;
use std::os::unix::io::AsRawFd;
use std::rc::Rc;
#[test]
fn readvec() {
async fn go<F: AsRawFd + Unpin>(async_source: Box<dyn IoSourceExt<F>>) {
let v = vec![0x55u8; 32];
let v_ptr = v.as_ptr();
let ret = async_source.read_to_vec(0, v).await.unwrap();
assert_eq!(ret.0, 32);
let ret_v = ret.1;
assert_eq!(v_ptr, ret_v.as_ptr());
assert!(ret_v.iter().all(|&b| b == 0));
}
let f = File::open("/dev/zero").unwrap();
let uring_source = async_uring_from(f).unwrap();
let fut = go(uring_source);
pin_mut!(fut);
crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut))
.unwrap()
.run()
.unwrap();
let f = File::open("/dev/zero").unwrap();
let poll_source = async_poll_from(f).unwrap();
let fut = go(poll_source);
pin_mut!(fut);
crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut))
.unwrap()
.run()
.unwrap();
}
#[test]
fn writevec() {
async fn go<F: AsRawFd + Unpin>(async_source: Box<dyn IoSourceExt<F>>) {
let v = vec![0x55u8; 32];
let v_ptr = v.as_ptr();
let ret = async_source.write_from_vec(0, v).await.unwrap();
assert_eq!(ret.0, 32);
let ret_v = ret.1;
assert_eq!(v_ptr, ret_v.as_ptr());
}
let f = OpenOptions::new().write(true).open("/dev/null").unwrap();
let uring_source = async_uring_from(f).unwrap();
let fut = go(uring_source);
pin_mut!(fut);
crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut))
.unwrap()
.run()
.unwrap();
let f = OpenOptions::new().write(true).open("/dev/null").unwrap();
let poll_source = async_poll_from(f).unwrap();
let fut = go(poll_source);
pin_mut!(fut);
crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut))
.unwrap()
.run()
.unwrap();
}
#[test]
fn readmem() {
async fn go<F: AsRawFd + Unpin>(async_source: Box<dyn IoSourceExt<F>>) {
let mem = Rc::new(VecIoWrapper::from(vec![0x55u8; 8192]));
let ret = async_source
.read_to_mem(
0,
Rc::<VecIoWrapper>::clone(&mem),
&[
MemRegion { offset: 0, len: 32 },
MemRegion {
offset: 200,
len: 56,
},
],
)
.await
.unwrap();
assert_eq!(ret, 32 + 56);
let vec: Vec<u8> = match Rc::try_unwrap(mem) {
Ok(v) => v.into(),
Err(_) => panic!("Too many vec refs"),
};
assert!(vec.iter().take(32).all(|&b| b == 0));
assert!(vec.iter().skip(32).take(168).all(|&b| b == 0x55));
assert!(vec.iter().skip(200).take(56).all(|&b| b == 0));
assert!(vec.iter().skip(256).all(|&b| b == 0x55));
}
let f = File::open("/dev/zero").unwrap();
let uring_source = async_uring_from(f).unwrap();
let fut = go(uring_source);
pin_mut!(fut);
crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut))
.unwrap()
.run()
.unwrap();
let f = File::open("/dev/zero").unwrap();
let poll_source = async_poll_from(f).unwrap();
let fut = go(poll_source);
pin_mut!(fut);
crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut))
.unwrap()
.run()
.unwrap();
}
#[test]
fn writemem() {
async fn go<F: AsRawFd + Unpin>(async_source: Box<dyn IoSourceExt<F>>) {
let mem = Rc::new(VecIoWrapper::from(vec![0x55u8; 8192]));
let ret = async_source
.write_from_mem(
0,
Rc::<VecIoWrapper>::clone(&mem),
&[MemRegion { offset: 0, len: 32 }],
)
.await
.unwrap();
assert_eq!(ret, 32);
}
let f = OpenOptions::new().write(true).open("/dev/null").unwrap();
let uring_source = async_uring_from(f).unwrap();
let fut = go(uring_source);
pin_mut!(fut);
crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut))
.unwrap()
.run()
.unwrap();
let f = OpenOptions::new().write(true).open("/dev/null").unwrap();
let poll_source = async_poll_from(f).unwrap();
let fut = go(poll_source);
pin_mut!(fut);
crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut))
.unwrap()
.run()
.unwrap();
}
#[test]
fn read_u64s() {
async fn go(async_source: File) -> u64 {
let source = async_from(async_source).unwrap();
source.read_u64().await.unwrap()
}
let f = File::open("/dev/zero").unwrap();
let fut = go(f);
pin_mut!(fut);
let val = crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut))
.unwrap()
.run()
.unwrap();
assert_eq!(val, 0);
}
#[test]
fn read_eventfds() {
use sys_util::EventFd;
async fn go<F: AsRawFd + Unpin>(source: Box<dyn IoSourceExt<F>>) -> u64 {
source.read_u64().await.unwrap()
}
let eventfd = EventFd::new().unwrap();
eventfd.write(0x55).unwrap();
let fut = go(async_uring_from(eventfd).unwrap());
pin_mut!(fut);
let val = crate::uring_executor::URingExecutor::new(crate::RunOne::new(fut))
.unwrap()
.run()
.unwrap();
assert_eq!(val, 0x55);
let eventfd = EventFd::new().unwrap();
eventfd.write(0xaa).unwrap();
let fut = go(async_poll_from(eventfd).unwrap());
pin_mut!(fut);
let val = crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut))
.unwrap()
.run()
.unwrap();
assert_eq!(val, 0xaa);
}
#[test]
fn fsync() {
async fn go<F: AsRawFd + Unpin>(source: Box<dyn IoSourceExt<F>>) {
let v = vec![0x55u8; 32];
let v_ptr = v.as_ptr();
let ret = source.write_from_vec(0, v).await.unwrap();
assert_eq!(ret.0, 32);
let ret_v = ret.1;
assert_eq!(v_ptr, ret_v.as_ptr());
source.fsync().await.unwrap();
}
let f = tempfile::tempfile().unwrap();
let source = async_from(f).unwrap();
let fut = go(source);
pin_mut!(fut);
crate::run_one(fut).unwrap();
}
}