blob: 5e125ed2a51cab26c6a63cc11e07e5ce774207f4 [file] [log] [blame]
//! Code that decides when workers should go to sleep. See README.md
//! for an overview.
use crate::log::Event::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Condvar, Mutex};
use std::thread;
use std::usize;
pub(super) struct Sleep {
state: AtomicUsize,
data: Mutex<()>,
tickle: Condvar,
}
const AWAKE: usize = 0;
const SLEEPING: usize = 1;
const ROUNDS_UNTIL_SLEEPY: usize = 32;
const ROUNDS_UNTIL_ASLEEP: usize = 64;
impl Sleep {
pub(super) fn new() -> Sleep {
Sleep {
state: AtomicUsize::new(AWAKE),
data: Mutex::new(()),
tickle: Condvar::new(),
}
}
fn anyone_sleeping(&self, state: usize) -> bool {
state & SLEEPING != 0
}
fn any_worker_is_sleepy(&self, state: usize) -> bool {
(state >> 1) != 0
}
fn worker_is_sleepy(&self, state: usize, worker_index: usize) -> bool {
(state >> 1) == (worker_index + 1)
}
fn with_sleepy_worker(&self, state: usize, worker_index: usize) -> usize {
debug_assert!(state == AWAKE || state == SLEEPING);
((worker_index + 1) << 1) + state
}
#[inline]
pub(super) fn work_found(&self, worker_index: usize, yields: usize) -> usize {
log!(FoundWork {
worker: worker_index,
yields: yields,
});
if yields > ROUNDS_UNTIL_SLEEPY {
// FIXME tickling here is a bit extreme; mostly we want to "release the lock"
// from us being sleepy, we don't necessarily need to wake others
// who are sleeping
self.tickle(worker_index);
}
0
}
#[inline]
pub(super) fn no_work_found(&self, worker_index: usize, yields: usize) -> usize {
log!(DidNotFindWork {
worker: worker_index,
yields: yields,
});
if yields < ROUNDS_UNTIL_SLEEPY {
thread::yield_now();
yields + 1
} else if yields == ROUNDS_UNTIL_SLEEPY {
thread::yield_now();
if self.get_sleepy(worker_index) {
yields + 1
} else {
yields
}
} else if yields < ROUNDS_UNTIL_ASLEEP {
thread::yield_now();
if self.still_sleepy(worker_index) {
yields + 1
} else {
log!(GotInterrupted {
worker: worker_index
});
0
}
} else {
debug_assert_eq!(yields, ROUNDS_UNTIL_ASLEEP);
self.sleep(worker_index);
0
}
}
pub(super) fn tickle(&self, worker_index: usize) {
// As described in README.md, this load must be SeqCst so as to ensure that:
// - if anyone is sleepy or asleep, we *definitely* see that now (and not eventually);
// - if anyone after us becomes sleepy or asleep, they see memory events that
// precede the call to `tickle()`, even though we did not do a write.
let old_state = self.state.load(Ordering::SeqCst);
if old_state != AWAKE {
self.tickle_cold(worker_index);
}
}
#[cold]
fn tickle_cold(&self, worker_index: usize) {
// The `Release` ordering here suffices. The reasoning is that
// the atomic's own natural ordering ensure that any attempt
// to become sleepy/asleep either will come before/after this
// swap. If it comes *after*, then Release is good because we
// want it to see the action that generated this tickle. If it
// comes *before*, then we will see it here (but not other
// memory writes from that thread). If the other worker was
// becoming sleepy, the other writes don't matter. If they
// were were going to sleep, we will acquire lock and hence
// acquire their reads.
let old_state = self.state.swap(AWAKE, Ordering::Release);
log!(Tickle {
worker: worker_index,
old_state: old_state,
});
if self.anyone_sleeping(old_state) {
let _data = self.data.lock().unwrap();
self.tickle.notify_all();
}
}
fn get_sleepy(&self, worker_index: usize) -> bool {
loop {
// Acquire ordering suffices here. If some other worker
// was sleepy but no longer is, we will eventually see
// that, and until then it doesn't hurt to spin.
// Otherwise, we will do a compare-exchange which will
// assert a stronger order and acquire any reads etc that
// we must see.
let state = self.state.load(Ordering::Acquire);
log!(GetSleepy {
worker: worker_index,
state: state,
});
if self.any_worker_is_sleepy(state) {
// somebody else is already sleepy, so we'll just wait our turn
debug_assert!(
!self.worker_is_sleepy(state, worker_index),
"worker {} called `is_sleepy()`, \
but they are already sleepy (state={})",
worker_index,
state
);
return false;
} else {
// make ourselves the sleepy one
let new_state = self.with_sleepy_worker(state, worker_index);
// This must be SeqCst on success because we want to
// ensure:
//
// - That we observe any writes that preceded
// some prior tickle, and that tickle may have only
// done a SeqCst load on `self.state`.
// - That any subsequent tickle *definitely* sees this store.
//
// See the section on "Ensuring Sequentially
// Consistency" in README.md for more details.
//
// The failure ordering doesn't matter since we are
// about to spin around and do a fresh load.
if self
.state
.compare_exchange(state, new_state, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
{
log!(GotSleepy {
worker: worker_index,
old_state: state,
new_state: new_state,
});
return true;
}
}
}
}
fn still_sleepy(&self, worker_index: usize) -> bool {
let state = self.state.load(Ordering::SeqCst);
self.worker_is_sleepy(state, worker_index)
}
fn sleep(&self, worker_index: usize) {
loop {
// Acquire here suffices. If we observe that the current worker is still
// sleepy, then in fact we know that no writes have occurred, and anyhow
// we are going to do a CAS which will synchronize.
//
// If we observe that the state has changed, it must be
// due to a tickle, and then the Acquire means we also see
// any events that occured before that.
let state = self.state.load(Ordering::Acquire);
if self.worker_is_sleepy(state, worker_index) {
// It is important that we hold the lock when we do
// the CAS. Otherwise, if we were to CAS first, then
// the following sequence of events could occur:
//
// - Thread A (us) sets state to SLEEPING.
// - Thread B sets state to AWAKE.
// - Thread C sets state to SLEEPY(C).
// - Thread C sets state to SLEEPING.
// - Thread A reawakens, acquires lock, and goes to sleep.
//
// Now we missed the wake-up from thread B! But since
// we have the lock when we set the state to sleeping,
// that cannot happen. Note that the swap `tickle()`
// is not part of the lock, though, so let's play that
// out:
//
// # Scenario 1
//
// - A loads state and see SLEEPY(A)
// - B swaps to AWAKE.
// - A locks, fails CAS
//
// # Scenario 2
//
// - A loads state and see SLEEPY(A)
// - A locks, performs CAS
// - B swaps to AWAKE.
// - A waits (releasing lock)
// - B locks, notifies
//
// In general, acquiring the lock inside the loop
// seems like it could lead to bad performance, but
// actually it should be ok. This is because the only
// reason for the `compare_exchange` to fail is if an
// awaken comes, in which case the next cycle around
// the loop will just return.
let data = self.data.lock().unwrap();
// This must be SeqCst on success because we want to
// ensure:
//
// - That we observe any writes that preceded
// some prior tickle, and that tickle may have only
// done a SeqCst load on `self.state`.
// - That any subsequent tickle *definitely* sees this store.
//
// See the section on "Ensuring Sequentially
// Consistency" in README.md for more details.
//
// The failure ordering doesn't matter since we are
// about to spin around and do a fresh load.
if self
.state
.compare_exchange(state, SLEEPING, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
{
// Don't do this in a loop. If we do it in a loop, we need
// some way to distinguish the ABA scenario where the pool
// was awoken but before we could process it somebody went
// to sleep. Note that if we get a false wakeup it's not a
// problem for us, we'll just loop around and maybe get
// sleepy again.
log!(FellAsleep {
worker: worker_index
});
let _ = self.tickle.wait(data).unwrap();
log!(GotAwoken {
worker: worker_index
});
return;
}
} else {
log!(GotInterrupted {
worker: worker_index
});
return;
}
}
}
}