blob: 2d826d458f36dba8af1e47a0a433ea27c9f6981b [file] [log] [blame]
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use std::fmt::{self, Debug, Formatter};
use std::sync::Arc;
use super::Inner;
use crate::call::{BatchContext, MessageReader, RpcStatusCode};
use crate::error::Error;
/// Batch job type.
#[derive(PartialEq, Debug)]
pub enum BatchType {
/// Finish without reading any message.
Finish,
/// Extract one message when finish.
Read,
/// Check the rpc code and then extract one message.
CheckRead,
}
/// A promise used to resolve batch jobs.
pub struct Batch {
ty: BatchType,
ctx: BatchContext,
inner: Arc<Inner<Option<MessageReader>>>,
}
impl Batch {
pub fn new(ty: BatchType, inner: Arc<Inner<Option<MessageReader>>>) -> Batch {
Batch {
ty,
ctx: BatchContext::new(),
inner,
}
}
pub fn context(&self) -> &BatchContext {
&self.ctx
}
fn read_one_msg(&mut self, success: bool) {
let task = {
let mut guard = self.inner.lock();
if success {
guard.set_result(Ok(self.ctx.recv_message()))
} else {
// rely on C core to handle the failed read (e.g. deliver approriate
// statusCode on the clientside).
guard.set_result(Ok(None))
}
};
task.map(|t| t.wake());
}
fn finish_response(&mut self, succeed: bool) {
let task = {
let mut guard = self.inner.lock();
if succeed {
let status = self.ctx.rpc_status();
if status.code() == RpcStatusCode::OK {
guard.set_result(Ok(None))
} else {
guard.set_result(Err(Error::RpcFailure(status)))
}
} else {
guard.set_result(Err(Error::RemoteStopped))
}
};
task.map(|t| t.wake());
}
fn handle_unary_response(&mut self) {
let task = {
let mut guard = self.inner.lock();
let status = self.ctx.rpc_status();
if status.code() == RpcStatusCode::OK {
guard.set_result(Ok(self.ctx.recv_message()))
} else {
guard.set_result(Err(Error::RpcFailure(status)))
}
};
task.map(|t| t.wake());
}
pub fn resolve(mut self, success: bool) {
match self.ty {
BatchType::CheckRead => {
assert!(success);
self.handle_unary_response();
}
BatchType::Finish => {
self.finish_response(success);
}
BatchType::Read => {
self.read_one_msg(success);
}
}
}
}
impl Debug for Batch {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "Batch [{:?}]", self.ty)
}
}
/// A promise used to resolve async action status.
///
/// The action can only succeed or fail without extra error hint.
pub struct Action {
inner: Arc<Inner<bool>>,
}
impl Action {
pub fn new(inner: Arc<Inner<bool>>) -> Action {
Action { inner }
}
pub fn resolve(self, success: bool) {
let task = {
let mut guard = self.inner.lock();
guard.set_result(Ok(success))
};
task.map(|t| t.wake());
}
}