Merge "Process write overflow with number of elements" into main
diff --git a/include/fmq/MessageQueueBase.h b/include/fmq/MessageQueueBase.h
index 3161c5b..8d65108 100644
--- a/include/fmq/MessageQueueBase.h
+++ b/include/fmq/MessageQueueBase.h
@@ -24,7 +24,7 @@
#include <utils/Log.h>
#include <utils/SystemClock.h>
#include <atomic>
-#include <new>
+#include <functional>
using android::hardware::kSynchronizedReadWrite;
using android::hardware::kUnsynchronizedWrite;
@@ -46,6 +46,7 @@
NONE,
POINTER_CORRUPTION, /** Read/write pointers mismatch */
};
+ using ErrorHandler = std::function<void(Error, std::string&&)>;
/**
* @param Desc MQDescriptor describing the FMQ.
@@ -86,28 +87,22 @@
0) {}
/**
- * @param errorDetected Optional output parameter which indicates
- * any errors that the client might care about.
- * @param errorMessage Optional output parameter for a human-readable
- * error description.
- *
+ * Set a client side error handler function which will be invoked when the FMQ detects
+ * one of the error situations defined by the 'Error' type.
+ */
+ void setErrorHandler(ErrorHandler&& handler) { mErrorHandler.swap(handler); }
+
+ /**
* @return Number of items of type T that can be written into the FMQ
* without a read.
*/
- size_t availableToWrite(Error* errorDetected = nullptr,
- std::string* errorMessage = nullptr) const;
+ size_t availableToWrite() const;
/**
- * @param errorDetected Optional output parameter which indicates
- * any errors that the client might care about.
- * @param errorMessage Optional output parameter for a human-readable
- * error description.
- *
* @return Number of items of type T that are waiting to be read from the
* FMQ.
*/
- size_t availableToRead(Error* errorDetected = nullptr,
- std::string* errorMessage = nullptr) const;
+ size_t availableToRead() const;
/**
* Returns the size of type T in bytes.
@@ -482,8 +477,8 @@
typename std::enable_if<!std::is_same<U, MQErased>::value, bool>::type = true>
static constexpr size_t kQuantumValue = sizeof(T);
inline size_t quantum() const;
- size_t availableToWriteBytes(Error* errorDetected, std::string* errorMessage) const;
- size_t availableToReadBytes(Error* errorDetected, std::string* errorMessage) const;
+ size_t availableToWriteBytes() const;
+ size_t availableToReadBytes() const;
MessageQueueBase(const MessageQueueBase& other) = delete;
MessageQueueBase& operator=(const MessageQueueBase& other) = delete;
@@ -517,6 +512,8 @@
*/
android::hardware::EventFlag* mEventFlag = nullptr;
+ ErrorHandler mErrorHandler;
+
const size_t kPageSize = getpagesize();
};
@@ -1137,28 +1134,17 @@
}
template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
-size_t MessageQueueBase<MQDescriptorType, T, flavor>::availableToWriteBytes(
- Error* errorDetected, std::string* errorMessage) const {
+size_t MessageQueueBase<MQDescriptorType, T, flavor>::availableToWriteBytes() const {
size_t queueSizeBytes = mDesc->getSize();
- Error localErrorDetected = Error::NONE;
- size_t availableBytes = availableToReadBytes(&localErrorDetected, errorMessage);
- if (localErrorDetected != Error::NONE) {
- if (errorDetected != nullptr) {
- *errorDetected = localErrorDetected;
- }
- return 0;
- }
+ size_t availableBytes = availableToReadBytes();
if (queueSizeBytes < availableBytes) {
std::string errorMsg =
"The write or read pointer has become corrupted. Writing to the queue is no "
"longer possible. Queue size: " +
std::to_string(queueSizeBytes) + ", available: " + std::to_string(availableBytes);
hardware::details::logError(errorMsg);
- if (errorDetected != nullptr) {
- *errorDetected = Error::POINTER_CORRUPTION;
- }
- if (errorMessage != nullptr) {
- *errorMessage = std::move(errorMsg);
+ if (mErrorHandler) {
+ mErrorHandler(Error::POINTER_CORRUPTION, std::move(errorMsg));
}
return 0;
}
@@ -1166,15 +1152,13 @@
}
template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
-size_t MessageQueueBase<MQDescriptorType, T, flavor>::availableToWrite(
- Error* errorDetected, std::string* errorMessage) const {
- return availableToWriteBytes(errorDetected, errorMessage) / quantum();
+size_t MessageQueueBase<MQDescriptorType, T, flavor>::availableToWrite() const {
+ return availableToWriteBytes() / quantum();
}
template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
-size_t MessageQueueBase<MQDescriptorType, T, flavor>::availableToRead(
- Error* errorDetected, std::string* errorMessage) const {
- return availableToReadBytes(errorDetected, errorMessage) / quantum();
+size_t MessageQueueBase<MQDescriptorType, T, flavor>::availableToRead() const {
+ return availableToReadBytes() / quantum();
}
template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
@@ -1193,10 +1177,15 @@
auto writePtr = mWritePtr->load(std::memory_order_relaxed);
if (writePtr % quantum() != 0) {
- hardware::details::logError(
- "The write pointer has become misaligned. Writing to the queue is no longer "
- "possible.");
+ std::string errorMsg =
+ "The write pointer has become misaligned. Writing to the queue is not possible. "
+ "Pointer: " +
+ std::to_string(writePtr) + ", quantum: " + std::to_string(quantum());
+ hardware::details::logError(errorMsg);
hardware::details::errorWriteLog(0x534e4554, "184963385");
+ if (mErrorHandler) {
+ mErrorHandler(Error::POINTER_CORRUPTION, std::move(errorMsg));
+ }
return false;
}
size_t writeOffset = writePtr % mDesc->getSize();
@@ -1246,8 +1235,7 @@
}
template <template <typename, MQFlavor> typename MQDescriptorType, typename T, MQFlavor flavor>
-size_t MessageQueueBase<MQDescriptorType, T, flavor>::availableToReadBytes(
- Error* errorDetected, std::string* errorMessage) const {
+size_t MessageQueueBase<MQDescriptorType, T, flavor>::availableToReadBytes() const {
/*
* This method is invoked by implementations of both read() and write() and
* hence requires a memory_order_acquired load for both mReadPtr and
@@ -1261,11 +1249,8 @@
"longer possible. Write pointer: " +
std::to_string(writePtr) + ", read pointer: " + std::to_string(readPtr);
hardware::details::logError(errorMsg);
- if (errorDetected != nullptr) {
- *errorDetected = Error::POINTER_CORRUPTION;
- }
- if (errorMessage != nullptr) {
- *errorMessage = std::move(errorMsg);
+ if (mErrorHandler) {
+ mErrorHandler(Error::POINTER_CORRUPTION, std::move(errorMsg));
}
return 0;
}
diff --git a/tests/msgq_test_client.cpp b/tests/msgq_test_client.cpp
index b64b21d..22c91d1 100644
--- a/tests/msgq_test_client.cpp
+++ b/tests/msgq_test_client.cpp
@@ -752,6 +752,69 @@
}
/*
+ * Write a message to the queue, get a pointer to the memory region for that
+ * first message. Set the write counter to the last byte in the ring buffer.
+ * Try another write, it should fail because the write address is misaligned.
+ */
+TYPED_TEST(SynchronizedReadWriteClient, MisalignedWriteCounterClientSide) {
+ if (TypeParam::UserFd) {
+ // When using the second FD for the ring buffer, we can't get to the read/write
+ // counters from a pointer to the ring buffer, so no sense in testing.
+ GTEST_SKIP();
+ }
+
+ bool errorCallbackTriggered = false;
+ auto errorHandler = [&errorCallbackTriggered](TypeParam::MQType::Error error, std::string&&) {
+ if (error == TypeParam::MQType::Error::POINTER_CORRUPTION) {
+ errorCallbackTriggered = true;
+ }
+ };
+ this->mQueue->setErrorHandler(errorHandler);
+ EXPECT_FALSE(errorCallbackTriggered);
+
+ const size_t dataLen = 1;
+ ASSERT_LE(dataLen, kNumElementsInSyncQueue);
+ int32_t data[dataLen];
+ initData(data, dataLen);
+ // begin write and get a MemTransaction object for the first object in the queue
+ typename TypeParam::MQType::MemTransaction tx;
+ ASSERT_TRUE(this->mQueue->beginWrite(dataLen, &tx));
+ EXPECT_FALSE(errorCallbackTriggered);
+
+ // get a pointer to the beginning of the ring buffer
+ const auto& region = tx.getFirstRegion();
+ int32_t* firstStart = region.getAddress();
+
+ // because this is the first location in the ring buffer, we can get
+ // access to the read and write pointer stored in the fd. 8 bytes back for the
+ // write counter and 16 bytes back for the read counter
+ uint64_t* writeCntr = (uint64_t*)((uint8_t*)firstStart - 8);
+
+ // set it to point to the very last byte in the ring buffer
+ *(writeCntr) = this->mQueue->getQuantumCount() * this->mQueue->getQuantumSize() - 1;
+ ASSERT_TRUE(*writeCntr % sizeof(int32_t) != 0);
+ EXPECT_FALSE(errorCallbackTriggered);
+
+ ASSERT_TRUE(this->mQueue->commitWrite(dataLen));
+ EXPECT_FALSE(errorCallbackTriggered);
+
+ // This next write will be misaligned and will overlap outside of the ring buffer.
+ // The write should fail.
+ EXPECT_FALSE(this->mQueue->write(data, dataLen));
+ EXPECT_TRUE(errorCallbackTriggered);
+
+ errorCallbackTriggered = false;
+ EXPECT_EQ(0, this->mQueue->availableToWrite());
+ EXPECT_TRUE(errorCallbackTriggered);
+
+ // Check that it is possible to reset the error handler.
+ errorCallbackTriggered = false;
+ this->mQueue->setErrorHandler(nullptr);
+ EXPECT_EQ(0, this->mQueue->availableToWrite());
+ EXPECT_FALSE(errorCallbackTriggered);
+}
+
+/*
* Write a small number of messages to FMQ using the beginWrite()/CommitWrite()
* APIs. Request mService to read and verify that the write was successful.
*/