| //! Run-queue structures to support a work-stealing scheduler |
| |
| use crate::loom::cell::UnsafeCell; |
| use crate::loom::sync::Arc; |
| use crate::runtime::scheduler::multi_thread::{Overflow, Stats}; |
| use crate::runtime::task; |
| |
| use std::mem::{self, MaybeUninit}; |
| use std::ptr; |
| use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; |
| |
| // Use wider integers when possible to increase ABA resilience. |
| // |
| // See issue #5041: <https://github.com/tokio-rs/tokio/issues/5041>. |
| cfg_has_atomic_u64! { |
| type UnsignedShort = u32; |
| type UnsignedLong = u64; |
| type AtomicUnsignedShort = crate::loom::sync::atomic::AtomicU32; |
| type AtomicUnsignedLong = crate::loom::sync::atomic::AtomicU64; |
| } |
| cfg_not_has_atomic_u64! { |
| type UnsignedShort = u16; |
| type UnsignedLong = u32; |
| type AtomicUnsignedShort = crate::loom::sync::atomic::AtomicU16; |
| type AtomicUnsignedLong = crate::loom::sync::atomic::AtomicU32; |
| } |
| |
| /// Producer handle. May only be used from a single thread. |
| pub(crate) struct Local<T: 'static> { |
| inner: Arc<Inner<T>>, |
| } |
| |
| /// Consumer handle. May be used from many threads. |
| pub(crate) struct Steal<T: 'static>(Arc<Inner<T>>); |
| |
| pub(crate) struct Inner<T: 'static> { |
| /// Concurrently updated by many threads. |
| /// |
| /// Contains two `UnsignedShort` values. The `LSB` byte is the "real" head of |
| /// the queue. The `UnsignedShort` in the `MSB` is set by a stealer in process |
| /// of stealing values. It represents the first value being stolen in the |
| /// batch. The `UnsignedShort` indices are intentionally wider than strictly |
| /// required for buffer indexing in order to provide ABA mitigation and make |
| /// it possible to distinguish between full and empty buffers. |
| /// |
| /// When both `UnsignedShort` values are the same, there is no active |
| /// stealer. |
| /// |
| /// Tracking an in-progress stealer prevents a wrapping scenario. |
| head: AtomicUnsignedLong, |
| |
| /// Only updated by producer thread but read by many threads. |
| tail: AtomicUnsignedShort, |
| |
| /// Elements |
| buffer: Box<[UnsafeCell<MaybeUninit<task::Notified<T>>>; LOCAL_QUEUE_CAPACITY]>, |
| } |
| |
| unsafe impl<T> Send for Inner<T> {} |
| unsafe impl<T> Sync for Inner<T> {} |
| |
| #[cfg(not(loom))] |
| const LOCAL_QUEUE_CAPACITY: usize = 256; |
| |
| // Shrink the size of the local queue when using loom. This shouldn't impact |
| // logic, but allows loom to test more edge cases in a reasonable a mount of |
| // time. |
| #[cfg(loom)] |
| const LOCAL_QUEUE_CAPACITY: usize = 4; |
| |
| const MASK: usize = LOCAL_QUEUE_CAPACITY - 1; |
| |
| // Constructing the fixed size array directly is very awkward. The only way to |
| // do it is to repeat `UnsafeCell::new(MaybeUninit::uninit())` 256 times, as |
| // the contents are not Copy. The trick with defining a const doesn't work for |
| // generic types. |
| fn make_fixed_size<T>(buffer: Box<[T]>) -> Box<[T; LOCAL_QUEUE_CAPACITY]> { |
| assert_eq!(buffer.len(), LOCAL_QUEUE_CAPACITY); |
| |
| // safety: We check that the length is correct. |
| unsafe { Box::from_raw(Box::into_raw(buffer).cast()) } |
| } |
| |
| /// Create a new local run-queue |
| pub(crate) fn local<T: 'static>() -> (Steal<T>, Local<T>) { |
| let mut buffer = Vec::with_capacity(LOCAL_QUEUE_CAPACITY); |
| |
| for _ in 0..LOCAL_QUEUE_CAPACITY { |
| buffer.push(UnsafeCell::new(MaybeUninit::uninit())); |
| } |
| |
| let inner = Arc::new(Inner { |
| head: AtomicUnsignedLong::new(0), |
| tail: AtomicUnsignedShort::new(0), |
| buffer: make_fixed_size(buffer.into_boxed_slice()), |
| }); |
| |
| let local = Local { |
| inner: inner.clone(), |
| }; |
| |
| let remote = Steal(inner); |
| |
| (remote, local) |
| } |
| |
| impl<T> Local<T> { |
| /// Returns the number of entries in the queue |
| pub(crate) fn len(&self) -> usize { |
| self.inner.len() as usize |
| } |
| |
| /// How many tasks can be pushed into the queue |
| pub(crate) fn remaining_slots(&self) -> usize { |
| self.inner.remaining_slots() |
| } |
| |
| pub(crate) fn max_capacity(&self) -> usize { |
| LOCAL_QUEUE_CAPACITY |
| } |
| |
| /// Returns false if there are any entries in the queue |
| /// |
| /// Separate to `is_stealable` so that refactors of `is_stealable` to "protect" |
| /// some tasks from stealing won't affect this |
| pub(crate) fn has_tasks(&self) -> bool { |
| !self.inner.is_empty() |
| } |
| |
| /// Pushes a batch of tasks to the back of the queue. All tasks must fit in |
| /// the local queue. |
| /// |
| /// # Panics |
| /// |
| /// The method panics if there is not enough capacity to fit in the queue. |
| pub(crate) fn push_back(&mut self, tasks: impl ExactSizeIterator<Item = task::Notified<T>>) { |
| let len = tasks.len(); |
| assert!(len <= LOCAL_QUEUE_CAPACITY); |
| |
| if len == 0 { |
| // Nothing to do |
| return; |
| } |
| |
| let head = self.inner.head.load(Acquire); |
| let (steal, _) = unpack(head); |
| |
| // safety: this is the **only** thread that updates this cell. |
| let mut tail = unsafe { self.inner.tail.unsync_load() }; |
| |
| if tail.wrapping_sub(steal) <= (LOCAL_QUEUE_CAPACITY - len) as UnsignedShort { |
| // Yes, this if condition is structured a bit weird (first block |
| // does nothing, second returns an error). It is this way to match |
| // `push_back_or_overflow`. |
| } else { |
| panic!() |
| } |
| |
| for task in tasks { |
| let idx = tail as usize & MASK; |
| |
| self.inner.buffer[idx].with_mut(|ptr| { |
| // Write the task to the slot |
| // |
| // Safety: There is only one producer and the above `if` |
| // condition ensures we don't touch a cell if there is a |
| // value, thus no consumer. |
| unsafe { |
| ptr::write((*ptr).as_mut_ptr(), task); |
| } |
| }); |
| |
| tail = tail.wrapping_add(1); |
| } |
| |
| self.inner.tail.store(tail, Release); |
| } |
| |
| /// Pushes a task to the back of the local queue, if there is not enough |
| /// capacity in the queue, this triggers the overflow operation. |
| /// |
| /// When the queue overflows, half of the current contents of the queue is |
| /// moved to the given Injection queue. This frees up capacity for more |
| /// tasks to be pushed into the local queue. |
| pub(crate) fn push_back_or_overflow<O: Overflow<T>>( |
| &mut self, |
| mut task: task::Notified<T>, |
| overflow: &O, |
| stats: &mut Stats, |
| ) { |
| let tail = loop { |
| let head = self.inner.head.load(Acquire); |
| let (steal, real) = unpack(head); |
| |
| // safety: this is the **only** thread that updates this cell. |
| let tail = unsafe { self.inner.tail.unsync_load() }; |
| |
| if tail.wrapping_sub(steal) < LOCAL_QUEUE_CAPACITY as UnsignedShort { |
| // There is capacity for the task |
| break tail; |
| } else if steal != real { |
| // Concurrently stealing, this will free up capacity, so only |
| // push the task onto the inject queue |
| overflow.push(task); |
| return; |
| } else { |
| // Push the current task and half of the queue into the |
| // inject queue. |
| match self.push_overflow(task, real, tail, overflow, stats) { |
| Ok(_) => return, |
| // Lost the race, try again |
| Err(v) => { |
| task = v; |
| } |
| } |
| } |
| }; |
| |
| self.push_back_finish(task, tail); |
| } |
| |
| // Second half of `push_back` |
| fn push_back_finish(&self, task: task::Notified<T>, tail: UnsignedShort) { |
| // Map the position to a slot index. |
| let idx = tail as usize & MASK; |
| |
| self.inner.buffer[idx].with_mut(|ptr| { |
| // Write the task to the slot |
| // |
| // Safety: There is only one producer and the above `if` |
| // condition ensures we don't touch a cell if there is a |
| // value, thus no consumer. |
| unsafe { |
| ptr::write((*ptr).as_mut_ptr(), task); |
| } |
| }); |
| |
| // Make the task available. Synchronizes with a load in |
| // `steal_into2`. |
| self.inner.tail.store(tail.wrapping_add(1), Release); |
| } |
| |
| /// Moves a batch of tasks into the inject queue. |
| /// |
| /// This will temporarily make some of the tasks unavailable to stealers. |
| /// Once `push_overflow` is done, a notification is sent out, so if other |
| /// workers "missed" some of the tasks during a steal, they will get |
| /// another opportunity. |
| #[inline(never)] |
| fn push_overflow<O: Overflow<T>>( |
| &mut self, |
| task: task::Notified<T>, |
| head: UnsignedShort, |
| tail: UnsignedShort, |
| overflow: &O, |
| stats: &mut Stats, |
| ) -> Result<(), task::Notified<T>> { |
| /// How many elements are we taking from the local queue. |
| /// |
| /// This is one less than the number of tasks pushed to the inject |
| /// queue as we are also inserting the `task` argument. |
| const NUM_TASKS_TAKEN: UnsignedShort = (LOCAL_QUEUE_CAPACITY / 2) as UnsignedShort; |
| |
| assert_eq!( |
| tail.wrapping_sub(head) as usize, |
| LOCAL_QUEUE_CAPACITY, |
| "queue is not full; tail = {}; head = {}", |
| tail, |
| head |
| ); |
| |
| let prev = pack(head, head); |
| |
| // Claim a bunch of tasks |
| // |
| // We are claiming the tasks **before** reading them out of the buffer. |
| // This is safe because only the **current** thread is able to push new |
| // tasks. |
| // |
| // There isn't really any need for memory ordering... Relaxed would |
| // work. This is because all tasks are pushed into the queue from the |
| // current thread (or memory has been acquired if the local queue handle |
| // moved). |
| if self |
| .inner |
| .head |
| .compare_exchange( |
| prev, |
| pack( |
| head.wrapping_add(NUM_TASKS_TAKEN), |
| head.wrapping_add(NUM_TASKS_TAKEN), |
| ), |
| Release, |
| Relaxed, |
| ) |
| .is_err() |
| { |
| // We failed to claim the tasks, losing the race. Return out of |
| // this function and try the full `push` routine again. The queue |
| // may not be full anymore. |
| return Err(task); |
| } |
| |
| /// An iterator that takes elements out of the run queue. |
| struct BatchTaskIter<'a, T: 'static> { |
| buffer: &'a [UnsafeCell<MaybeUninit<task::Notified<T>>>; LOCAL_QUEUE_CAPACITY], |
| head: UnsignedLong, |
| i: UnsignedLong, |
| } |
| impl<'a, T: 'static> Iterator for BatchTaskIter<'a, T> { |
| type Item = task::Notified<T>; |
| |
| #[inline] |
| fn next(&mut self) -> Option<task::Notified<T>> { |
| if self.i == UnsignedLong::from(NUM_TASKS_TAKEN) { |
| None |
| } else { |
| let i_idx = self.i.wrapping_add(self.head) as usize & MASK; |
| let slot = &self.buffer[i_idx]; |
| |
| // safety: Our CAS from before has assumed exclusive ownership |
| // of the task pointers in this range. |
| let task = slot.with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) }); |
| |
| self.i += 1; |
| Some(task) |
| } |
| } |
| } |
| |
| // safety: The CAS above ensures that no consumer will look at these |
| // values again, and we are the only producer. |
| let batch_iter = BatchTaskIter { |
| buffer: &self.inner.buffer, |
| head: head as UnsignedLong, |
| i: 0, |
| }; |
| overflow.push_batch(batch_iter.chain(std::iter::once(task))); |
| |
| // Add 1 to factor in the task currently being scheduled. |
| stats.incr_overflow_count(); |
| |
| Ok(()) |
| } |
| |
| /// Pops a task from the local queue. |
| pub(crate) fn pop(&mut self) -> Option<task::Notified<T>> { |
| let mut head = self.inner.head.load(Acquire); |
| |
| let idx = loop { |
| let (steal, real) = unpack(head); |
| |
| // safety: this is the **only** thread that updates this cell. |
| let tail = unsafe { self.inner.tail.unsync_load() }; |
| |
| if real == tail { |
| // queue is empty |
| return None; |
| } |
| |
| let next_real = real.wrapping_add(1); |
| |
| // If `steal == real` there are no concurrent stealers. Both `steal` |
| // and `real` are updated. |
| let next = if steal == real { |
| pack(next_real, next_real) |
| } else { |
| assert_ne!(steal, next_real); |
| pack(steal, next_real) |
| }; |
| |
| // Attempt to claim a task. |
| let res = self |
| .inner |
| .head |
| .compare_exchange(head, next, AcqRel, Acquire); |
| |
| match res { |
| Ok(_) => break real as usize & MASK, |
| Err(actual) => head = actual, |
| } |
| }; |
| |
| Some(self.inner.buffer[idx].with(|ptr| unsafe { ptr::read(ptr).assume_init() })) |
| } |
| } |
| |
| impl<T> Steal<T> { |
| pub(crate) fn is_empty(&self) -> bool { |
| self.0.is_empty() |
| } |
| |
| /// Steals half the tasks from self and place them into `dst`. |
| pub(crate) fn steal_into( |
| &self, |
| dst: &mut Local<T>, |
| dst_stats: &mut Stats, |
| ) -> Option<task::Notified<T>> { |
| // Safety: the caller is the only thread that mutates `dst.tail` and |
| // holds a mutable reference. |
| let dst_tail = unsafe { dst.inner.tail.unsync_load() }; |
| |
| // To the caller, `dst` may **look** empty but still have values |
| // contained in the buffer. If another thread is concurrently stealing |
| // from `dst` there may not be enough capacity to steal. |
| let (steal, _) = unpack(dst.inner.head.load(Acquire)); |
| |
| if dst_tail.wrapping_sub(steal) > LOCAL_QUEUE_CAPACITY as UnsignedShort / 2 { |
| // we *could* try to steal less here, but for simplicity, we're just |
| // going to abort. |
| return None; |
| } |
| |
| // Steal the tasks into `dst`'s buffer. This does not yet expose the |
| // tasks in `dst`. |
| let mut n = self.steal_into2(dst, dst_tail); |
| |
| if n == 0 { |
| // No tasks were stolen |
| return None; |
| } |
| |
| dst_stats.incr_steal_count(n as u16); |
| dst_stats.incr_steal_operations(); |
| |
| // We are returning a task here |
| n -= 1; |
| |
| let ret_pos = dst_tail.wrapping_add(n); |
| let ret_idx = ret_pos as usize & MASK; |
| |
| // safety: the value was written as part of `steal_into2` and not |
| // exposed to stealers, so no other thread can access it. |
| let ret = dst.inner.buffer[ret_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) }); |
| |
| if n == 0 { |
| // The `dst` queue is empty, but a single task was stolen |
| return Some(ret); |
| } |
| |
| // Make the stolen items available to consumers |
| dst.inner.tail.store(dst_tail.wrapping_add(n), Release); |
| |
| Some(ret) |
| } |
| |
| // Steal tasks from `self`, placing them into `dst`. Returns the number of |
| // tasks that were stolen. |
| fn steal_into2(&self, dst: &mut Local<T>, dst_tail: UnsignedShort) -> UnsignedShort { |
| let mut prev_packed = self.0.head.load(Acquire); |
| let mut next_packed; |
| |
| let n = loop { |
| let (src_head_steal, src_head_real) = unpack(prev_packed); |
| let src_tail = self.0.tail.load(Acquire); |
| |
| // If these two do not match, another thread is concurrently |
| // stealing from the queue. |
| if src_head_steal != src_head_real { |
| return 0; |
| } |
| |
| // Number of available tasks to steal |
| let n = src_tail.wrapping_sub(src_head_real); |
| let n = n - n / 2; |
| |
| if n == 0 { |
| // No tasks available to steal |
| return 0; |
| } |
| |
| // Update the real head index to acquire the tasks. |
| let steal_to = src_head_real.wrapping_add(n); |
| assert_ne!(src_head_steal, steal_to); |
| next_packed = pack(src_head_steal, steal_to); |
| |
| // Claim all those tasks. This is done by incrementing the "real" |
| // head but not the steal. By doing this, no other thread is able to |
| // steal from this queue until the current thread completes. |
| let res = self |
| .0 |
| .head |
| .compare_exchange(prev_packed, next_packed, AcqRel, Acquire); |
| |
| match res { |
| Ok(_) => break n, |
| Err(actual) => prev_packed = actual, |
| } |
| }; |
| |
| assert!( |
| n <= LOCAL_QUEUE_CAPACITY as UnsignedShort / 2, |
| "actual = {}", |
| n |
| ); |
| |
| let (first, _) = unpack(next_packed); |
| |
| // Take all the tasks |
| for i in 0..n { |
| // Compute the positions |
| let src_pos = first.wrapping_add(i); |
| let dst_pos = dst_tail.wrapping_add(i); |
| |
| // Map to slots |
| let src_idx = src_pos as usize & MASK; |
| let dst_idx = dst_pos as usize & MASK; |
| |
| // Read the task |
| // |
| // safety: We acquired the task with the atomic exchange above. |
| let task = self.0.buffer[src_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) }); |
| |
| // Write the task to the new slot |
| // |
| // safety: `dst` queue is empty and we are the only producer to |
| // this queue. |
| dst.inner.buffer[dst_idx] |
| .with_mut(|ptr| unsafe { ptr::write((*ptr).as_mut_ptr(), task) }); |
| } |
| |
| let mut prev_packed = next_packed; |
| |
| // Update `src_head_steal` to match `src_head_real` signalling that the |
| // stealing routine is complete. |
| loop { |
| let head = unpack(prev_packed).1; |
| next_packed = pack(head, head); |
| |
| let res = self |
| .0 |
| .head |
| .compare_exchange(prev_packed, next_packed, AcqRel, Acquire); |
| |
| match res { |
| Ok(_) => return n, |
| Err(actual) => { |
| let (actual_steal, actual_real) = unpack(actual); |
| |
| assert_ne!(actual_steal, actual_real); |
| |
| prev_packed = actual; |
| } |
| } |
| } |
| } |
| } |
| |
| cfg_metrics! { |
| impl<T> Steal<T> { |
| pub(crate) fn len(&self) -> usize { |
| self.0.len() as _ |
| } |
| } |
| } |
| |
| impl<T> Clone for Steal<T> { |
| fn clone(&self) -> Steal<T> { |
| Steal(self.0.clone()) |
| } |
| } |
| |
| impl<T> Drop for Local<T> { |
| fn drop(&mut self) { |
| if !std::thread::panicking() { |
| assert!(self.pop().is_none(), "queue not empty"); |
| } |
| } |
| } |
| |
| impl<T> Inner<T> { |
| fn remaining_slots(&self) -> usize { |
| let (steal, _) = unpack(self.head.load(Acquire)); |
| let tail = self.tail.load(Acquire); |
| |
| LOCAL_QUEUE_CAPACITY - (tail.wrapping_sub(steal) as usize) |
| } |
| |
| fn len(&self) -> UnsignedShort { |
| let (_, head) = unpack(self.head.load(Acquire)); |
| let tail = self.tail.load(Acquire); |
| |
| tail.wrapping_sub(head) |
| } |
| |
| fn is_empty(&self) -> bool { |
| self.len() == 0 |
| } |
| } |
| |
| /// Split the head value into the real head and the index a stealer is working |
| /// on. |
| fn unpack(n: UnsignedLong) -> (UnsignedShort, UnsignedShort) { |
| let real = n & UnsignedShort::MAX as UnsignedLong; |
| let steal = n >> (mem::size_of::<UnsignedShort>() * 8); |
| |
| (steal as UnsignedShort, real as UnsignedShort) |
| } |
| |
| /// Join the two head values |
| fn pack(steal: UnsignedShort, real: UnsignedShort) -> UnsignedLong { |
| (real as UnsignedLong) | ((steal as UnsignedLong) << (mem::size_of::<UnsignedShort>() * 8)) |
| } |
| |
| #[test] |
| fn test_local_queue_capacity() { |
| assert!(LOCAL_QUEUE_CAPACITY - 1 <= u8::MAX as usize); |
| } |