blob: ef3b60dab20ccfff6d428460ad0b81a4ce18de75 [file] [log] [blame]
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
use jsonrpc_core::types::ErrorCode;
use log::debug;
use crate::actions::work_pool;
use crate::actions::work_pool::WorkDescription;
use crate::actions::InitActionContext;
use crate::concurrency::{ConcurrentJob, JobToken};
use crate::lsp_data::LSPRequest;
use crate::server;
use crate::server::io::Output;
use crate::server::message::ResponseError;
use crate::server::{Request, Response};
use super::requests::*;
/// Timeout time for request responses. By default a LSP client request not
/// responded to after this duration will return a fallback response.
#[cfg(not(test))]
pub const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_millis(1500);
// Timeout lengthened to "never" for potentially very slow CI boxes
#[cfg(test)]
pub const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_millis(3_600_000);
/// Macro enum `DispatchRequest` packing in various similar `Request` types
macro_rules! define_dispatch_request_enum {
($($request_type:ident),*$(,)*) => {
// Seems ok for a short-lived macro-enum.
#[allow(clippy::large_enum_variant)]
pub(crate) enum DispatchRequest {
$(
$request_type(Request<$request_type>),
)*
}
$(
impl From<Request<$request_type>> for DispatchRequest {
fn from(req: Request<$request_type>) -> Self {
DispatchRequest::$request_type(req)
}
}
)*
impl DispatchRequest {
fn handle<O: Output>(self, ctx: InitActionContext, out: &O) {
match self {
$(
DispatchRequest::$request_type(req) => {
let Request { id, params, received, .. } = req;
let timeout = $request_type::timeout();
let receiver = work_pool::receive_from_thread(move || {
// Checking timeout here can prevent starting expensive work that has
// already timed out due to previous long running requests.
// Note: done here on the threadpool as pool scheduling may incur
// a further delay.
if received.elapsed() >= timeout {
$request_type::fallback_response()
}
else {
$request_type::handle(ctx, params)
}
}, WorkDescription($request_type::METHOD));
match receiver.recv_timeout(timeout)
.unwrap_or_else(|_| $request_type::fallback_response()) {
Ok(response) => response.send(id, out),
Err(ResponseError::Empty) => {
out.failure_message(id, ErrorCode::InternalError, "An unknown error occurred")
}
Err(ResponseError::Message(code, msg)) => {
out.failure_message(id, code, msg)
}
}
}
)*
}
}
}
}
}
define_dispatch_request_enum!(
Completion,
Definition,
References,
WorkspaceSymbol,
Symbols,
Hover,
Implementation,
DocumentHighlight,
Rename,
CodeAction,
ResolveCompletion,
Formatting,
RangeFormatting,
ExecuteCommand,
CodeLensRequest,
);
/// Provides ability to dispatch requests to a worker thread that will
/// handle the requests sequentially, without blocking stdin.
/// Requests dispatched this way are automatically timed out & avoid
/// processing if have already timed out before starting.
pub(crate) struct Dispatcher {
sender: mpsc::Sender<(DispatchRequest, InitActionContext, JobToken)>,
}
impl Dispatcher {
/// Creates a new `Dispatcher` starting a new thread and channel.
pub(crate) fn new<O: Output>(out: O) -> Self {
let (sender, receiver) = mpsc::channel::<(DispatchRequest, InitActionContext, JobToken)>();
thread::Builder::new()
.name("dispatch-worker".into())
.spawn(move || {
while let Ok((request, ctx, token)) = receiver.recv() {
request.handle(ctx, &out);
drop(token);
}
})
.unwrap();
Self { sender }
}
/// Sends a request to the dispatch-worker thread; does not block.
pub(crate) fn dispatch<R: Into<DispatchRequest>>(
&mut self,
request: R,
ctx: InitActionContext,
) {
let (job, token) = ConcurrentJob::new();
ctx.add_job(job);
if let Err(err) = self.sender.send((request.into(), ctx, token)) {
debug!("failed to dispatch request: {:?}", err);
}
}
}
/// Stdin-non-blocking request logic designed to be packed into a `DispatchRequest`
/// and handled on the `WORK_POOL` via a `Dispatcher`.
pub trait RequestAction: LSPRequest {
/// Serializable response type.
type Response: server::Response + Send;
/// Max duration this request should finish within; also see `fallback_response()`.
fn timeout() -> Duration {
DEFAULT_REQUEST_TIMEOUT
}
/// Returns a response used in timeout scenarios.
fn fallback_response() -> Result<Self::Response, ResponseError>;
/// Request processing logic.
fn handle(
ctx: InitActionContext,
params: Self::Params,
) -> Result<Self::Response, ResponseError>;
}