| use crate::future::poll_fn; |
| use crate::loom::sync::atomic::AtomicBool; |
| use crate::loom::sync::Mutex; |
| use crate::park::{Park, Unpark}; |
| use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task}; |
| use crate::sync::notify::Notify; |
| use crate::util::{waker_ref, Wake, WakerRef}; |
| |
| use std::cell::RefCell; |
| use std::collections::VecDeque; |
| use std::fmt; |
| use std::future::Future; |
| use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; |
| use std::sync::Arc; |
| use std::task::Poll::{Pending, Ready}; |
| use std::time::Duration; |
| |
| /// Executes tasks on the current thread |
| pub(crate) struct BasicScheduler<P: Park> { |
| /// Inner state guarded by a mutex that is shared |
| /// between all `block_on` calls. |
| inner: Mutex<Option<Inner<P>>>, |
| |
| /// Notifier for waking up other threads to steal the |
| /// parker. |
| notify: Notify, |
| |
| /// Sendable task spawner |
| spawner: Spawner, |
| } |
| |
| /// The inner scheduler that owns the task queue and the main parker P. |
| struct Inner<P: Park> { |
| /// Scheduler run queue |
| /// |
| /// When the scheduler is executed, the queue is removed from `self` and |
| /// moved into `Context`. |
| /// |
| /// This indirection is to allow `BasicScheduler` to be `Send`. |
| tasks: Option<Tasks>, |
| |
| /// Sendable task spawner |
| spawner: Spawner, |
| |
| /// Current tick |
| tick: u8, |
| |
| /// Thread park handle |
| park: P, |
| } |
| |
| #[derive(Clone)] |
| pub(crate) struct Spawner { |
| shared: Arc<Shared>, |
| } |
| |
| struct Tasks { |
| /// Local run queue. |
| /// |
| /// Tasks notified from the current thread are pushed into this queue. |
| queue: VecDeque<task::Notified<Arc<Shared>>>, |
| } |
| |
| /// A remote scheduler entry. |
| /// |
| /// These are filled in by remote threads sending instructions to the scheduler. |
| enum RemoteMsg { |
| /// A remote thread wants to spawn a task. |
| Schedule(task::Notified<Arc<Shared>>), |
| } |
| |
| // Safety: Used correctly, the task header is "thread safe". Ultimately the task |
| // is owned by the current thread executor, for which this instruction is being |
| // sent. |
| unsafe impl Send for RemoteMsg {} |
| |
| /// Scheduler state shared between threads. |
| struct Shared { |
| /// Remote run queue. None if the `Runtime` has been dropped. |
| queue: Mutex<Option<VecDeque<RemoteMsg>>>, |
| |
| /// Collection of all active tasks spawned onto this executor. |
| owned: OwnedTasks<Arc<Shared>>, |
| |
| /// Unpark the blocked thread. |
| unpark: Box<dyn Unpark>, |
| |
| /// Indicates whether the blocked on thread was woken. |
| woken: AtomicBool, |
| } |
| |
| /// Thread-local context. |
| struct Context { |
| /// Shared scheduler state |
| shared: Arc<Shared>, |
| |
| /// Local queue |
| tasks: RefCell<Tasks>, |
| } |
| |
| /// Initial queue capacity. |
| const INITIAL_CAPACITY: usize = 64; |
| |
| /// Max number of tasks to poll per tick. |
| #[cfg(loom)] |
| const MAX_TASKS_PER_TICK: usize = 4; |
| #[cfg(not(loom))] |
| const MAX_TASKS_PER_TICK: usize = 61; |
| |
| /// How often to check the remote queue first. |
| const REMOTE_FIRST_INTERVAL: u8 = 31; |
| |
| // Tracks the current BasicScheduler. |
| scoped_thread_local!(static CURRENT: Context); |
| |
| impl<P: Park> BasicScheduler<P> { |
| pub(crate) fn new(park: P) -> BasicScheduler<P> { |
| let unpark = Box::new(park.unpark()); |
| |
| let spawner = Spawner { |
| shared: Arc::new(Shared { |
| queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))), |
| owned: OwnedTasks::new(), |
| unpark: unpark as Box<dyn Unpark>, |
| woken: AtomicBool::new(false), |
| }), |
| }; |
| |
| let inner = Mutex::new(Some(Inner { |
| tasks: Some(Tasks { |
| queue: VecDeque::with_capacity(INITIAL_CAPACITY), |
| }), |
| spawner: spawner.clone(), |
| tick: 0, |
| park, |
| })); |
| |
| BasicScheduler { |
| inner, |
| notify: Notify::new(), |
| spawner, |
| } |
| } |
| |
| pub(crate) fn spawner(&self) -> &Spawner { |
| &self.spawner |
| } |
| |
| pub(crate) fn block_on<F: Future>(&self, future: F) -> F::Output { |
| pin!(future); |
| |
| // Attempt to steal the dedicated parker and block_on the future if we can there, |
| // otherwise, lets select on a notification that the parker is available |
| // or the future is complete. |
| loop { |
| if let Some(inner) = &mut self.take_inner() { |
| return inner.block_on(future); |
| } else { |
| let mut enter = crate::runtime::enter(false); |
| |
| let notified = self.notify.notified(); |
| pin!(notified); |
| |
| if let Some(out) = enter |
| .block_on(poll_fn(|cx| { |
| if notified.as_mut().poll(cx).is_ready() { |
| return Ready(None); |
| } |
| |
| if let Ready(out) = future.as_mut().poll(cx) { |
| return Ready(Some(out)); |
| } |
| |
| Pending |
| })) |
| .expect("Failed to `Enter::block_on`") |
| { |
| return out; |
| } |
| } |
| } |
| } |
| |
| fn take_inner(&self) -> Option<InnerGuard<'_, P>> { |
| let inner = self.inner.lock().take()?; |
| |
| Some(InnerGuard { |
| inner: Some(inner), |
| basic_scheduler: self, |
| }) |
| } |
| } |
| |
| impl<P: Park> Inner<P> { |
| /// Block on the future provided and drive the runtime's driver. |
| fn block_on<F: Future>(&mut self, future: F) -> F::Output { |
| enter(self, |scheduler, context| { |
| let _enter = crate::runtime::enter(false); |
| let waker = scheduler.spawner.waker_ref(); |
| let mut cx = std::task::Context::from_waker(&waker); |
| let mut polled = false; |
| |
| pin!(future); |
| |
| 'outer: loop { |
| if scheduler.spawner.was_woken() || !polled { |
| polled = true; |
| if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) { |
| return v; |
| } |
| } |
| |
| for _ in 0..MAX_TASKS_PER_TICK { |
| // Get and increment the current tick |
| let tick = scheduler.tick; |
| scheduler.tick = scheduler.tick.wrapping_add(1); |
| |
| let entry = if tick % REMOTE_FIRST_INTERVAL == 0 { |
| scheduler.spawner.pop().or_else(|| { |
| context |
| .tasks |
| .borrow_mut() |
| .queue |
| .pop_front() |
| .map(RemoteMsg::Schedule) |
| }) |
| } else { |
| context |
| .tasks |
| .borrow_mut() |
| .queue |
| .pop_front() |
| .map(RemoteMsg::Schedule) |
| .or_else(|| scheduler.spawner.pop()) |
| }; |
| |
| let entry = match entry { |
| Some(entry) => entry, |
| None => { |
| // Park until the thread is signaled |
| scheduler.park.park().expect("failed to park"); |
| |
| // Try polling the `block_on` future next |
| continue 'outer; |
| } |
| }; |
| |
| match entry { |
| RemoteMsg::Schedule(task) => { |
| let task = context.shared.owned.assert_owner(task); |
| crate::coop::budget(|| task.run()) |
| } |
| } |
| } |
| |
| // Yield to the park, this drives the timer and pulls any pending |
| // I/O events. |
| scheduler |
| .park |
| .park_timeout(Duration::from_millis(0)) |
| .expect("failed to park"); |
| } |
| }) |
| } |
| } |
| |
| /// Enter the scheduler context. This sets the queue and other necessary |
| /// scheduler state in the thread-local |
| fn enter<F, R, P>(scheduler: &mut Inner<P>, f: F) -> R |
| where |
| F: FnOnce(&mut Inner<P>, &Context) -> R, |
| P: Park, |
| { |
| // Ensures the run queue is placed back in the `BasicScheduler` instance |
| // once `block_on` returns.` |
| struct Guard<'a, P: Park> { |
| context: Option<Context>, |
| scheduler: &'a mut Inner<P>, |
| } |
| |
| impl<P: Park> Drop for Guard<'_, P> { |
| fn drop(&mut self) { |
| let Context { tasks, .. } = self.context.take().expect("context missing"); |
| self.scheduler.tasks = Some(tasks.into_inner()); |
| } |
| } |
| |
| // Remove `tasks` from `self` and place it in a `Context`. |
| let tasks = scheduler.tasks.take().expect("invalid state"); |
| |
| let guard = Guard { |
| context: Some(Context { |
| shared: scheduler.spawner.shared.clone(), |
| tasks: RefCell::new(tasks), |
| }), |
| scheduler, |
| }; |
| |
| let context = guard.context.as_ref().unwrap(); |
| let scheduler = &mut *guard.scheduler; |
| |
| CURRENT.set(context, || f(scheduler, context)) |
| } |
| |
| impl<P: Park> Drop for BasicScheduler<P> { |
| fn drop(&mut self) { |
| // Avoid a double panic if we are currently panicking and |
| // the lock may be poisoned. |
| |
| let mut inner = match self.inner.lock().take() { |
| Some(inner) => inner, |
| None if std::thread::panicking() => return, |
| None => panic!("Oh no! We never placed the Inner state back, this is a bug!"), |
| }; |
| |
| enter(&mut inner, |scheduler, context| { |
| // Drain the OwnedTasks collection. This call also closes the |
| // collection, ensuring that no tasks are ever pushed after this |
| // call returns. |
| context.shared.owned.close_and_shutdown_all(); |
| |
| // Drain local queue |
| // We already shut down every task, so we just need to drop the task. |
| for task in context.tasks.borrow_mut().queue.drain(..) { |
| drop(task); |
| } |
| |
| // Drain remote queue and set it to None |
| let remote_queue = scheduler.spawner.shared.queue.lock().take(); |
| |
| // Using `Option::take` to replace the shared queue with `None`. |
| // We already shut down every task, so we just need to drop the task. |
| if let Some(remote_queue) = remote_queue { |
| for entry in remote_queue { |
| match entry { |
| RemoteMsg::Schedule(task) => { |
| drop(task); |
| } |
| } |
| } |
| } |
| |
| assert!(context.shared.owned.is_empty()); |
| }); |
| } |
| } |
| |
| impl<P: Park> fmt::Debug for BasicScheduler<P> { |
| fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
| fmt.debug_struct("BasicScheduler").finish() |
| } |
| } |
| |
| // ===== impl Spawner ===== |
| |
| impl Spawner { |
| /// Spawns a future onto the basic scheduler |
| pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> |
| where |
| F: crate::future::Future + Send + 'static, |
| F::Output: Send + 'static, |
| { |
| let (handle, notified) = self.shared.owned.bind(future, self.shared.clone()); |
| |
| if let Some(notified) = notified { |
| self.shared.schedule(notified); |
| } |
| |
| handle |
| } |
| |
| fn pop(&self) -> Option<RemoteMsg> { |
| match self.shared.queue.lock().as_mut() { |
| Some(queue) => queue.pop_front(), |
| None => None, |
| } |
| } |
| |
| fn waker_ref(&self) -> WakerRef<'_> { |
| // clear the woken bit |
| self.shared.woken.swap(false, AcqRel); |
| waker_ref(&self.shared) |
| } |
| |
| fn was_woken(&self) -> bool { |
| self.shared.woken.load(Acquire) |
| } |
| } |
| |
| impl fmt::Debug for Spawner { |
| fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
| fmt.debug_struct("Spawner").finish() |
| } |
| } |
| |
| // ===== impl Shared ===== |
| |
| impl Schedule for Arc<Shared> { |
| fn release(&self, task: &Task<Self>) -> Option<Task<Self>> { |
| self.owned.remove(task) |
| } |
| |
| fn schedule(&self, task: task::Notified<Self>) { |
| CURRENT.with(|maybe_cx| match maybe_cx { |
| Some(cx) if Arc::ptr_eq(self, &cx.shared) => { |
| cx.tasks.borrow_mut().queue.push_back(task); |
| } |
| _ => { |
| // If the queue is None, then the runtime has shut down. We |
| // don't need to do anything with the notification in that case. |
| let mut guard = self.queue.lock(); |
| if let Some(queue) = guard.as_mut() { |
| queue.push_back(RemoteMsg::Schedule(task)); |
| drop(guard); |
| self.unpark.unpark(); |
| } |
| } |
| }); |
| } |
| } |
| |
| impl Wake for Shared { |
| fn wake(self: Arc<Self>) { |
| Wake::wake_by_ref(&self) |
| } |
| |
| /// Wake by reference |
| fn wake_by_ref(arc_self: &Arc<Self>) { |
| arc_self.woken.store(true, Release); |
| arc_self.unpark.unpark(); |
| } |
| } |
| |
| // ===== InnerGuard ===== |
| |
| /// Used to ensure we always place the Inner value |
| /// back into its slot in `BasicScheduler`, even if the |
| /// future panics. |
| struct InnerGuard<'a, P: Park> { |
| inner: Option<Inner<P>>, |
| basic_scheduler: &'a BasicScheduler<P>, |
| } |
| |
| impl<P: Park> InnerGuard<'_, P> { |
| fn block_on<F: Future>(&mut self, future: F) -> F::Output { |
| // The only time inner gets set to `None` is if we have dropped |
| // already so this unwrap is safe. |
| self.inner.as_mut().unwrap().block_on(future) |
| } |
| } |
| |
| impl<P: Park> Drop for InnerGuard<'_, P> { |
| fn drop(&mut self) { |
| if let Some(scheduler) = self.inner.take() { |
| let mut lock = self.basic_scheduler.inner.lock(); |
| |
| // Replace old scheduler back into the state to allow |
| // other threads to pick it up and drive it. |
| lock.replace(scheduler); |
| |
| // Wake up other possible threads that could steal |
| // the dedicated parker P. |
| self.basic_scheduler.notify.notify_one() |
| } |
| } |
| } |