| //! Coordinates idling workers |
| |
| #![allow(dead_code)] |
| |
| use crate::loom::sync::atomic::{AtomicBool, AtomicUsize}; |
| use crate::loom::sync::MutexGuard; |
| use crate::runtime::scheduler::multi_thread_alt::{worker, Core, Handle, Shared}; |
| |
| use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; |
| |
| pub(super) struct Idle { |
| /// Number of searching cores |
| num_searching: AtomicUsize, |
| |
| /// Number of idle cores |
| num_idle: AtomicUsize, |
| |
| /// Map of idle cores |
| idle_map: IdleMap, |
| |
| /// Used to catch false-negatives when waking workers |
| needs_searching: AtomicBool, |
| |
| /// Total number of cores |
| num_cores: usize, |
| } |
| |
| pub(super) struct IdleMap { |
| chunks: Vec<AtomicUsize>, |
| } |
| |
| pub(super) struct Snapshot { |
| chunks: Vec<usize>, |
| } |
| |
| /// Data synchronized by the scheduler mutex |
| pub(super) struct Synced { |
| /// Worker IDs that are currently sleeping |
| sleepers: Vec<usize>, |
| |
| /// Cores available for workers |
| available_cores: Vec<Box<Core>>, |
| } |
| |
| impl Idle { |
| pub(super) fn new(cores: Vec<Box<Core>>, num_workers: usize) -> (Idle, Synced) { |
| let idle = Idle { |
| num_searching: AtomicUsize::new(0), |
| num_idle: AtomicUsize::new(cores.len()), |
| idle_map: IdleMap::new(&cores), |
| needs_searching: AtomicBool::new(false), |
| num_cores: cores.len(), |
| }; |
| |
| let synced = Synced { |
| sleepers: Vec::with_capacity(num_workers), |
| available_cores: cores, |
| }; |
| |
| (idle, synced) |
| } |
| |
| pub(super) fn needs_searching(&self) -> bool { |
| self.needs_searching.load(Acquire) |
| } |
| |
| pub(super) fn num_idle(&self, synced: &Synced) -> usize { |
| #[cfg(not(loom))] |
| debug_assert_eq!(synced.available_cores.len(), self.num_idle.load(Acquire)); |
| synced.available_cores.len() |
| } |
| |
| pub(super) fn num_searching(&self) -> usize { |
| self.num_searching.load(Acquire) |
| } |
| |
| pub(super) fn snapshot(&self, snapshot: &mut Snapshot) { |
| snapshot.update(&self.idle_map) |
| } |
| |
| /// Try to acquire an available core |
| pub(super) fn try_acquire_available_core(&self, synced: &mut Synced) -> Option<Box<Core>> { |
| let ret = synced.available_cores.pop(); |
| |
| if let Some(core) = &ret { |
| // Decrement the number of idle cores |
| let num_idle = self.num_idle.load(Acquire) - 1; |
| debug_assert_eq!(num_idle, synced.available_cores.len()); |
| self.num_idle.store(num_idle, Release); |
| |
| self.idle_map.unset(core.index); |
| debug_assert!(self.idle_map.matches(&synced.available_cores)); |
| } |
| |
| ret |
| } |
| |
| /// We need at least one searching worker |
| pub(super) fn notify_local(&self, shared: &Shared) { |
| if self.num_searching.load(Acquire) != 0 { |
| // There already is a searching worker. Note, that this could be a |
| // false positive. However, because this method is called **from** a |
| // worker, we know that there is at least one worker currently |
| // awake, so the scheduler won't deadlock. |
| return; |
| } |
| |
| if self.num_idle.load(Acquire) == 0 { |
| self.needs_searching.store(true, Release); |
| return; |
| } |
| |
| // There aren't any searching workers. Try to initialize one |
| if self |
| .num_searching |
| .compare_exchange(0, 1, AcqRel, Acquire) |
| .is_err() |
| { |
| // Failing the compare_exchange means another thread concurrently |
| // launched a searching worker. |
| return; |
| } |
| |
| super::counters::inc_num_unparks_local(); |
| |
| // Acquire the lock |
| let synced = shared.synced.lock(); |
| self.notify_synced(synced, shared); |
| } |
| |
| /// Notifies a single worker |
| pub(super) fn notify_remote(&self, synced: MutexGuard<'_, worker::Synced>, shared: &Shared) { |
| if synced.idle.sleepers.is_empty() { |
| self.needs_searching.store(true, Release); |
| return; |
| } |
| |
| // We need to establish a stronger barrier than with `notify_local` |
| self.num_searching.fetch_add(1, AcqRel); |
| |
| self.notify_synced(synced, shared); |
| } |
| |
| /// Notify a worker while synced |
| fn notify_synced(&self, mut synced: MutexGuard<'_, worker::Synced>, shared: &Shared) { |
| // Find a sleeping worker |
| if let Some(worker) = synced.idle.sleepers.pop() { |
| // Find an available core |
| if let Some(mut core) = self.try_acquire_available_core(&mut synced.idle) { |
| debug_assert!(!core.is_searching); |
| core.is_searching = true; |
| |
| // Assign the core to the worker |
| synced.assigned_cores[worker] = Some(core); |
| |
| // Drop the lock before notifying the condvar. |
| drop(synced); |
| |
| super::counters::inc_num_unparks_remote(); |
| |
| // Notify the worker |
| shared.condvars[worker].notify_one(); |
| return; |
| } else { |
| synced.idle.sleepers.push(worker); |
| } |
| } |
| |
| super::counters::inc_notify_no_core(); |
| |
| // Set the `needs_searching` flag, this happens *while* the lock is held. |
| self.needs_searching.store(true, Release); |
| self.num_searching.fetch_sub(1, Release); |
| |
| // Explicit mutex guard drop to show that holding the guard to this |
| // point is significant. `needs_searching` and `num_searching` must be |
| // updated in the critical section. |
| drop(synced); |
| } |
| |
| pub(super) fn notify_mult( |
| &self, |
| synced: &mut worker::Synced, |
| workers: &mut Vec<usize>, |
| num: usize, |
| ) { |
| debug_assert!(workers.is_empty()); |
| |
| for _ in 0..num { |
| if let Some(worker) = synced.idle.sleepers.pop() { |
| // TODO: can this be switched to use next_available_core? |
| if let Some(core) = synced.idle.available_cores.pop() { |
| debug_assert!(!core.is_searching); |
| |
| self.idle_map.unset(core.index); |
| |
| synced.assigned_cores[worker] = Some(core); |
| |
| workers.push(worker); |
| |
| continue; |
| } else { |
| synced.idle.sleepers.push(worker); |
| } |
| } |
| |
| break; |
| } |
| |
| if !workers.is_empty() { |
| debug_assert!(self.idle_map.matches(&synced.idle.available_cores)); |
| let num_idle = synced.idle.available_cores.len(); |
| self.num_idle.store(num_idle, Release); |
| } else { |
| #[cfg(not(loom))] |
| debug_assert_eq!( |
| synced.idle.available_cores.len(), |
| self.num_idle.load(Acquire) |
| ); |
| self.needs_searching.store(true, Release); |
| } |
| } |
| |
| pub(super) fn shutdown(&self, synced: &mut worker::Synced, shared: &Shared) { |
| // Wake every sleeping worker and assign a core to it. There may not be |
| // enough sleeping workers for all cores, but other workers will |
| // eventually find the cores and shut them down. |
| while !synced.idle.sleepers.is_empty() && !synced.idle.available_cores.is_empty() { |
| let worker = synced.idle.sleepers.pop().unwrap(); |
| let core = self.try_acquire_available_core(&mut synced.idle).unwrap(); |
| |
| synced.assigned_cores[worker] = Some(core); |
| shared.condvars[worker].notify_one(); |
| } |
| |
| debug_assert!(self.idle_map.matches(&synced.idle.available_cores)); |
| |
| // Wake up any other workers |
| while let Some(index) = synced.idle.sleepers.pop() { |
| shared.condvars[index].notify_one(); |
| } |
| } |
| |
| pub(super) fn shutdown_unassigned_cores(&self, handle: &Handle, shared: &Shared) { |
| // If there are any remaining cores, shut them down here. |
| // |
| // This code is a bit convoluted to avoid lock-reentry. |
| while let Some(core) = { |
| let mut synced = shared.synced.lock(); |
| self.try_acquire_available_core(&mut synced.idle) |
| } { |
| shared.shutdown_core(handle, core); |
| } |
| } |
| |
| /// The worker releases the given core, making it available to other workers |
| /// that are waiting. |
| pub(super) fn release_core(&self, synced: &mut worker::Synced, core: Box<Core>) { |
| // The core should not be searching at this point |
| debug_assert!(!core.is_searching); |
| |
| // Check that there are no pending tasks in the global queue |
| debug_assert!(synced.inject.is_empty()); |
| |
| let num_idle = synced.idle.available_cores.len(); |
| #[cfg(not(loom))] |
| debug_assert_eq!(num_idle, self.num_idle.load(Acquire)); |
| |
| self.idle_map.set(core.index); |
| |
| // Store the core in the list of available cores |
| synced.idle.available_cores.push(core); |
| |
| debug_assert!(self.idle_map.matches(&synced.idle.available_cores)); |
| |
| // Update `num_idle` |
| self.num_idle.store(num_idle + 1, Release); |
| } |
| |
| pub(super) fn transition_worker_to_parked(&self, synced: &mut worker::Synced, index: usize) { |
| // Store the worker index in the list of sleepers |
| synced.idle.sleepers.push(index); |
| |
| // The worker's assigned core slot should be empty |
| debug_assert!(synced.assigned_cores[index].is_none()); |
| } |
| |
| pub(super) fn try_transition_worker_to_searching(&self, core: &mut Core) { |
| debug_assert!(!core.is_searching); |
| |
| let num_searching = self.num_searching.load(Acquire); |
| let num_idle = self.num_idle.load(Acquire); |
| |
| if 2 * num_searching >= self.num_cores - num_idle { |
| return; |
| } |
| |
| self.transition_worker_to_searching(core); |
| } |
| |
| /// Needs to happen while synchronized in order to avoid races |
| pub(super) fn transition_worker_to_searching_if_needed( |
| &self, |
| _synced: &mut Synced, |
| core: &mut Core, |
| ) -> bool { |
| if self.needs_searching.load(Acquire) { |
| // Needs to be called while holding the lock |
| self.transition_worker_to_searching(core); |
| true |
| } else { |
| false |
| } |
| } |
| |
| pub(super) fn transition_worker_to_searching(&self, core: &mut Core) { |
| core.is_searching = true; |
| self.num_searching.fetch_add(1, AcqRel); |
| self.needs_searching.store(false, Release); |
| } |
| |
| /// A lightweight transition from searching -> running. |
| /// |
| /// Returns `true` if this is the final searching worker. The caller |
| /// **must** notify a new worker. |
| pub(super) fn transition_worker_from_searching(&self) -> bool { |
| let prev = self.num_searching.fetch_sub(1, AcqRel); |
| debug_assert!(prev > 0); |
| |
| prev == 1 |
| } |
| } |
| |
| const BITS: usize = usize::BITS as usize; |
| const BIT_MASK: usize = (usize::BITS - 1) as usize; |
| |
| impl IdleMap { |
| fn new(cores: &[Box<Core>]) -> IdleMap { |
| let ret = IdleMap::new_n(num_chunks(cores.len())); |
| ret.set_all(cores); |
| |
| ret |
| } |
| |
| fn new_n(n: usize) -> IdleMap { |
| let chunks = (0..n).map(|_| AtomicUsize::new(0)).collect(); |
| IdleMap { chunks } |
| } |
| |
| fn set(&self, index: usize) { |
| let (chunk, mask) = index_to_mask(index); |
| let prev = self.chunks[chunk].load(Acquire); |
| let next = prev | mask; |
| self.chunks[chunk].store(next, Release); |
| } |
| |
| fn set_all(&self, cores: &[Box<Core>]) { |
| for core in cores { |
| self.set(core.index); |
| } |
| } |
| |
| fn unset(&self, index: usize) { |
| let (chunk, mask) = index_to_mask(index); |
| let prev = self.chunks[chunk].load(Acquire); |
| let next = prev & !mask; |
| self.chunks[chunk].store(next, Release); |
| } |
| |
| fn matches(&self, idle_cores: &[Box<Core>]) -> bool { |
| let expect = IdleMap::new_n(self.chunks.len()); |
| expect.set_all(idle_cores); |
| |
| for (i, chunk) in expect.chunks.iter().enumerate() { |
| if chunk.load(Acquire) != self.chunks[i].load(Acquire) { |
| return false; |
| } |
| } |
| |
| true |
| } |
| } |
| |
| impl Snapshot { |
| pub(crate) fn new(idle: &Idle) -> Snapshot { |
| let chunks = vec![0; idle.idle_map.chunks.len()]; |
| let mut ret = Snapshot { chunks }; |
| ret.update(&idle.idle_map); |
| ret |
| } |
| |
| fn update(&mut self, idle_map: &IdleMap) { |
| for i in 0..self.chunks.len() { |
| self.chunks[i] = idle_map.chunks[i].load(Acquire); |
| } |
| } |
| |
| pub(super) fn is_idle(&self, index: usize) -> bool { |
| let (chunk, mask) = index_to_mask(index); |
| debug_assert!( |
| chunk < self.chunks.len(), |
| "index={}; chunks={}", |
| index, |
| self.chunks.len() |
| ); |
| self.chunks[chunk] & mask == mask |
| } |
| } |
| |
| fn num_chunks(max_cores: usize) -> usize { |
| (max_cores / BITS) + 1 |
| } |
| |
| fn index_to_mask(index: usize) -> (usize, usize) { |
| let mask = 1 << (index & BIT_MASK); |
| let chunk = index / BITS; |
| |
| (chunk, mask) |
| } |
| |
| fn num_active_workers(synced: &Synced) -> usize { |
| synced.available_cores.capacity() - synced.available_cores.len() |
| } |