blob: 10c48b8357b01518ba48b6135a35ad4c04a0faf6 [file] [log] [blame]
// Copyright 2020 The ChromiumOS Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! An Executor and future combinators based on operations that block on file descriptors.
//!
//! This crate is meant to be used with the `futures-rs` crate that provides further combinators
//! and utility functions to combine and manage futures. All futures will run until they block on a
//! file descriptor becoming readable or writable. Facilities are provided to register future
//! wakers based on such events.
//!
//! # Running top-level futures.
//!
//! Use helper functions based the desired behavior of your application.
//!
//! ## Completing one of several futures.
//!
//! If there are several top level tasks that should run until any one completes, use the "select"
//! family of executor constructors. These return an [`Executor`](trait.Executor.html) whose `run`
//! function will return when the first future completes. The uncompleted futures will also be
//! returned so they can be run further or otherwise cleaned up. These functions are inspired by
//! the `select_all` function from futures-rs, but built to be run inside an FD based executor and
//! to poll only when necessary. See the docs for [`select2`](fn.select2.html),
//! [`select3`](fn.select3.html), [`select4`](fn.select4.html), and [`select5`](fn.select5.html).
//!
//! ## Completing all of several futures.
//!
//! If there are several top level tasks that all need to be completed, use the "complete" family
//! of executor constructors. These return an [`Executor`](trait.Executor.html) whose `run`
//! function will return only once all the futures passed to it have completed. These functions are
//! inspired by the `join_all` function from futures-rs, but built to be run inside an FD based
//! executor and to poll only when necessary. See the docs for [`complete2`](fn.complete2.html),
//! [`complete3`](fn.complete3.html), [`complete4`](fn.complete4.html), and
//! [`complete5`](fn.complete5.html).
//!
//! # Implementing new FD-based futures.
//!
//! For URing implementations should provide an implementation of the `IoSource` trait.
//! For the FD executor, new futures can use the existing ability to poll a source to build async
//! functionality on top of.
//!
//! # Implementations
//!
//! Currently there are two paths for using the asynchronous IO. One uses a WaitContext and drives
//! futures based on the FDs signaling they are ready for the opteration. This method will exist so
//! long as kernels < 5.4 are supported.
//! The other method submits operations to io_uring and is signaled when they complete. This is more
//! efficient, but only supported on kernel 5.4+.
//! If `IoSource::new` is used to interface with async IO, then the correct backend will be chosen
//! automatically.
//!
//! # Examples
//!
//! See the docs for `IoSource` if support for kernels <5.4 is required. Focus on `UringSource` if
//! all systems have support for io_uring.
mod async_types;
pub mod audio_streams_async;
mod blocking;
mod complete;
mod event;
mod io_ext;
mod io_source;
pub mod mem;
mod queue;
mod select;
pub mod sync;
pub mod sys;
#[cfg(any(target_os = "android", target_os = "linux"))]
pub use sys::unix::uring_executor::is_uring_stable;
pub use sys::Executor;
pub use sys::ExecutorKind;
pub use sys::TaskHandle;
mod common_executor;
mod timer;
mod waker;
use std::future::Future;
use std::pin::Pin;
use std::task::Poll;
pub use async_types::*;
pub use base::Event;
#[cfg(any(target_os = "android", target_os = "linux"))]
pub use blocking::sys::unix::block_on::block_on;
pub use blocking::unblock;
pub use blocking::unblock_disarm;
pub use blocking::BlockingPool;
pub use blocking::CancellableBlockingPool;
pub use blocking::TimeoutAction;
pub use event::EventAsync;
#[cfg(windows)]
pub use futures::executor::block_on;
use futures::stream::FuturesUnordered;
pub use io_ext::AsyncWrapper;
pub use io_ext::Error as AsyncError;
pub use io_ext::IntoAsync;
pub use io_ext::Result as AsyncResult;
pub use io_source::AllocateMode;
pub use io_source::IoSource;
pub use mem::BackingMemory;
pub use mem::MemRegion;
pub use mem::MemRegionIter;
pub use mem::VecIoWrapper;
use remain::sorted;
pub use select::SelectResult;
use thiserror::Error as ThisError;
pub use timer::TimerAsync;
#[sorted]
#[derive(ThisError, Debug)]
pub enum Error {
/// Error from EventAsync
#[error("Failure in EventAsync: {0}")]
EventAsync(base::Error),
/// Error from the handle executor.
#[cfg(windows)]
#[error("Failure in the handle executor: {0}")]
HandleExecutor(sys::windows::handle_executor::Error),
#[error("IO error: {0}")]
Io(std::io::Error),
/// Error from the polled(FD) source, which includes error from the FD executor.
#[cfg(any(target_os = "android", target_os = "linux"))]
#[error("An error with a poll source: {0}")]
PollSource(sys::unix::poll_source::Error),
/// Error from Timer.
#[error("Failure in Timer: {0}")]
Timer(base::Error),
/// Error from TimerFd.
#[error("Failure in TimerAsync: {0}")]
TimerAsync(AsyncError),
/// Error from the uring executor.
#[cfg(any(target_os = "android", target_os = "linux"))]
#[error("Failure in the uring executor: {0}")]
URingExecutor(sys::unix::uring_executor::Error),
}
pub type Result<T> = std::result::Result<T, Error>;
/// Heterogeneous collection of `async_task:Task` that are running in a "detached" state.
///
/// We keep them around to ensure they are dropped before the executor they are running on.
pub(crate) struct DetachedTasks(FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>>);
impl DetachedTasks {
pub(crate) fn new() -> Self {
DetachedTasks(FuturesUnordered::new())
}
pub(crate) fn push<R: Send + 'static>(&self, task: async_task::Task<R>) {
// Convert to fallible, otherwise poll could panic if the `Runnable` is dropped early.
let task = task.fallible();
self.0.push(Box::pin(async {
let _ = task.await;
}));
}
/// Polls all the tasks, dropping any that complete.
pub(crate) fn poll(&mut self, cx: &mut std::task::Context) {
use futures::Stream;
while let Poll::Ready(Some(_)) = Pin::new(&mut self.0).poll_next(cx) {}
}
}
// Select helpers to run until any future completes.
/// Creates a combinator that runs the two given futures until one completes, returning a tuple
/// containing the result of the finished future and the still pending future.
///
/// # Example
///
/// ```
/// use cros_async::{SelectResult, select2, block_on};
/// use futures::future::pending;
/// use futures::pin_mut;
///
/// let first = async {5};
/// let second = async {let () = pending().await;};
/// pin_mut!(first);
/// pin_mut!(second);
/// match block_on(select2(first, second)) {
/// (SelectResult::Finished(5), SelectResult::Pending(_second)) => (),
/// _ => panic!("Select didn't return the first future"),
/// };
/// ```
pub async fn select2<F1: Future + Unpin, F2: Future + Unpin>(
f1: F1,
f2: F2,
) -> (SelectResult<F1>, SelectResult<F2>) {
select::Select2::new(f1, f2).await
}
/// Creates a combinator that runs the three given futures until one or more completes, returning a
/// tuple containing the result of the finished future(s) and the still pending future(s).
///
/// # Example
///
/// ```
/// use cros_async::{SelectResult, select3, block_on};
/// use futures::future::pending;
/// use futures::pin_mut;
///
/// let first = async {4};
/// let second = async {let () = pending().await;};
/// let third = async {5};
/// pin_mut!(first);
/// pin_mut!(second);
/// pin_mut!(third);
/// match block_on(select3(first, second, third)) {
/// (SelectResult::Finished(4),
/// SelectResult::Pending(_second),
/// SelectResult::Finished(5)) => (),
/// _ => panic!("Select didn't return the futures"),
/// };
/// ```
pub async fn select3<F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin>(
f1: F1,
f2: F2,
f3: F3,
) -> (SelectResult<F1>, SelectResult<F2>, SelectResult<F3>) {
select::Select3::new(f1, f2, f3).await
}
/// Creates a combinator that runs the four given futures until one or more completes, returning a
/// tuple containing the result of the finished future(s) and the still pending future(s).
///
/// # Example
///
/// ```
/// use cros_async::{SelectResult, select4, block_on};
/// use futures::future::pending;
/// use futures::pin_mut;
///
/// let first = async {4};
/// let second = async {let () = pending().await;};
/// let third = async {5};
/// let fourth = async {let () = pending().await;};
/// pin_mut!(first);
/// pin_mut!(second);
/// pin_mut!(third);
/// pin_mut!(fourth);
/// match block_on(select4(first, second, third, fourth)) {
/// (SelectResult::Finished(4), SelectResult::Pending(_second),
/// SelectResult::Finished(5), SelectResult::Pending(_fourth)) => (),
/// _ => panic!("Select didn't return the futures"),
/// };
/// ```
pub async fn select4<
F1: Future + Unpin,
F2: Future + Unpin,
F3: Future + Unpin,
F4: Future + Unpin,
>(
f1: F1,
f2: F2,
f3: F3,
f4: F4,
) -> (
SelectResult<F1>,
SelectResult<F2>,
SelectResult<F3>,
SelectResult<F4>,
) {
select::Select4::new(f1, f2, f3, f4).await
}
/// Creates a combinator that runs the five given futures until one or more completes, returning a
/// tuple containing the result of the finished future(s) and the still pending future(s).
///
/// # Example
///
/// ```
/// use cros_async::{SelectResult, select5, block_on};
/// use futures::future::pending;
/// use futures::pin_mut;
///
/// let first = async {4};
/// let second = async {let () = pending().await;};
/// let third = async {5};
/// let fourth = async {let () = pending().await;};
/// let fifth = async {6};
/// pin_mut!(first);
/// pin_mut!(second);
/// pin_mut!(third);
/// pin_mut!(fourth);
/// pin_mut!(fifth);
/// match block_on(select5(first, second, third, fourth, fifth)) {
/// (SelectResult::Finished(4), SelectResult::Pending(_second),
/// SelectResult::Finished(5), SelectResult::Pending(_fourth),
/// SelectResult::Finished(6)) => (),
/// _ => panic!("Select didn't return the futures"),
/// };
/// ```
pub async fn select5<
F1: Future + Unpin,
F2: Future + Unpin,
F3: Future + Unpin,
F4: Future + Unpin,
F5: Future + Unpin,
>(
f1: F1,
f2: F2,
f3: F3,
f4: F4,
f5: F5,
) -> (
SelectResult<F1>,
SelectResult<F2>,
SelectResult<F3>,
SelectResult<F4>,
SelectResult<F5>,
) {
select::Select5::new(f1, f2, f3, f4, f5).await
}
/// Creates a combinator that runs the six given futures until one or more completes, returning a
/// tuple containing the result of the finished future(s) and the still pending future(s).
///
/// # Example
///
/// ```
/// use cros_async::{SelectResult, select6, block_on};
/// use futures::future::pending;
/// use futures::pin_mut;
///
/// let first = async {1};
/// let second = async {let () = pending().await;};
/// let third = async {3};
/// let fourth = async {let () = pending().await;};
/// let fifth = async {5};
/// let sixth = async {6};
/// pin_mut!(first);
/// pin_mut!(second);
/// pin_mut!(third);
/// pin_mut!(fourth);
/// pin_mut!(fifth);
/// pin_mut!(sixth);
/// match block_on(select6(first, second, third, fourth, fifth, sixth)) {
/// (SelectResult::Finished(1), SelectResult::Pending(_second),
/// SelectResult::Finished(3), SelectResult::Pending(_fourth),
/// SelectResult::Finished(5), SelectResult::Finished(6)) => (),
/// _ => panic!("Select didn't return the futures"),
/// };
/// ```
pub async fn select6<
F1: Future + Unpin,
F2: Future + Unpin,
F3: Future + Unpin,
F4: Future + Unpin,
F5: Future + Unpin,
F6: Future + Unpin,
>(
f1: F1,
f2: F2,
f3: F3,
f4: F4,
f5: F5,
f6: F6,
) -> (
SelectResult<F1>,
SelectResult<F2>,
SelectResult<F3>,
SelectResult<F4>,
SelectResult<F5>,
SelectResult<F6>,
) {
select::Select6::new(f1, f2, f3, f4, f5, f6).await
}
pub async fn select7<
F1: Future + Unpin,
F2: Future + Unpin,
F3: Future + Unpin,
F4: Future + Unpin,
F5: Future + Unpin,
F6: Future + Unpin,
F7: Future + Unpin,
>(
f1: F1,
f2: F2,
f3: F3,
f4: F4,
f5: F5,
f6: F6,
f7: F7,
) -> (
SelectResult<F1>,
SelectResult<F2>,
SelectResult<F3>,
SelectResult<F4>,
SelectResult<F5>,
SelectResult<F6>,
SelectResult<F7>,
) {
select::Select7::new(f1, f2, f3, f4, f5, f6, f7).await
}
pub async fn select8<
F1: Future + Unpin,
F2: Future + Unpin,
F3: Future + Unpin,
F4: Future + Unpin,
F5: Future + Unpin,
F6: Future + Unpin,
F7: Future + Unpin,
F8: Future + Unpin,
>(
f1: F1,
f2: F2,
f3: F3,
f4: F4,
f5: F5,
f6: F6,
f7: F7,
f8: F8,
) -> (
SelectResult<F1>,
SelectResult<F2>,
SelectResult<F3>,
SelectResult<F4>,
SelectResult<F5>,
SelectResult<F6>,
SelectResult<F7>,
SelectResult<F8>,
) {
select::Select8::new(f1, f2, f3, f4, f5, f6, f7, f8).await
}
pub async fn select9<
F1: Future + Unpin,
F2: Future + Unpin,
F3: Future + Unpin,
F4: Future + Unpin,
F5: Future + Unpin,
F6: Future + Unpin,
F7: Future + Unpin,
F8: Future + Unpin,
F9: Future + Unpin,
>(
f1: F1,
f2: F2,
f3: F3,
f4: F4,
f5: F5,
f6: F6,
f7: F7,
f8: F8,
f9: F9,
) -> (
SelectResult<F1>,
SelectResult<F2>,
SelectResult<F3>,
SelectResult<F4>,
SelectResult<F5>,
SelectResult<F6>,
SelectResult<F7>,
SelectResult<F8>,
SelectResult<F9>,
) {
select::Select9::new(f1, f2, f3, f4, f5, f6, f7, f8, f9).await
}
pub async fn select10<
F1: Future + Unpin,
F2: Future + Unpin,
F3: Future + Unpin,
F4: Future + Unpin,
F5: Future + Unpin,
F6: Future + Unpin,
F7: Future + Unpin,
F8: Future + Unpin,
F9: Future + Unpin,
F10: Future + Unpin,
>(
f1: F1,
f2: F2,
f3: F3,
f4: F4,
f5: F5,
f6: F6,
f7: F7,
f8: F8,
f9: F9,
f10: F10,
) -> (
SelectResult<F1>,
SelectResult<F2>,
SelectResult<F3>,
SelectResult<F4>,
SelectResult<F5>,
SelectResult<F6>,
SelectResult<F7>,
SelectResult<F8>,
SelectResult<F9>,
SelectResult<F10>,
) {
select::Select10::new(f1, f2, f3, f4, f5, f6, f7, f8, f9, f10).await
}
pub async fn select11<
F1: Future + Unpin,
F2: Future + Unpin,
F3: Future + Unpin,
F4: Future + Unpin,
F5: Future + Unpin,
F6: Future + Unpin,
F7: Future + Unpin,
F8: Future + Unpin,
F9: Future + Unpin,
F10: Future + Unpin,
F11: Future + Unpin,
>(
f1: F1,
f2: F2,
f3: F3,
f4: F4,
f5: F5,
f6: F6,
f7: F7,
f8: F8,
f9: F9,
f10: F10,
f11: F11,
) -> (
SelectResult<F1>,
SelectResult<F2>,
SelectResult<F3>,
SelectResult<F4>,
SelectResult<F5>,
SelectResult<F6>,
SelectResult<F7>,
SelectResult<F8>,
SelectResult<F9>,
SelectResult<F10>,
SelectResult<F11>,
) {
select::Select11::new(f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11).await
}
pub async fn select12<
F1: Future + Unpin,
F2: Future + Unpin,
F3: Future + Unpin,
F4: Future + Unpin,
F5: Future + Unpin,
F6: Future + Unpin,
F7: Future + Unpin,
F8: Future + Unpin,
F9: Future + Unpin,
F10: Future + Unpin,
F11: Future + Unpin,
F12: Future + Unpin,
>(
f1: F1,
f2: F2,
f3: F3,
f4: F4,
f5: F5,
f6: F6,
f7: F7,
f8: F8,
f9: F9,
f10: F10,
f11: F11,
f12: F12,
) -> (
SelectResult<F1>,
SelectResult<F2>,
SelectResult<F3>,
SelectResult<F4>,
SelectResult<F5>,
SelectResult<F6>,
SelectResult<F7>,
SelectResult<F8>,
SelectResult<F9>,
SelectResult<F10>,
SelectResult<F11>,
SelectResult<F12>,
) {
select::Select12::new(f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11, f12).await
}
// Combination helpers to run until all futures are complete.
/// Creates a combinator that runs the two given futures to completion, returning a tuple of the
/// outputs each yields.
///
/// # Example
///
/// ```
/// use cros_async::{complete2, block_on};
///
/// let first = async {5};
/// let second = async {6};
/// assert_eq!(block_on(complete2(first, second)), (5,6));
/// ```
pub async fn complete2<F1, F2>(f1: F1, f2: F2) -> (F1::Output, F2::Output)
where
F1: Future,
F2: Future,
{
complete::Complete2::new(f1, f2).await
}
/// Creates a combinator that runs the three given futures to completion, returning a tuple of the
/// outputs each yields.
///
/// # Example
///
/// ```
/// use cros_async::{complete3, block_on};
///
/// let first = async {5};
/// let second = async {6};
/// let third = async {7};
/// assert_eq!(block_on(complete3(first, second, third)), (5,6,7));
/// ```
pub async fn complete3<F1, F2, F3>(f1: F1, f2: F2, f3: F3) -> (F1::Output, F2::Output, F3::Output)
where
F1: Future,
F2: Future,
F3: Future,
{
complete::Complete3::new(f1, f2, f3).await
}
/// Creates a combinator that runs the four given futures to completion, returning a tuple of the
/// outputs each yields.
///
/// # Example
///
/// ```
/// use cros_async::{complete4, block_on};
///
/// let first = async {5};
/// let second = async {6};
/// let third = async {7};
/// let fourth = async {8};
/// assert_eq!(block_on(complete4(first, second, third, fourth)), (5,6,7,8));
/// ```
pub async fn complete4<F1, F2, F3, F4>(
f1: F1,
f2: F2,
f3: F3,
f4: F4,
) -> (F1::Output, F2::Output, F3::Output, F4::Output)
where
F1: Future,
F2: Future,
F3: Future,
F4: Future,
{
complete::Complete4::new(f1, f2, f3, f4).await
}
/// Creates a combinator that runs the five given futures to completion, returning a tuple of the
/// outputs each yields.
///
/// # Example
///
/// ```
/// use cros_async::{complete5, block_on};
///
/// let first = async {5};
/// let second = async {6};
/// let third = async {7};
/// let fourth = async {8};
/// let fifth = async {9};
/// assert_eq!(block_on(complete5(first, second, third, fourth, fifth)),
/// (5,6,7,8,9));
/// ```
pub async fn complete5<F1, F2, F3, F4, F5>(
f1: F1,
f2: F2,
f3: F3,
f4: F4,
f5: F5,
) -> (F1::Output, F2::Output, F3::Output, F4::Output, F5::Output)
where
F1: Future,
F2: Future,
F3: Future,
F4: Future,
F5: Future,
{
complete::Complete5::new(f1, f2, f3, f4, f5).await
}