| // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception |
| // -*- Mode: C++ -*- |
| // |
| // Copyright (C) 2013-2024 Red Hat, Inc. |
| // |
| // Author: Dodji Seketeli |
| |
| /// @file |
| /// |
| /// This file implements the worker threads (or thread pool) design |
| /// pattern. It aims at performing a set of tasks in parallel, using |
| /// the multi-threading capabilities of the underlying processor(s). |
| |
| #include <assert.h> |
| #include <unistd.h> |
| #include <pthread.h> |
| #include <queue> |
| #include <vector> |
| #include <iostream> |
| |
| #include "abg-fwd.h" |
| #include "abg-internal.h" |
| // <headers defining libabigail's API go under here> |
| ABG_BEGIN_EXPORT_DECLARATIONS |
| |
| #include "abg-workers.h" |
| |
| ABG_END_EXPORT_DECLARATIONS |
| // </headers defining libabigail's API> |
| |
| namespace abigail |
| { |
| |
| namespace workers |
| { |
| |
| /// @defgroup thread_pool Worker Threads |
| /// @{ |
| /// |
| /// \brief Libabigail's implementation of Thread Pools. |
| /// |
| /// The main interface of this pattern is a @ref queue of @ref tasks |
| /// to be performed. Associated to that queue are a set of worker |
| /// threads (these are native posix threads) that sits there, idle, |
| /// until at least one @ref task is added to the queue. |
| /// |
| /// When a @ref task is added to the @ref queue, one thread is woken |
| /// up, picks the @ref task, removes it from the @ref queue, and |
| /// executes the instructions it carries. We say the worker thread |
| /// performs the @ref task. |
| /// |
| /// When the worker thread is done performing the @ref task, the |
| /// performed @ref task is added to another queue, named as the "done |
| /// queue". Then the thread looks at the @ref queue of tasks to be |
| /// performed again, and if there is at least one task in that queue, |
| /// the same process as above is done. Otherwise, the thread blocks, |
| /// waiting for a new task to be added to the queue. |
| /// |
| /// By default, the number of worker threads is equal to the number of |
| /// execution threads advertised by the underlying processor. |
| /// |
| /// Note that the user of the queue can either wait for all the tasks |
| /// to be performed by the pool of threads,and them stop them, get the |
| /// vector of done tasks and proceed to whatever computation she may |
| /// need next. |
| /// |
| /// Or she can choose to be asynchronously notified whenever a task is |
| /// performed and added to the "done queue". |
| /// |
| ///@} |
| |
| /// @return The number of hardware threads of executions advertised by |
| /// the underlying processor. |
| size_t |
| get_number_of_threads() |
| {return sysconf(_SC_NPROCESSORS_ONLN);} |
| |
| /// The abstraction of a worker thread. |
| /// |
| /// This is an implementation detail of the @ref queue public |
| /// interface type of this worker thread design pattern. |
| struct worker |
| { |
| pthread_t tid; |
| |
| worker() |
| : tid() |
| {} |
| |
| static queue::priv* |
| wait_to_execute_a_task(queue::priv*); |
| }; // end struct worker |
| |
| // </worker declarations> |
| |
| // <queue stuff> |
| |
| /// The private data structure of the task queue. |
| struct queue::priv |
| { |
| // A boolean to say if the user wants to shutdown the worker |
| // threads. guarded by tasks_todo_mutex. |
| // TODO: once we have std::atomic<bool>, use it and reconsider the |
| // synchronization around its reads and writes |
| bool bring_workers_down; |
| // The number of worker threads. |
| size_t num_workers; |
| // A mutex that protects the todo tasks queue from being accessed in |
| // read/write by two threads at the same time. |
| pthread_mutex_t tasks_todo_mutex; |
| // The queue condition variable. This condition is used to make the |
| // worker threads sleep until a new task is added to the queue of |
| // todo tasks. Whenever a new task is added to that queue, a signal |
| // is sent to all a thread sleeping on this condition variable. |
| pthread_cond_t tasks_todo_cond; |
| // A mutex that protects the done tasks queue from being accessed in |
| // read/write by two threads at the same time. |
| pthread_mutex_t tasks_done_mutex; |
| // A condition to be signalled whenever there is a task done. That is being |
| // used to wait for tasks completed when bringing the workers down. |
| pthread_cond_t tasks_done_cond; |
| // The todo task queue itself. |
| std::queue<task_sptr> tasks_todo; |
| // The done task queue itself. |
| std::vector<task_sptr> tasks_done; |
| // This functor is invoked to notify the user of this queue that a |
| // task has been completed and has been added to the done tasks |
| // vector. We call it a notifier. This notifier is the default |
| // notifier of the work queue; the one that is used when the user |
| // has specified no notifier. It basically does nothing. |
| static task_done_notify default_notify; |
| // This is a reference to the the notifier that is actually used in |
| // the queue. It's either the one specified by the user or the |
| // default one. |
| task_done_notify& notify; |
| // A vector of the worker threads. |
| std::vector<worker> workers; |
| |
| /// A constructor of @ref queue::priv. |
| /// |
| /// @param nb_workers the number of worker threads to have in the |
| /// thread pool. |
| /// |
| /// @param task_done_notify a functor object that is invoked by the |
| /// worker thread which has performed the task, right after it's |
| /// added that task to the vector of the done tasks. |
| priv(size_t nb_workers = get_number_of_threads(), |
| task_done_notify& n = default_notify) |
| : bring_workers_down(), |
| num_workers(nb_workers), |
| tasks_todo_mutex(), |
| tasks_todo_cond(), |
| tasks_done_mutex(), |
| tasks_done_cond(), |
| notify(n) |
| {create_workers();} |
| |
| /// Create the worker threads pool and have all threads sit idle, |
| /// waiting for a task to be added to the todo queue. |
| void |
| create_workers() |
| { |
| for (unsigned i = 0; i < num_workers; ++i) |
| { |
| worker w; |
| ABG_ASSERT(pthread_create(&w.tid, |
| /*attr=*/0, |
| (void*(*)(void*))&worker::wait_to_execute_a_task, |
| this) == 0); |
| workers.push_back(w); |
| } |
| } |
| |
| /// Submit a task to the queue of tasks to be performed. |
| /// |
| /// This wakes up one thread from the pool which immediatly starts |
| /// performing the task. When it's done with the task, it goes back |
| /// to be suspended, waiting for a new task to be scheduled. |
| /// |
| /// @param t the task to schedule. Note that a nil task won't be |
| /// scheduled. If the queue is empty, the task @p t won't be |
| /// scheduled either. |
| /// |
| /// @return true iff the task @p t was successfully scheduled. |
| bool |
| schedule_task(const task_sptr& t) |
| { |
| if (workers.empty() || !t) |
| return false; |
| |
| pthread_mutex_lock(&tasks_todo_mutex); |
| tasks_todo.push(t); |
| pthread_mutex_unlock(&tasks_todo_mutex); |
| pthread_cond_signal(&tasks_todo_cond); |
| return true; |
| } |
| |
| /// Submit a vector of task to the queue of tasks to be performed. |
| /// |
| /// This wakes up threads of the pool which immediatly start |
| /// performing the tasks. When they are done with the task, they go |
| /// back to be suspended, waiting for new tasks to be scheduled. |
| /// |
| /// @param tasks the tasks to schedule. |
| bool |
| schedule_tasks(const tasks_type& tasks) |
| { |
| bool is_ok= true; |
| for (tasks_type::const_iterator t = tasks.begin(); t != tasks.end(); ++t) |
| is_ok &= schedule_task(*t); |
| return is_ok; |
| } |
| |
| /// Signal all the threads (of the pool) which are suspended and |
| /// waiting to perform a task, so that they wake up and end up their |
| /// execution. If there is no task to perform, they just end their |
| /// execution. If there are tasks to perform, they finish them and |
| /// then end their execution. |
| /// |
| /// This function then joins all the tasks of the pool, waiting for |
| /// them to finish, and then it returns. In other words, this |
| /// function suspends the thread of the caller, waiting for the |
| /// worker threads to finish their tasks, and end their execution. |
| /// |
| /// If the user code wants to work with the thread pool again, |
| /// she'll need to create them again, using the member function |
| /// create_workers(). |
| void |
| do_bring_workers_down() |
| { |
| if (workers.empty()) |
| return; |
| |
| // Wait for the todo list to be empty to make sure all tasks got picked up |
| pthread_mutex_lock(&tasks_todo_mutex); |
| while (!tasks_todo.empty()) |
| pthread_cond_wait(&tasks_done_cond, &tasks_todo_mutex); |
| |
| bring_workers_down = true; |
| pthread_mutex_unlock(&tasks_todo_mutex); |
| |
| // Now that the task queue is empty, drain the workers by waking them up, |
| // letting them finish their final task before termination. |
| ABG_ASSERT(pthread_cond_broadcast(&tasks_todo_cond) == 0); |
| |
| for (std::vector<worker>::const_iterator i = workers.begin(); |
| i != workers.end(); |
| ++i) |
| ABG_ASSERT(pthread_join(i->tid, /*thread_return=*/0) == 0); |
| workers.clear(); |
| } |
| |
| /// Destructors of @ref queue::priv type. |
| ~priv() |
| {do_bring_workers_down();} |
| |
| }; //end struct queue::priv |
| |
| // default initialize the default notifier. |
| queue::task_done_notify queue::priv::default_notify; |
| |
| /// Default constructor of the @ref queue type. |
| /// |
| /// By default the queue is created with a number of worker threaders |
| /// which is equals to the number of simultaneous execution threads |
| /// supported by the underlying processor. |
| queue::queue() |
| : p_(new priv()) |
| {} |
| |
| /// Constructor of the @ref queue type. |
| /// |
| /// @param number_of_workers the number of worker threads to have in |
| /// the pool. |
| queue::queue(unsigned number_of_workers) |
| : p_(new priv(number_of_workers)) |
| {} |
| |
| /// Constructor of the @ref queue type. |
| /// |
| /// @param number_of_workers the number of worker threads to have in |
| /// the pool. |
| /// |
| /// @param the notifier to invoke when a task is done doing its job. |
| /// Users should create a type that inherit this @ref task_done_notify |
| /// class and overload its virtual task_done_notify::operator() |
| /// operator function. Note that the code of that |
| /// task_done_notify::operator() is assured to run in *sequence*, with |
| /// respect to the code of other task_done_notify::operator() from |
| /// other tasks. |
| queue::queue(unsigned number_of_workers, |
| task_done_notify& notifier) |
| : p_(new priv(number_of_workers, notifier)) |
| {} |
| |
| /// Getter of the size of the queue. This gives the number of task |
| /// still present in the queue. |
| /// |
| /// @return the number of task still present in the queue. |
| size_t |
| queue::get_size() const |
| {return p_->tasks_todo.size();} |
| |
| /// Submit a task to the queue of tasks to be performed. |
| /// |
| /// This wakes up one thread from the pool which immediatly starts |
| /// performing the task. When it's done with the task, it goes back |
| /// to be suspended, waiting for a new task to be scheduled. |
| /// |
| /// @param t the task to schedule. Note that if the queue is empty or |
| /// if the task is nil, the task is not scheduled. |
| /// |
| /// @return true iff the task was successfully scheduled. |
| bool |
| queue::schedule_task(const task_sptr& t) |
| {return p_->schedule_task(t);} |
| |
| /// Submit a vector of tasks to the queue of tasks to be performed. |
| /// |
| /// This wakes up one or more threads from the pool which immediatly |
| /// start performing the tasks. When the threads are done with the |
| /// tasks, they goes back to be suspended, waiting for a new task to |
| /// be scheduled. |
| /// |
| /// @param tasks the tasks to schedule. |
| bool |
| queue::schedule_tasks(const tasks_type& tasks) |
| {return p_->schedule_tasks(tasks);} |
| |
| /// Suspends the current thread until all worker threads finish |
| /// performing the tasks they are executing. |
| /// |
| /// If the worker threads were suspended waiting for a new task to |
| /// perform, they are woken up and their execution ends. |
| /// |
| /// The execution of the current thread is resumed when all the |
| /// threads of the pool have finished their execution and are |
| /// terminated. |
| void |
| queue::wait_for_workers_to_complete() |
| {p_->do_bring_workers_down();} |
| |
| /// Getter of the vector of tasks that got performed. |
| /// |
| /// @return the vector of tasks that got performed. |
| std::vector<task_sptr>& |
| queue::get_completed_tasks() const |
| {return p_->tasks_done;} |
| |
| /// Destructor for the @ref queue type. |
| queue::~queue() |
| {} |
| |
| /// The default function invocation operator of the @ref queue type. |
| /// |
| /// This does nothing. |
| void |
| queue::task_done_notify::operator()(const task_sptr&/*task_done*/) |
| { |
| } |
| |
| // </queue stuff> |
| |
| // <worker definitions> |
| |
| /// Wait to be woken up by a thread condition signal, then look if |
| /// there is a task to be executed. If there is, then pick one (in a |
| /// FIFO manner), execute it, and put the executed task into the set |
| /// of done tasks. |
| /// |
| /// @param t the private data of the "task queue" type to consider. |
| /// |
| /// @param return the same private data of the task queue type we got |
| /// in argument. |
| queue::priv* |
| worker::wait_to_execute_a_task(queue::priv* p) |
| { |
| while (true) |
| { |
| pthread_mutex_lock(&p->tasks_todo_mutex); |
| // If there is no more tasks to perform and the queue is not to |
| // be brought down then wait (sleep) for new tasks to come up. |
| while (p->tasks_todo.empty() && !p->bring_workers_down) |
| pthread_cond_wait(&p->tasks_todo_cond, &p->tasks_todo_mutex); |
| |
| // We were woken up. So maybe there are tasks to perform? If |
| // so, get a task from the queue ... |
| task_sptr t; |
| if (!p->tasks_todo.empty()) |
| { |
| t = p->tasks_todo.front(); |
| p->tasks_todo.pop(); |
| } |
| pthread_mutex_unlock(&p->tasks_todo_mutex); |
| |
| // If we've got a task to perform then perform it and when it's |
| // done then add to the set of tasks that are done. |
| if (t) |
| { |
| t->perform(); |
| |
| // Add the task to the vector of tasks that are done and |
| // notify listeners about the fact that the task is done. |
| // |
| // Note that this (including the notification) is not |
| // happening in parallel. So the code performed by the |
| // notifier during the notification is running sequentially, |
| // not in parallel with any other task that was just done |
| // and that is notifying its listeners. |
| pthread_mutex_lock(&p->tasks_done_mutex); |
| p->tasks_done.push_back(t); |
| p->notify(t); |
| pthread_mutex_unlock(&p->tasks_done_mutex); |
| pthread_cond_signal(&p->tasks_done_cond); |
| } |
| |
| // ensure we access bring_workers_down always guarded |
| bool drop_out = false; |
| pthread_mutex_lock(&p->tasks_todo_mutex); |
| drop_out = p->bring_workers_down; |
| pthread_mutex_unlock(&p->tasks_todo_mutex); |
| if (drop_out) |
| break; |
| } |
| |
| return p; |
| } |
| // </worker definitions> |
| } //end namespace workers |
| } //end namespace abigail |