blob: feaff306522012e42a1c5526a4dad92d1675e048 [file] [log] [blame]
use park::DefaultPark;
use worker::{WorkerId};
use std::cell::UnsafeCell;
use std::fmt;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{self, Acquire, AcqRel, Relaxed};
use std::time::{Duration, Instant};
/// State associated with a thread in the thread pool.
///
/// The pool manages a number of threads. Some of those threads are considered
/// "primary" threads and process the work queue. When a task being run on a
/// primary thread enters a blocking context, the responsibility of processing
/// the work queue must be handed off to another thread. This is done by first
/// checking for idle threads on the backup stack. If one is found, the worker
/// token (`WorkerId`) is handed off to that running thread. If none are found,
/// a new thread is spawned.
///
/// This state manages the exchange. A thread that is idle, not assigned to a
/// work queue, sits around for a specified amount of time. When the worker
/// token is handed off, it is first stored in `handoff`. The backup thread is
/// then signaled. At this point, the backup thread wakes up from sleep and
/// reads `handoff`. At that point, it has been promoted to a primary thread and
/// will begin processing inbound work on the work queue.
///
/// The name `Backup` isn't really great for what the type does, but I have not
/// come up with a better name... Maybe it should just be named `Thread`.
#[derive(Debug)]
pub(crate) struct Backup {
/// Worker ID that is being handed to this thread.
handoff: UnsafeCell<Option<WorkerId>>,
/// Thread state.
///
/// This tracks:
///
/// * Is queued flag
/// * If the pool is shutting down.
/// * If the thread is running
state: AtomicUsize,
/// Next entry in the Treiber stack.
next_sleeper: UnsafeCell<BackupId>,
/// Used to put the thread to sleep
park: DefaultPark,
}
#[derive(Debug, Eq, PartialEq, Copy, Clone)]
pub(crate) struct BackupId(pub(crate) usize);
#[derive(Debug)]
pub(crate) enum Handoff {
Worker(WorkerId),
Idle,
Terminated,
}
/// Tracks thread state.
#[derive(Clone, Copy, Eq, PartialEq)]
struct State(usize);
/// Set when the worker is pushed onto the scheduler's stack of sleeping
/// threads.
///
/// This flag also serves as a "notification" bit. If another thread is
/// attempting to hand off a worker to the backup thread, then the pushed bit
/// will not be set when the thread tries to shutdown.
pub const PUSHED: usize = 0b001;
/// Set when the thread is running
pub const RUNNING: usize = 0b010;
/// Set when the thread pool has terminated
pub const TERMINATED: usize = 0b100;
// ===== impl Backup =====
impl Backup {
pub fn new() -> Backup {
Backup {
handoff: UnsafeCell::new(None),
state: AtomicUsize::new(State::new().into()),
next_sleeper: UnsafeCell::new(BackupId(0)),
park: DefaultPark::new(),
}
}
/// Called when the thread is starting
pub fn start(&self, worker_id: &WorkerId) {
debug_assert!({
let state: State = self.state.load(Relaxed).into();
debug_assert!(!state.is_pushed());
debug_assert!(state.is_running());
debug_assert!(!state.is_terminated());
true
});
// The handoff value is equal to `worker_id`
debug_assert_eq!(unsafe { (*self.handoff.get()).as_ref() }, Some(worker_id));
unsafe { *self.handoff.get() = None; }
}
pub fn is_running(&self) -> bool {
let state: State = self.state.load(Relaxed).into();
state.is_running()
}
/// Hands off the worker to a thread.
///
/// Returns `true` if the thread needs to be spawned.
pub fn worker_handoff(&self, worker_id: WorkerId) -> bool {
unsafe {
// The backup worker should not already have been handoff a worker.
debug_assert!((*self.handoff.get()).is_none());
// Set the handoff
*self.handoff.get() = Some(worker_id);
}
// This *probably* can just be `Release`... memory orderings, how do
// they work?
let prev = State::worker_handoff(&self.state);
debug_assert!(prev.is_pushed());
if prev.is_running() {
// Wakeup the backup thread
self.park.notify();
false
} else {
true
}
}
/// Terminate the worker
pub fn signal_stop(&self) {
let prev: State = self.state.fetch_xor(TERMINATED | PUSHED, AcqRel).into();
debug_assert!(!prev.is_terminated());
debug_assert!(prev.is_pushed());
if prev.is_running() {
self.park.notify();
}
}
/// Release the worker
pub fn release(&self) {
let prev: State = self.state.fetch_xor(RUNNING, AcqRel).into();
debug_assert!(prev.is_running());
}
/// Wait for a worker handoff
pub fn wait_for_handoff(&self, timeout: Option<Duration>) -> Handoff {
let sleep_until = timeout.map(|dur| Instant::now() + dur);
let mut state: State = self.state.load(Acquire).into();
// Run in a loop since there can be spurious wakeups
loop {
if !state.is_pushed() {
if state.is_terminated() {
return Handoff::Terminated;
}
let worker_id = unsafe {
(*self.handoff.get()).take()
.expect("no worker handoff")
};
return Handoff::Worker(worker_id);
}
match sleep_until {
None => {
self.park.park_sync(None);
state = self.state.load(Acquire).into();
}
Some(when) => {
let now = Instant::now();
if now < when {
self.park.park_sync(Some(when - now));
state = self.state.load(Acquire).into();
} else {
debug_assert!(state.is_running());
// Transition out of running
let mut next = state;
next.unset_running();
let actual = self.state.compare_and_swap(
state.into(),
next.into(),
AcqRel).into();
if actual == state {
debug_assert!(!next.is_running());
return Handoff::Idle;
}
state = actual;
}
}
}
}
}
pub fn is_pushed(&self) -> bool {
let state: State = self.state.load(Relaxed).into();
state.is_pushed()
}
pub fn set_pushed(&self, ordering: Ordering) {
let prev: State = self.state.fetch_or(PUSHED, ordering).into();
debug_assert!(!prev.is_pushed());
}
#[inline]
pub fn next_sleeper(&self) -> BackupId {
unsafe { *self.next_sleeper.get() }
}
#[inline]
pub fn set_next_sleeper(&self, val: BackupId) {
unsafe { *self.next_sleeper.get() = val; }
}
}
// ===== impl State =====
impl State {
/// Returns a new, default, thread `State`
pub fn new() -> State {
State(0)
}
/// Returns true if the thread entry is pushed in the sleeper stack
pub fn is_pushed(&self) -> bool {
self.0 & PUSHED == PUSHED
}
fn unset_pushed(&mut self) {
self.0 &= !PUSHED;
}
pub fn is_running(&self) -> bool {
self.0 & RUNNING == RUNNING
}
pub fn set_running(&mut self) {
self.0 |= RUNNING;
}
pub fn unset_running(&mut self) {
self.0 &= !RUNNING;
}
pub fn is_terminated(&self) -> bool {
self.0 & TERMINATED == TERMINATED
}
fn worker_handoff(state: &AtomicUsize) -> State {
let mut curr: State = state.load(Acquire).into();
loop {
let mut next = curr;
next.set_running();
next.unset_pushed();
let actual = state.compare_and_swap(
curr.into(), next.into(), AcqRel).into();
if actual == curr {
return curr;
}
curr = actual;
}
}
}
impl From<usize> for State {
fn from(src: usize) -> State {
State(src)
}
}
impl From<State> for usize {
fn from(src: State) -> usize {
src.0
}
}
impl fmt::Debug for State {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("backup::State")
.field("is_pushed", &self.is_pushed())
.field("is_running", &self.is_running())
.field("is_terminated", &self.is_terminated())
.finish()
}
}