// Copyright 2020 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! The executor runs all given futures to completion. Futures register wakers associated with file
//! descriptors. The wakers will be called when the FD becomes readable or writable depending on
//! the situation.
//! `FdExecutor` is meant to be used with the `futures-rs` crate that provides combinators and
//! utility functions to combine futures.
use std::cell::RefCell;
use std::collections::VecDeque;
use std::fmt::{self, Display};
use std::fs::File;
use std::future::Future;
use std::os::unix::io::FromRawFd;
use std::os::unix::io::RawFd;
use std::pin::Pin;
use std::task::Waker;
use slab::Slab;
use sys_util::{error, PollContext, WatchingEvents};
use crate::executor::{ExecutableFuture, Executor, FutureList};
use crate::WakerToken;
#[derive(Debug, PartialEq)]
pub enum Error {
/// Attempts to create two Executors on the same thread fail.
/// Failed to copy the FD for the polling context.
/// Failed accessing the thread local storage for wakers.
/// Creating a context to wait on FDs failed.
/// PollContext failure.
/// Failed to submit the waker to the polling context.
/// A Waker was canceled, but the operation isn't running.
pub type Result<T> = std::result::Result<T, Error>;
impl Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
use self::Error::*;
match self {
AttemptedDuplicateExecutor => write!(f, "Cannot have two executors on one thread."),
DuplicatingFd(e) => write!(f, "Failed to copy the FD for the polling context: {}", e),
InvalidContext => write!(
"Invalid context, was the Fd executor created successfully?"
CreatingContext(e) => write!(f, "An error creating the fd waiting context: {}.", e),
PollContextError(e) => write!(f, "PollContext failure: {}", e),
SubmittingWaker(e) => write!(f, "An error adding to the Aio context: {}.", e),
UnknownWaker => write!(f, "Unknown waker"),
// Temporary vectors of new additions to the executor.
// Tracks active wakers and the futures they are associated with.
thread_local!(static STATE: RefCell<Option<FdWakerState>> = RefCell::new(None));
fn add_waker(fd: RawFd, waker: Waker, events: WatchingEvents) -> Result<WakerToken> {
STATE.with(|state| {
let mut state = state.borrow_mut();
if let Some(state) = state.as_mut() {
state.add_waker(fd, waker, events)
} else {
/// A token returned from `add_waker` that can be used to cancel the waker before it completes.
/// Used to manage getting the result from the underlying executor for a completed operation.
/// Dropping a `PendingWaker` will get the result from the executor.
pub struct PendingWaker {
token: Option<WakerToken>,
impl PendingWaker {
pub(crate) fn new(token: WakerToken) -> PendingWaker {
PendingWaker { token: Some(token) }
impl Drop for PendingWaker {
fn drop(&mut self) {
if let Some(token) = self.token.take() {
let _ = cancel_waker(token);
/// Tells the waking system to wake `waker` when `fd` becomes readable.
/// The 'fd' must be fully owned by the future adding the waker, and must not be closed until the
/// next time the future is polled. If the fd is closed, there is a race where another FD can be
/// opened on top of it causing the next poll to access the new target file.
/// Returns a `PendingWaker` that can be used to poll for completion or cancel the waker before it
/// completes.
pub(crate) fn add_read_waker(fd: RawFd, waker: Waker) -> Result<PendingWaker> {
add_waker(fd, waker, WatchingEvents::empty().set_read()).map(PendingWaker::new)
/// Tells the waking system to wake `waker` when `fd` becomes writable.
/// The 'fd' must be fully owned by the future adding the waker, and must not be closed until the
/// next time the future is polled. If the fd is closed, there is a race where another FD can be
/// opened on top of it causing the next poll to access the new target file.
/// Returns a `PendingWaker` that can be used to poll for completion or cancel the waker before it
/// completes.
pub(crate) fn add_write_waker(fd: RawFd, waker: Waker) -> Result<PendingWaker> {
add_waker(fd, waker, WatchingEvents::empty().set_write()).map(PendingWaker::new)
/// Cancels the waker that returned the given token if the waker hasn't yet fired.
pub(crate) fn cancel_waker(token: WakerToken) -> Result<()> {
STATE.with(|state| {
let mut state = state.borrow_mut();
if let Some(state) = state.as_mut() {
} else {
/// Adds a new top level future to the Executor.
/// These futures must return `()`, indicating they are intended to create side-effects only.
pub(crate) fn add_future(future: Pin<Box<dyn Future<Output = ()>>>) -> Result<()> {
STATE.with(|state| {
let mut state = state.borrow_mut();
if let Some(state) = state.as_mut() {
} else {
// Tracks active wakers and associates wakers with the futures that registered them.
struct FdWakerState {
poll_ctx: PollContext<usize>,
tokens: Slab<(File, Option<Waker>)>,
new_futures: VecDeque<ExecutableFuture<()>>,
impl FdWakerState {
fn new() -> Result<Self> {
Ok(FdWakerState {
poll_ctx: PollContext::new().map_err(Error::CreatingContext)?,
tokens: Slab::with_capacity(64),
new_futures: VecDeque::new(),
// Adds an fd that, when signaled, will trigger the given waker.
fn add_waker(&mut self, fd: RawFd, waker: Waker, events: WatchingEvents) -> Result<WakerToken> {
let duped_fd = unsafe {
// Safe because duplicating an FD doesn't affect memory safety, and the dup'd FD
// will only be added to the poll loop.
let entry = self.tokens.vacant_entry();
let next_token = entry.key();
.add_fd_with_events(&duped_fd, events, next_token)
entry.insert((duped_fd, Some(waker)));
// Waits until one of the FDs is readable and wakes the associated waker.
fn wait_wake_event(&mut self) -> Result<()> {
let events = self.poll_ctx.wait().map_err(Error::PollContextError)?;
for e in events.iter() {
let token = e.token();
if let Some((fd, waker)) = self.tokens.get_mut(token) {
if let Some(waker) = waker.take() {
} else {
error!("Woken twice");
} else {
error!("Unknown waker");
// Remove the waker for the given token if it hasn't fired yet.
fn cancel_waker(&mut self, token: WakerToken) -> Result<()> {
let (fd, _waker) = self.tokens.remove(token.0);
/// Runs futures to completion on a single thread. Futures are allowed to block on file descriptors
/// only. Futures can only block on FDs becoming readable or writable. `FdExecutor` is meant to be
/// used where a poll or select loop would be used otherwise.
pub(crate) struct FdExecutor<T: FutureList> {
futures: T,
impl<T: FutureList> Executor for FdExecutor<T> {
type Output = Result<T::Output>;
fn run(&mut self) -> Self::Output {
loop {
if let Some(output) = self.futures.poll_results() {
return Ok(output);
// If no futures are ready, sleep until a waker is signaled.
if !self.futures.any_ready() {
STATE.with(|state| {
let mut state = state.borrow_mut();
if let Some(state) = state.as_mut() {
} else {
unreachable!("Can't get here without a context being created");
impl<T: FutureList> FdExecutor<T> {
/// Create a new executor.
pub fn new(futures: T) -> Result<FdExecutor<T>> {
STATE.with(|state| {
if state.borrow().is_some() {
return Err(Error::AttemptedDuplicateExecutor);
Ok(FdExecutor { futures })
// Add any new futures and wakers to the lists.
fn append_futures(&mut self) {
STATE.with(|state| {
let mut state = state.borrow_mut();
if let Some(state) = state.as_mut() {
self.futures.futures_mut().append(&mut state.new_futures);
} else {
unreachable!("Can't get here without a context being created");
impl<T: FutureList> Drop for FdExecutor<T> {
fn drop(&mut self) {
STATE.with(|state| {
// Used to `dup` the FDs passed to the executor so there is a guarantee they aren't closed while
// waiting in TLS to be added to the main polling context.
unsafe fn dup_fd(fd: RawFd) -> Result<RawFd> {
let ret = libc::dup(fd);
if ret < 0 {
} else {
mod test {
use std::cell::RefCell;
use std::fs::File;
use std::future::Future;
use std::os::unix::io::AsRawFd;
use std::rc::Rc;
use std::task::{Context, Poll};
use futures::future::Either;
use super::*;
// test function to get the number of pending wakers.
fn pending_ops() -> usize {
STATE.with(|state| {
let state = state.borrow_mut();
struct TestFut {
f: File,
pending_waker: Option<PendingWaker>,
impl TestFut {
fn new(f: File) -> TestFut {
TestFut {
pending_waker: None,
impl Future for TestFut {
type Output = u64;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
if self.pending_waker.is_none() {
self.pending_waker = Some(
crate::fd_executor::add_read_waker(self.f.as_raw_fd(), cx.waker().clone())
impl Drop for TestFut {
fn drop(&mut self) {
println!("drop test fut");
fn test_it() {
async fn do_test() {
let (r, _w) = sys_util::pipe(true).unwrap();
let done = Box::pin(async { 5usize });
let pending = Box::pin(TestFut::new(r));
match futures::future::select(pending, done).await {
Either::Right((5, pending)) => std::mem::drop(pending),
_ => panic!("unexpected select result"),
// test that dropping the incomplete future removed the waker.
assert_eq!(0, pending_ops());
let fut = do_test();
// Example of starting the framework and running a future:
async fn my_async(x: Rc<RefCell<u64>>) {
let x = Rc::new(RefCell::new(0));
assert_eq!(*x.borrow(), 4);