blob: 5c2e199c67c2b4dd674bc06708844566340bdfa4 [file] [log] [blame]
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc;
use std::sync::Arc;
use std::thread::{Builder as ThreadBuilder, JoinHandle};
use crate::grpc_sys;
use crate::cq::{CompletionQueue, CompletionQueueHandle, EventType, WorkQueue};
use crate::task::CallTag;
// event loop
fn poll_queue(tx: mpsc::Sender<CompletionQueue>) {
let cq = Arc::new(CompletionQueueHandle::new());
let worker_info = Arc::new(WorkQueue::new());
let cq = CompletionQueue::new(cq, worker_info);
tx.send(cq.clone()).expect("send back completion queue");
loop {
let e = cq.next();
match e.type_ {
EventType::GRPC_QUEUE_SHUTDOWN => break,
// timeout should not happen in theory.
EventType::GRPC_QUEUE_TIMEOUT => continue,
EventType::GRPC_OP_COMPLETE => {}
}
let tag: Box<CallTag> = unsafe { Box::from_raw(e.tag as _) };
tag.resolve(&cq, e.success != 0);
while let Some(work) = unsafe { cq.worker.pop_work() } {
work.finish();
}
}
}
/// [`Environment`] factory in order to configure the properties.
pub struct EnvBuilder {
cq_count: usize,
name_prefix: Option<String>,
after_start: Option<Arc<dyn Fn() + Send + Sync>>,
before_stop: Option<Arc<dyn Fn() + Send + Sync>>,
}
impl EnvBuilder {
/// Initialize a new [`EnvBuilder`].
pub fn new() -> EnvBuilder {
EnvBuilder {
cq_count: unsafe { grpc_sys::gpr_cpu_num_cores() as usize },
name_prefix: None,
after_start: None,
before_stop: None,
}
}
/// Set the number of completion queues and polling threads. Each thread polls
/// one completion queue.
///
/// # Panics
///
/// This method will panic if `count` is 0.
pub fn cq_count(mut self, count: usize) -> EnvBuilder {
assert!(count > 0);
self.cq_count = count;
self
}
/// Set the thread name prefix of each polling thread.
pub fn name_prefix<S: Into<String>>(mut self, prefix: S) -> EnvBuilder {
self.name_prefix = Some(prefix.into());
self
}
/// Execute function `f` after each thread is started but before it starts doing work.
pub fn after_start<F: Fn() + Send + Sync + 'static>(mut self, f: F) -> EnvBuilder {
self.after_start = Some(Arc::new(f));
self
}
/// Execute function `f` before each thread stops.
pub fn before_stop<F: Fn() + Send + Sync + 'static>(mut self, f: F) -> EnvBuilder {
self.before_stop = Some(Arc::new(f));
self
}
/// Finalize the [`EnvBuilder`], build the [`Environment`] and initialize the gRPC library.
pub fn build(self) -> Environment {
unsafe {
grpc_sys::grpc_init();
}
let mut cqs = Vec::with_capacity(self.cq_count);
let mut handles = Vec::with_capacity(self.cq_count);
let (tx, rx) = mpsc::channel();
for i in 0..self.cq_count {
let tx_i = tx.clone();
let mut builder = ThreadBuilder::new();
if let Some(ref prefix) = self.name_prefix {
builder = builder.name(format!("{}-{}", prefix, i));
}
let after_start = self.after_start.clone();
let before_stop = self.before_stop.clone();
let handle = builder
.spawn(move || {
if let Some(f) = after_start {
f();
}
poll_queue(tx_i);
if let Some(f) = before_stop {
f();
}
})
.unwrap();
handles.push(handle);
}
for _ in 0..self.cq_count {
cqs.push(rx.recv().unwrap());
}
Environment {
cqs,
idx: AtomicUsize::new(0),
_handles: handles,
}
}
}
/// An object that used to control concurrency and start gRPC event loop.
pub struct Environment {
cqs: Vec<CompletionQueue>,
idx: AtomicUsize,
_handles: Vec<JoinHandle<()>>,
}
impl Environment {
/// Initialize gRPC and create a thread pool to poll completion queue. The thread pool size
/// and the number of completion queue is specified by `cq_count`. Each thread polls one
/// completion queue.
///
/// # Panics
///
/// This method will panic if `cq_count` is 0.
pub fn new(cq_count: usize) -> Environment {
assert!(cq_count > 0);
EnvBuilder::new()
.name_prefix("grpc-poll")
.cq_count(cq_count)
.build()
}
/// Get all the created completion queues.
pub fn completion_queues(&self) -> &[CompletionQueue] {
self.cqs.as_slice()
}
/// Pick an arbitrary completion queue.
pub fn pick_cq(&self) -> CompletionQueue {
let idx = self.idx.fetch_add(1, Ordering::Relaxed);
self.cqs[idx % self.cqs.len()].clone()
}
}
impl Drop for Environment {
fn drop(&mut self) {
for cq in self.completion_queues() {
// it's safe to shutdown more than once.
cq.shutdown()
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_basic_loop() {
let mut env = Environment::new(2);
let q1 = env.pick_cq();
let q2 = env.pick_cq();
let q3 = env.pick_cq();
let cases = vec![(&q1, &q3, true), (&q1, &q2, false)];
for (lq, rq, is_eq) in cases {
let lq_ref = lq.borrow().unwrap();
let rq_ref = rq.borrow().unwrap();
if is_eq {
assert_eq!(lq_ref.as_ptr(), rq_ref.as_ptr());
} else {
assert_ne!(lq_ref.as_ptr(), rq_ref.as_ptr());
}
}
assert_eq!(env.completion_queues().len(), 2);
for cq in env.completion_queues() {
cq.shutdown();
}
for handle in env._handles.drain(..) {
handle.join().unwrap();
}
}
}