blob: 8fa87e567c21544dd5fed83c343b01647fef0707 [file] [log] [blame]
/*
* Copyright (C) 2017 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef PRODUCER_CONSUMER_QUEUE_H
#define PRODUCER_CONSUMER_QUEUE_H
#include <algorithm>
#include <atomic>
#include <cassert>
#include <condition_variable>
#include <deque>
#include <mutex>
#include "utils/clock.h"
#include "utils/log.h"
#include "utils/stopwatch.h"
namespace profiler {
// A blocking synchronized producer-consumer queue that also supports
// move-only types. Optionally accepts a |max_length| that is expected to be
// greater than zero, which allows the queue to be bounded. Otherwise, the queue
// can grow unbounded.
// NOTE the current policy is to discard the oldest data when the queue is full.
// TODO abstract out the policy to support other logic as needed (e.g. blocking
// when full).
//
// Example:
// ProducerConsumerQueue<int32_t> q;
//
// In thread #1
// ============
// int val;
// // |Pop| will block until a value is available or the channel is finished
// while (q.Pop(&val)) {
// ...
// }
//
// In thread #2
// ============
// q.Push(long_operation_1());
// q.Push(long_operation_2());
// q.Push(long_operation_3());
// q.Push(long_operation_4());
// q.Finish();
template <class T>
class ProducerConsumerQueue {
public:
ProducerConsumerQueue(int32_t max_length = -1)
: max_length_(max_length), is_finished_(false) {
assert(max_length_ > 0 || max_length_ == -1);
}
// No copy or move semantics
ProducerConsumerQueue(const ProducerConsumerQueue&) = delete;
ProducerConsumerQueue& operator=(const ProducerConsumerQueue&) = delete;
// Resets the queue.
void Reset() {
std::lock_guard<std::mutex> lock(queue_mutex_);
inner_queue_.clear();
is_finished_ = false;
peak_length_ = total_push_ = total_pop_ = 0;
sw_.Start();
}
// Push a value into the queue. Values will be consumed in the order entered
// by calls to |Pop|. If |Finish| was called on this channel, then the value
// entered here will be ignored (and |false| will be returned to indicate it).
template <class VALUE>
bool Push(VALUE&& value) {
std::lock_guard<std::mutex> lock(queue_mutex_);
if (is_finished_) {
return false;
}
if (inner_queue_.empty()) {
queue_cv_.notify_all();
}
// Removes oldest data to make room for value.
if (max_length_ > 0 && inner_queue_.size() >= max_length_) {
assert(inner_queue_.size() == max_length_);
inner_queue_.pop_front();
}
inner_queue_.push_back(std::forward<VALUE>(value));
total_push_++;
peak_length_ = std::max(peak_length_, (int32_t)inner_queue_.size());
return true;
}
// Pull a value out of the queue added by |Push|. If the queue is
// currently empty, this call will block until a value is put in, unless
// the queue was marked finished by calling |Finish|, at which point it will
// exit immediately and return |false|.
bool Pop(T* value) {
std::unique_lock<std::mutex> lock(queue_mutex_);
while (!is_finished_ && inner_queue_.empty()) {
queue_cv_.wait(lock);
}
if (!inner_queue_.empty()) {
*value = std::move(inner_queue_.front());
inner_queue_.pop_front();
total_pop_++;
return true;
} else {
assert(is_finished_);
return false;
}
}
// Pulls all the contents of the queue at once.
std::deque<T> Drain() {
std::unique_lock<std::mutex> lock(queue_mutex_);
std::deque<T> snapshot;
std::swap(snapshot, inner_queue_);
total_pop_ += snapshot.size();
return snapshot;
}
// Indicate that this queue shouldn't accept values anymore. When calling
// |Pop| on an empty queue that is finished, instead of blocking
// indefinitely, the method will return |false| immediately. This allows
// callers to pull data out of a queue in a while loop which will break
// automatically when the queue is finished.
void Finish() {
std::lock_guard<std::mutex> lock(queue_mutex_);
is_finished_ = true;
queue_cv_.notify_all();
}
// Bookkeeping - print tracking stats
void PrintStats() const {
std::lock_guard<std::mutex> lock(queue_mutex_);
int64_t elapsedus = Clock::ns_to_us(sw_.GetElapsed());
Log::V(">> Peak:%d, Push:%d(%.4f/us), Pop:%d(%.4f/us)", peak_length_,
total_push_, total_push_ / (double)elapsedus, total_pop_,
total_pop_ / (double)elapsedus);
}
private:
int32_t max_length_;
bool is_finished_;
mutable std::condition_variable queue_cv_;
mutable std::mutex queue_mutex_;
std::deque<T> inner_queue_;
Stopwatch sw_;
// Bookkeep stats
int32_t peak_length_ = 0;
int32_t total_push_ = 0;
int32_t total_pop_ = 0;
};
} // namespace profiler
#endif // PRODUCER_CONSUMER_QUEUE_H