blob: 122357768a6f45e93cb6cac753641eceb9c6faa7 [file] [log] [blame]
// Copyright 2020 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::convert::TryInto;
use std::io;
use std::ops::{Deref, DerefMut};
use std::os::unix::io::AsRawFd;
use std::sync::Arc;
use async_trait::async_trait;
use crate::mem::{BackingMemory, MemRegion, VecIoWrapper};
use crate::uring_executor::{Error, RegisteredSource, Result, URingExecutor};
use crate::AsyncError;
use crate::AsyncResult;
/// `UringSource` wraps FD backed IO sources for use with io_uring. It is a thin wrapper around
/// registering an IO source with the uring that provides an `IoSource` implementation.
/// Most useful functions are provided by 'IoSourceExt'.
pub struct UringSource<F: AsRawFd> {
registered_source: RegisteredSource,
source: F,
}
impl<F: AsRawFd> UringSource<F> {
/// Creates a new `UringSource` that wraps the given `io_source` object.
pub fn new(io_source: F, ex: &URingExecutor) -> Result<UringSource<F>> {
let r = ex.register_source(&io_source)?;
Ok(UringSource {
registered_source: r,
source: io_source,
})
}
/// Consume `self` and return the object used to create it.
pub fn into_source(self) -> F {
self.source
}
}
#[async_trait(?Send)]
impl<F: AsRawFd> crate::ReadAsync for UringSource<F> {
/// Reads from the iosource at `file_offset` and fill the given `vec`.
async fn read_to_vec<'a>(
&'a self,
file_offset: Option<u64>,
vec: Vec<u8>,
) -> AsyncResult<(usize, Vec<u8>)> {
let buf = Arc::new(VecIoWrapper::from(vec));
let op = self.registered_source.start_read_to_mem(
file_offset.unwrap_or(0),
buf.clone(),
&[MemRegion {
offset: 0,
len: buf.len(),
}],
)?;
let len = op.await?;
let bytes = if let Ok(v) = Arc::try_unwrap(buf) {
v.into()
} else {
panic!("too many refs on buf");
};
Ok((len as usize, bytes))
}
/// Wait for the FD of `self` to be readable.
async fn wait_readable(&self) -> AsyncResult<()> {
let op = self.registered_source.poll_fd_readable()?;
op.await?;
Ok(())
}
/// Reads a single u64 (e.g. from an eventfd).
async fn read_u64(&self) -> AsyncResult<u64> {
// This doesn't just forward to read_to_vec to avoid an unnecessary extra allocation from
// async-trait.
let buf = Arc::new(VecIoWrapper::from(0u64.to_ne_bytes().to_vec()));
let op = self.registered_source.start_read_to_mem(
0,
buf.clone(),
&[MemRegion {
offset: 0,
len: buf.len(),
}],
)?;
let len = op.await?;
if len != buf.len() as u32 {
Err(AsyncError::Uring(Error::Io(io::Error::new(
io::ErrorKind::Other,
format!("expected to read {} bytes, but read {}", buf.len(), len),
))))
} else {
let bytes: Vec<u8> = if let Ok(v) = Arc::try_unwrap(buf) {
v.into()
} else {
panic!("too many refs on buf");
};
// Will never panic because bytes is of the appropriate size.
Ok(u64::from_ne_bytes(bytes[..].try_into().unwrap()))
}
}
/// Reads to the given `mem` at the given offsets from the file starting at `file_offset`.
async fn read_to_mem<'a>(
&'a self,
file_offset: Option<u64>,
mem: Arc<dyn BackingMemory + Send + Sync>,
mem_offsets: &'a [MemRegion],
) -> AsyncResult<usize> {
let op =
self.registered_source
.start_read_to_mem(file_offset.unwrap_or(0), mem, mem_offsets)?;
let len = op.await?;
Ok(len as usize)
}
}
#[async_trait(?Send)]
impl<F: AsRawFd> crate::WriteAsync for UringSource<F> {
/// Writes from the given `vec` to the file starting at `file_offset`.
async fn write_from_vec<'a>(
&'a self,
file_offset: Option<u64>,
vec: Vec<u8>,
) -> AsyncResult<(usize, Vec<u8>)> {
let buf = Arc::new(VecIoWrapper::from(vec));
let op = self.registered_source.start_write_from_mem(
file_offset.unwrap_or(0),
buf.clone(),
&[MemRegion {
offset: 0,
len: buf.len(),
}],
)?;
let len = op.await?;
let bytes = if let Ok(v) = Arc::try_unwrap(buf) {
v.into()
} else {
panic!("too many refs on buf");
};
Ok((len as usize, bytes))
}
/// Writes from the given `mem` from the given offsets to the file starting at `file_offset`.
async fn write_from_mem<'a>(
&'a self,
file_offset: Option<u64>,
mem: Arc<dyn BackingMemory + Send + Sync>,
mem_offsets: &'a [MemRegion],
) -> AsyncResult<usize> {
let op = self.registered_source.start_write_from_mem(
file_offset.unwrap_or(0),
mem,
mem_offsets,
)?;
let len = op.await?;
Ok(len as usize)
}
/// See `fallocate(2)`. Note this op is synchronous when using the Polled backend.
async fn fallocate(&self, file_offset: u64, len: u64, mode: u32) -> AsyncResult<()> {
let op = self
.registered_source
.start_fallocate(file_offset, len, mode)?;
let _ = op.await?;
Ok(())
}
/// Sync all completed write operations to the backing storage.
async fn fsync(&self) -> AsyncResult<()> {
let op = self.registered_source.start_fsync()?;
let _ = op.await?;
Ok(())
}
}
#[async_trait(?Send)]
impl<F: AsRawFd> crate::IoSourceExt<F> for UringSource<F> {
/// Yields the underlying IO source.
fn into_source(self: Box<Self>) -> F {
self.source
}
/// Provides a mutable ref to the underlying IO source.
fn as_source(&self) -> &F {
&self.source
}
/// Provides a ref to the underlying IO source.
fn as_source_mut(&mut self) -> &mut F {
&mut self.source
}
}
impl<F: AsRawFd> Deref for UringSource<F> {
type Target = F;
fn deref(&self) -> &Self::Target {
&self.source
}
}
impl<F: AsRawFd> DerefMut for UringSource<F> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.source
}
}
#[cfg(test)]
mod tests {
use std::fs::{File, OpenOptions};
use std::os::unix::io::AsRawFd;
use std::path::PathBuf;
use crate::io_ext::{ReadAsync, WriteAsync};
use crate::uring_executor::use_uring;
use crate::UringSource;
use super::*;
#[test]
fn read_to_mem() {
if !use_uring() {
return;
}
use crate::mem::VecIoWrapper;
use std::io::Write;
use tempfile::tempfile;
let ex = URingExecutor::new().unwrap();
// Use guest memory as a test file, it implements AsRawFd.
let mut source = tempfile().unwrap();
let data = vec![0x55; 8192];
source.write(&data).unwrap();
let io_obj = UringSource::new(source, &ex).unwrap();
// Start with memory filled with 0x44s.
let buf: Arc<VecIoWrapper> = Arc::new(VecIoWrapper::from(vec![0x44; 8192]));
let fut = io_obj.read_to_mem(
None,
Arc::<VecIoWrapper>::clone(&buf),
&[MemRegion {
offset: 0,
len: 8192,
}],
);
assert_eq!(8192, ex.run_until(fut).unwrap().unwrap());
let vec: Vec<u8> = match Arc::try_unwrap(buf) {
Ok(v) => v.into(),
Err(_) => panic!("Too many vec refs"),
};
assert!(vec.iter().all(|&b| b == 0x55));
}
#[test]
fn readvec() {
if !use_uring() {
return;
}
async fn go(ex: &URingExecutor) {
let f = File::open("/dev/zero").unwrap();
let source = UringSource::new(f, ex).unwrap();
let v = vec![0x55u8; 32];
let v_ptr = v.as_ptr();
let ret = source.read_to_vec(None, v).await.unwrap();
assert_eq!(ret.0, 32);
let ret_v = ret.1;
assert_eq!(v_ptr, ret_v.as_ptr());
assert!(ret_v.iter().all(|&b| b == 0));
}
let ex = URingExecutor::new().unwrap();
ex.run_until(go(&ex)).unwrap();
}
#[test]
fn readmulti() {
if !use_uring() {
return;
}
async fn go(ex: &URingExecutor) {
let f = File::open("/dev/zero").unwrap();
let source = UringSource::new(f, ex).unwrap();
let v = vec![0x55u8; 32];
let v2 = vec![0x55u8; 32];
let (ret, ret2) = futures::future::join(
source.read_to_vec(None, v),
source.read_to_vec(Some(32), v2),
)
.await;
assert!(ret.unwrap().1.iter().all(|&b| b == 0));
assert!(ret2.unwrap().1.iter().all(|&b| b == 0));
}
let ex = URingExecutor::new().unwrap();
ex.run_until(go(&ex)).unwrap();
}
async fn read_u64<T: AsRawFd>(source: &UringSource<T>) -> u64 {
// Init a vec that translates to u64::max;
let u64_mem = vec![0xffu8; std::mem::size_of::<u64>()];
let (ret, u64_mem) = source.read_to_vec(None, u64_mem).await.unwrap();
assert_eq!(ret as usize, std::mem::size_of::<u64>());
let mut val = 0u64.to_ne_bytes();
val.copy_from_slice(&u64_mem);
u64::from_ne_bytes(val)
}
#[test]
fn u64_from_file() {
if !use_uring() {
return;
}
let f = File::open("/dev/zero").unwrap();
let ex = URingExecutor::new().unwrap();
let source = UringSource::new(f, &ex).unwrap();
assert_eq!(0u64, ex.run_until(read_u64(&source)).unwrap());
}
#[test]
fn event() {
if !use_uring() {
return;
}
use sys_util::EventFd;
async fn write_event(ev: EventFd, wait: EventFd, ex: &URingExecutor) {
let wait = UringSource::new(wait, ex).unwrap();
ev.write(55).unwrap();
read_u64(&wait).await;
ev.write(66).unwrap();
read_u64(&wait).await;
ev.write(77).unwrap();
read_u64(&wait).await;
}
async fn read_events(ev: EventFd, signal: EventFd, ex: &URingExecutor) {
let source = UringSource::new(ev, ex).unwrap();
assert_eq!(read_u64(&source).await, 55);
signal.write(1).unwrap();
assert_eq!(read_u64(&source).await, 66);
signal.write(1).unwrap();
assert_eq!(read_u64(&source).await, 77);
signal.write(1).unwrap();
}
let event = EventFd::new().unwrap();
let signal_wait = EventFd::new().unwrap();
let ex = URingExecutor::new().unwrap();
let write_task = write_event(
event.try_clone().unwrap(),
signal_wait.try_clone().unwrap(),
&ex,
);
let read_task = read_events(event, signal_wait, &ex);
ex.run_until(futures::future::join(read_task, write_task))
.unwrap();
}
#[test]
fn pend_on_pipe() {
if !use_uring() {
return;
}
use std::io::Write;
use futures::future::Either;
async fn do_test(ex: &URingExecutor) {
let (read_source, mut w) = sys_util::pipe(true).unwrap();
let source = UringSource::new(read_source, ex).unwrap();
let done = Box::pin(async { 5usize });
let pending = Box::pin(read_u64(&source));
match futures::future::select(pending, done).await {
Either::Right((5, pending)) => {
// Write to the pipe so that the kernel will release the memory associated with
// the uring read operation.
w.write(&[0]).expect("failed to write to pipe");
::std::mem::drop(pending);
}
_ => panic!("unexpected select result"),
};
}
let ex = URingExecutor::new().unwrap();
ex.run_until(do_test(&ex)).unwrap();
}
#[test]
fn readmem() {
if !use_uring() {
return;
}
async fn go(ex: &URingExecutor) {
let f = File::open("/dev/zero").unwrap();
let source = UringSource::new(f, ex).unwrap();
let v = vec![0x55u8; 64];
let vw = Arc::new(VecIoWrapper::from(v));
let ret = source
.read_to_mem(
None,
Arc::<VecIoWrapper>::clone(&vw),
&[MemRegion { offset: 0, len: 32 }],
)
.await
.unwrap();
assert_eq!(32, ret);
let vec: Vec<u8> = match Arc::try_unwrap(vw) {
Ok(v) => v.into(),
Err(_) => panic!("Too many vec refs"),
};
assert!(vec.iter().take(32).all(|&b| b == 0));
assert!(vec.iter().skip(32).all(|&b| b == 0x55));
// test second half of memory too.
let v = vec![0x55u8; 64];
let vw = Arc::new(VecIoWrapper::from(v));
let ret = source
.read_to_mem(
None,
Arc::<VecIoWrapper>::clone(&vw),
&[MemRegion {
offset: 32,
len: 32,
}],
)
.await
.unwrap();
assert_eq!(32, ret);
let v: Vec<u8> = match Arc::try_unwrap(vw) {
Ok(v) => v.into(),
Err(_) => panic!("Too many vec refs"),
};
assert!(v.iter().take(32).all(|&b| b == 0x55));
assert!(v.iter().skip(32).all(|&b| b == 0));
}
let ex = URingExecutor::new().unwrap();
ex.run_until(go(&ex)).unwrap();
}
#[test]
fn range_error() {
if !use_uring() {
return;
}
async fn go(ex: &URingExecutor) {
let f = File::open("/dev/zero").unwrap();
let source = UringSource::new(f, ex).unwrap();
let v = vec![0x55u8; 64];
let vw = Arc::new(VecIoWrapper::from(v));
let ret = source
.read_to_mem(
None,
Arc::<VecIoWrapper>::clone(&vw),
&[MemRegion {
offset: 32,
len: 33,
}],
)
.await;
assert!(ret.is_err());
}
let ex = URingExecutor::new().unwrap();
ex.run_until(go(&ex)).unwrap();
}
#[test]
fn fallocate() {
if !use_uring() {
return;
}
async fn go(ex: &URingExecutor) {
let dir = tempfile::TempDir::new().unwrap();
let mut file_path = PathBuf::from(dir.path());
file_path.push("test");
let f = OpenOptions::new()
.create(true)
.write(true)
.open(&file_path)
.unwrap();
let source = UringSource::new(f, &ex).unwrap();
if let Err(e) = source.fallocate(0, 4096, 0).await {
match e {
crate::io_ext::Error::Uring(crate::uring_executor::Error::Io(io_err)) => {
if io_err.kind() == std::io::ErrorKind::InvalidInput {
// Skip the test on kernels before fallocate support.
return;
}
}
_ => panic!("Unexpected uring error on fallocate: {}", e),
}
}
let meta_data = std::fs::metadata(&file_path).unwrap();
assert_eq!(meta_data.len(), 4096);
}
let ex = URingExecutor::new().unwrap();
ex.run_until(go(&ex)).unwrap();
}
#[test]
fn fsync() {
if !use_uring() {
return;
}
async fn go(ex: &URingExecutor) {
let f = tempfile::tempfile().unwrap();
let source = UringSource::new(f, ex).unwrap();
source.fsync().await.unwrap();
}
let ex = URingExecutor::new().unwrap();
ex.run_until(go(&ex)).unwrap();
}
#[test]
fn wait_read() {
if !use_uring() {
return;
}
async fn go(ex: &URingExecutor) {
let f = File::open("/dev/zero").unwrap();
let source = UringSource::new(f, ex).unwrap();
source.wait_readable().await.unwrap();
}
let ex = URingExecutor::new().unwrap();
ex.run_until(go(&ex)).unwrap();
}
#[test]
fn writemem() {
if !use_uring() {
return;
}
async fn go(ex: &URingExecutor) {
let f = OpenOptions::new()
.create(true)
.write(true)
.open("/tmp/write_from_vec")
.unwrap();
let source = UringSource::new(f, ex).unwrap();
let v = vec![0x55u8; 64];
let vw = Arc::new(crate::mem::VecIoWrapper::from(v));
let ret = source
.write_from_mem(None, vw, &[MemRegion { offset: 0, len: 32 }])
.await
.unwrap();
assert_eq!(32, ret);
}
let ex = URingExecutor::new().unwrap();
ex.run_until(go(&ex)).unwrap();
}
#[test]
fn writevec() {
if !use_uring() {
return;
}
async fn go(ex: &URingExecutor) {
let f = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open("/tmp/write_from_vec")
.unwrap();
let source = UringSource::new(f, ex).unwrap();
let v = vec![0x55u8; 32];
let v_ptr = v.as_ptr();
let (ret, ret_v) = source.write_from_vec(None, v).await.unwrap();
assert_eq!(32, ret);
assert_eq!(v_ptr, ret_v.as_ptr());
}
let ex = URingExecutor::new().unwrap();
ex.run_until(go(&ex)).unwrap();
}
#[test]
fn writemulti() {
if !use_uring() {
return;
}
async fn go(ex: &URingExecutor) {
let f = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open("/tmp/write_from_vec")
.unwrap();
let source = UringSource::new(f, ex).unwrap();
let v = vec![0x55u8; 32];
let v2 = vec![0x55u8; 32];
let (r, r2) = futures::future::join(
source.write_from_vec(None, v),
source.write_from_vec(Some(32), v2),
)
.await;
assert_eq!(32, r.unwrap().0);
assert_eq!(32, r2.unwrap().0);
}
let ex = URingExecutor::new().unwrap();
ex.run_until(go(&ex)).unwrap();
}
}