blob: 60ecaa317501dd29b3f76af901328ebd9d06aed3 [file] [log] [blame]
//! This module implements the job queue which determines the ordering in which
//! rustc is spawned off. It also manages the allocation of jobserver tokens to
//! rustc beyond the implicit token each rustc owns (i.e., the ones used for
//! parallel LLVM work and parallel rustc threads).
//!
//! Cargo and rustc have a somewhat non-trivial jobserver relationship with each
//! other, which is due to scaling issues with sharing a single jobserver
//! amongst what is potentially hundreds of threads of work on many-cored
//! systems on (at least) linux, and likely other platforms as well.
//!
//! The details of this algorithm are (also) written out in
//! src/librustc_jobserver/lib.rs. What follows is a description focusing on the
//! Cargo side of things.
//!
//! Cargo wants to complete the build as quickly as possible, fully saturating
//! all cores (as constrained by the -j=N) parameter. Cargo also must not spawn
//! more than N threads of work: the total amount of tokens we have floating
//! around must always be limited to N.
//!
//! It is not really possible to optimally choose which crate should build first
//! or last; nor is it possible to decide whether to give an additional token to
//! rustc first or rather spawn a new crate of work. For now, the algorithm we
//! implement prioritizes spawning as many crates (i.e., rustc processes) as
//! possible, and then filling each rustc with tokens on demand.
//!
//! The primary loop is in `drain_the_queue` below.
//!
//! We integrate with the jobserver, originating from GNU make, to make sure
//! that build scripts which use make to build C code can cooperate with us on
//! the number of used tokens and avoid overfilling the system we're on.
//!
//! The jobserver is unfortunately a very simple protocol, so we enhance it a
//! little when we know that there is a rustc on the other end. Via the stderr
//! pipe we have to rustc, we get messages such as "NeedsToken" and
//! "ReleaseToken" from rustc.
//!
//! "NeedsToken" indicates that a rustc is interested in acquiring a token, but
//! never that it would be impossible to make progress without one (i.e., it
//! would be incorrect for rustc to not terminate due to a unfulfilled
//! NeedsToken request); we do not usually fulfill all NeedsToken requests for a
//! given rustc.
//!
//! "ReleaseToken" indicates that a rustc is done with one of its tokens and is
//! ready for us to re-acquire ownership -- we will either release that token
//! back into the general pool or reuse it ourselves. Note that rustc will
//! inform us that it is releasing a token even if it itself is also requesting
//! tokens; is is up to us whether to return the token to that same rustc.
//!
//! The current scheduling algorithm is relatively primitive and could likely be
//! improved.
use std::cell::Cell;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::io;
use std::marker;
use std::mem;
use std::sync::Arc;
use std::time::Duration;
use anyhow::format_err;
use crossbeam_channel::{unbounded, Receiver, Sender};
use crossbeam_utils::thread::Scope;
use jobserver::{Acquired, Client, HelperThread};
use log::{debug, info, trace};
use super::context::OutputFile;
use super::job::{
Freshness::{self, Dirty, Fresh},
Job,
};
use super::timings::Timings;
use super::{BuildContext, BuildPlan, CompileMode, Context, Unit};
use crate::core::{PackageId, TargetKind};
use crate::util;
use crate::util::diagnostic_server::{self, DiagnosticPrinter};
use crate::util::{internal, profile, CargoResult, CargoResultExt, ProcessBuilder};
use crate::util::{Config, DependencyQueue};
use crate::util::{Progress, ProgressStyle};
/// This structure is backed by the `DependencyQueue` type and manages the
/// queueing of compilation steps for each package. Packages enqueue units of
/// work and then later on the entire graph is converted to DrainState and
/// executed.
pub struct JobQueue<'a, 'cfg> {
queue: DependencyQueue<Unit<'a>, Artifact, Job>,
counts: HashMap<PackageId, usize>,
timings: Timings<'a, 'cfg>,
}
/// This structure is backed by the `DependencyQueue` type and manages the
/// actual compilation step of each package. Packages enqueue units of work and
/// then later on the entire graph is processed and compiled.
///
/// It is created from JobQueue when we have fully assembled the crate graph
/// (i.e., all package dependencies are known).
struct DrainState<'a, 'cfg> {
// This is the length of the DependencyQueue when starting out
total_units: usize,
queue: DependencyQueue<Unit<'a>, Artifact, Job>,
tx: Sender<Message>,
rx: Receiver<Message>,
active: HashMap<JobId, Unit<'a>>,
compiled: HashSet<PackageId>,
documented: HashSet<PackageId>,
counts: HashMap<PackageId, usize>,
progress: Progress<'cfg>,
next_id: u32,
timings: Timings<'a, 'cfg>,
/// Tokens that are currently owned by this Cargo, and may be "associated"
/// with a rustc process. They may also be unused, though if so will be
/// dropped on the next loop iteration.
///
/// Note that the length of this may be zero, but we will still spawn work,
/// as we share the implicit token given to this Cargo process with a
/// single rustc process.
tokens: Vec<Acquired>,
/// rustc per-thread tokens, when in jobserver-per-rustc mode.
rustc_tokens: HashMap<JobId, Vec<Acquired>>,
/// This represents the list of rustc jobs (processes) and associated
/// clients that are interested in receiving a token.
to_send_clients: BTreeMap<JobId, Vec<Client>>,
/// The list of jobs that we have not yet started executing, but have
/// retrieved from the `queue`. We eagerly pull jobs off the main queue to
/// allow us to request jobserver tokens pretty early.
pending_queue: Vec<(Unit<'a>, Job)>,
print: DiagnosticPrinter<'cfg>,
// How many jobs we've finished
finished: usize,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct JobId(pub u32);
impl std::fmt::Display for JobId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
pub struct JobState<'a> {
/// Channel back to the main thread to coordinate messages and such.
tx: Sender<Message>,
/// The job id that this state is associated with, used when sending
/// messages back to the main thread.
id: JobId,
/// Whether or not we're expected to have a call to `rmeta_produced`. Once
/// that method is called this is dynamically set to `false` to prevent
/// sending a double message later on.
rmeta_required: Cell<bool>,
// Historical versions of Cargo made use of the `'a` argument here, so to
// leave the door open to future refactorings keep it here.
_marker: marker::PhantomData<&'a ()>,
}
/// Possible artifacts that can be produced by compilations, used as edge values
/// in the dependency graph.
///
/// As edge values we can have multiple kinds of edges depending on one node,
/// for example some units may only depend on the metadata for an rlib while
/// others depend on the full rlib. This `Artifact` enum is used to distinguish
/// this case and track the progress of compilations as they proceed.
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
enum Artifact {
/// A generic placeholder for "depends on everything run by a step" and
/// means that we can't start the next compilation until the previous has
/// finished entirely.
All,
/// A node indicating that we only depend on the metadata of a compilation,
/// but the compilation is typically also producing an rlib. We can start
/// our step, however, before the full rlib is available.
Metadata,
}
enum Message {
Run(JobId, String),
BuildPlanMsg(String, ProcessBuilder, Arc<Vec<OutputFile>>),
Stdout(String),
Stderr(String),
FixDiagnostic(diagnostic_server::Message),
Token(io::Result<Acquired>),
Finish(JobId, Artifact, CargoResult<()>),
// This client should get release_raw called on it with one of our tokens
NeedsToken(JobId),
// A token previously passed to a NeedsToken client is being released.
ReleaseToken(JobId),
}
impl<'a> JobState<'a> {
pub fn running(&self, cmd: &ProcessBuilder) {
let _ = self.tx.send(Message::Run(self.id, cmd.to_string()));
}
pub fn build_plan(
&self,
module_name: String,
cmd: ProcessBuilder,
filenames: Arc<Vec<OutputFile>>,
) {
let _ = self
.tx
.send(Message::BuildPlanMsg(module_name, cmd, filenames));
}
pub fn stdout(&self, stdout: String) {
drop(self.tx.send(Message::Stdout(stdout)));
}
pub fn stderr(&self, stderr: String) {
drop(self.tx.send(Message::Stderr(stderr)));
}
/// A method used to signal to the coordinator thread that the rmeta file
/// for an rlib has been produced. This is only called for some rmeta
/// builds when required, and can be called at any time before a job ends.
/// This should only be called once because a metadata file can only be
/// produced once!
pub fn rmeta_produced(&self) {
self.rmeta_required.set(false);
let _ = self
.tx
.send(Message::Finish(self.id, Artifact::Metadata, Ok(())));
}
/// The rustc underlying this Job is about to acquire a jobserver token (i.e., block)
/// on the passed client.
///
/// This should arrange for the associated client to eventually get a token via
/// `client.release_raw()`.
pub fn will_acquire(&self) {
let _ = self.tx.send(Message::NeedsToken(self.id));
}
/// The rustc underlying this Job is informing us that it is done with a jobserver token.
///
/// Note that it does *not* write that token back anywhere.
pub fn release_token(&self) {
let _ = self.tx.send(Message::ReleaseToken(self.id));
}
}
impl<'a, 'cfg> JobQueue<'a, 'cfg> {
pub fn new(bcx: &BuildContext<'a, 'cfg>, root_units: &[Unit<'a>]) -> JobQueue<'a, 'cfg> {
JobQueue {
queue: DependencyQueue::new(),
counts: HashMap::new(),
timings: Timings::new(bcx, root_units),
}
}
pub fn enqueue(
&mut self,
cx: &Context<'a, 'cfg>,
unit: &Unit<'a>,
job: Job,
) -> CargoResult<()> {
let dependencies = cx.unit_deps(unit);
let mut queue_deps = dependencies
.iter()
.filter(|dep| {
// Binaries aren't actually needed to *compile* tests, just to run
// them, so we don't include this dependency edge in the job graph.
!dep.unit.target.is_test() && !dep.unit.target.is_bin()
})
.map(|dep| {
// Handle the case here where our `unit -> dep` dependency may
// only require the metadata, not the full compilation to
// finish. Use the tables in `cx` to figure out what kind
// of artifact is associated with this dependency.
let artifact = if cx.only_requires_rmeta(unit, &dep.unit) {
Artifact::Metadata
} else {
Artifact::All
};
(dep.unit, artifact)
})
.collect::<HashMap<_, _>>();
// This is somewhat tricky, but we may need to synthesize some
// dependencies for this target if it requires full upstream
// compilations to have completed. If we're in pipelining mode then some
// dependency edges may be `Metadata` due to the above clause (as
// opposed to everything being `All`). For example consider:
//
// a (binary)
// â”” b (lib)
// â”” c (lib)
//
// Here the dependency edge from B to C will be `Metadata`, and the
// dependency edge from A to B will be `All`. For A to be compiled,
// however, it currently actually needs the full rlib of C. This means
// that we need to synthesize a dependency edge for the dependency graph
// from A to C. That's done here.
//
// This will walk all dependencies of the current target, and if any of
// *their* dependencies are `Metadata` then we depend on the `All` of
// the target as well. This should ensure that edges changed to
// `Metadata` propagate upwards `All` dependencies to anything that
// transitively contains the `Metadata` edge.
if unit.requires_upstream_objects() {
for dep in dependencies {
depend_on_deps_of_deps(cx, &mut queue_deps, dep.unit);
}
fn depend_on_deps_of_deps<'a>(
cx: &Context<'a, '_>,
deps: &mut HashMap<Unit<'a>, Artifact>,
unit: Unit<'a>,
) {
for dep in cx.unit_deps(&unit) {
if deps.insert(dep.unit, Artifact::All).is_none() {
depend_on_deps_of_deps(cx, deps, dep.unit);
}
}
}
}
self.queue.queue(*unit, job, queue_deps);
*self.counts.entry(unit.pkg.package_id()).or_insert(0) += 1;
Ok(())
}
/// Executes all jobs necessary to build the dependency graph.
///
/// This function will spawn off `config.jobs()` workers to build all of the
/// necessary dependencies, in order. Freshness is propagated as far as
/// possible along each dependency chain.
pub fn execute(mut self, cx: &mut Context<'a, '_>, plan: &mut BuildPlan) -> CargoResult<()> {
let _p = profile::start("executing the job graph");
self.queue.queue_finished();
let (tx, rx) = unbounded();
let progress = Progress::with_style("Building", ProgressStyle::Ratio, cx.bcx.config);
let state = DrainState {
total_units: self.queue.len(),
queue: self.queue,
tx,
rx,
active: HashMap::new(),
compiled: HashSet::new(),
documented: HashSet::new(),
counts: self.counts,
progress,
next_id: 0,
timings: self.timings,
tokens: Vec::new(),
rustc_tokens: HashMap::new(),
to_send_clients: BTreeMap::new(),
pending_queue: Vec::new(),
print: DiagnosticPrinter::new(cx.bcx.config),
finished: 0,
};
// Create a helper thread for acquiring jobserver tokens
let tx = state.tx.clone();
let helper = cx
.jobserver
.clone()
.into_helper_thread(move |token| {
drop(tx.send(Message::Token(token)));
})
.chain_err(|| "failed to create helper thread for jobserver management")?;
// Create a helper thread to manage the diagnostics for rustfix if
// necessary.
let tx = state.tx.clone();
let _diagnostic_server = cx
.bcx
.build_config
.rustfix_diagnostic_server
.borrow_mut()
.take()
.map(move |srv| srv.start(move |msg| drop(tx.send(Message::FixDiagnostic(msg)))));
crossbeam_utils::thread::scope(move |scope| state.drain_the_queue(cx, plan, scope, &helper))
.expect("child threads shouldn't panic")
}
}
impl<'a, 'cfg> DrainState<'a, 'cfg> {
fn spawn_work_if_possible(
&mut self,
cx: &mut Context<'a, '_>,
jobserver_helper: &HelperThread,
scope: &Scope<'_>,
has_errored: bool,
) -> CargoResult<()> {
// Dequeue as much work as we can, learning about everything
// possible that can run. Note that this is also the point where we
// start requesting job tokens. Each job after the first needs to
// request a token.
while let Some((unit, job)) = self.queue.dequeue() {
self.pending_queue.push((unit, job));
if self.active.len() + self.pending_queue.len() > 1 {
jobserver_helper.request_token();
}
}
// Do not actually spawn the new work if we've errored out
if has_errored {
return Ok(());
}
// Now that we've learned of all possible work that we can execute
// try to spawn it so long as we've got a jobserver token which says
// we're able to perform some parallel work.
while self.has_extra_tokens() && !self.pending_queue.is_empty() {
let (unit, job) = self.pending_queue.remove(0);
self.run(&unit, job, cx, scope)?;
}
Ok(())
}
fn has_extra_tokens(&self) -> bool {
self.active.len() < self.tokens.len() + 1
}
// The oldest job (i.e., least job ID) is the one we grant tokens to first.
fn pop_waiting_client(&mut self) -> (JobId, Client) {
// FIXME: replace this with BTreeMap::first_entry when that stabilizes.
let key = *self
.to_send_clients
.keys()
.next()
.expect("at least one waiter");
let clients = self.to_send_clients.get_mut(&key).unwrap();
let client = clients.pop().unwrap();
if clients.is_empty() {
self.to_send_clients.remove(&key);
}
(key, client)
}
// If we managed to acquire some extra tokens, send them off to a waiting rustc.
fn grant_rustc_token_requests(&mut self) -> CargoResult<()> {
while !self.to_send_clients.is_empty() && self.has_extra_tokens() {
let (id, client) = self.pop_waiting_client();
// This unwrap is guaranteed to succeed. `active` must be at least
// length 1, as otherwise there can't be a client waiting to be sent
// on, so tokens.len() must also be at least one.
let token = self.tokens.pop().unwrap();
self.rustc_tokens
.entry(id)
.or_insert_with(Vec::new)
.push(token);
client
.release_raw()
.chain_err(|| "failed to release jobserver token")?;
}
Ok(())
}
fn handle_event(
&mut self,
cx: &mut Context<'a, '_>,
jobserver_helper: &HelperThread,
plan: &mut BuildPlan,
event: Message,
) -> CargoResult<Option<anyhow::Error>> {
match event {
Message::Run(id, cmd) => {
cx.bcx
.config
.shell()
.verbose(|c| c.status("Running", &cmd))?;
self.timings.unit_start(id, self.active[&id]);
}
Message::BuildPlanMsg(module_name, cmd, filenames) => {
plan.update(&module_name, &cmd, &filenames)?;
}
Message::Stdout(out) => {
cx.bcx.config.shell().stdout_println(out);
}
Message::Stderr(err) => {
let mut shell = cx.bcx.config.shell();
shell.print_ansi(err.as_bytes())?;
shell.err().write_all(b"\n")?;
}
Message::FixDiagnostic(msg) => {
self.print.print(&msg)?;
}
Message::Finish(id, artifact, result) => {
let unit = match artifact {
// If `id` has completely finished we remove it
// from the `active` map ...
Artifact::All => {
info!("end: {:?}", id);
self.finished += 1;
if let Some(rustc_tokens) = self.rustc_tokens.remove(&id) {
// This puts back the tokens that this rustc
// acquired into our primary token list.
//
// This represents a rustc bug: it did not
// release all of its thread tokens but finished
// completely. But we want to make Cargo resilient
// to such rustc bugs, as they're generally not
// fatal in nature (i.e., Cargo can make progress
// still, and the build might not even fail).
self.tokens.extend(rustc_tokens);
}
self.to_send_clients.remove(&id);
self.active.remove(&id).unwrap()
}
// ... otherwise if it hasn't finished we leave it
// in there as we'll get another `Finish` later on.
Artifact::Metadata => {
info!("end (meta): {:?}", id);
self.active[&id]
}
};
info!("end ({:?}): {:?}", unit, result);
match result {
Ok(()) => self.finish(id, &unit, artifact, cx)?,
Err(e) => {
let msg = "The following warnings were emitted during compilation:";
self.emit_warnings(Some(msg), &unit, cx)?;
if !self.active.is_empty() {
crate::display_error(&e, &mut *cx.bcx.config.shell());
cx.bcx.config.shell().warn(
"build failed, waiting for other \
jobs to finish...",
)?;
return Ok(Some(anyhow::format_err!("build failed")));
} else {
return Ok(Some(e));
}
}
}
}
Message::Token(acquired_token) => {
let token = acquired_token.chain_err(|| "failed to acquire jobserver token")?;
self.tokens.push(token);
}
Message::NeedsToken(id) => {
log::info!("queue token request");
jobserver_helper.request_token();
let client = cx.rustc_clients[&self.active[&id]].clone();
self.to_send_clients
.entry(id)
.or_insert_with(Vec::new)
.push(client);
}
Message::ReleaseToken(id) => {
// Note that this pops off potentially a completely
// different token, but all tokens of the same job are
// conceptually the same so that's fine.
//
// self.tokens is a "pool" -- the order doesn't matter -- and
// this transfers ownership of the token into that pool. If we
// end up using it on the next go around, then this token will
// be truncated, same as tokens obtained through Message::Token.
let rustc_tokens = self
.rustc_tokens
.get_mut(&id)
.expect("no tokens associated");
self.tokens
.push(rustc_tokens.pop().expect("rustc releases token it has"));
}
}
Ok(None)
}
// This will also tick the progress bar as appropriate
fn wait_for_events(&mut self) -> Vec<Message> {
// Drain all events at once to avoid displaying the progress bar
// unnecessarily. If there's no events we actually block waiting for
// an event, but we keep a "heartbeat" going to allow `record_cpu`
// to run above to calculate CPU usage over time. To do this we
// listen for a message with a timeout, and on timeout we run the
// previous parts of the loop again.
let events: Vec<_> = self.rx.try_iter().collect();
info!(
"tokens in use: {}, rustc_tokens: {:?}, waiting_rustcs: {:?} (events this tick: {})",
self.tokens.len(),
self.rustc_tokens
.iter()
.map(|(k, j)| (k, j.len()))
.collect::<Vec<_>>(),
self.to_send_clients
.iter()
.map(|(k, j)| (k, j.len()))
.collect::<Vec<_>>(),
events.len(),
);
if events.is_empty() {
loop {
self.tick_progress();
self.tokens.truncate(self.active.len() - 1);
match self.rx.recv_timeout(Duration::from_millis(500)) {
Ok(message) => break vec![message],
Err(_) => continue,
}
}
} else {
events
}
}
fn drain_the_queue(
mut self,
cx: &mut Context<'a, '_>,
plan: &mut BuildPlan,
scope: &Scope<'a>,
jobserver_helper: &HelperThread,
) -> CargoResult<()> {
trace!("queue: {:#?}", self.queue);
// Iteratively execute the entire dependency graph. Each turn of the
// loop starts out by scheduling as much work as possible (up to the
// maximum number of parallel jobs we have tokens for). A local queue
// is maintained separately from the main dependency queue as one
// dequeue may actually dequeue quite a bit of work (e.g., 10 binaries
// in one package).
//
// After a job has finished we update our internal state if it was
// successful and otherwise wait for pending work to finish if it failed
// and then immediately return.
let mut error = None;
loop {
self.spawn_work_if_possible(cx, jobserver_helper, scope, error.is_some())?;
// If after all that we're not actually running anything then we're
// done!
if self.active.is_empty() {
break;
}
self.grant_rustc_token_requests()?;
// And finally, before we block waiting for the next event, drop any
// excess tokens we may have accidentally acquired. Due to how our
// jobserver interface is architected we may acquire a token that we
// don't actually use, and if this happens just relinquish it back
// to the jobserver itself.
for event in self.wait_for_events() {
if let Some(err) = self.handle_event(cx, jobserver_helper, plan, event)? {
error = Some(err);
}
}
}
self.progress.clear();
let profile_name = cx.bcx.build_config.requested_profile;
// NOTE: this may be a bit inaccurate, since this may not display the
// profile for what was actually built. Profile overrides can change
// these settings, and in some cases different targets are built with
// different profiles. To be accurate, it would need to collect a
// list of Units built, and maybe display a list of the different
// profiles used. However, to keep it simple and compatible with old
// behavior, we just display what the base profile is.
let profile = cx.bcx.profiles.base_profile();
let mut opt_type = String::from(if profile.opt_level.as_str() == "0" {
"unoptimized"
} else {
"optimized"
});
if profile.debuginfo.unwrap_or(0) != 0 {
opt_type += " + debuginfo";
}
let time_elapsed = util::elapsed(cx.bcx.config.creation_time().elapsed());
self.timings.finished(cx.bcx, &error)?;
if let Some(e) = error {
Err(e)
} else if self.queue.is_empty() && self.pending_queue.is_empty() {
let message = format!(
"{} [{}] target(s) in {}",
profile_name, opt_type, time_elapsed
);
if !cx.bcx.build_config.build_plan {
cx.bcx.config.shell().status("Finished", message)?;
}
Ok(())
} else {
debug!("queue: {:#?}", self.queue);
Err(internal("finished with jobs still left in the queue"))
}
}
// This also records CPU usage and marks concurrency; we roughly want to do
// this as often as we spin on the events receiver (at least every 500ms or
// so).
fn tick_progress(&mut self) {
// Record some timing information if `-Ztimings` is enabled, and
// this'll end up being a noop if we're not recording this
// information.
self.timings.mark_concurrency(
self.active.len(),
self.pending_queue.len(),
self.queue.len(),
self.rustc_tokens.len(),
);
self.timings.record_cpu();
let active_names = self
.active
.values()
.map(|u| self.name_for_progress(u))
.collect::<Vec<_>>();
drop(self.progress.tick_now(
self.finished,
self.total_units,
&format!(": {}", active_names.join(", ")),
));
}
fn name_for_progress(&self, unit: &Unit<'_>) -> String {
let pkg_name = unit.pkg.name();
match unit.mode {
CompileMode::Doc { .. } => format!("{}(doc)", pkg_name),
CompileMode::RunCustomBuild => format!("{}(build)", pkg_name),
_ => {
let annotation = match unit.target.kind() {
TargetKind::Lib(_) => return pkg_name.to_string(),
TargetKind::CustomBuild => return format!("{}(build.rs)", pkg_name),
TargetKind::Bin => "bin",
TargetKind::Test => "test",
TargetKind::Bench => "bench",
TargetKind::ExampleBin | TargetKind::ExampleLib(_) => "example",
};
format!("{}({})", unit.target.name(), annotation)
}
}
}
/// Executes a job, pushing the spawned thread's handled onto `threads`.
fn run(
&mut self,
unit: &Unit<'a>,
job: Job,
cx: &Context<'a, '_>,
scope: &Scope<'_>,
) -> CargoResult<()> {
let id = JobId(self.next_id);
self.next_id = self.next_id.checked_add(1).unwrap();
info!("start {}: {:?}", id, unit);
assert!(self.active.insert(id, *unit).is_none());
*self.counts.get_mut(&unit.pkg.package_id()).unwrap() -= 1;
let my_tx = self.tx.clone();
let fresh = job.freshness();
let rmeta_required = cx.rmeta_required(unit);
if !cx.bcx.build_config.build_plan {
// Print out some nice progress information.
self.note_working_on(cx.bcx.config, unit, fresh)?;
}
let doit = move || {
let state = JobState {
id,
tx: my_tx.clone(),
rmeta_required: Cell::new(rmeta_required),
_marker: marker::PhantomData,
};
let mut sender = FinishOnDrop {
tx: &my_tx,
id,
result: Err(format_err!("worker panicked")),
};
sender.result = job.run(&state);
// If the `rmeta_required` wasn't consumed but it was set
// previously, then we either have:
//
// 1. The `job` didn't do anything because it was "fresh".
// 2. The `job` returned an error and didn't reach the point where
// it called `rmeta_produced`.
// 3. We forgot to call `rmeta_produced` and there's a bug in Cargo.
//
// Ruling out the third, the other two are pretty common for 2
// we'll just naturally abort the compilation operation but for 1
// we need to make sure that the metadata is flagged as produced so
// send a synthetic message here.
if state.rmeta_required.get() && sender.result.is_ok() {
my_tx
.send(Message::Finish(id, Artifact::Metadata, Ok(())))
.unwrap();
}
// Use a helper struct with a `Drop` implementation to guarantee
// that a `Finish` message is sent even if our job panics. We
// shouldn't panic unless there's a bug in Cargo, so we just need
// to make sure nothing hangs by accident.
struct FinishOnDrop<'a> {
tx: &'a Sender<Message>,
id: JobId,
result: CargoResult<()>,
}
impl Drop for FinishOnDrop<'_> {
fn drop(&mut self) {
let msg = mem::replace(&mut self.result, Ok(()));
drop(self.tx.send(Message::Finish(self.id, Artifact::All, msg)));
}
}
};
match fresh {
Freshness::Fresh => {
self.timings.add_fresh();
doit();
}
Freshness::Dirty => {
self.timings.add_dirty();
scope.spawn(move |_| doit());
}
}
Ok(())
}
fn emit_warnings(
&mut self,
msg: Option<&str>,
unit: &Unit<'a>,
cx: &mut Context<'a, '_>,
) -> CargoResult<()> {
let outputs = cx.build_script_outputs.lock().unwrap();
let metadata = match cx.find_build_script_metadata(*unit) {
Some(metadata) => metadata,
None => return Ok(()),
};
let bcx = &mut cx.bcx;
if let Some(output) = outputs.get(unit.pkg.package_id(), metadata) {
if !output.warnings.is_empty() {
if let Some(msg) = msg {
writeln!(bcx.config.shell().err(), "{}\n", msg)?;
}
for warning in output.warnings.iter() {
bcx.config.shell().warn(warning)?;
}
if msg.is_some() {
// Output an empty line.
writeln!(bcx.config.shell().err())?;
}
}
}
Ok(())
}
fn finish(
&mut self,
id: JobId,
unit: &Unit<'a>,
artifact: Artifact,
cx: &mut Context<'a, '_>,
) -> CargoResult<()> {
if unit.mode.is_run_custom_build() && cx.bcx.show_warnings(unit.pkg.package_id()) {
self.emit_warnings(None, unit, cx)?;
}
let unlocked = self.queue.finish(unit, &artifact);
match artifact {
Artifact::All => self.timings.unit_finished(id, unlocked),
Artifact::Metadata => self.timings.unit_rmeta_finished(id, unlocked),
}
Ok(())
}
// This isn't super trivial because we don't want to print loads and
// loads of information to the console, but we also want to produce a
// faithful representation of what's happening. This is somewhat nuanced
// as a package can start compiling *very* early on because of custom
// build commands and such.
//
// In general, we try to print "Compiling" for the first nontrivial task
// run for a package, regardless of when that is. We then don't print
// out any more information for a package after we've printed it once.
fn note_working_on(
&mut self,
config: &Config,
unit: &Unit<'a>,
fresh: Freshness,
) -> CargoResult<()> {
if (self.compiled.contains(&unit.pkg.package_id()) && !unit.mode.is_doc())
|| (self.documented.contains(&unit.pkg.package_id()) && unit.mode.is_doc())
{
return Ok(());
}
match fresh {
// Any dirty stage which runs at least one command gets printed as
// being a compiled package.
Dirty => {
if unit.mode.is_doc() {
self.documented.insert(unit.pkg.package_id());
config.shell().status("Documenting", unit.pkg)?;
} else if unit.mode.is_doc_test() {
// Skip doc test.
} else {
self.compiled.insert(unit.pkg.package_id());
if unit.mode.is_check() {
config.shell().status("Checking", unit.pkg)?;
} else {
config.shell().status("Compiling", unit.pkg)?;
}
}
}
Fresh => {
// If doc test are last, only print "Fresh" if nothing has been printed.
if self.counts[&unit.pkg.package_id()] == 0
&& !(unit.mode.is_doc_test() && self.compiled.contains(&unit.pkg.package_id()))
{
self.compiled.insert(unit.pkg.package_id());
config.shell().verbose(|c| c.status("Fresh", unit.pkg))?;
}
}
}
Ok(())
}
}