blob: 2e710d600a4c41668e89ad28c374bef9c555b351 [file] [log] [blame]
#ifndef LIBGAV1_SRC_UTILS_THREADPOOL_H_
#define LIBGAV1_SRC_UTILS_THREADPOOL_H_
#include <deque>
#include <functional>
#include <memory>
#if defined(__ANDROID__)
#include <condition_variable> // NOLINT (unapproved c++11 header)
#include <mutex> // NOLINT (unapproved c++11 header)
#define LIBGAV1_THREADPOOL_USE_STD_MUTEX 1
#else
// absl::Mutex & absl::CondVar are significantly faster than the pthread
// variants on platforms other than Android.
#include "third_party/absl/base/thread_annotations.h"
#include "third_party/absl/synchronization/mutex.h"
#define LIBGAV1_THREADPOOL_USE_STD_MUTEX 0
#endif
#include "src/utils/compiler_attributes.h"
#include "src/utils/executor.h"
#include "src/utils/memory.h"
namespace libgav1 {
// An implementation of ThreadPool using POSIX threads (pthreads) or Windows
// threads.
//
// - The pool allocates a fixed number of worker threads on instantiation.
// - The worker threads will pick up work jobs as they arrive.
// - If all workers are busy, work jobs are queued for later execution.
//
// The thread pool is shut down when the pool is destroyed.
//
// Example usage of the thread pool:
// {
// std::unique_ptr<ThreadPool> pool = ThreadPool::Create(4);
// for (int i = 0; i < 100; ++i) { // Dispatch 100 jobs.
// pool->Schedule([&my_data]() { MyFunction(&my_data); });
// }
// } // ThreadPool gets destroyed only when all jobs are done.
class ThreadPool : public Executor, public Allocable {
public:
// Creates the thread pool with the specified number of worker threads.
// If num_threads is 1, the closures are run in FIFO order.
static std::unique_ptr<ThreadPool> Create(int num_threads);
// Like the above factory method, but also sets the name prefix for threads.
static std::unique_ptr<ThreadPool> Create(const char name_prefix[],
int num_threads);
// The destructor will shut down the thread pool and all jobs are executed.
// Note that after shutdown, the thread pool does not accept further jobs.
~ThreadPool() override;
// Adds the specified "closure" to the queue for processing. If worker threads
// are available, "closure" will run immediately. Otherwise "closure" is
// queued for later execution.
void Schedule(std::function<void()> closure) override;
int num_threads() const;
private:
class WorkerThread;
// Creates the thread pool with the specified number of worker threads.
// If num_threads is 1, the closures are run in FIFO order.
ThreadPool(const char name_prefix[], std::unique_ptr<WorkerThread*[]> threads,
int num_threads);
// Starts the worker pool.
LIBGAV1_MUST_USE_RESULT bool StartWorkers();
void WorkerFunction();
// Shuts down the thread pool, i.e. worker threads finish their work and
// pick up new jobs until the queue is empty. This call will block until
// the shutdown is complete.
//
// Note: If a worker encounters an empty queue after this call, it will exit.
// Other workers might still be running, and if the queue fills up again, the
// thread pool will continue to operate with a decreased number of workers.
// It is up to the caller to prevent adding new jobs.
void Shutdown();
#if LIBGAV1_THREADPOOL_USE_STD_MUTEX
void LockMutex() { queue_mutex_.lock(); }
void UnlockMutex() { queue_mutex_.unlock(); }
void Wait() {
std::unique_lock<std::mutex> queue_lock(queue_mutex_, std::adopt_lock);
condition_.wait(queue_lock);
queue_lock.release();
}
void SignalOne() { condition_.notify_one(); }
void SignalAll() { condition_.notify_all(); }
std::condition_variable condition_;
std::mutex queue_mutex_;
#else // !LIBGAV1_THREADPOOL_USE_STD_MUTEX
void LockMutex() EXCLUSIVE_LOCK_FUNCTION() { queue_mutex_.Lock(); }
void UnlockMutex() UNLOCK_FUNCTION() { queue_mutex_.Unlock(); }
void Wait() { condition_.Wait(&queue_mutex_); }
void SignalOne() { condition_.Signal(); }
void SignalAll() { condition_.SignalAll(); }
absl::CondVar condition_;
absl::Mutex queue_mutex_;
#endif // LIBGAV1_THREADPOOL_USE_STD_MUTEX
std::deque<std::function<void()>> queue_ LIBGAV1_GUARDED_BY(queue_mutex_);
// If not all the worker threads are created, the first entry after the
// created worker threads is a null pointer.
const std::unique_ptr<WorkerThread*[]> threads_;
bool exit_threads_ LIBGAV1_GUARDED_BY(queue_mutex_) = false;
const int num_threads_ = 0;
// name_prefix_ is a C string, whose length is restricted to 16 characters,
// including the terminating null byte ('\0'). This restriction comes from
// the Linux pthread_setname_np() function.
char name_prefix_[16];
};
} // namespace libgav1
#undef LIBGAV1_THREADPOOL_USE_STD_MUTEX
#endif // LIBGAV1_SRC_UTILS_THREADPOOL_H_