logd: consolidate test code

Test: these tests
Change-Id: Ia422cd84106cbe1a8044ef65d3e287c531095a7e
diff --git a/logd/ChattyLogBufferTest.cpp b/logd/ChattyLogBufferTest.cpp
index e273efe..dd9375d 100644
--- a/logd/ChattyLogBufferTest.cpp
+++ b/logd/ChattyLogBufferTest.cpp
@@ -59,16 +59,6 @@
     FixupMessages(&log_messages);
     LogMessages(log_messages);
 
-    std::vector<LogMessage> read_log_messages;
-    {
-        auto lock = std::lock_guard{logd_lock};
-        std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, nullptr));
-
-        std::unique_ptr<FlushToState> flush_to_state =
-                log_buffer_->CreateFlushToState(1, kLogMaskAll);
-        EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr));
-    }
-
     std::vector<LogMessage> expected_log_messages = {
             make_message(0, "test_tag", "duplicate"),
             make_message(1, "test_tag", "duplicate"),
@@ -92,7 +82,8 @@
             make_message(300, "test_tag", "duplicate"),
     };
     FixupMessages(&expected_log_messages);
-    CompareLogMessages(expected_log_messages, read_log_messages);
+    auto flush_result = FlushMessages();
+    CompareLogMessages(expected_log_messages, flush_result.messages);
 };
 
 TEST_P(ChattyLogBufferTest, deduplication_overflow) {
@@ -121,15 +112,6 @@
     FixupMessages(&log_messages);
     LogMessages(log_messages);
 
-    std::vector<LogMessage> read_log_messages;
-    {
-        auto lock = std::lock_guard{logd_lock};
-        std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, nullptr));
-        std::unique_ptr<FlushToState> flush_to_state =
-                log_buffer_->CreateFlushToState(1, kLogMaskAll);
-        EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr));
-    }
-
     std::vector<LogMessage> expected_log_messages = {
             make_message(0, "test_tag", "normal"),
             make_message(1, "test_tag", "duplicate"),
@@ -141,7 +123,8 @@
             make_message(expired_per_chatty_message + 4, "test_tag", "normal"),
     };
     FixupMessages(&expected_log_messages);
-    CompareLogMessages(expected_log_messages, read_log_messages);
+    auto flush_result = FlushMessages();
+    CompareLogMessages(expected_log_messages, flush_result.messages);
 }
 
 TEST_P(ChattyLogBufferTest, deduplication_liblog) {
@@ -181,15 +164,6 @@
     FixupMessages(&log_messages);
     LogMessages(log_messages);
 
-    std::vector<LogMessage> read_log_messages;
-    {
-        auto lock = std::lock_guard{logd_lock};
-        std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, nullptr));
-        std::unique_ptr<FlushToState> flush_to_state =
-                log_buffer_->CreateFlushToState(1, kLogMaskAll);
-        EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr));
-    }
-
     std::vector<LogMessage> expected_log_messages = {
             make_message(0, 1234, 1),
             make_message(1, LIBLOG_LOG_TAG, 3),
@@ -212,7 +186,8 @@
             make_message(20, 1234, 227),
     };
     FixupMessages(&expected_log_messages);
-    CompareLogMessages(expected_log_messages, read_log_messages);
+    auto flush_result = FlushMessages();
+    CompareLogMessages(expected_log_messages, flush_result.messages);
 };
 
 TEST_P(ChattyLogBufferTest, no_leading_chatty_simple) {
@@ -267,21 +242,7 @@
     FixupMessages(&expected_log_messages);
     // clang-format on
 
-    std::vector<LogMessage> read_log_messages;
-    bool released = false;
-    {
-        auto lock = std::lock_guard{logd_lock};
-        std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, &released));
-        std::unique_ptr<LogReaderThread> log_reader(
-                new LogReaderThread(log_buffer_.get(), &reader_list_, std::move(test_writer), true,
-                                    0, ~0, 1, {}, 2, {}));
-        reader_list_.reader_threads().emplace_back(std::move(log_reader));
-    }
-
-    while (!released) {
-        usleep(5000);
-    }
-
+    auto read_log_messages = ReadLogMessagesNonBlockingThread({.pid = 1, .sequence = 2});
     CompareLogMessages(expected_log_messages, read_log_messages);
 }
 
@@ -327,21 +288,7 @@
     FixupMessages(&expected_log_messages);
     // clang-format on
 
-    std::vector<LogMessage> read_log_messages;
-    bool released = false;
-    {
-        auto lock = std::lock_guard{logd_lock};
-        std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, &released));
-        std::unique_ptr<LogReaderThread> log_reader(
-                new LogReaderThread(log_buffer_.get(), &reader_list_, std::move(test_writer), true,
-                                    3, ~0, 0, {}, 1, {}));
-        reader_list_.reader_threads().emplace_back(std::move(log_reader));
-    }
-
-    while (!released) {
-        usleep(5000);
-    }
-
+    auto read_log_messages = ReadLogMessagesNonBlockingThread({.tail = 3});
     CompareLogMessages(expected_log_messages, read_log_messages);
 }
 
diff --git a/logd/LogBufferTest.cpp b/logd/LogBufferTest.cpp
index 3b3c795..e881e76 100644
--- a/logd/LogBufferTest.cpp
+++ b/logd/LogBufferTest.cpp
@@ -18,7 +18,6 @@
 
 #include <unistd.h>
 
-#include <chrono>
 #include <limits>
 #include <memory>
 #include <regex>
@@ -34,7 +33,6 @@
 using android::base::Join;
 using android::base::Split;
 using android::base::StringPrintf;
-using namespace std::chrono_literals;
 
 char* android::uidToName(uid_t) {
     return nullptr;
@@ -191,16 +189,9 @@
     FixupMessages(&log_messages);
     LogMessages(log_messages);
 
-    std::vector<LogMessage> read_log_messages;
-    {
-        auto lock = std::lock_guard{logd_lock};
-        std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, nullptr));
-        std::unique_ptr<FlushToState> flush_to_state =
-                log_buffer_->CreateFlushToState(1, kLogMaskAll);
-        EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr));
-        EXPECT_EQ(2ULL, flush_to_state->start());
-    }
-    CompareLogMessages(log_messages, read_log_messages);
+    auto flush_result = FlushMessages();
+    EXPECT_EQ(2ULL, flush_result.next_sequence);
+    CompareLogMessages(log_messages, flush_result.messages);
 }
 
 TEST_P(LogBufferTest, smoke_with_reader_thread) {
@@ -229,25 +220,7 @@
     FixupMessages(&log_messages);
     LogMessages(log_messages);
 
-    std::vector<LogMessage> read_log_messages;
-    bool released = false;
-
-    {
-        auto lock = std::lock_guard{logd_lock};
-        std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, &released));
-        std::unique_ptr<LogReaderThread> log_reader(
-                new LogReaderThread(log_buffer_.get(), &reader_list_, std::move(test_writer), true,
-                                    0, kLogMaskAll, 0, {}, 1, {}));
-        reader_list_.reader_threads().emplace_back(std::move(log_reader));
-    }
-
-    while (!released) {
-        usleep(5000);
-    }
-    {
-        auto lock = std::lock_guard{logd_lock};
-        EXPECT_EQ(0U, reader_list_.reader_threads().size());
-    }
+    auto read_log_messages = ReadLogMessagesNonBlockingThread({});
     CompareLogMessages(log_messages, read_log_messages);
 }
 
@@ -295,33 +268,20 @@
     return {entry, message};
 }
 
-TEST_P(LogBufferTest, random_messages) {
+std::vector<LogMessage> GenerateRandomLogMessages(size_t count) {
     srand(1);
     std::vector<LogMessage> log_messages;
-    for (size_t i = 0; i < 1000; ++i) {
+    for (size_t i = 0; i < count; ++i) {
         log_messages.emplace_back(GenerateRandomLogMessage(i));
     }
+    return log_messages;
+}
+
+TEST_P(LogBufferTest, random_messages) {
+    auto log_messages = GenerateRandomLogMessages(1000);
     LogMessages(log_messages);
 
-    std::vector<LogMessage> read_log_messages;
-    bool released = false;
-
-    {
-        auto lock = std::lock_guard{logd_lock};
-        std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, &released));
-        std::unique_ptr<LogReaderThread> log_reader(
-                new LogReaderThread(log_buffer_.get(), &reader_list_, std::move(test_writer), true,
-                                    0, kLogMaskAll, 0, {}, 1, {}));
-        reader_list_.reader_threads().emplace_back(std::move(log_reader));
-    }
-
-    while (!released) {
-        usleep(5000);
-    }
-    {
-        auto lock = std::lock_guard{logd_lock};
-        EXPECT_EQ(0U, reader_list_.reader_threads().size());
-    }
+    auto read_log_messages = ReadLogMessagesNonBlockingThread({});
     CompareLogMessages(log_messages, read_log_messages);
 }
 
@@ -337,26 +297,8 @@
     FixupMessages(&log_messages);
     LogMessages(log_messages);
 
-    std::vector<LogMessage> read_log_messages;
-    bool released = false;
-
-    {
-        auto lock = std::lock_guard{logd_lock};
-        std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, &released));
-        std::unique_ptr<LogReaderThread> log_reader(
-                new LogReaderThread(log_buffer_.get(), &reader_list_, std::move(test_writer), true,
-                                    0, kLogMaskAll, 0, {}, 3, {}));
-        reader_list_.reader_threads().emplace_back(std::move(log_reader));
-    }
-
-    while (!released) {
-        usleep(5000);
-    }
-    {
-        auto lock = std::lock_guard{logd_lock};
-        EXPECT_EQ(0U, reader_list_.reader_threads().size());
-    }
     std::vector<LogMessage> expected_log_messages = {log_messages.back()};
+    auto read_log_messages = ReadLogMessagesNonBlockingThread({.sequence = 3});
     CompareLogMessages(expected_log_messages, read_log_messages);
 }
 
@@ -373,18 +315,8 @@
     FixupMessages(&log_messages);
     LogMessages(log_messages);
 
-    std::vector<LogMessage> read_log_messages;
-    bool released = false;
-
     // Connect a blocking reader.
-    {
-        auto lock = std::lock_guard{logd_lock};
-        std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, &released));
-        std::unique_ptr<LogReaderThread> log_reader(
-                new LogReaderThread(log_buffer_.get(), &reader_list_, std::move(test_writer), false,
-                                    0, kLogMaskAll, 0, {}, 1, {}));
-        reader_list_.reader_threads().emplace_back(std::move(log_reader));
-    }
+    auto blocking_reader = TestReaderThread({.non_block = false}, *this);
 
     // Wait up to 250ms for the reader to read the first 3 logs.
     constexpr int kMaxRetryCount = 50;
@@ -423,100 +355,37 @@
     }
     ASSERT_LT(count, kMaxRetryCount);
 
-    // Release the reader, wait for it to get the signal then check that it has been deleted.
-    {
-        auto lock = std::lock_guard{logd_lock};
-        reader_list_.reader_threads().back()->Release();
-    }
-    while (!released) {
-        usleep(5000);
-    }
-    {
-        auto lock = std::lock_guard{logd_lock};
-        EXPECT_EQ(0U, reader_list_.reader_threads().size());
-    }
+    ReleaseAndJoinReaders();
 
     // Check that we have read all 6 messages.
     std::vector<LogMessage> expected_log_messages = log_messages;
     expected_log_messages.insert(expected_log_messages.end(), after_clear_messages.begin(),
                                  after_clear_messages.end());
-    CompareLogMessages(expected_log_messages, read_log_messages);
+    CompareLogMessages(expected_log_messages, blocking_reader.read_log_messages());
 
-    // Finally, call FlushTo and ensure that only the 3 logs after the clear remain in the buffer.
-    std::vector<LogMessage> read_log_messages_after_clear;
-    {
-        auto lock = std::lock_guard{logd_lock};
-        std::unique_ptr<LogWriter> test_writer(
-                new TestWriter(&read_log_messages_after_clear, nullptr));
-        std::unique_ptr<FlushToState> flush_to_state =
-                log_buffer_->CreateFlushToState(1, kLogMaskAll);
-        EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr));
-        EXPECT_EQ(7ULL, flush_to_state->start());
-    }
-    CompareLogMessages(after_clear_messages, read_log_messages_after_clear);
+    // Finally, Flush messages and ensure that only the 3 logs after the clear remain in the buffer.
+    auto flush_after_clear_result = FlushMessages();
+    EXPECT_EQ(7ULL, flush_after_clear_result.next_sequence);
+    CompareLogMessages(after_clear_messages, flush_after_clear_result.messages);
 }
 
 TEST_P(LogBufferTest, tail100_nonblocking_1000total) {
-    LogMessage message = {
-            {.pid = 1, .tid = 2, .sec = 0, .nsec = 20001, .lid = LOG_ID_MAIN, .uid = 0}, "message"};
-    std::vector<LogMessage> log_messages;
-    for (int i = 0; i < 1000; ++i) {
-        log_messages.push_back(message);
-        log_messages.back().entry.sec = i;
-        log_messages.back().entry.pid = i;  // Chatty dedupes these messages if they're identical.
-    }
-    FixupMessages(&log_messages);
+    auto log_messages = GenerateRandomLogMessages(1000);
     LogMessages(log_messages);
 
-    std::vector<LogMessage> read_log_messages;
-    bool released = false;
-
     constexpr int kTailCount = 100;
-    {
-        auto lock = std::lock_guard{logd_lock};
-        std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, &released));
-        std::unique_ptr<LogReaderThread> log_reader(
-                new LogReaderThread(log_buffer_.get(), &reader_list_, std::move(test_writer), true,
-                                    kTailCount, kLogMaskAll, 0, {}, 1, {}));
-        reader_list_.reader_threads().emplace_back(std::move(log_reader));
-    }
-
-    while (!released) {
-        usleep(5000);
-    }
-    {
-        auto lock = std::lock_guard{logd_lock};
-        EXPECT_EQ(0U, reader_list_.reader_threads().size());
-    }
     std::vector<LogMessage> expected_log_messages{log_messages.end() - kTailCount,
                                                   log_messages.end()};
+    auto read_log_messages = ReadLogMessagesNonBlockingThread({.tail = kTailCount});
     CompareLogMessages(expected_log_messages, read_log_messages);
 }
 
 TEST_P(LogBufferTest, tail100_blocking_1000total_then1000more) {
-    LogMessage message = {
-            {.pid = 1, .tid = 2, .sec = 0, .nsec = 20001, .lid = LOG_ID_MAIN, .uid = 0}, "message"};
-    std::vector<LogMessage> log_messages;
-    for (int i = 0; i < 1000; ++i) {
-        log_messages.push_back(message);
-        log_messages.back().entry.sec = i;
-        log_messages.back().entry.pid = i;  // Chatty dedupes these messages if they're identical.
-    }
-    FixupMessages(&log_messages);
+    auto log_messages = GenerateRandomLogMessages(1000);
     LogMessages(log_messages);
 
-    std::vector<LogMessage> read_log_messages;
-    bool released = false;
-
     constexpr int kTailCount = 100;
-    {
-        auto lock = std::lock_guard{logd_lock};
-        std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, &released));
-        std::unique_ptr<LogReaderThread> log_reader(
-                new LogReaderThread(log_buffer_.get(), &reader_list_, std::move(test_writer), false,
-                                    kTailCount, kLogMaskAll, 0, {}, 1, {}));
-        reader_list_.reader_threads().emplace_back(std::move(log_reader));
-    }
+    auto blocking_reader = TestReaderThread({.non_block = false, .tail = kTailCount}, *this);
 
     std::vector<LogMessage> expected_log_messages{log_messages.end() - kTailCount,
                                                   log_messages.end()};
@@ -526,18 +395,15 @@
     while (retry_count--) {
         usleep(5000);
         auto lock = std::lock_guard{logd_lock};
-        if (read_log_messages.size() == expected_log_messages.size()) {
-            CompareLogMessages(expected_log_messages, read_log_messages);
+        if (blocking_reader.read_log_messages().size() == expected_log_messages.size()) {
+            CompareLogMessages(expected_log_messages, blocking_reader.read_log_messages());
             break;
         }
     }
     ASSERT_GT(retry_count, 0);
 
     // Log more messages
-    for (auto& message : log_messages) {
-        message.entry.sec += 10000;
-    }
-    FixupMessages(&log_messages);
+    log_messages = GenerateRandomLogMessages(1000);
     LogMessages(log_messages);
     expected_log_messages.insert(expected_log_messages.end(), log_messages.begin(),
                                  log_messages.end());
@@ -547,91 +413,34 @@
     while (retry_count--) {
         usleep(5000);
         auto lock = std::lock_guard{logd_lock};
-        if (read_log_messages.size() == expected_log_messages.size()) {
-            CompareLogMessages(expected_log_messages, read_log_messages);
+        if (blocking_reader.read_log_messages().size() == expected_log_messages.size()) {
+            CompareLogMessages(expected_log_messages, blocking_reader.read_log_messages());
             break;
         }
     }
     ASSERT_GT(retry_count, 0);
 
-    // Release the reader.
-    {
-        auto lock = std::lock_guard{logd_lock};
-        reader_list_.reader_threads().back()->Release();
-    }
-
-    // Confirm that it has exited.
-    while (!released) {
-        usleep(5000);
-    }
-    {
-        auto lock = std::lock_guard{logd_lock};
-        EXPECT_EQ(0U, reader_list_.reader_threads().size());
-    }
+    ReleaseAndJoinReaders();
 
     // Final check that no extraneous logs were logged.
-    CompareLogMessages(expected_log_messages, read_log_messages);
+    CompareLogMessages(expected_log_messages, blocking_reader.read_log_messages());
 }
 
 TEST_P(LogBufferTest, tail100_nonblocking_50total) {
-    LogMessage message = {
-            {.pid = 1, .tid = 2, .sec = 0, .nsec = 20001, .lid = LOG_ID_MAIN, .uid = 0}, "message"};
-    std::vector<LogMessage> log_messages;
-    for (int i = 0; i < 50; ++i) {
-        log_messages.push_back(message);
-        log_messages.back().entry.sec = i;
-        log_messages.back().entry.pid = i;  // Chatty dedupes these messages if they're identical.
-    }
-    FixupMessages(&log_messages);
+    auto log_messages = GenerateRandomLogMessages(50);
     LogMessages(log_messages);
 
-    std::vector<LogMessage> read_log_messages;
-    bool released = false;
-
     constexpr int kTailCount = 100;
-    {
-        auto lock = std::lock_guard{logd_lock};
-        std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, &released));
-        std::unique_ptr<LogReaderThread> log_reader(
-                new LogReaderThread(log_buffer_.get(), &reader_list_, std::move(test_writer), true,
-                                    kTailCount, kLogMaskAll, 0, {}, 1, {}));
-        reader_list_.reader_threads().emplace_back(std::move(log_reader));
-    }
-
-    while (!released) {
-        usleep(5000);
-    }
-    {
-        auto lock = std::lock_guard{logd_lock};
-        EXPECT_EQ(0U, reader_list_.reader_threads().size());
-    }
+    auto read_log_messages = ReadLogMessagesNonBlockingThread({.tail = kTailCount});
     CompareLogMessages(log_messages, read_log_messages);
 }
 
 TEST_P(LogBufferTest, tail100_blocking_50total_then1000more) {
-    LogMessage message = {
-            {.pid = 1, .tid = 2, .sec = 0, .nsec = 20001, .lid = LOG_ID_MAIN, .uid = 0}, "message"};
-    std::vector<LogMessage> log_messages;
-    for (int i = 0; i < 50; ++i) {
-        log_messages.push_back(message);
-        log_messages.back().entry.sec = i;
-        log_messages.back().entry.pid = i;  // Chatty dedupes these messages if they're identical.
-    }
-    FixupMessages(&log_messages);
+    auto log_messages = GenerateRandomLogMessages(50);
     LogMessages(log_messages);
 
-    std::vector<LogMessage> read_log_messages;
-    bool released = false;
-
     constexpr int kTailCount = 100;
-    {
-        auto lock = std::lock_guard{logd_lock};
-        std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, &released));
-        std::unique_ptr<LogReaderThread> log_reader(
-                new LogReaderThread(log_buffer_.get(), &reader_list_, std::move(test_writer), false,
-                                    kTailCount, kLogMaskAll, 0, {}, 1, {}));
-        reader_list_.reader_threads().emplace_back(std::move(log_reader));
-    }
+    auto blocking_reader = TestReaderThread({.non_block = false, .tail = kTailCount}, *this);
 
     std::vector<LogMessage> expected_log_messages = log_messages;
 
@@ -640,18 +449,15 @@
     while (retry_count--) {
         usleep(5000);
         auto lock = std::lock_guard{logd_lock};
-        if (read_log_messages.size() == expected_log_messages.size()) {
-            CompareLogMessages(expected_log_messages, read_log_messages);
+        if (blocking_reader.read_log_messages().size() == expected_log_messages.size()) {
+            CompareLogMessages(expected_log_messages, blocking_reader.read_log_messages());
             break;
         }
     }
     ASSERT_GT(retry_count, 0);
 
     // Log more messages
-    for (auto& message : log_messages) {
-        message.entry.sec += 10000;
-    }
-    FixupMessages(&log_messages);
+    log_messages = GenerateRandomLogMessages(1000);
     LogMessages(log_messages);
     expected_log_messages.insert(expected_log_messages.end(), log_messages.begin(),
                                  log_messages.end());
@@ -661,30 +467,18 @@
     while (retry_count--) {
         usleep(5000);
         auto lock = std::lock_guard{logd_lock};
-        if (read_log_messages.size() == expected_log_messages.size()) {
-            CompareLogMessages(expected_log_messages, read_log_messages);
+        if (blocking_reader.read_log_messages().size() == expected_log_messages.size()) {
+            CompareLogMessages(expected_log_messages, blocking_reader.read_log_messages());
+
             break;
         }
     }
     ASSERT_GT(retry_count, 0);
 
-    // Release the reader.
-    {
-        auto lock = std::lock_guard{logd_lock};
-        reader_list_.reader_threads().back()->Release();
-    }
-
-    // Confirm that it has exited.
-    while (!released) {
-        usleep(5000);
-    }
-    {
-        auto lock = std::lock_guard{logd_lock};
-        EXPECT_EQ(0U, reader_list_.reader_threads().size());
-    }
+    ReleaseAndJoinReaders();
 
     // Final check that no extraneous logs were logged.
-    CompareLogMessages(expected_log_messages, read_log_messages);
+    CompareLogMessages(expected_log_messages, blocking_reader.read_log_messages());
 }
 
 INSTANTIATE_TEST_CASE_P(LogBufferTests, LogBufferTest,
diff --git a/logd/LogBufferTest.h b/logd/LogBufferTest.h
index bd0607a..0d42bd3 100644
--- a/logd/LogBufferTest.h
+++ b/logd/LogBufferTest.h
@@ -16,12 +16,14 @@
 
 #pragma once
 
+#include <chrono>
 #include <string>
 #include <vector>
 
 #include <gtest/gtest.h>
 
 #include "ChattyLogBuffer.h"
+#include "LogBuffer.h"
 #include "LogReaderList.h"
 #include "LogStatistics.h"
 #include "LogTags.h"
@@ -29,6 +31,8 @@
 #include "SerializedLogBuffer.h"
 #include "SimpleLogBuffer.h"
 
+using namespace std::chrono_literals;
+
 struct LogMessage {
     logger_entry entry;
     std::string message;
@@ -88,6 +92,88 @@
         }
     }
 
+    struct FlushMessagesResult {
+        std::vector<LogMessage> messages;
+        uint64_t next_sequence;
+    };
+
+    FlushMessagesResult FlushMessages(uint64_t sequence = 1, LogMask log_mask = kLogMaskAll) {
+        std::vector<LogMessage> read_log_messages;
+        auto lock = std::lock_guard{logd_lock};
+        std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages, nullptr));
+
+        auto flush_to_state = log_buffer_->CreateFlushToState(sequence, log_mask);
+        EXPECT_TRUE(log_buffer_->FlushTo(test_writer.get(), *flush_to_state, nullptr));
+        return {read_log_messages, flush_to_state->start()};
+    }
+
+    struct ReaderThreadParams {
+        bool non_block = true;
+        unsigned long tail = 0;
+        LogMask log_mask = kLogMaskAll;
+        pid_t pid = 0;
+        log_time start_time = {};
+        uint64_t sequence = 1;
+        std::chrono::steady_clock::time_point deadline = {};
+    };
+
+    class TestReaderThread {
+      public:
+        TestReaderThread(const ReaderThreadParams& params, LogBufferTest& test) : test_(test) {
+            auto lock = std::lock_guard{logd_lock};
+            std::unique_ptr<LogWriter> test_writer(new TestWriter(&read_log_messages_, &released_));
+            std::unique_ptr<LogReaderThread> log_reader(new LogReaderThread(
+                    test_.log_buffer_.get(), &test_.reader_list_, std::move(test_writer),
+                    params.non_block, params.tail, params.log_mask, params.pid, params.start_time,
+                    params.sequence, params.deadline));
+            test_.reader_list_.reader_threads().emplace_back(std::move(log_reader));
+        }
+
+        void WaitUntilReleased() {
+            while (!released_) {
+                usleep(5000);
+            }
+        }
+
+        std::vector<LogMessage> read_log_messages() const { return read_log_messages_; }
+
+        LogBufferTest& test_;
+        std::vector<LogMessage> read_log_messages_;
+        bool released_ = false;
+    };
+
+    std::vector<LogMessage> ReadLogMessagesNonBlockingThread(const ReaderThreadParams& params) {
+        EXPECT_TRUE(params.non_block)
+                << "params.non_block must be true for ReadLogMessagesNonBlockingThread()";
+
+        auto reader = TestReaderThread(params, *this);
+        reader.WaitUntilReleased();
+        auto lock = std::lock_guard{logd_lock};
+        EXPECT_EQ(0U, reader_list_.reader_threads().size());
+
+        return reader.read_log_messages();
+    }
+
+    void ReleaseAndJoinReaders() {
+        {
+            auto lock = std::lock_guard{logd_lock};
+            for (auto& reader : reader_list_.reader_threads()) {
+                reader->Release();
+            }
+        }
+
+        auto retries = 1s / 5000us;
+        while (retries--) {
+            usleep(5000);
+            auto lock = std::lock_guard{logd_lock};
+            if (reader_list_.reader_threads().size() == 0) {
+                return;
+            }
+        }
+
+        FAIL() << "ReleaseAndJoinReaders() timed out with reader threads still running";
+    }
+
     LogReaderList reader_list_;
     LogTags tags_;
     PruneList prune_;