| // Copyright 2020 The Chromium OS Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use std::cell::Cell; |
| use std::collections::VecDeque; |
| use std::future::Future; |
| use std::pin::Pin; |
| use std::rc::Rc; |
| use std::task::Waker; |
| use std::task::{Context, Poll}; |
| |
| use crate::waker::create_waker; |
| |
| /// Represents a future executor that can be run. Implementers of the trait will take a list of |
| /// futures and poll them until completed. |
| pub trait Executor { |
| /// The type returned by the executor. This is normally `()` or a combination of the output the |
| /// futures produce. |
| type Output; |
| |
| /// Run the executor, this will return once the exit criteria is met. The exit criteria is |
| /// specified when the executor is created, for example running until all futures are complete. |
| fn run(&mut self) -> Self::Output; |
| } |
| |
| // Tracks if a future needs to be polled and the waker to use. |
| pub(crate) struct FutureState { |
| pub needs_poll: Rc<Cell<bool>>, |
| pub waker: Waker, |
| } |
| |
| impl FutureState { |
| pub fn new() -> FutureState { |
| let needs_poll = Rc::new(Cell::new(true)); |
| // Safe because a valid pointer is passed to `create_waker` and the valid result is |
| // passed to `Waker::from_raw`. And because the reference count to needs_poll is |
| // incremented by cloning it so it can't be dropped before the waker. |
| let waker = unsafe { |
| let clone = needs_poll.clone(); |
| let raw_waker = create_waker(Rc::into_raw(clone) as *const _); |
| Waker::from_raw(raw_waker) |
| }; |
| FutureState { needs_poll, waker } |
| } |
| } |
| |
| // Couples a future owned by the executor with a flag that indicates the future is ready to be |
| // polled. Futures will start with the flag set. After blocking by returning `Poll::Pending`, the |
| // flag will be false until the waker is triggered and sets the flag to true, signalling the |
| // executor to poll the future again. |
| pub(crate) struct ExecutableFuture<T> { |
| future: Pin<Box<dyn Future<Output = T>>>, |
| state: FutureState, |
| } |
| |
| impl<T> ExecutableFuture<T> { |
| // Creates an `ExecutableFuture` from the future. The returned struct is used to track when the |
| // future should be polled again. |
| pub fn new(future: Pin<Box<dyn Future<Output = T>>>) -> ExecutableFuture<T> { |
| ExecutableFuture { |
| future, |
| state: FutureState::new(), |
| } |
| } |
| |
| // Polls the future if needed and returns the result. |
| // Covers setting up the waker and context before calling the future. |
| fn poll(&mut self) -> Poll<T> { |
| let mut ctx = Context::from_waker(&self.state.waker); |
| let f = self.future.as_mut(); |
| f.poll(&mut ctx) |
| } |
| } |
| |
| // Private trait used to allow one executor to behave differently. Using FutureList allows the |
| // executor code to be common across different collections of crates and different termination |
| // behavior. For example, one list can decide to exit after the first trait completes, others can |
| // wait until all are complete. |
| pub(crate) trait FutureList { |
| type Output; |
| |
| // Return a mutable reference to the list of futures that can be added or removed from this |
| // List. |
| fn futures_mut(&mut self) -> &mut UnitFutures; |
| // Polls all futures that are ready. Returns the results if this list has completed. |
| fn poll_results(&mut self) -> Option<Self::Output>; |
| // Returns true if any future in the list is ready to be polled. |
| fn any_ready(&self) -> bool; |
| } |
| |
| // `UnitFutures` is the simplest implementor of `FutureList`. It runs all futures added to it until |
| // there are none left to poll. The futures must all return `()`. |
| pub(crate) struct UnitFutures { |
| futures: VecDeque<ExecutableFuture<()>>, |
| } |
| |
| impl UnitFutures { |
| // Creates a new, empty list of futures. |
| pub fn new() -> UnitFutures { |
| UnitFutures { |
| futures: VecDeque::new(), |
| } |
| } |
| |
| // Adds a future to the list of futures to be polled. |
| pub fn append(&mut self, futures: &mut VecDeque<ExecutableFuture<()>>) { |
| self.futures.append(futures); |
| } |
| |
| // Polls all futures that are ready to be polled. Removes any futures that indicate they are |
| // completed. |
| pub fn poll_all(&mut self) { |
| let mut i = 0; |
| while i < self.futures.len() { |
| let fut = &mut self.futures[i]; |
| let remove = if fut.state.needs_poll.replace(false) { |
| fut.poll().is_ready() |
| } else { |
| false |
| }; |
| if remove { |
| self.futures.remove(i); |
| } else { |
| i += 1; |
| } |
| } |
| } |
| } |
| |
| impl FutureList for UnitFutures { |
| type Output = (); |
| |
| fn futures_mut(&mut self) -> &mut UnitFutures { |
| self |
| } |
| |
| fn poll_results(&mut self) -> Option<Self::Output> { |
| self.poll_all(); |
| if self.futures.is_empty() { |
| Some(()) |
| } else { |
| None |
| } |
| } |
| |
| fn any_ready(&self) -> bool { |
| self.futures.iter().any(|fut| fut.state.needs_poll.get()) |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use std::sync::atomic::{AtomicUsize, Ordering}; |
| |
| #[test] |
| fn basic_run() { |
| async fn f(called: Rc<AtomicUsize>) { |
| called.fetch_add(1, Ordering::Relaxed); |
| } |
| |
| let f1_called = Rc::new(AtomicUsize::new(0)); |
| let f2_called = Rc::new(AtomicUsize::new(0)); |
| |
| let fut1 = Box::pin(f(f1_called.clone())); |
| let fut2 = Box::pin(f(f2_called.clone())); |
| |
| let mut futures = VecDeque::new(); |
| futures.push_back(ExecutableFuture::new(fut1)); |
| futures.push_back(ExecutableFuture::new(fut2)); |
| |
| let mut uf = UnitFutures::new(); |
| uf.append(&mut futures); |
| assert!(uf.poll_results().is_some()); |
| assert_eq!(f1_called.load(Ordering::Relaxed), 1); |
| assert_eq!(f2_called.load(Ordering::Relaxed), 1); |
| } |
| } |