blob: f9637a4f7b225889aa1c1ce46a59d0050d2cab26 [file] [log] [blame]
use super::super::noop::*;
use super::super::plumbing::*;
use std::ptr;
use std::slice;
use std::sync::atomic::{AtomicUsize, Ordering};
pub struct CollectConsumer<'c, T: Send + 'c> {
/// Tracks how many items we successfully wrote. Used to guarantee
/// safety in the face of panics or buggy parallel iterators.
writes: &'c AtomicUsize,
/// A slice covering the target memory, not yet initialized!
target: &'c mut [T],
}
pub struct CollectFolder<'c, T: Send + 'c> {
global_writes: &'c AtomicUsize,
local_writes: usize,
/// An iterator over the *uninitialized* target memory.
target: slice::IterMut<'c, T>,
}
impl<'c, T: Send + 'c> CollectConsumer<'c, T> {
/// The target memory is considered uninitialized, and will be
/// overwritten without dropping anything.
pub fn new(writes: &'c AtomicUsize, target: &'c mut [T]) -> CollectConsumer<'c, T> {
CollectConsumer {
writes: writes,
target: target,
}
}
}
impl<'c, T: Send + 'c> Consumer<T> for CollectConsumer<'c, T> {
type Folder = CollectFolder<'c, T>;
type Reducer = NoopReducer;
type Result = ();
fn split_at(self, index: usize) -> (Self, Self, NoopReducer) {
// instances Read in the fields from `self` and then
// forget `self`, since it has been legitimately consumed
// (and not dropped during unwinding).
let CollectConsumer { writes, target } = self;
// Produce new consumers. Normal slicing ensures that the
// memory range given to each consumer is disjoint.
let (left, right) = target.split_at_mut(index);
(
CollectConsumer::new(writes, left),
CollectConsumer::new(writes, right),
NoopReducer,
)
}
fn into_folder(self) -> CollectFolder<'c, T> {
CollectFolder {
global_writes: self.writes,
local_writes: 0,
target: self.target.into_iter(),
}
}
fn full(&self) -> bool {
false
}
}
impl<'c, T: Send + 'c> Folder<T> for CollectFolder<'c, T> {
type Result = ();
fn consume(mut self, item: T) -> CollectFolder<'c, T> {
// Compute target pointer and write to it. Safe because the iterator
// does all the bounds checking; we're only avoiding the target drop.
let head = self
.target
.next()
.expect("too many values pushed to consumer");
unsafe {
ptr::write(head, item);
}
self.local_writes += 1;
self
}
fn complete(self) {
assert!(self.target.len() == 0, "too few values pushed to consumer");
// track total values written
self.global_writes
.fetch_add(self.local_writes, Ordering::Relaxed);
}
fn full(&self) -> bool {
false
}
}
/// Pretend to be unindexed for `special_collect_into_vec`,
/// but we should never actually get used that way...
impl<'c, T: Send + 'c> UnindexedConsumer<T> for CollectConsumer<'c, T> {
fn split_off_left(&self) -> Self {
unreachable!("CollectConsumer must be indexed!")
}
fn to_reducer(&self) -> Self::Reducer {
NoopReducer
}
}