[automerge] fmq_fuzzer: Exit blocking threads when no more work 2p: b1f22fd744
Original change: https://googleplex-android-review.googlesource.com/c/platform/system/libfmq/+/17453953
Bug: 218521670
Change-Id: Ic887ec8d76c322e238234ec9c67e2cc4fa00ef0a
diff --git a/fuzzer/fmq_fuzzer.cpp b/fuzzer/fmq_fuzzer.cpp
index 79ce5fc..8c8a78e 100644
--- a/fuzzer/fmq_fuzzer.cpp
+++ b/fuzzer/fmq_fuzzer.cpp
@@ -21,6 +21,7 @@
#include <thread>
#include <android-base/logging.h>
+#include <android-base/scopeguard.h>
#include <fmq/AidlMessageQueue.h>
#include <fmq/ConvertMQDescriptors.h>
#include <fmq/EventFlag.h>
@@ -58,7 +59,7 @@
static constexpr int kMaxNumSyncReaders = 1;
static constexpr int kMaxNumUnsyncReaders = 5;
-static constexpr int kMaxDataPerReader = 5;
+static constexpr int kMaxDataPerReader = 1000;
typedef android::AidlMessageQueue<payload_t, SynchronizedReadWrite> AidlMessageQueueSync;
typedef android::AidlMessageQueue<payload_t, UnsynchronizedWrite> AidlMessageQueueUnsync;
@@ -111,30 +112,35 @@
}
template <typename Queue, typename Desc>
-void readerBlocking(const Desc& desc, std::vector<uint8_t> readerData) {
+void readerBlocking(const Desc& desc, std::vector<uint8_t>& readerData,
+ std::atomic<size_t>& readersNotFinished,
+ std::atomic<size_t>& writersNotFinished) {
+ android::base::ScopeGuard guard([&readersNotFinished]() { readersNotFinished--; });
Queue readMq(desc);
if (!readMq.isValid()) {
LOG(ERROR) << "read mq invalid";
return;
}
FuzzedDataProvider fdp(&readerData[0], readerData.size());
- bool success;
do {
size_t count = fdp.remaining_bytes()
? fdp.ConsumeIntegralInRange<size_t>(1, readMq.getQuantumCount())
: 1;
std::vector<payload_t> data;
data.resize(count);
- success = readMq.readBlocking(data.data(), count, kBlockingTimeoutNs);
- } while (success == true || fdp.remaining_bytes() > sizeof(size_t));
+ readMq.readBlocking(data.data(), count, kBlockingTimeoutNs);
+ } while (fdp.remaining_bytes() > sizeof(size_t) && writersNotFinished > 0);
}
// Can't use blocking calls with Unsync queues(there is a static_assert)
template <>
void readerBlocking<AidlMessageQueueUnsync, AidlMQDescUnsync>(const AidlMQDescUnsync&,
- std::vector<uint8_t>) {}
+ std::vector<uint8_t>&,
+ std::atomic<size_t>&,
+ std::atomic<size_t>&) {}
template <>
-void readerBlocking<MessageQueueUnsync, MQDescUnsync>(const MQDescUnsync&, std::vector<uint8_t>) {}
+void readerBlocking<MessageQueueUnsync, MQDescUnsync>(const MQDescUnsync&, std::vector<uint8_t>&,
+ std::atomic<size_t>&, std::atomic<size_t>&) {}
template <typename Queue>
void writer(Queue& writeMq, FuzzedDataProvider& fdp, bool userFd) {
@@ -168,8 +174,11 @@
}
template <typename Queue>
-void writerBlocking(Queue& writeMq, FuzzedDataProvider& fdp) {
- while (fdp.remaining_bytes() > sizeof(size_t)) {
+void writerBlocking(Queue& writeMq, FuzzedDataProvider& fdp,
+ std::atomic<size_t>& writersNotFinished,
+ std::atomic<size_t>& readersNotFinished) {
+ android::base::ScopeGuard guard([&writersNotFinished]() { writersNotFinished--; });
+ while (fdp.remaining_bytes() > sizeof(size_t) && readersNotFinished > 0) {
size_t count = fdp.ConsumeIntegralInRange<size_t>(1, writeMq.getQuantumCount());
std::vector<payload_t> data;
for (int i = 0; i < count; i++) {
@@ -181,9 +190,11 @@
// Can't use blocking calls with Unsync queues(there is a static_assert)
template <>
-void writerBlocking<AidlMessageQueueUnsync>(AidlMessageQueueUnsync&, FuzzedDataProvider&) {}
+void writerBlocking<AidlMessageQueueUnsync>(AidlMessageQueueUnsync&, FuzzedDataProvider&,
+ std::atomic<size_t>&, std::atomic<size_t>&) {}
template <>
-void writerBlocking<MessageQueueUnsync>(MessageQueueUnsync&, FuzzedDataProvider&) {}
+void writerBlocking<MessageQueueUnsync>(MessageQueueUnsync&, FuzzedDataProvider&,
+ std::atomic<size_t>&, std::atomic<size_t>&) {}
template <typename Queue, typename Desc>
void fuzzAidlWithReaders(std::vector<uint8_t>& writerData,
@@ -207,25 +218,29 @@
const auto desc = writeMq.dupeDesc();
CHECK(desc.handle.fds[0].get() != -1);
- std::vector<std::thread> clients;
+ std::atomic<size_t> readersNotFinished = readerData.size();
+ std::atomic<size_t> writersNotFinished = 1;
+ std::vector<std::thread> readers;
for (int i = 0; i < readerData.size(); i++) {
if (blocking) {
- clients.emplace_back(readerBlocking<Queue, Desc>, std::ref(desc),
- std::ref(readerData[i]));
+ readers.emplace_back(readerBlocking<Queue, Desc>, std::ref(desc),
+ std::ref(readerData[i]), std::ref(readersNotFinished),
+ std::ref(writersNotFinished));
+
} else {
- clients.emplace_back(reader<Queue, Desc>, std::ref(desc), std::ref(readerData[i]),
+ readers.emplace_back(reader<Queue, Desc>, std::ref(desc), std::ref(readerData[i]),
userFd);
}
}
if (blocking) {
- writerBlocking<Queue>(writeMq, fdp);
+ writerBlocking<Queue>(writeMq, fdp, writersNotFinished, readersNotFinished);
} else {
writer<Queue>(writeMq, fdp, userFd);
}
- for (auto& client : clients) {
- client.join();
+ for (auto& reader : readers) {
+ reader.join();
}
}
@@ -251,25 +266,28 @@
const auto desc = writeMq.getDesc();
CHECK(desc->isHandleValid());
- std::vector<std::thread> clients;
+ std::atomic<size_t> readersNotFinished = readerData.size();
+ std::atomic<size_t> writersNotFinished = 1;
+ std::vector<std::thread> readers;
for (int i = 0; i < readerData.size(); i++) {
if (blocking) {
- clients.emplace_back(readerBlocking<Queue, Desc>, std::ref(*desc),
- std::ref(readerData[i]));
+ readers.emplace_back(readerBlocking<Queue, Desc>, std::ref(*desc),
+ std::ref(readerData[i]), std::ref(readersNotFinished),
+ std::ref(writersNotFinished));
} else {
- clients.emplace_back(reader<Queue, Desc>, std::ref(*desc), std::ref(readerData[i]),
+ readers.emplace_back(reader<Queue, Desc>, std::ref(*desc), std::ref(readerData[i]),
userFd);
}
}
if (blocking) {
- writerBlocking<Queue>(writeMq, fdp);
+ writerBlocking<Queue>(writeMq, fdp, writersNotFinished, readersNotFinished);
} else {
writer<Queue>(writeMq, fdp, userFd);
}
- for (auto& client : clients) {
- client.join();
+ for (auto& reader : readers) {
+ reader.join();
}
}