blob: f100368669ba88179ffb69dff32654b1ecadc6b0 [file] [log] [blame]
// Copyright 2020 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! A wrapped IO source that uses FdExecutor to drive asynchronous completion. Used from
//! `IoSourceExt::new` when uring isn't available in the kernel.
use async_trait::async_trait;
use std::borrow::Borrow;
use std::future::Future;
use std::ops::{Deref, DerefMut};
use std::os::unix::io::{AsRawFd, RawFd};
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use libc::O_NONBLOCK;
use crate::fd_executor::{self, add_read_waker, add_write_waker, PendingWaker};
use crate::uring_mem::{BackingMemory, BorrowedIoVec, MemRegion};
use crate::AsyncError;
use crate::AsyncResult;
use crate::{IoSourceExt, ReadAsync, WriteAsync};
use sys_util::{self, add_fd_flags};
use thiserror::Error as ThisError;
#[derive(ThisError, Debug)]
pub enum Error {
/// An error occurred attempting to register a waker with the executor.
#[error("An error occurred attempting to register a waker with the executor: {0}.")]
AddingWaker(fd_executor::Error),
/// An error occurred when executing fallocate synchronously.
#[error("An error occurred when executing fallocate synchronously: {0}")]
Fallocate(sys_util::Error),
/// An error occurred when executing fsync synchronously.
#[error("An error occurred when executing fsync synchronously: {0}")]
Fsync(sys_util::Error),
/// An error occurred when reading the FD.
///
#[error("An error occurred when reading the FD: {0}.")]
Read(sys_util::Error),
/// Can't seek file.
#[error("An error occurred when seeking the FD: {0}.")]
Seeking(sys_util::Error),
/// An error occurred when setting the FD non-blocking.
#[error("An error occurred setting the FD non-blocking: {0}.")]
SettingNonBlocking(sys_util::Error),
/// An error occurred when writing the FD.
#[error("An error occurred when writing the FD: {0}.")]
Write(sys_util::Error),
}
pub type Result<T> = std::result::Result<T, Error>;
/// Async wrapper for an IO source that uses the FD executor to drive async operations.
/// Used by `IoSourceExt::new` when uring isn't available.
pub struct PollSource<F: AsRawFd> {
source: F,
}
impl<F: AsRawFd> PollSource<F> {
/// Create a new `PollSource` from the given IO source.
pub fn new(f: F) -> Result<Self> {
let fd = f.as_raw_fd();
add_fd_flags(fd, O_NONBLOCK).map_err(Error::SettingNonBlocking)?;
Ok(Self { source: f })
}
/// Read a u64 from the current offset. Avoid seeking Fds that don't support `pread`.
pub fn read_u64(&self) -> PollReadU64<'_, F> {
PollReadU64 {
reader: self,
pending_waker: None,
}
}
/// Return the inner source.
pub fn into_source(self) -> F {
self.source
}
}
impl<F: AsRawFd> Deref for PollSource<F> {
type Target = F;
fn deref(&self) -> &Self::Target {
&self.source
}
}
#[async_trait(?Send)]
impl<F: AsRawFd> ReadAsync for PollSource<F> {
/// Reads from the iosource at `file_offset` and fill the given `vec`.
async fn read_to_vec<'a>(
&'a self,
file_offset: u64,
vec: Vec<u8>,
) -> AsyncResult<(usize, Vec<u8>)> {
let fut = PollReadVec {
reader: self,
file_offset,
vec: Some(vec),
pending_waker: None,
};
fut.await
.map(|(n, vec)| (n as usize, vec))
.map_err(AsyncError::Poll)
}
/// Reads to the given `mem` at the given offsets from the file starting at `file_offset`.
async fn read_to_mem<'a>(
&'a self,
file_offset: u64,
mem: Rc<dyn BackingMemory>,
mem_offsets: &'a [MemRegion],
) -> AsyncResult<usize> {
let fut = PollReadMem {
reader: self,
file_offset,
mem,
mem_offsets,
pending_waker: None,
};
fut.await.map(|n| n as usize).map_err(AsyncError::Poll)
}
/// Wait for the FD of `self` to be readable.
async fn wait_readable(&self) -> AsyncResult<()> {
let fut = PollWaitReadable {
pollee: self,
pending_waker: None,
};
fut.await.map_err(AsyncError::Poll)
}
async fn read_u64(&self) -> AsyncResult<u64> {
PollSource::read_u64(self).await.map_err(AsyncError::Poll)
}
}
#[async_trait(?Send)]
impl<F: AsRawFd> WriteAsync for PollSource<F> {
/// Writes from the given `vec` to the file starting at `file_offset`.
async fn write_from_vec<'a>(
&'a self,
file_offset: u64,
vec: Vec<u8>,
) -> AsyncResult<(usize, Vec<u8>)> {
let fut = PollWriteVec {
writer: self,
file_offset,
vec: Some(vec),
pending_waker: None,
};
fut.await
.map(|(n, vec)| (n as usize, vec))
.map_err(AsyncError::Poll)
}
/// Writes from the given `mem` from the given offsets to the file starting at `file_offset`.
async fn write_from_mem<'a>(
&'a self,
file_offset: u64,
mem: Rc<dyn BackingMemory>,
mem_offsets: &'a [MemRegion],
) -> AsyncResult<usize> {
let fut = PollWriteMem {
writer: self,
file_offset,
mem,
mem_offsets,
pending_waker: None,
};
fut.await.map(|n| n as usize).map_err(AsyncError::Poll)
}
/// See `fallocate(2)` for details.
async fn fallocate(&self, file_offset: u64, len: u64, mode: u32) -> AsyncResult<()> {
let ret = unsafe {
libc::fallocate64(
self.source.as_raw_fd(),
mode as libc::c_int,
file_offset as libc::off64_t,
len as libc::off64_t,
)
};
if ret == 0 {
Ok(())
} else {
Err(AsyncError::Poll(Error::Fallocate(sys_util::Error::last())))
}
}
/// Sync all completed write operations to the backing storage.
async fn fsync(&self) -> AsyncResult<()> {
let ret = unsafe { libc::fsync(self.source.as_raw_fd()) };
if ret == 0 {
Ok(())
} else {
Err(AsyncError::Poll(Error::Fsync(sys_util::Error::last())))
}
}
}
#[async_trait(?Send)]
impl<F: AsRawFd> IoSourceExt<F> for PollSource<F> {
/// Yields the underlying IO source.
fn into_source(self: Box<Self>) -> F {
self.source
}
/// Provides a mutable ref to the underlying IO source.
fn as_source_mut(&mut self) -> &mut F {
&mut self.source
}
/// Provides a ref to the underlying IO source.
fn as_source(&self) -> &F {
&self.source
}
}
impl<F: AsRawFd> DerefMut for PollSource<F> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.source
}
}
pub struct PollReadVec<'a, F: AsRawFd> {
reader: &'a PollSource<F>,
file_offset: u64,
vec: Option<Vec<u8>>,
pending_waker: Option<PendingWaker>,
}
impl<'a, F: AsRawFd> Future for PollReadVec<'a, F> {
type Output = Result<(usize, Vec<u8>)>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
fn do_read(fd: RawFd, file_offset: u64, buf: &mut [u8]) -> sys_util::Result<usize> {
// Safe because we trust the kernel not to write past the length given and the length is
// guaranteed to be valid from the pointer by the mut slice.
let ret = unsafe {
libc::pread(
fd,
buf.as_mut_ptr() as *mut _,
buf.len(),
file_offset as libc::c_long,
)
};
match ret {
n if n >= 0 => Ok(n as usize),
_ => sys_util::errno_result(),
}
}
let file_offset = self.file_offset;
// unwrap is allowed because Futures can panic if polled again after returning Ready, and
// Ready is the only way for vec to be None.
let mut vec = self.vec.take().unwrap();
let source = &self.reader.source;
match do_read(source.as_raw_fd(), file_offset, &mut vec) {
Ok(v) => Poll::Ready(Ok((v as usize, vec))),
Err(err) => {
if err.errno() == libc::EWOULDBLOCK {
if self.pending_waker.is_some() {
// Already waiting.
self.vec = Some(vec);
Poll::Pending
} else {
match add_read_waker(source.as_raw_fd(), cx.waker().clone()) {
Ok(pending_waker) => {
self.pending_waker = Some(pending_waker);
self.vec = Some(vec);
Poll::Pending
}
Err(e) => Poll::Ready(Err(Error::AddingWaker(e))),
}
}
} else {
Poll::Ready(Err(Error::Read(err)))
}
}
}
}
}
pub struct PollReadU64<'a, F: AsRawFd> {
reader: &'a PollSource<F>,
pending_waker: Option<PendingWaker>,
}
impl<'a, F: AsRawFd> Future for PollReadU64<'a, F> {
type Output = Result<u64>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
fn do_read(fd: RawFd, buf: &mut [u8]) -> sys_util::Result<usize> {
// Safe because we trust the kernel not to write past the length given and the length is
// guaranteed to be valid from the pointer by the mut slice.
let ret = unsafe { libc::read(fd, buf.as_mut_ptr() as *mut _, buf.len()) };
match ret {
n if n >= 0 => Ok(n as usize),
_ => sys_util::errno_result(),
}
}
let source = &self.reader.source;
let mut bytes = 0u64.to_ne_bytes();
match do_read(source.as_raw_fd(), &mut bytes) {
Ok(_) => {
let val = u64::from_ne_bytes(bytes);
Poll::Ready(Ok(val))
}
Err(err) => {
if err.errno() == libc::EWOULDBLOCK {
if self.pending_waker.is_some() {
Poll::Pending
} else {
match add_read_waker(source.as_raw_fd(), cx.waker().clone()) {
Ok(pending_waker) => {
self.pending_waker = Some(pending_waker);
Poll::Pending
}
Err(e) => Poll::Ready(Err(Error::AddingWaker(e))),
}
}
} else {
Poll::Ready(Err(Error::Read(err)))
}
}
}
}
}
// Hack to tide over until io_uring is available everywhere. `PollWaitReadable` emulates requesting
// that uring notify when an FD is readable. This can also be removed if msg_on_socket is
// re-implemented in a way that doesn't depend on triggering when an FD is ready.
pub struct PollWaitReadable<'a, F: AsRawFd> {
pollee: &'a PollSource<F>,
pending_waker: Option<PendingWaker>,
}
impl<'a, F: AsRawFd> Future for PollWaitReadable<'a, F> {
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
fn do_poll(fd: RawFd) -> sys_util::Result<usize> {
let poll_arg = libc::pollfd {
fd,
events: sys_util::WatchingEvents::empty().set_read().get_raw() as i16,
revents: 0,
};
// Safe because poll doesn't touch memory.
let ret = unsafe { libc::poll(&poll_arg as *const _ as *mut _, 1, 0) };
match ret {
n if n >= 0 => Ok(n as usize),
_ => sys_util::errno_result(),
}
}
let source = &self.pollee.source;
match do_poll(source.as_raw_fd()) {
Ok(1) => Poll::Ready(Ok(())),
Ok(_) => {
if self.pending_waker.is_some() {
Poll::Pending
} else {
match add_read_waker(source.as_raw_fd(), cx.waker().clone()) {
Ok(pending_waker) => {
self.pending_waker = Some(pending_waker);
Poll::Pending
}
Err(e) => Poll::Ready(Err(Error::AddingWaker(e))),
}
}
}
Err(err) => Poll::Ready(Err(Error::Read(err))),
}
}
}
pub struct PollWriteVec<'a, F: AsRawFd> {
writer: &'a PollSource<F>,
file_offset: u64,
vec: Option<Vec<u8>>,
pending_waker: Option<PendingWaker>,
}
impl<'a, F: AsRawFd> Future for PollWriteVec<'a, F> {
type Output = Result<(usize, Vec<u8>)>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
fn do_write(fd: RawFd, file_offset: u64, buf: &[u8]) -> sys_util::Result<usize> {
// Safe because we trust the kernel not to write to the buffer, only read from it and
// the length is guaranteed to be valid from the pointer by the mut slice.
let ret = unsafe {
libc::pwrite(
fd,
buf.as_ptr() as *const _,
buf.len(),
file_offset as libc::c_long,
)
};
match ret {
n if n >= 0 => Ok(n as usize),
_ => sys_util::errno_result(),
}
}
let file_offset = self.file_offset;
// unwrap s allowed because Futures can panic if polled again after returning Ready, and
// Ready is the only way for vec to be None.
let vec = self.vec.take().unwrap();
let source = &self.writer.source;
match do_write(source.as_raw_fd(), file_offset, &vec) {
Ok(v) => Poll::Ready(Ok((v as usize, vec))),
Err(err) => {
if err.errno() == libc::EWOULDBLOCK {
if self.pending_waker.is_some() {
// Already waiting.
self.vec = Some(vec);
Poll::Pending
} else {
match add_write_waker(source.as_raw_fd(), cx.waker().clone()) {
Ok(pending_waker) => {
self.pending_waker = Some(pending_waker);
self.vec = Some(vec);
Poll::Pending
}
Err(e) => Poll::Ready(Err(Error::AddingWaker(e))),
}
}
} else {
Poll::Ready(Err(Error::Write(err)))
}
}
}
}
}
pub struct PollReadMem<'a, F: AsRawFd> {
reader: &'a PollSource<F>,
file_offset: u64,
mem: Rc<dyn BackingMemory>,
mem_offsets: &'a [MemRegion],
pending_waker: Option<PendingWaker>,
}
impl<'a, F: AsRawFd> Future for PollReadMem<'a, F> {
type Output = Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
fn do_readv(
fd: RawFd,
file_offset: u64,
mem: &dyn BackingMemory,
mem_offsets: &[MemRegion],
) -> sys_util::Result<usize> {
let mut iovecs = mem_offsets
.iter()
.filter_map(|&mem_vec| mem.get_iovec(mem_vec).ok())
.collect::<Vec<BorrowedIoVec>>();
// Safe because we trust the kernel not to write path the length given and the length is
// guaranteed to be valid from the pointer by io_slice_mut.
let ret = unsafe {
libc::preadv64(
fd,
iovecs.as_mut_ptr() as *mut _,
iovecs.len() as i32,
file_offset as libc::off64_t,
)
};
match ret {
n if n >= 0 => Ok(n as usize),
_ => sys_util::errno_result(),
}
}
let file_offset = self.file_offset;
let source = &self.reader.source;
match do_readv(
source.as_raw_fd(),
file_offset,
self.mem.borrow(),
self.mem_offsets,
) {
Ok(n) => Poll::Ready(Ok(n)),
Err(err) => {
if err.errno() == libc::EWOULDBLOCK {
if self.pending_waker.is_some() {
Poll::Pending
} else {
match add_read_waker(source.as_raw_fd(), cx.waker().clone()) {
Ok(pending_waker) => {
self.pending_waker = Some(pending_waker);
Poll::Pending
}
Err(e) => Poll::Ready(Err(Error::AddingWaker(e))),
}
}
} else {
Poll::Ready(Err(Error::Read(err)))
}
}
}
}
}
pub struct PollWriteMem<'a, F: AsRawFd> {
writer: &'a PollSource<F>,
file_offset: u64,
mem: Rc<dyn BackingMemory>,
mem_offsets: &'a [MemRegion],
pending_waker: Option<PendingWaker>,
}
impl<'a, F: AsRawFd> Future for PollWriteMem<'a, F> {
type Output = Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
fn do_writev(
fd: RawFd,
file_offset: u64,
mem: &dyn BackingMemory,
mem_offsets: &[MemRegion],
) -> sys_util::Result<usize> {
let iovecs = mem_offsets
.iter()
.map(|&mem_vec| mem.get_iovec(mem_vec))
.filter_map(|r| r.ok())
.collect::<Vec<BorrowedIoVec>>();
// Safe because we trust the kernel not to write path the length given and the length is
// guaranteed to be valid from the pointer by io_slice_mut.
let ret = unsafe {
libc::pwritev64(
fd,
iovecs.as_ptr() as *mut _,
iovecs.len() as i32,
file_offset as libc::off64_t,
)
};
match ret {
n if n >= 0 => Ok(n as usize),
_ => sys_util::errno_result(),
}
}
let file_offset = self.file_offset;
let source = &self.writer.source;
match do_writev(
source.as_raw_fd(),
file_offset,
self.mem.borrow(),
self.mem_offsets,
) {
Ok(n) => Poll::Ready(Ok(n)),
Err(err) => {
if err.errno() == libc::EWOULDBLOCK {
if self.pending_waker.is_some() {
Poll::Pending
} else {
match add_write_waker(source.as_raw_fd(), cx.waker().clone()) {
Ok(pending_waker) => {
self.pending_waker = Some(pending_waker);
Poll::Pending
}
Err(e) => Poll::Ready(Err(Error::AddingWaker(e))),
}
}
} else {
Poll::Ready(Err(Error::Write(err)))
}
}
}
}
}
#[cfg(test)]
mod tests {
use std::fs::{File, OpenOptions};
use std::path::PathBuf;
use futures::pin_mut;
use crate::executor::Executor;
use super::*;
#[test]
fn readvec() {
async fn go() {
let f = File::open("/dev/zero").unwrap();
let async_source = PollSource::new(f).unwrap();
let v = vec![0x55u8; 32];
let v_ptr = v.as_ptr();
let ret = async_source.read_to_vec(0, v).await.unwrap();
assert_eq!(ret.0, 32);
let ret_v = ret.1;
assert_eq!(v_ptr, ret_v.as_ptr());
assert!(ret_v.iter().all(|&b| b == 0));
}
let fut = go();
pin_mut!(fut);
crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut))
.unwrap()
.run()
.unwrap();
}
#[test]
fn writevec() {
async fn go() {
let f = OpenOptions::new().write(true).open("/dev/null").unwrap();
let async_source = PollSource::new(f).unwrap();
let v = vec![0x55u8; 32];
let v_ptr = v.as_ptr();
let ret = async_source.write_from_vec(0, v).await.unwrap();
assert_eq!(ret.0, 32);
let ret_v = ret.1;
assert_eq!(v_ptr, ret_v.as_ptr());
}
let fut = go();
pin_mut!(fut);
crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut))
.unwrap()
.run()
.unwrap();
}
#[test]
fn fallocate() {
async fn go() {
let dir = tempfile::TempDir::new().unwrap();
let mut file_path = PathBuf::from(dir.path());
file_path.push("test");
let f = OpenOptions::new()
.create(true)
.write(true)
.open(&file_path)
.unwrap();
let source = PollSource::new(f).unwrap();
source.fallocate(0, 4096, 0).await.unwrap();
let meta_data = std::fs::metadata(&file_path).unwrap();
assert_eq!(meta_data.len(), 4096);
}
let fut = go();
pin_mut!(fut);
crate::fd_executor::FdExecutor::new(crate::RunOne::new(fut))
.unwrap()
.run()
.unwrap();
}
}