[automerge] fmq_fuzzer: Exit blocking threads when no more work 2p: b1f22fd744

Original change: https://googleplex-android-review.googlesource.com/c/platform/system/libfmq/+/17453953

Bug: 218521670
Change-Id: Ic887ec8d76c322e238234ec9c67e2cc4fa00ef0a
diff --git a/fuzzer/fmq_fuzzer.cpp b/fuzzer/fmq_fuzzer.cpp
index 79ce5fc..8c8a78e 100644
--- a/fuzzer/fmq_fuzzer.cpp
+++ b/fuzzer/fmq_fuzzer.cpp
@@ -21,6 +21,7 @@
 #include <thread>
 
 #include <android-base/logging.h>
+#include <android-base/scopeguard.h>
 #include <fmq/AidlMessageQueue.h>
 #include <fmq/ConvertMQDescriptors.h>
 #include <fmq/EventFlag.h>
@@ -58,7 +59,7 @@
 
 static constexpr int kMaxNumSyncReaders = 1;
 static constexpr int kMaxNumUnsyncReaders = 5;
-static constexpr int kMaxDataPerReader = 5;
+static constexpr int kMaxDataPerReader = 1000;
 
 typedef android::AidlMessageQueue<payload_t, SynchronizedReadWrite> AidlMessageQueueSync;
 typedef android::AidlMessageQueue<payload_t, UnsynchronizedWrite> AidlMessageQueueUnsync;
@@ -111,30 +112,35 @@
 }
 
 template <typename Queue, typename Desc>
-void readerBlocking(const Desc& desc, std::vector<uint8_t> readerData) {
+void readerBlocking(const Desc& desc, std::vector<uint8_t>& readerData,
+                    std::atomic<size_t>& readersNotFinished,
+                    std::atomic<size_t>& writersNotFinished) {
+    android::base::ScopeGuard guard([&readersNotFinished]() { readersNotFinished--; });
     Queue readMq(desc);
     if (!readMq.isValid()) {
         LOG(ERROR) << "read mq invalid";
         return;
     }
     FuzzedDataProvider fdp(&readerData[0], readerData.size());
-    bool success;
     do {
         size_t count = fdp.remaining_bytes()
                                ? fdp.ConsumeIntegralInRange<size_t>(1, readMq.getQuantumCount())
                                : 1;
         std::vector<payload_t> data;
         data.resize(count);
-        success = readMq.readBlocking(data.data(), count, kBlockingTimeoutNs);
-    } while (success == true || fdp.remaining_bytes() > sizeof(size_t));
+        readMq.readBlocking(data.data(), count, kBlockingTimeoutNs);
+    } while (fdp.remaining_bytes() > sizeof(size_t) && writersNotFinished > 0);
 }
 
 // Can't use blocking calls with Unsync queues(there is a static_assert)
 template <>
 void readerBlocking<AidlMessageQueueUnsync, AidlMQDescUnsync>(const AidlMQDescUnsync&,
-                                                              std::vector<uint8_t>) {}
+                                                              std::vector<uint8_t>&,
+                                                              std::atomic<size_t>&,
+                                                              std::atomic<size_t>&) {}
 template <>
-void readerBlocking<MessageQueueUnsync, MQDescUnsync>(const MQDescUnsync&, std::vector<uint8_t>) {}
+void readerBlocking<MessageQueueUnsync, MQDescUnsync>(const MQDescUnsync&, std::vector<uint8_t>&,
+                                                      std::atomic<size_t>&, std::atomic<size_t>&) {}
 
 template <typename Queue>
 void writer(Queue& writeMq, FuzzedDataProvider& fdp, bool userFd) {
@@ -168,8 +174,11 @@
 }
 
 template <typename Queue>
-void writerBlocking(Queue& writeMq, FuzzedDataProvider& fdp) {
-    while (fdp.remaining_bytes() > sizeof(size_t)) {
+void writerBlocking(Queue& writeMq, FuzzedDataProvider& fdp,
+                    std::atomic<size_t>& writersNotFinished,
+                    std::atomic<size_t>& readersNotFinished) {
+    android::base::ScopeGuard guard([&writersNotFinished]() { writersNotFinished--; });
+    while (fdp.remaining_bytes() > sizeof(size_t) && readersNotFinished > 0) {
         size_t count = fdp.ConsumeIntegralInRange<size_t>(1, writeMq.getQuantumCount());
         std::vector<payload_t> data;
         for (int i = 0; i < count; i++) {
@@ -181,9 +190,11 @@
 
 // Can't use blocking calls with Unsync queues(there is a static_assert)
 template <>
-void writerBlocking<AidlMessageQueueUnsync>(AidlMessageQueueUnsync&, FuzzedDataProvider&) {}
+void writerBlocking<AidlMessageQueueUnsync>(AidlMessageQueueUnsync&, FuzzedDataProvider&,
+                                            std::atomic<size_t>&, std::atomic<size_t>&) {}
 template <>
-void writerBlocking<MessageQueueUnsync>(MessageQueueUnsync&, FuzzedDataProvider&) {}
+void writerBlocking<MessageQueueUnsync>(MessageQueueUnsync&, FuzzedDataProvider&,
+                                        std::atomic<size_t>&, std::atomic<size_t>&) {}
 
 template <typename Queue, typename Desc>
 void fuzzAidlWithReaders(std::vector<uint8_t>& writerData,
@@ -207,25 +218,29 @@
     const auto desc = writeMq.dupeDesc();
     CHECK(desc.handle.fds[0].get() != -1);
 
-    std::vector<std::thread> clients;
+    std::atomic<size_t> readersNotFinished = readerData.size();
+    std::atomic<size_t> writersNotFinished = 1;
+    std::vector<std::thread> readers;
     for (int i = 0; i < readerData.size(); i++) {
         if (blocking) {
-            clients.emplace_back(readerBlocking<Queue, Desc>, std::ref(desc),
-                                 std::ref(readerData[i]));
+            readers.emplace_back(readerBlocking<Queue, Desc>, std::ref(desc),
+                                 std::ref(readerData[i]), std::ref(readersNotFinished),
+                                 std::ref(writersNotFinished));
+
         } else {
-            clients.emplace_back(reader<Queue, Desc>, std::ref(desc), std::ref(readerData[i]),
+            readers.emplace_back(reader<Queue, Desc>, std::ref(desc), std::ref(readerData[i]),
                                  userFd);
         }
     }
 
     if (blocking) {
-        writerBlocking<Queue>(writeMq, fdp);
+        writerBlocking<Queue>(writeMq, fdp, writersNotFinished, readersNotFinished);
     } else {
         writer<Queue>(writeMq, fdp, userFd);
     }
 
-    for (auto& client : clients) {
-        client.join();
+    for (auto& reader : readers) {
+        reader.join();
     }
 }
 
@@ -251,25 +266,28 @@
     const auto desc = writeMq.getDesc();
     CHECK(desc->isHandleValid());
 
-    std::vector<std::thread> clients;
+    std::atomic<size_t> readersNotFinished = readerData.size();
+    std::atomic<size_t> writersNotFinished = 1;
+    std::vector<std::thread> readers;
     for (int i = 0; i < readerData.size(); i++) {
         if (blocking) {
-            clients.emplace_back(readerBlocking<Queue, Desc>, std::ref(*desc),
-                                 std::ref(readerData[i]));
+            readers.emplace_back(readerBlocking<Queue, Desc>, std::ref(*desc),
+                                 std::ref(readerData[i]), std::ref(readersNotFinished),
+                                 std::ref(writersNotFinished));
         } else {
-            clients.emplace_back(reader<Queue, Desc>, std::ref(*desc), std::ref(readerData[i]),
+            readers.emplace_back(reader<Queue, Desc>, std::ref(*desc), std::ref(readerData[i]),
                                  userFd);
         }
     }
 
     if (blocking) {
-        writerBlocking<Queue>(writeMq, fdp);
+        writerBlocking<Queue>(writeMq, fdp, writersNotFinished, readersNotFinished);
     } else {
         writer<Queue>(writeMq, fdp, userFd);
     }
 
-    for (auto& client : clients) {
-        client.join();
+    for (auto& reader : readers) {
+        reader.join();
     }
 }