| use std::{marker::PhantomData, mem::MaybeUninit, sync::Once}; |
| |
| use crate::Error; |
| |
| pub(crate) struct JobToken(PhantomData<()>); |
| |
| impl JobToken { |
| fn new() -> Self { |
| Self(PhantomData) |
| } |
| } |
| |
| impl Drop for JobToken { |
| fn drop(&mut self) { |
| match JobTokenServer::new() { |
| JobTokenServer::Inherited(jobserver) => jobserver.release_token_raw(), |
| JobTokenServer::InProcess(jobserver) => jobserver.release_token_raw(), |
| } |
| } |
| } |
| |
| enum JobTokenServer { |
| Inherited(inherited_jobserver::JobServer), |
| InProcess(inprocess_jobserver::JobServer), |
| } |
| |
| impl JobTokenServer { |
| /// This function returns a static reference to the jobserver because |
| /// - creating a jobserver from env is a bit fd-unsafe (e.g. the fd might |
| /// be closed by other jobserver users in the process) and better do it |
| /// at the start of the program. |
| /// - in case a jobserver cannot be created from env (e.g. it's not |
| /// present), we will create a global in-process only jobserver |
| /// that has to be static so that it will be shared by all cc |
| /// compilation. |
| fn new() -> &'static Self { |
| static INIT: Once = Once::new(); |
| static mut JOBSERVER: MaybeUninit<JobTokenServer> = MaybeUninit::uninit(); |
| |
| unsafe { |
| INIT.call_once(|| { |
| let server = inherited_jobserver::JobServer::from_env() |
| .map(Self::Inherited) |
| .unwrap_or_else(|| Self::InProcess(inprocess_jobserver::JobServer::new())); |
| JOBSERVER = MaybeUninit::new(server); |
| }); |
| // TODO: Poor man's assume_init_ref, as that'd require a MSRV of 1.55. |
| &*JOBSERVER.as_ptr() |
| } |
| } |
| } |
| |
| pub(crate) enum ActiveJobTokenServer { |
| Inherited(inherited_jobserver::ActiveJobServer<'static>), |
| InProcess(&'static inprocess_jobserver::JobServer), |
| } |
| |
| impl ActiveJobTokenServer { |
| pub(crate) fn new() -> Result<Self, Error> { |
| match JobTokenServer::new() { |
| JobTokenServer::Inherited(inherited_jobserver) => { |
| inherited_jobserver.enter_active().map(Self::Inherited) |
| } |
| JobTokenServer::InProcess(inprocess_jobserver) => { |
| Ok(Self::InProcess(inprocess_jobserver)) |
| } |
| } |
| } |
| |
| pub(crate) async fn acquire(&self) -> Result<JobToken, Error> { |
| match &self { |
| Self::Inherited(jobserver) => jobserver.acquire().await, |
| Self::InProcess(jobserver) => Ok(jobserver.acquire().await), |
| } |
| } |
| } |
| |
| mod inherited_jobserver { |
| use super::JobToken; |
| |
| use crate::{parallel::async_executor::YieldOnce, Error, ErrorKind}; |
| |
| use std::{ |
| io, mem, |
| sync::{mpsc, Mutex, MutexGuard, PoisonError}, |
| }; |
| |
| pub(super) struct JobServer { |
| /// Implicit token for this process which is obtained and will be |
| /// released in parent. Since JobTokens only give back what they got, |
| /// there should be at most one global implicit token in the wild. |
| /// |
| /// Since Rust does not execute any `Drop` for global variables, |
| /// we can't just put it back to jobserver and then re-acquire it at |
| /// the end of the process. |
| /// |
| /// Use `Mutex` to avoid race between acquire and release. |
| /// If an `AtomicBool` is used, then it's possible for: |
| /// - `release_token_raw`: Tries to set `global_implicit_token` to true, but it is already |
| /// set to `true`, continue to release it to jobserver |
| /// - `acquire` takes the global implicit token, set `global_implicit_token` to false |
| /// - `release_token_raw` now writes the token back into the jobserver, while |
| /// `global_implicit_token` is `false` |
| /// |
| /// If the program exits here, then cc effectively increases parallelism by one, which is |
| /// incorrect, hence we use a `Mutex` here. |
| global_implicit_token: Mutex<bool>, |
| inner: jobserver::Client, |
| } |
| |
| impl JobServer { |
| pub(super) unsafe fn from_env() -> Option<Self> { |
| jobserver::Client::from_env().map(|inner| Self { |
| inner, |
| global_implicit_token: Mutex::new(true), |
| }) |
| } |
| |
| fn get_global_implicit_token(&self) -> MutexGuard<'_, bool> { |
| self.global_implicit_token |
| .lock() |
| .unwrap_or_else(PoisonError::into_inner) |
| } |
| |
| /// All tokens except for the global implicit token will be put back into the jobserver |
| /// immediately and they cannot be cached, since Rust does not call `Drop::drop` on |
| /// global variables. |
| pub(super) fn release_token_raw(&self) { |
| let mut global_implicit_token = self.get_global_implicit_token(); |
| |
| if *global_implicit_token { |
| // There's already a global implicit token, so this token must |
| // be released back into jobserver. |
| // |
| // `release_raw` should not block |
| let _ = self.inner.release_raw(); |
| } else { |
| *global_implicit_token = true; |
| } |
| } |
| |
| pub(super) fn enter_active(&self) -> Result<ActiveJobServer<'_>, Error> { |
| ActiveJobServer::new(self) |
| } |
| } |
| |
| pub(crate) struct ActiveJobServer<'a> { |
| jobserver: &'a JobServer, |
| helper_thread: jobserver::HelperThread, |
| /// When rx is dropped, all the token stored within it will be dropped. |
| rx: mpsc::Receiver<io::Result<jobserver::Acquired>>, |
| } |
| |
| impl<'a> ActiveJobServer<'a> { |
| fn new(jobserver: &'a JobServer) -> Result<Self, Error> { |
| let (tx, rx) = mpsc::channel(); |
| |
| Ok(Self { |
| rx, |
| helper_thread: jobserver.inner.clone().into_helper_thread(move |res| { |
| let _ = tx.send(res); |
| })?, |
| jobserver, |
| }) |
| } |
| |
| pub(super) async fn acquire(&self) -> Result<JobToken, Error> { |
| let mut has_requested_token = false; |
| |
| loop { |
| // Fast path |
| if mem::replace(&mut *self.jobserver.get_global_implicit_token(), false) { |
| break Ok(JobToken::new()); |
| } |
| |
| // Cold path, no global implicit token, obtain one |
| match self.rx.try_recv() { |
| Ok(res) => { |
| let acquired = res?; |
| acquired.drop_without_releasing(); |
| break Ok(JobToken::new()); |
| } |
| Err(mpsc::TryRecvError::Disconnected) => { |
| break Err(Error::new( |
| ErrorKind::JobserverHelpThreadError, |
| "jobserver help thread has returned before ActiveJobServer is dropped", |
| )) |
| } |
| Err(mpsc::TryRecvError::Empty) => { |
| if !has_requested_token { |
| self.helper_thread.request_token(); |
| has_requested_token = true; |
| } |
| YieldOnce::default().await |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| mod inprocess_jobserver { |
| use super::JobToken; |
| |
| use crate::parallel::async_executor::YieldOnce; |
| |
| use std::{ |
| env::var, |
| sync::atomic::{ |
| AtomicU32, |
| Ordering::{AcqRel, Acquire}, |
| }, |
| }; |
| |
| pub(crate) struct JobServer(AtomicU32); |
| |
| impl JobServer { |
| pub(super) fn new() -> Self { |
| // Use `NUM_JOBS` if set (it's configured by Cargo) and otherwise |
| // just fall back to a semi-reasonable number. |
| // |
| // Note that we could use `num_cpus` here but it's an extra |
| // dependency that will almost never be used, so |
| // it's generally not too worth it. |
| let mut parallelism = 4; |
| // TODO: Use std::thread::available_parallelism as an upper bound |
| // when MSRV is bumped. |
| if let Ok(amt) = var("NUM_JOBS") { |
| if let Ok(amt) = amt.parse() { |
| parallelism = amt; |
| } |
| } |
| |
| Self(AtomicU32::new(parallelism)) |
| } |
| |
| pub(super) async fn acquire(&self) -> JobToken { |
| loop { |
| let res = self |
| .0 |
| .fetch_update(AcqRel, Acquire, |tokens| tokens.checked_sub(1)); |
| |
| if res.is_ok() { |
| break JobToken::new(); |
| } |
| |
| YieldOnce::default().await |
| } |
| } |
| |
| pub(super) fn release_token_raw(&self) { |
| self.0.fetch_add(1, AcqRel); |
| } |
| } |
| } |