blob: 7cbf215c4d42d7b5e9013bc00ee374013f5cd171 [file] [log] [blame]
use super::{IndexedParallelIterator, IntoParallelIterator, ParallelExtend, ParallelIterator};
use std::mem::MaybeUninit;
use std::slice;
mod consumer;
use self::consumer::CollectConsumer;
use self::consumer::CollectResult;
use super::unzip::unzip_indexed;
mod test;
/// Collects the results of the exact iterator into the specified vector.
///
/// This is called by `IndexedParallelIterator::collect_into_vec`.
pub(super) fn collect_into_vec<I, T>(pi: I, v: &mut Vec<T>)
where
I: IndexedParallelIterator<Item = T>,
T: Send,
{
v.truncate(0); // clear any old data
let len = pi.len();
Collect::new(v, len).with_consumer(|consumer| pi.drive(consumer));
}
/// Collects the results of the iterator into the specified vector.
///
/// Technically, this only works for `IndexedParallelIterator`, but we're faking a
/// bit of specialization here until Rust can do that natively. Callers are
/// using `opt_len` to find the length before calling this, and only exact
/// iterators will return anything but `None` there.
///
/// Since the type system doesn't understand that contract, we have to allow
/// *any* `ParallelIterator` here, and `CollectConsumer` has to also implement
/// `UnindexedConsumer`. That implementation panics `unreachable!` in case
/// there's a bug where we actually do try to use this unindexed.
fn special_extend<I, T>(pi: I, len: usize, v: &mut Vec<T>)
where
I: ParallelIterator<Item = T>,
T: Send,
{
Collect::new(v, len).with_consumer(|consumer| pi.drive_unindexed(consumer));
}
/// Unzips the results of the exact iterator into the specified vectors.
///
/// This is called by `IndexedParallelIterator::unzip_into_vecs`.
pub(super) fn unzip_into_vecs<I, A, B>(pi: I, left: &mut Vec<A>, right: &mut Vec<B>)
where
I: IndexedParallelIterator<Item = (A, B)>,
A: Send,
B: Send,
{
// clear any old data
left.truncate(0);
right.truncate(0);
let len = pi.len();
Collect::new(right, len).with_consumer(|right_consumer| {
let mut right_result = None;
Collect::new(left, len).with_consumer(|left_consumer| {
let (left_r, right_r) = unzip_indexed(pi, left_consumer, right_consumer);
right_result = Some(right_r);
left_r
});
right_result.unwrap()
});
}
/// Manage the collection vector.
struct Collect<'c, T: Send> {
vec: &'c mut Vec<T>,
len: usize,
}
impl<'c, T: Send + 'c> Collect<'c, T> {
fn new(vec: &'c mut Vec<T>, len: usize) -> Self {
Collect { vec, len }
}
/// Create a consumer on the slice of memory we are collecting into.
///
/// The consumer needs to be used inside the scope function, and the
/// complete collect result passed back.
///
/// This method will verify the collect result, and panic if the slice
/// was not fully written into. Otherwise, in the successful case,
/// the vector is complete with the collected result.
fn with_consumer<F>(mut self, scope_fn: F)
where
F: FnOnce(CollectConsumer<'_, T>) -> CollectResult<'_, T>,
{
let slice = Self::reserve_get_tail_slice(&mut self.vec, self.len);
let result = scope_fn(CollectConsumer::new(slice));
// The CollectResult represents a contiguous part of the
// slice, that has been written to.
// On unwind here, the CollectResult will be dropped.
// If some producers on the way did not produce enough elements,
// partial CollectResults may have been dropped without
// being reduced to the final result, and we will see
// that as the length coming up short.
//
// Here, we assert that `slice` is fully initialized. This is
// checked by the following assert, which verifies if a
// complete CollectResult was produced; if the length is
// correct, it is necessarily covering the target slice.
// Since we know that the consumer cannot have escaped from
// `drive` (by parametricity, essentially), we know that any
// stores that will happen, have happened. Unless some code is buggy,
// that means we should have seen `len` total writes.
let actual_writes = result.len();
assert!(
actual_writes == self.len,
"expected {} total writes, but got {}",
self.len,
actual_writes
);
// Release the result's mutable borrow and "proxy ownership"
// of the elements, before the vector takes it over.
result.release_ownership();
let new_len = self.vec.len() + self.len;
unsafe {
self.vec.set_len(new_len);
}
}
/// Reserve space for `len` more elements in the vector,
/// and return a slice to the uninitialized tail of the vector
fn reserve_get_tail_slice(vec: &mut Vec<T>, len: usize) -> &mut [MaybeUninit<T>] {
// Reserve the new space.
vec.reserve(len);
// TODO: use `Vec::spare_capacity_mut` instead
// SAFETY: `MaybeUninit<T>` is guaranteed to have the same layout
// as `T`, and we already made sure to have the additional space.
let start = vec.len();
let tail_ptr = vec[start..].as_mut_ptr() as *mut MaybeUninit<T>;
unsafe { slice::from_raw_parts_mut(tail_ptr, len) }
}
}
/// Extends a vector with items from a parallel iterator.
impl<T> ParallelExtend<T> for Vec<T>
where
T: Send,
{
fn par_extend<I>(&mut self, par_iter: I)
where
I: IntoParallelIterator<Item = T>,
{
// See the vec_collect benchmarks in rayon-demo for different strategies.
let par_iter = par_iter.into_par_iter();
match par_iter.opt_len() {
Some(len) => {
// When Rust gets specialization, we can get here for indexed iterators
// without relying on `opt_len`. Until then, `special_extend()` fakes
// an unindexed mode on the promise that `opt_len()` is accurate.
special_extend(par_iter, len, self);
}
None => {
// This works like `extend`, but `Vec::append` is more efficient.
let list = super::extend::collect(par_iter);
self.reserve(super::extend::len(&list));
for mut vec in list {
self.append(&mut vec);
}
}
}
}
}