blob: 5adedf1dad43ea3e75c969d503f84ba292cb247e [file] [log] [blame]
#ifndef CAFFE2_UTILS_SIMPLE_QUEUE_H_
#define CAFFE2_UTILS_SIMPLE_QUEUE_H_
#include <condition_variable> // NOLINT
#include <mutex> // NOLINT
#include <queue>
#include "caffe2/core/logging.h"
namespace caffe2 {
// This is a very simple queue that Yangqing wrote when bottlefeeding the baby,
// so don't take it seriously. What it does is a minimal thread-safe queue that
// allows me to run network as a DAG.
//
// A usual work pattern looks like this: one or multiple producers push jobs
// into this queue, and one or multiple workers pops jobs from this queue. If
// nothing is in the queue but NoMoreJobs() is not called yet, the pop calls
// will wait. If NoMoreJobs() has been called, pop calls will return false,
// which serves as a message to the workers that they should exit.
template <typename T>
class SimpleQueue {
public:
SimpleQueue() : no_more_jobs_(false) {}
// Pops a value and writes it to the value pointer. If there is nothing in the
// queue, this will wait till a value is inserted to the queue. If there are
// no more jobs to pop, the function returns false. Otherwise, it returns
// true.
bool Pop(T* value) {
std::unique_lock<std::mutex> mutex_lock(mutex_);
while (queue_.size() == 0 && !no_more_jobs_) cv_.wait(mutex_lock);
if (queue_.size() == 0 && no_more_jobs_) return false;
*value = queue_.front();
queue_.pop();
return true;
}
int size() {
std::unique_lock<std::mutex> mutex_lock(mutex_);
return queue_.size();
}
// Push pushes a value to the queue.
void Push(const T& value) {
{
std::lock_guard<std::mutex> mutex_lock(mutex_);
CAFFE_ENFORCE(!no_more_jobs_, "Cannot push to a closed queue.");
queue_.push(value);
}
cv_.notify_one();
}
// NoMoreJobs() marks the close of this queue. It also notifies all waiting
// Pop() calls so that they either check out remaining jobs, or return false.
// After NoMoreJobs() is called, this queue is considered closed - no more
// Push() functions are allowed, and once existing items are all checked out
// by the Pop() functions, any more Pop() function will immediately return
// false with nothing set to the value.
void NoMoreJobs() {
{
std::lock_guard<std::mutex> mutex_lock(mutex_);
no_more_jobs_ = true;
}
cv_.notify_all();
}
private:
std::mutex mutex_;
std::condition_variable cv_;
std::queue<T> queue_;
bool no_more_jobs_;
// We do not allow copy constructors.
SimpleQueue(const SimpleQueue& /*src*/) {}
};
} // namespace caffe2
#endif // CAFFE2_UTILS_SIMPLE_QUEUE_H_