| use park::DefaultPark; |
| use worker::{WorkerId}; |
| |
| use std::cell::UnsafeCell; |
| use std::fmt; |
| use std::sync::atomic::AtomicUsize; |
| use std::sync::atomic::Ordering::{self, Acquire, AcqRel, Relaxed}; |
| use std::time::{Duration, Instant}; |
| |
| /// State associated with a thread in the thread pool. |
| /// |
| /// The pool manages a number of threads. Some of those threads are considered |
| /// "primary" threads and process the work queue. When a task being run on a |
| /// primary thread enters a blocking context, the responsibility of processing |
| /// the work queue must be handed off to another thread. This is done by first |
| /// checking for idle threads on the backup stack. If one is found, the worker |
| /// token (`WorkerId`) is handed off to that running thread. If none are found, |
| /// a new thread is spawned. |
| /// |
| /// This state manages the exchange. A thread that is idle, not assigned to a |
| /// work queue, sits around for a specified amount of time. When the worker |
| /// token is handed off, it is first stored in `handoff`. The backup thread is |
| /// then signaled. At this point, the backup thread wakes up from sleep and |
| /// reads `handoff`. At that point, it has been promoted to a primary thread and |
| /// will begin processing inbound work on the work queue. |
| /// |
| /// The name `Backup` isn't really great for what the type does, but I have not |
| /// come up with a better name... Maybe it should just be named `Thread`. |
| #[derive(Debug)] |
| pub(crate) struct Backup { |
| /// Worker ID that is being handed to this thread. |
| handoff: UnsafeCell<Option<WorkerId>>, |
| |
| /// Thread state. |
| /// |
| /// This tracks: |
| /// |
| /// * Is queued flag |
| /// * If the pool is shutting down. |
| /// * If the thread is running |
| state: AtomicUsize, |
| |
| /// Next entry in the Treiber stack. |
| next_sleeper: UnsafeCell<BackupId>, |
| |
| /// Used to put the thread to sleep |
| park: DefaultPark, |
| } |
| |
| #[derive(Debug, Eq, PartialEq, Copy, Clone)] |
| pub(crate) struct BackupId(pub(crate) usize); |
| |
| #[derive(Debug)] |
| pub(crate) enum Handoff { |
| Worker(WorkerId), |
| Idle, |
| Terminated, |
| } |
| |
| /// Tracks thread state. |
| #[derive(Clone, Copy, Eq, PartialEq)] |
| struct State(usize); |
| |
| /// Set when the worker is pushed onto the scheduler's stack of sleeping |
| /// threads. |
| /// |
| /// This flag also serves as a "notification" bit. If another thread is |
| /// attempting to hand off a worker to the backup thread, then the pushed bit |
| /// will not be set when the thread tries to shutdown. |
| pub const PUSHED: usize = 0b001; |
| |
| /// Set when the thread is running |
| pub const RUNNING: usize = 0b010; |
| |
| /// Set when the thread pool has terminated |
| pub const TERMINATED: usize = 0b100; |
| |
| // ===== impl Backup ===== |
| |
| impl Backup { |
| pub fn new() -> Backup { |
| Backup { |
| handoff: UnsafeCell::new(None), |
| state: AtomicUsize::new(State::new().into()), |
| next_sleeper: UnsafeCell::new(BackupId(0)), |
| park: DefaultPark::new(), |
| } |
| } |
| |
| /// Called when the thread is starting |
| pub fn start(&self, worker_id: &WorkerId) { |
| debug_assert!({ |
| let state: State = self.state.load(Relaxed).into(); |
| |
| debug_assert!(!state.is_pushed()); |
| debug_assert!(state.is_running()); |
| debug_assert!(!state.is_terminated()); |
| |
| true |
| }); |
| |
| // The handoff value is equal to `worker_id` |
| debug_assert_eq!(unsafe { (*self.handoff.get()).as_ref() }, Some(worker_id)); |
| |
| unsafe { *self.handoff.get() = None; } |
| } |
| |
| pub fn is_running(&self) -> bool { |
| let state: State = self.state.load(Relaxed).into(); |
| state.is_running() |
| } |
| |
| /// Hands off the worker to a thread. |
| /// |
| /// Returns `true` if the thread needs to be spawned. |
| pub fn worker_handoff(&self, worker_id: WorkerId) -> bool { |
| unsafe { |
| // The backup worker should not already have been handoff a worker. |
| debug_assert!((*self.handoff.get()).is_none()); |
| |
| // Set the handoff |
| *self.handoff.get() = Some(worker_id); |
| } |
| |
| // This *probably* can just be `Release`... memory orderings, how do |
| // they work? |
| let prev = State::worker_handoff(&self.state); |
| debug_assert!(prev.is_pushed()); |
| |
| if prev.is_running() { |
| // Wakeup the backup thread |
| self.park.notify(); |
| false |
| } else { |
| true |
| } |
| } |
| |
| /// Terminate the worker |
| pub fn signal_stop(&self) { |
| let prev: State = self.state.fetch_xor(TERMINATED | PUSHED, AcqRel).into(); |
| |
| debug_assert!(!prev.is_terminated()); |
| debug_assert!(prev.is_pushed()); |
| |
| if prev.is_running() { |
| self.park.notify(); |
| } |
| } |
| |
| /// Release the worker |
| pub fn release(&self) { |
| let prev: State = self.state.fetch_xor(RUNNING, AcqRel).into(); |
| |
| debug_assert!(prev.is_running()); |
| } |
| |
| /// Wait for a worker handoff |
| pub fn wait_for_handoff(&self, timeout: Option<Duration>) -> Handoff { |
| let sleep_until = timeout.map(|dur| Instant::now() + dur); |
| let mut state: State = self.state.load(Acquire).into(); |
| |
| // Run in a loop since there can be spurious wakeups |
| loop { |
| if !state.is_pushed() { |
| if state.is_terminated() { |
| return Handoff::Terminated; |
| } |
| |
| let worker_id = unsafe { |
| (*self.handoff.get()).take() |
| .expect("no worker handoff") |
| }; |
| return Handoff::Worker(worker_id); |
| } |
| |
| match sleep_until { |
| None => { |
| self.park.park_sync(None); |
| state = self.state.load(Acquire).into(); |
| } |
| Some(when) => { |
| let now = Instant::now(); |
| |
| if now < when { |
| self.park.park_sync(Some(when - now)); |
| state = self.state.load(Acquire).into(); |
| } else { |
| debug_assert!(state.is_running()); |
| |
| // Transition out of running |
| let mut next = state; |
| next.unset_running(); |
| |
| let actual = self.state.compare_and_swap( |
| state.into(), |
| next.into(), |
| AcqRel).into(); |
| |
| if actual == state { |
| debug_assert!(!next.is_running()); |
| return Handoff::Idle; |
| } |
| |
| state = actual; |
| } |
| } |
| } |
| } |
| } |
| |
| pub fn is_pushed(&self) -> bool { |
| let state: State = self.state.load(Relaxed).into(); |
| state.is_pushed() |
| } |
| |
| pub fn set_pushed(&self, ordering: Ordering) { |
| let prev: State = self.state.fetch_or(PUSHED, ordering).into(); |
| debug_assert!(!prev.is_pushed()); |
| } |
| |
| #[inline] |
| pub fn next_sleeper(&self) -> BackupId { |
| unsafe { *self.next_sleeper.get() } |
| } |
| |
| #[inline] |
| pub fn set_next_sleeper(&self, val: BackupId) { |
| unsafe { *self.next_sleeper.get() = val; } |
| } |
| } |
| |
| // ===== impl State ===== |
| |
| impl State { |
| /// Returns a new, default, thread `State` |
| pub fn new() -> State { |
| State(0) |
| } |
| |
| /// Returns true if the thread entry is pushed in the sleeper stack |
| pub fn is_pushed(&self) -> bool { |
| self.0 & PUSHED == PUSHED |
| } |
| |
| fn unset_pushed(&mut self) { |
| self.0 &= !PUSHED; |
| } |
| |
| pub fn is_running(&self) -> bool { |
| self.0 & RUNNING == RUNNING |
| } |
| |
| pub fn set_running(&mut self) { |
| self.0 |= RUNNING; |
| } |
| |
| pub fn unset_running(&mut self) { |
| self.0 &= !RUNNING; |
| } |
| |
| pub fn is_terminated(&self) -> bool { |
| self.0 & TERMINATED == TERMINATED |
| } |
| |
| fn worker_handoff(state: &AtomicUsize) -> State { |
| let mut curr: State = state.load(Acquire).into(); |
| |
| loop { |
| let mut next = curr; |
| next.set_running(); |
| next.unset_pushed(); |
| |
| let actual = state.compare_and_swap( |
| curr.into(), next.into(), AcqRel).into(); |
| |
| if actual == curr { |
| return curr; |
| } |
| |
| curr = actual; |
| } |
| } |
| } |
| |
| impl From<usize> for State { |
| fn from(src: usize) -> State { |
| State(src) |
| } |
| } |
| |
| impl From<State> for usize { |
| fn from(src: State) -> usize { |
| src.0 |
| } |
| } |
| |
| impl fmt::Debug for State { |
| fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { |
| fmt.debug_struct("backup::State") |
| .field("is_pushed", &self.is_pushed()) |
| .field("is_running", &self.is_running()) |
| .field("is_terminated", &self.is_terminated()) |
| .finish() |
| } |
| } |