| // Copyright 2023 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // https://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| use std::{ |
| sync::{mpsc, Arc, Mutex}, |
| thread, |
| }; |
| |
| use log::{error, info}; |
| |
| pub struct ThreadPool { |
| workers: Vec<Worker>, |
| sender: Option<mpsc::Sender<Job>>, |
| } |
| |
| type Job = Box<dyn FnOnce() + Send + 'static>; |
| |
| impl ThreadPool { |
| /// Create a new ThreadPool. |
| /// |
| /// The size is the number of threads in the pool. |
| /// |
| /// # Panics |
| /// |
| /// The `new` function will panic if the size is zero. |
| pub fn new(size: usize) -> ThreadPool { |
| assert!(size > 0); |
| |
| let (sender, receiver) = mpsc::channel(); |
| |
| let receiver = Arc::new(Mutex::new(receiver)); |
| |
| let mut workers = Vec::with_capacity(size); |
| |
| for id in 0..size { |
| workers.push(Worker::new(id, Arc::clone(&receiver))); |
| } |
| |
| ThreadPool { workers, sender: Some(sender) } |
| } |
| |
| pub fn execute<F>(&self, f: F) |
| where |
| F: FnOnce() + Send + 'static, |
| { |
| let job = Box::new(f); |
| |
| self.sender.as_ref().unwrap().send(job).unwrap(); |
| } |
| } |
| |
| impl Drop for ThreadPool { |
| fn drop(&mut self) { |
| drop(self.sender.take()); |
| |
| for worker in &mut self.workers { |
| info!("Shutting down worker {}", worker.id); |
| |
| if let Some(thread) = worker.thread.take() { |
| thread.join().unwrap(); |
| } |
| } |
| } |
| } |
| |
| struct Worker { |
| id: usize, |
| thread: Option<thread::JoinHandle<()>>, |
| } |
| |
| impl Worker { |
| fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { |
| let thread = thread::Builder::new() |
| .name("http_pool_{id}".to_string()) |
| .spawn(move || loop { |
| let message = receiver.lock().expect("Failed to acquire lock on receiver").recv(); |
| |
| match message { |
| Ok(job) => { |
| job(); |
| } |
| Err(_) => { |
| error!("Worker {id} disconnected; shutting down."); |
| break; |
| } |
| } |
| }) |
| .expect("http_pool_{id} spawn failed"); |
| Worker { id, thread: Some(thread) } |
| } |
| } |