blob: d4c42bdd53834fc435bb12b22576f4c1f93d0961 [file] [log] [blame]
/*
* Copyright (C) 2022 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef CHRE_UTIL_ATOMIC_SPSC_QUEUE_H_
#define CHRE_UTIL_ATOMIC_SPSC_QUEUE_H_
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <new>
#include <type_traits>
#include "chre/platform/atomic.h"
#include "chre/util/fixed_size_vector.h"
#include "chre/util/memory.h"
#include "chre/util/non_copyable.h"
/**
* @file
* AtomicSpscArrayQueue is a templated fixed-size FIFO queue implemented around
* a contiguous array supporting atomic single-producer, single-consumer (SPSC)
* usage. In other words, one thread of execution can safely add to the
* queue while a different thread of execution can can pull from the queue,
* without the use of locking. To ensure safe concurrency, the user of this
* class must ensure that producer methods do not interleave with other producer
* methods, and likewise for consumer methods. To help ensure this contract is
* upheld, producer-only methods are grouped under the Producer subclass
* (accessed via AtomicSpscArrayQueue::producer()), and likewise for Consumer.
*
* To accomplish concurrency without the use of locks, the head and tail
* pointers are allowed to increment past the size of the container. They are
* reset when new elements are pushed into an empty container, therefore the
* usage model must involve relatively frequent emptying of the container to
* prevent overflow of the indices. The nearingOverflow() method can be used to
* detect when this condition is imminent, and enable flow control or some other
* mechanism to ensure the queue is fully emptied before proceeding (though
* triggering an assert/fatal error could also be considered, since the set of
* conditions required to trigger this condition organically are expected to be
* so rare as to be effectively impossible, so a bug is a more likely cause).
*
* Since modulo operations are common in the internals of this container, it's
* recommended to use powers of 2 for the capacity where possible.
*/
namespace chre {
template <typename ElementType, size_t kCapacity>
class AtomicSpscQueue : public NonCopyable {
// Since we rely on being able to increment mHead and mTail beyond kCapacity,
// this provides some level of guarantee that we'll be able to do that a few
// times before things are reset (when the queue is emptied).
static_assert(kCapacity <= UINT32_MAX / 8,
"Large capacity usage of AtomicSpscQueue is not advised");
public:
/**
* Destroying the queue must only be done when it is guaranteed that the
* producer and consumer execution contexts are both stopped.
*/
~AtomicSpscQueue() {
size_t sz = size();
auto c = consumer();
for (size_t i = 0; i < sz; i++) {
c.pop();
}
}
size_t capacity() const {
return kCapacity;
}
/**
* Checks whether the queue has not been fully emptied in a long time, and
* internal counters are nearing overflow, which would cause significant data
* loss if it occurs (consumer sees queue as empty when it actually isn't,
* until tail catches up to head). If this possibility is a concern, the
* producer should check this and if it returns true, enable flow control to
* stop adding new data to the queue until after the queue has been fully
* emptied.
*
* @return true internal counters/indices are nearing overflow
*/
bool nearingOverflow() const {
return (mTail.load() > UINT32_MAX - kCapacity);
}
/**
* Gets a snapshot of the number of elements currently stored in the queue.
* Safe to call from any context.
*/
size_t size() const {
uint32_t head = mHead.load();
uint32_t tail = mTail.load();
// Note that head and tail are normally monotonically increasing with
// head <= tail, *except* when we are resetting both head and tail to 0
// (done only when adding new elements), in which case we reset tail first.
// If our reads happened between resetting tail and resetting head, then
// tail < head, and we can safely assume the queue is empty.
if (head == tail || tail < head) {
return 0;
} else {
return (tail - head);
}
}
/**
* Non-const methods within this class must ONLY be invoked from the producer
* execution context.
*/
class Producer {
public:
size_t capacity() const {
return kCapacity;
}
bool full() const {
return (size() == kCapacity);
}
size_t size() const {
return mQueue.size();
}
/**
* Constructs a new item at the end of the queue in-place.
*
* WARNING: Undefined behavior if the array queue is currently full.
*/
template <typename... Args>
void emplace(Args &&...args) {
uint32_t newTail;
new (nextStorage(&newTail)) ElementType(std::forward<Args>(args)...);
mQueue.mTail = newTail;
}
/**
* Pushes an element onto the back of the array queue.
*
* WARNING: Undefined behavior if the array queue is currently full.
*/
void push(const ElementType &element) {
uint32_t newTail;
new (nextStorage(&newTail)) ElementType(element);
mQueue.mTail = newTail;
}
//! Move construction version of push(const ElementType&)
void push(ElementType &&element) {
uint32_t newTail;
new (nextStorage(&newTail)) ElementType(std::move(element));
mQueue.mTail = newTail;
}
private:
friend class AtomicSpscQueue;
Producer(AtomicSpscQueue<ElementType, kCapacity> &q) : mQueue(q) {}
AtomicSpscQueue<ElementType, kCapacity> &mQueue;
//! Fetches a pointer to the next location where we should push an element,
//! and updates bookkeeping for the next next location
ElementType *nextStorage(uint32_t *newTail) {
uint32_t tail = mQueue.mTail.load();
if (tail != 0 && tail == mQueue.mHead.load()) {
// We're empty, so reset both head and tail to 0 so it doesn't continue
// to grow (and possibly overflow). Only do this when pushing, as this
// is the only place we can guarantee that mHead is stable (there's
// nothing for the consumer to retrieve, and attempting to pull from an
// empty queue is UB) and mTail is too (we're in the producer context).
// Note that we need to reset tail *first* to ensure size() is safe to
// call from both contexts.
mQueue.mTail = 0;
mQueue.mHead = 0;
tail = 0;
} else {
// If tail overflows (only possible if the producer *always* pushes a
// new element while the consumer is reading, meaning that the queue
// never gets fully emptied, and this continues until the tail pointer
// reaches the max here), then size() will consider the queue empty and
// things will get very broken.
CHRE_ASSERT(tail < UINT32_MAX);
}
*newTail = tail + 1;
return &mQueue.data()[tail % kCapacity];
}
};
Producer producer() {
return Producer(*this);
}
/**
* Non-const methods within this class must ONLY be invoked from the consumer
* execution context.
*/
class Consumer {
public:
size_t capacity() const {
return kCapacity;
}
bool empty() const {
return (size() == 0);
}
size_t size() const {
return mQueue.size();
}
/**
* Retrieves a reference to the oldest element in the queue.
*
* WARNING: Undefined behavior if the queue is currently empty
*/
ElementType &front() {
return mQueue.data()[mQueue.mHead.load() % kCapacity];
}
const ElementType &front() const {
return mQueue.data()[mQueue.mHead.load() % kCapacity];
}
/**
* Removes the oldest element in the queue.
*
* WARNING: Undefined behavior if the queue is currently empty
*/
void pop() {
// Destructing prior to moving the head pointer is safe as long as this
// doesn't interleave with other Producer methods
uint32_t headRaw = mQueue.mHead;
uint32_t headIndex = headRaw % kCapacity;
mQueue.data()[headIndex].~ElementType();
mQueue.mHead = headRaw + 1;
}
/**
* Moves or copies a block of elements into the provided (possibly
* uninitialized) destination storage.
*
* Safe to call if the queue is currently empty (includes an internal
* check).
*
* @param dest Pointer to destination array
* @param count Maximum number of elements to extract
*
* @return Number of elements actually pulled out of the queue.
*/
size_t extract(ElementType *dest, size_t count) {
size_t elementsToCopy = std::min(mQueue.size(), count);
return extractInternal(dest, elementsToCopy);
}
//! Equivalent to extract(ElementType*, size_t) but appends to the provided
//! FixedSizeVector up to its capacity
template <size_t kDestCapacity>
size_t extract(FixedSizeVector<ElementType, kDestCapacity> *dest) {
size_t destIndex = dest->size();
size_t elementsToCopy =
std::min(mQueue.size(), dest->capacity() - destIndex);
dest->resize(destIndex + elementsToCopy);
return extractInternal(&dest->data()[destIndex], elementsToCopy);
}
private:
friend class AtomicSpscQueue;
Consumer(AtomicSpscQueue<ElementType, kCapacity> &q) : mQueue(q) {}
AtomicSpscQueue<ElementType, kCapacity> &mQueue;
size_t extractInternal(ElementType *dest, size_t elementsToCopy) {
if (elementsToCopy > 0) {
uint32_t headRaw = mQueue.mHead;
size_t headIndex = headRaw % kCapacity;
size_t firstCopy = std::min(elementsToCopy, kCapacity - headIndex);
uninitializedMoveOrCopy(&mQueue.data()[headIndex], firstCopy, dest);
destroy(&mQueue.data()[headIndex], firstCopy);
if (firstCopy != elementsToCopy) {
size_t secondCopy = elementsToCopy - firstCopy;
uninitializedMoveOrCopy(&mQueue.data()[0], secondCopy,
&dest[firstCopy]);
destroy(&mQueue.data()[0], secondCopy);
}
mQueue.mHead = headRaw + elementsToCopy;
}
return elementsToCopy;
}
};
Consumer consumer() {
return Consumer(*this);
}
protected:
//! Index of the oldest element on the queue (first to be popped). If the
//! queue is empty, this is equal to mTail (modulo kCapacity) *or* for a very
//! brief time it may be greater than mTail (when we're resetting both to 0).
chre::AtomicUint32 mHead{0};
//! Indicator of where we will push the next element -- to provide atomic
//! behavior, this may exceed kCapacity, so modulo kCapacity is needed to
//! convert this into an array index.
chre::AtomicUint32 mTail{0};
typename std::aligned_storage<sizeof(ElementType), alignof(ElementType)>::type
mData[kCapacity];
ElementType *data() {
return reinterpret_cast<ElementType *>(mData);
}
};
} // namespace chre
#endif // CHRE_UTIL_ATOMIC_SPSC_QUEUE_H_