blob: 5246bbd8b0f781f1421cc25b805313a7f75c53c3 [file] [log] [blame]
use crate::server::DEFAULT_REQUEST_TIMEOUT;
use lazy_static::lazy_static;
use log::{info, warn};
use rayon;
use std::sync::{mpsc, Mutex};
use std::time::{Duration, Instant};
use std::{fmt, panic};
/// Description of work on the request work pool. Equality implies two pieces of work are the same
/// kind of thing. The `str` should be human readable for logging (e.g., the language server
/// protocol request message name or similar).
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct WorkDescription(pub &'static str);
impl fmt::Display for WorkDescription {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
lazy_static! {
/// Maximum total concurrent working tasks
static ref NUM_THREADS: usize = ::num_cpus::get();
/// Duration of work after which we should warn something is taking a long time
static ref WARN_TASK_DURATION: Duration = DEFAULT_REQUEST_TIMEOUT * 5;
/// Current work descriptions active on the work pool
static ref WORK: Mutex<Vec<WorkDescription>> = Mutex::new(vec![]);
/// Thread pool for request execution allowing concurrent request processing.
static ref WORK_POOL: rayon::ThreadPool = rayon::ThreadPoolBuilder::new()
.num_threads(*NUM_THREADS)
.thread_name(|num| format!("request-worker-{}", num))
.build()
.unwrap();
}
/// Maximum concurrent working tasks of the same type (equal `WorkDescription`)
/// Note: `2` allows a single task to run immediately after a similar task has timed out.
/// Once multiple similar tasks have timed out but remain running we start refusing to start new
/// ones.
const MAX_SIMILAR_CONCURRENT_WORK: usize = 2;
/// Runs work in a new thread on the `WORK_POOL` returning a result `Receiver`
///
/// Panicking work will receive `Err(RecvError)` / `Err(RecvTimeoutError::Disconnected)`
///
/// If too many tasks are already running the work will not be done and the receiver will
/// immediately return `Err(RecvTimeoutError::Disconnected)`
pub fn receive_from_thread<T, F>(work_fn: F, description: WorkDescription) -> mpsc::Receiver<T>
where
T: Send + 'static,
F: FnOnce() -> T + Send + panic::UnwindSafe + 'static,
{
let (sender, receiver) = mpsc::channel();
{
let mut work = WORK.lock().unwrap();
if work.len() >= *NUM_THREADS {
// there are already N ongoing tasks, that may or may not have timed out
// don't add yet more to the queue fail fast to allow the work pool to recover
warn!("Could not start `{}` as at work capacity, {:?} in progress", description, *work,);
return receiver;
}
if work.iter().filter(|desc| *desc == &description).count() >= MAX_SIMILAR_CONCURRENT_WORK {
// this type of work is already filling max proportion of the work pool, so fail
// new requests of this kind until some/all the ongoing work finishes
info!(
"Could not start `{}` as same work-type is filling half capacity, {:?} in progress",
description, *work,
);
return receiver;
}
work.push(description);
}
WORK_POOL.spawn(move || {
let start = Instant::now();
// panic details will be on stderr, otherwise ignore the work panic as it
// will already cause a mpsc disconnect-error & there isn't anything else to log
if let Ok(work_result) = panic::catch_unwind(work_fn) {
// an error here simply means the work took too long and the receiver has been dropped
let _ = sender.send(work_result);
}
let mut work = WORK.lock().unwrap();
if let Some(index) = work.iter().position(|desc| desc == &description) {
work.swap_remove(index);
}
let elapsed = start.elapsed();
if elapsed >= *WARN_TASK_DURATION {
let secs =
elapsed.as_secs() as f64 + f64::from(elapsed.subsec_nanos()) / 1_000_000_000_f64;
warn!("`{}` took {:.1}s", description, secs);
}
});
receiver
}