blob: 3a5e0f4e833b1e9e24be5b9715d0ef7168e41cdf [file] [log] [blame]
use crate::{
cfg::{self, CfgPrivate},
clear::Clear,
page,
tid::Tid,
Pack,
};
use std::fmt;
// ┌─────────────┐ ┌────────┐
// │ page 1 │ │ │
// ├─────────────┤ ┌───▶│ next──┼─┐
// │ page 2 │ │ ├────────┤ │
// │ │ │ │XXXXXXXX│ │
// │ local_free──┼─┘ ├────────┤ │
// │ global_free─┼─┐ │ │◀┘
// ├─────────────┤ └───▶│ next──┼─┐
// │ page 3 │ ├────────┤ │
// └─────────────┘ │XXXXXXXX│ │
// ... ├────────┤ │
// ┌─────────────┐ │XXXXXXXX│ │
// │ page n │ ├────────┤ │
// └─────────────┘ │ │◀┘
// │ next──┼───▶
// ├────────┤
// │XXXXXXXX│
// └────────┘
// ...
pub(crate) struct Shard<T, C: cfg::Config> {
/// The shard's parent thread ID.
pub(crate) tid: usize,
/// The local free list for each page.
///
/// These are only ever accessed from this shard's thread, so they are
/// stored separately from the shared state for the page that can be
/// accessed concurrently, to minimize false sharing.
local: Box<[page::Local]>,
/// The shared state for each page in this shard.
///
/// This consists of the page's metadata (size, previous size), remote free
/// list, and a pointer to the actual array backing that page.
shared: Box<[page::Shared<T, C>]>,
}
impl<T, C> Shard<T, C>
where
C: cfg::Config,
{
#[inline(always)]
pub(crate) fn get<U>(
&self,
idx: usize,
f: impl FnOnce(&T) -> &U,
) -> Option<page::slot::Guard<'_, U, C>> {
debug_assert_eq!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
let (addr, page_index) = page::indices::<C>(idx);
test_println!("-> {:?}", addr);
if page_index > self.shared.len() {
return None;
}
self.shared[page_index].get(addr, idx, f)
}
pub(crate) fn new(tid: usize) -> Self {
let mut total_sz = 0;
let shared = (0..C::MAX_PAGES)
.map(|page_num| {
let sz = C::page_size(page_num);
let prev_sz = total_sz;
total_sz += sz;
page::Shared::new(sz, prev_sz)
})
.collect();
let local = (0..C::MAX_PAGES).map(|_| page::Local::new()).collect();
Self { tid, local, shared }
}
}
impl<T, C> Shard<Option<T>, C>
where
C: cfg::Config,
{
/// Remove an item on the shard's local thread.
pub(crate) fn take_local(&self, idx: usize) -> Option<T> {
debug_assert_eq!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
let (addr, page_index) = page::indices::<C>(idx);
test_println!("-> remove_local {:?}", addr);
self.shared
.get(page_index)?
.take(addr, C::unpack_gen(idx), self.local(page_index))
}
/// Remove an item, while on a different thread from the shard's local thread.
pub(crate) fn take_remote(&self, idx: usize) -> Option<T> {
debug_assert_eq!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
debug_assert!(Tid::<C>::current().as_usize() != self.tid);
let (addr, page_index) = page::indices::<C>(idx);
test_println!("-> take_remote {:?}; page {:?}", addr, page_index);
let shared = self.shared.get(page_index)?;
shared.take(addr, C::unpack_gen(idx), shared.free_list())
}
pub(crate) fn remove_local(&self, idx: usize) -> bool {
debug_assert_eq!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
let (addr, page_index) = page::indices::<C>(idx);
if page_index > self.shared.len() {
return false;
}
self.shared[page_index].remove(addr, C::unpack_gen(idx), self.local(page_index))
}
pub(crate) fn remove_remote(&self, idx: usize) -> bool {
debug_assert_eq!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
let (addr, page_index) = page::indices::<C>(idx);
if page_index > self.shared.len() {
return false;
}
let shared = &self.shared[page_index];
shared.remove(addr, C::unpack_gen(idx), shared.free_list())
}
pub(crate) fn iter<'a>(&'a self) -> std::slice::Iter<'a, page::Shared<Option<T>, C>> {
self.shared.iter()
}
}
impl<T, C> Shard<T, C>
where
T: Clear + Default,
C: cfg::Config,
{
pub(crate) fn init_with<F>(&self, mut func: F) -> Option<usize>
where
F: FnMut(&page::slot::Slot<T, C>) -> Option<page::slot::Generation<C>>,
{
// Can we fit the value into an existing page?
for (page_idx, page) in self.shared.iter().enumerate() {
let local = self.local(page_idx);
test_println!("-> page {}; {:?}; {:?}", page_idx, local, page);
if let Some(poff) = page.init_with(local, &mut func) {
return Some(poff);
}
}
None
}
pub(crate) fn mark_clear_local(&self, idx: usize) -> bool {
debug_assert_eq!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
let (addr, page_index) = page::indices::<C>(idx);
if page_index > self.shared.len() {
return false;
}
self.shared[page_index].mark_clear(addr, C::unpack_gen(idx), self.local(page_index))
}
pub(crate) fn mark_clear_remote(&self, idx: usize) -> bool {
debug_assert_eq!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
let (addr, page_index) = page::indices::<C>(idx);
if page_index > self.shared.len() {
return false;
}
let shared = &self.shared[page_index];
shared.mark_clear(addr, C::unpack_gen(idx), shared.free_list())
}
#[inline(always)]
fn local(&self, i: usize) -> &page::Local {
#[cfg(debug_assertions)]
debug_assert_eq!(
Tid::<C>::current().as_usize(),
self.tid,
"tried to access local data from another thread!"
);
&self.local[i]
}
}
impl<T: fmt::Debug, C: cfg::Config> fmt::Debug for Shard<T, C> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut d = f.debug_struct("Shard");
#[cfg(debug_assertions)]
d.field("tid", &self.tid);
d.field("shared", &self.shared).finish()
}
}