| //! Code that decides when workers should go to sleep. See README.md |
| //! for an overview. |
| |
| use log::Event::*; |
| use std::sync::atomic::{AtomicUsize, Ordering}; |
| use std::sync::{Condvar, Mutex}; |
| use std::thread; |
| use std::usize; |
| |
| pub struct Sleep { |
| state: AtomicUsize, |
| data: Mutex<()>, |
| tickle: Condvar, |
| } |
| |
| const AWAKE: usize = 0; |
| const SLEEPING: usize = 1; |
| |
| const ROUNDS_UNTIL_SLEEPY: usize = 32; |
| const ROUNDS_UNTIL_ASLEEP: usize = 64; |
| |
| impl Sleep { |
| pub fn new() -> Sleep { |
| Sleep { |
| state: AtomicUsize::new(AWAKE), |
| data: Mutex::new(()), |
| tickle: Condvar::new(), |
| } |
| } |
| |
| fn anyone_sleeping(&self, state: usize) -> bool { |
| state & SLEEPING != 0 |
| } |
| |
| fn any_worker_is_sleepy(&self, state: usize) -> bool { |
| (state >> 1) != 0 |
| } |
| |
| fn worker_is_sleepy(&self, state: usize, worker_index: usize) -> bool { |
| (state >> 1) == (worker_index + 1) |
| } |
| |
| fn with_sleepy_worker(&self, state: usize, worker_index: usize) -> usize { |
| debug_assert!(state == AWAKE || state == SLEEPING); |
| ((worker_index + 1) << 1) + state |
| } |
| |
| #[inline] |
| pub fn work_found(&self, worker_index: usize, yields: usize) -> usize { |
| log!(FoundWork { |
| worker: worker_index, |
| yields: yields, |
| }); |
| if yields > ROUNDS_UNTIL_SLEEPY { |
| // FIXME tickling here is a bit extreme; mostly we want to "release the lock" |
| // from us being sleepy, we don't necessarily need to wake others |
| // who are sleeping |
| self.tickle(worker_index); |
| } |
| 0 |
| } |
| |
| #[inline] |
| pub fn no_work_found(&self, worker_index: usize, yields: usize) -> usize { |
| log!(DidNotFindWork { |
| worker: worker_index, |
| yields: yields, |
| }); |
| if yields < ROUNDS_UNTIL_SLEEPY { |
| thread::yield_now(); |
| yields + 1 |
| } else if yields == ROUNDS_UNTIL_SLEEPY { |
| thread::yield_now(); |
| if self.get_sleepy(worker_index) { |
| yields + 1 |
| } else { |
| yields |
| } |
| } else if yields < ROUNDS_UNTIL_ASLEEP { |
| thread::yield_now(); |
| if self.still_sleepy(worker_index) { |
| yields + 1 |
| } else { |
| log!(GotInterrupted { worker: worker_index }); |
| 0 |
| } |
| } else { |
| debug_assert_eq!(yields, ROUNDS_UNTIL_ASLEEP); |
| self.sleep(worker_index); |
| 0 |
| } |
| } |
| |
| pub fn tickle(&self, worker_index: usize) { |
| // As described in README.md, this load must be SeqCst so as to ensure that: |
| // - if anyone is sleepy or asleep, we *definitely* see that now (and not eventually); |
| // - if anyone after us becomes sleepy or asleep, they see memory events that |
| // precede the call to `tickle()`, even though we did not do a write. |
| let old_state = self.state.load(Ordering::SeqCst); |
| if old_state != AWAKE { |
| self.tickle_cold(worker_index); |
| } |
| } |
| |
| #[cold] |
| fn tickle_cold(&self, worker_index: usize) { |
| // The `Release` ordering here suffices. The reasoning is that |
| // the atomic's own natural ordering ensure that any attempt |
| // to become sleepy/asleep either will come before/after this |
| // swap. If it comes *after*, then Release is good because we |
| // want it to see the action that generated this tickle. If it |
| // comes *before*, then we will see it here (but not other |
| // memory writes from that thread). If the other worker was |
| // becoming sleepy, the other writes don't matter. If they |
| // were were going to sleep, we will acquire lock and hence |
| // acquire their reads. |
| let old_state = self.state.swap(AWAKE, Ordering::Release); |
| log!(Tickle { |
| worker: worker_index, |
| old_state: old_state, |
| }); |
| if self.anyone_sleeping(old_state) { |
| let _data = self.data.lock().unwrap(); |
| self.tickle.notify_all(); |
| } |
| } |
| |
| fn get_sleepy(&self, worker_index: usize) -> bool { |
| loop { |
| // Acquire ordering suffices here. If some other worker |
| // was sleepy but no longer is, we will eventually see |
| // that, and until then it doesn't hurt to spin. |
| // Otherwise, we will do a compare-exchange which will |
| // assert a stronger order and acquire any reads etc that |
| // we must see. |
| let state = self.state.load(Ordering::Acquire); |
| log!(GetSleepy { |
| worker: worker_index, |
| state: state, |
| }); |
| if self.any_worker_is_sleepy(state) { |
| // somebody else is already sleepy, so we'll just wait our turn |
| debug_assert!(!self.worker_is_sleepy(state, worker_index), |
| "worker {} called `is_sleepy()`, \ |
| but they are already sleepy (state={})", |
| worker_index, |
| state); |
| return false; |
| } else { |
| // make ourselves the sleepy one |
| let new_state = self.with_sleepy_worker(state, worker_index); |
| |
| // This must be SeqCst on success because we want to |
| // ensure: |
| // |
| // - That we observe any writes that preceded |
| // some prior tickle, and that tickle may have only |
| // done a SeqCst load on `self.state`. |
| // - That any subsequent tickle *definitely* sees this store. |
| // |
| // See the section on "Ensuring Sequentially |
| // Consistency" in README.md for more details. |
| // |
| // The failure ordering doesn't matter since we are |
| // about to spin around and do a fresh load. |
| if self.state |
| .compare_exchange(state, new_state, Ordering::SeqCst, Ordering::Relaxed) |
| .is_ok() { |
| log!(GotSleepy { |
| worker: worker_index, |
| old_state: state, |
| new_state: new_state, |
| }); |
| return true; |
| } |
| } |
| } |
| } |
| |
| fn still_sleepy(&self, worker_index: usize) -> bool { |
| let state = self.state.load(Ordering::SeqCst); |
| self.worker_is_sleepy(state, worker_index) |
| } |
| |
| fn sleep(&self, worker_index: usize) { |
| loop { |
| // Acquire here suffices. If we observe that the current worker is still |
| // sleepy, then in fact we know that no writes have occurred, and anyhow |
| // we are going to do a CAS which will synchronize. |
| // |
| // If we observe that the state has changed, it must be |
| // due to a tickle, and then the Acquire means we also see |
| // any events that occured before that. |
| let state = self.state.load(Ordering::Acquire); |
| if self.worker_is_sleepy(state, worker_index) { |
| // It is important that we hold the lock when we do |
| // the CAS. Otherwise, if we were to CAS first, then |
| // the following sequence of events could occur: |
| // |
| // - Thread A (us) sets state to SLEEPING. |
| // - Thread B sets state to AWAKE. |
| // - Thread C sets state to SLEEPY(C). |
| // - Thread C sets state to SLEEPING. |
| // - Thread A reawakens, acquires lock, and goes to sleep. |
| // |
| // Now we missed the wake-up from thread B! But since |
| // we have the lock when we set the state to sleeping, |
| // that cannot happen. Note that the swap `tickle()` |
| // is not part of the lock, though, so let's play that |
| // out: |
| // |
| // # Scenario 1 |
| // |
| // - A loads state and see SLEEPY(A) |
| // - B swaps to AWAKE. |
| // - A locks, fails CAS |
| // |
| // # Scenario 2 |
| // |
| // - A loads state and see SLEEPY(A) |
| // - A locks, performs CAS |
| // - B swaps to AWAKE. |
| // - A waits (releasing lock) |
| // - B locks, notifies |
| // |
| // In general, acquiring the lock inside the loop |
| // seems like it could lead to bad performance, but |
| // actually it should be ok. This is because the only |
| // reason for the `compare_exchange` to fail is if an |
| // awaken comes, in which case the next cycle around |
| // the loop will just return. |
| let data = self.data.lock().unwrap(); |
| |
| // This must be SeqCst on success because we want to |
| // ensure: |
| // |
| // - That we observe any writes that preceded |
| // some prior tickle, and that tickle may have only |
| // done a SeqCst load on `self.state`. |
| // - That any subsequent tickle *definitely* sees this store. |
| // |
| // See the section on "Ensuring Sequentially |
| // Consistency" in README.md for more details. |
| // |
| // The failure ordering doesn't matter since we are |
| // about to spin around and do a fresh load. |
| if self.state |
| .compare_exchange(state, SLEEPING, Ordering::SeqCst, Ordering::Relaxed) |
| .is_ok() { |
| // Don't do this in a loop. If we do it in a loop, we need |
| // some way to distinguish the ABA scenario where the pool |
| // was awoken but before we could process it somebody went |
| // to sleep. Note that if we get a false wakeup it's not a |
| // problem for us, we'll just loop around and maybe get |
| // sleepy again. |
| log!(FellAsleep { worker: worker_index }); |
| let _ = self.tickle.wait(data).unwrap(); |
| log!(GotAwoken { worker: worker_index }); |
| return; |
| } |
| } else { |
| log!(GotInterrupted { worker: worker_index }); |
| return; |
| } |
| } |
| } |
| } |