blob: ea0c9c93be94093868e1d306cdeba51aeb37a440 [file] [log] [blame]
use std::sync::atomic::{AtomicBool, Ordering};
use gix_features::{
parallel::in_parallel_with_slice,
progress::{self, DynNestedProgress, Progress},
threading,
threading::{Mutable, OwnShared},
};
use crate::{
cache::delta::{traverse::util::ItemSliceSync, Item, Tree},
data::EntryRange,
};
mod resolve;
pub(crate) mod util;
/// Returned by [`Tree::traverse()`]
#[derive(thiserror::Error, Debug)]
#[allow(missing_docs)]
pub enum Error {
#[error("{message}")]
ZlibInflate {
source: gix_features::zlib::inflate::Error,
message: &'static str,
},
#[error("The resolver failed to obtain the pack entry bytes for the entry at {pack_offset}")]
ResolveFailed { pack_offset: u64 },
#[error("One of the object inspectors failed")]
Inspect(#[from] Box<dyn std::error::Error + Send + Sync>),
#[error("Interrupted")]
Interrupted,
#[error(
"The base at {base_pack_offset} was referred to by a ref-delta, but it was never added to the tree as if the pack was still thin."
)]
OutOfPackRefDelta {
/// The base's offset which was from a resolved ref-delta that didn't actually get added to the tree
base_pack_offset: crate::data::Offset,
},
#[error("Failed to spawn thread when switching to work-stealing mode")]
SpawnThread(#[from] std::io::Error),
}
/// Additional context passed to the `inspect_object(…)` function of the [`Tree::traverse()`] method.
pub struct Context<'a> {
/// The pack entry describing the object
pub entry: &'a crate::data::Entry,
/// The offset at which `entry` ends in the pack, useful to learn about the exact range of `entry` within the pack.
pub entry_end: u64,
/// The decompressed object itself, ready to be decoded.
pub decompressed: &'a [u8],
/// The depth at which this object resides in the delta-tree. It represents the amount of base objects, with 0 indicating
/// an 'undeltified' object, and higher values indicating delta objects with the given amount of bases.
pub level: u16,
}
/// Options for [`Tree::traverse()`].
pub struct Options<'a, 's> {
/// is a progress instance to track progress for each object in the traversal.
pub object_progress: Box<dyn DynNestedProgress>,
/// is a progress instance to track the overall progress.
pub size_progress: &'s mut dyn Progress,
/// If `Some`, only use the given amount of threads. Otherwise, the amount of threads to use will be selected based on
/// the amount of available logical cores.
pub thread_limit: Option<usize>,
/// Abort the operation if the value is `true`.
pub should_interrupt: &'a AtomicBool,
/// specifies what kind of hashes we expect to be stored in oid-delta entries, which is viable to decoding them
/// with the correct size.
pub object_hash: gix_hash::Kind,
}
/// The outcome of [`Tree::traverse()`]
pub struct Outcome<T> {
/// The items that have no children in the pack, i.e. base objects.
pub roots: Vec<Item<T>>,
/// The items that children to a root object, i.e. delta objects.
pub children: Vec<Item<T>>,
}
impl<T> Tree<T>
where
T: Send,
{
/// Traverse this tree of delta objects with a function `inspect_object` to process each object at will.
///
/// * `should_run_in_parallel() -> bool` returns true if the underlying pack is big enough to warrant parallel traversal at all.
/// * `resolve(EntrySlice, &mut Vec<u8>) -> Option<()>` resolves the bytes in the pack for the given `EntrySlice` and stores them in the
/// output vector. It returns `Some(())` if the object existed in the pack, or `None` to indicate a resolution error, which would abort the
/// operation as well.
/// * `pack_entries_end` marks one-past-the-last byte of the last entry in the pack, as the last entries size would otherwise
/// be unknown as it's not part of the index file.
/// * `inspect_object(node_data: &mut T, progress: Progress, context: Context<ThreadLocal State>) -> Result<(), CustomError>` is a function
/// running for each thread receiving fully decoded objects along with contextual information, which either succeeds with `Ok(())`
/// or returns a `CustomError`.
/// Note that `node_data` can be modified to allow storing maintaining computation results on a per-object basis. It should contain
/// its own mutable per-thread data as required.
///
/// This method returns a vector of all tree items, along with their potentially modified custom node data.
///
/// _Note_ that this method consumed the Tree to assure safe parallel traversal with mutation support.
pub fn traverse<F, MBFN, E, R>(
mut self,
resolve: F,
resolve_data: &R,
pack_entries_end: u64,
inspect_object: MBFN,
Options {
thread_limit,
mut object_progress,
size_progress,
should_interrupt,
object_hash,
}: Options<'_, '_>,
) -> Result<Outcome<T>, Error>
where
F: for<'r> Fn(EntryRange, &'r R) -> Option<&'r [u8]> + Send + Clone,
R: Send + Sync,
MBFN: FnMut(&mut T, &dyn Progress, Context<'_>) -> Result<(), E> + Send + Clone,
E: std::error::Error + Send + Sync + 'static,
{
self.set_pack_entries_end_and_resolve_ref_offsets(pack_entries_end)?;
let num_objects = self.num_items();
let object_counter = {
let progress = &mut object_progress;
progress.init(Some(num_objects), progress::count("objects"));
progress.counter()
};
size_progress.init(None, progress::bytes());
let size_counter = size_progress.counter();
let object_progress = OwnShared::new(Mutable::new(object_progress));
let start = std::time::Instant::now();
let (mut root_items, mut child_items_vec) = self.take_root_and_child();
let child_items = ItemSliceSync::new(&mut child_items_vec);
let child_items = &child_items;
in_parallel_with_slice(
&mut root_items,
thread_limit,
{
{
let object_progress = object_progress.clone();
move |thread_index| resolve::State {
delta_bytes: Vec::<u8>::with_capacity(4096),
fully_resolved_delta_bytes: Vec::<u8>::with_capacity(4096),
progress: Box::new(
threading::lock(&object_progress).add_child(format!("thread {thread_index}")),
),
resolve: resolve.clone(),
modify_base: inspect_object.clone(),
child_items,
}
}
},
{
move |node, state, threads_left, should_interrupt| {
// SAFETY: This invariant is upheld since `child_items` and `node` come from the same Tree.
// This means we can rely on Tree's invariant that node.children will be the only `children` array in
// for nodes in this tree that will contain any of those children.
#[allow(unsafe_code)]
unsafe {
resolve::deltas(
object_counter.clone(),
size_counter.clone(),
node,
state,
resolve_data,
object_hash.len_in_bytes(),
threads_left,
should_interrupt,
)
}
}
},
|| (!should_interrupt.load(Ordering::Relaxed)).then(|| std::time::Duration::from_millis(50)),
|_| (),
)?;
threading::lock(&object_progress).show_throughput(start);
size_progress.show_throughput(start);
Ok(Outcome {
roots: root_items,
children: child_items_vec,
})
}
}