blob: d4c42bdd53834fc435bb12b22576f4c1f93d0961 [file] [log] [blame]
* Copyright (C) 2022 The Android Open Source Project
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <new>
#include <type_traits>
#include "chre/platform/atomic.h"
#include "chre/util/fixed_size_vector.h"
#include "chre/util/memory.h"
#include "chre/util/non_copyable.h"
* @file
* AtomicSpscArrayQueue is a templated fixed-size FIFO queue implemented around
* a contiguous array supporting atomic single-producer, single-consumer (SPSC)
* usage. In other words, one thread of execution can safely add to the
* queue while a different thread of execution can can pull from the queue,
* without the use of locking. To ensure safe concurrency, the user of this
* class must ensure that producer methods do not interleave with other producer
* methods, and likewise for consumer methods. To help ensure this contract is
* upheld, producer-only methods are grouped under the Producer subclass
* (accessed via AtomicSpscArrayQueue::producer()), and likewise for Consumer.
* To accomplish concurrency without the use of locks, the head and tail
* pointers are allowed to increment past the size of the container. They are
* reset when new elements are pushed into an empty container, therefore the
* usage model must involve relatively frequent emptying of the container to
* prevent overflow of the indices. The nearingOverflow() method can be used to
* detect when this condition is imminent, and enable flow control or some other
* mechanism to ensure the queue is fully emptied before proceeding (though
* triggering an assert/fatal error could also be considered, since the set of
* conditions required to trigger this condition organically are expected to be
* so rare as to be effectively impossible, so a bug is a more likely cause).
* Since modulo operations are common in the internals of this container, it's
* recommended to use powers of 2 for the capacity where possible.
namespace chre {
template <typename ElementType, size_t kCapacity>
class AtomicSpscQueue : public NonCopyable {
// Since we rely on being able to increment mHead and mTail beyond kCapacity,
// this provides some level of guarantee that we'll be able to do that a few
// times before things are reset (when the queue is emptied).
static_assert(kCapacity <= UINT32_MAX / 8,
"Large capacity usage of AtomicSpscQueue is not advised");
* Destroying the queue must only be done when it is guaranteed that the
* producer and consumer execution contexts are both stopped.
~AtomicSpscQueue() {
size_t sz = size();
auto c = consumer();
for (size_t i = 0; i < sz; i++) {
size_t capacity() const {
return kCapacity;
* Checks whether the queue has not been fully emptied in a long time, and
* internal counters are nearing overflow, which would cause significant data
* loss if it occurs (consumer sees queue as empty when it actually isn't,
* until tail catches up to head). If this possibility is a concern, the
* producer should check this and if it returns true, enable flow control to
* stop adding new data to the queue until after the queue has been fully
* emptied.
* @return true internal counters/indices are nearing overflow
bool nearingOverflow() const {
return (mTail.load() > UINT32_MAX - kCapacity);
* Gets a snapshot of the number of elements currently stored in the queue.
* Safe to call from any context.
size_t size() const {
uint32_t head = mHead.load();
uint32_t tail = mTail.load();
// Note that head and tail are normally monotonically increasing with
// head <= tail, *except* when we are resetting both head and tail to 0
// (done only when adding new elements), in which case we reset tail first.
// If our reads happened between resetting tail and resetting head, then
// tail < head, and we can safely assume the queue is empty.
if (head == tail || tail < head) {
return 0;
} else {
return (tail - head);
* Non-const methods within this class must ONLY be invoked from the producer
* execution context.
class Producer {
size_t capacity() const {
return kCapacity;
bool full() const {
return (size() == kCapacity);
size_t size() const {
return mQueue.size();
* Constructs a new item at the end of the queue in-place.
* WARNING: Undefined behavior if the array queue is currently full.
template <typename... Args>
void emplace(Args &&...args) {
uint32_t newTail;
new (nextStorage(&newTail)) ElementType(std::forward<Args>(args)...);
mQueue.mTail = newTail;
* Pushes an element onto the back of the array queue.
* WARNING: Undefined behavior if the array queue is currently full.
void push(const ElementType &element) {
uint32_t newTail;
new (nextStorage(&newTail)) ElementType(element);
mQueue.mTail = newTail;
//! Move construction version of push(const ElementType&)
void push(ElementType &&element) {
uint32_t newTail;
new (nextStorage(&newTail)) ElementType(std::move(element));
mQueue.mTail = newTail;
friend class AtomicSpscQueue;
Producer(AtomicSpscQueue<ElementType, kCapacity> &q) : mQueue(q) {}
AtomicSpscQueue<ElementType, kCapacity> &mQueue;
//! Fetches a pointer to the next location where we should push an element,
//! and updates bookkeeping for the next next location
ElementType *nextStorage(uint32_t *newTail) {
uint32_t tail = mQueue.mTail.load();
if (tail != 0 && tail == mQueue.mHead.load()) {
// We're empty, so reset both head and tail to 0 so it doesn't continue
// to grow (and possibly overflow). Only do this when pushing, as this
// is the only place we can guarantee that mHead is stable (there's
// nothing for the consumer to retrieve, and attempting to pull from an
// empty queue is UB) and mTail is too (we're in the producer context).
// Note that we need to reset tail *first* to ensure size() is safe to
// call from both contexts.
mQueue.mTail = 0;
mQueue.mHead = 0;
tail = 0;
} else {
// If tail overflows (only possible if the producer *always* pushes a
// new element while the consumer is reading, meaning that the queue
// never gets fully emptied, and this continues until the tail pointer
// reaches the max here), then size() will consider the queue empty and
// things will get very broken.
*newTail = tail + 1;
return &[tail % kCapacity];
Producer producer() {
return Producer(*this);
* Non-const methods within this class must ONLY be invoked from the consumer
* execution context.
class Consumer {
size_t capacity() const {
return kCapacity;
bool empty() const {
return (size() == 0);
size_t size() const {
return mQueue.size();
* Retrieves a reference to the oldest element in the queue.
* WARNING: Undefined behavior if the queue is currently empty
ElementType &front() {
return[mQueue.mHead.load() % kCapacity];
const ElementType &front() const {
return[mQueue.mHead.load() % kCapacity];
* Removes the oldest element in the queue.
* WARNING: Undefined behavior if the queue is currently empty
void pop() {
// Destructing prior to moving the head pointer is safe as long as this
// doesn't interleave with other Producer methods
uint32_t headRaw = mQueue.mHead;
uint32_t headIndex = headRaw % kCapacity;[headIndex].~ElementType();
mQueue.mHead = headRaw + 1;
* Moves or copies a block of elements into the provided (possibly
* uninitialized) destination storage.
* Safe to call if the queue is currently empty (includes an internal
* check).
* @param dest Pointer to destination array
* @param count Maximum number of elements to extract
* @return Number of elements actually pulled out of the queue.
size_t extract(ElementType *dest, size_t count) {
size_t elementsToCopy = std::min(mQueue.size(), count);
return extractInternal(dest, elementsToCopy);
//! Equivalent to extract(ElementType*, size_t) but appends to the provided
//! FixedSizeVector up to its capacity
template <size_t kDestCapacity>
size_t extract(FixedSizeVector<ElementType, kDestCapacity> *dest) {
size_t destIndex = dest->size();
size_t elementsToCopy =
std::min(mQueue.size(), dest->capacity() - destIndex);
dest->resize(destIndex + elementsToCopy);
return extractInternal(&dest->data()[destIndex], elementsToCopy);
friend class AtomicSpscQueue;
Consumer(AtomicSpscQueue<ElementType, kCapacity> &q) : mQueue(q) {}
AtomicSpscQueue<ElementType, kCapacity> &mQueue;
size_t extractInternal(ElementType *dest, size_t elementsToCopy) {
if (elementsToCopy > 0) {
uint32_t headRaw = mQueue.mHead;
size_t headIndex = headRaw % kCapacity;
size_t firstCopy = std::min(elementsToCopy, kCapacity - headIndex);
uninitializedMoveOrCopy(&[headIndex], firstCopy, dest);
destroy(&[headIndex], firstCopy);
if (firstCopy != elementsToCopy) {
size_t secondCopy = elementsToCopy - firstCopy;
uninitializedMoveOrCopy(&[0], secondCopy,
destroy(&[0], secondCopy);
mQueue.mHead = headRaw + elementsToCopy;
return elementsToCopy;
Consumer consumer() {
return Consumer(*this);
//! Index of the oldest element on the queue (first to be popped). If the
//! queue is empty, this is equal to mTail (modulo kCapacity) *or* for a very
//! brief time it may be greater than mTail (when we're resetting both to 0).
chre::AtomicUint32 mHead{0};
//! Indicator of where we will push the next element -- to provide atomic
//! behavior, this may exceed kCapacity, so modulo kCapacity is needed to
//! convert this into an array index.
chre::AtomicUint32 mTail{0};
typename std::aligned_storage<sizeof(ElementType), alignof(ElementType)>::type
ElementType *data() {
return reinterpret_cast<ElementType *>(mData);
} // namespace chre