logd: add a SerializedLogBuffer suitable for compression

Initial commit for a SerializedLogBuffer.  The intention here is for
the serialized data to be compressed (currently using zlib) to allow
for substantially longer logs in the same memory footprint.

Test: unit tests
Change-Id: I2528e4e1ff1cf3bc91130173a107f371f04d911a
diff --git a/logd/Android.bp b/logd/Android.bp
index 1f6ab34..e0a1168 100644
--- a/logd/Android.bp
+++ b/logd/Android.bp
@@ -31,7 +31,11 @@
 cc_defaults {
     name: "logd_defaults",
 
-    shared_libs: ["libbase"],
+    shared_libs: [
+        "libbase",
+        "libz",
+    ],
+    static_libs: ["libzstd"],
     cflags: [
         "-Wextra",
         "-Wthread-safety",
@@ -40,6 +44,7 @@
     lto: {
         thin: true,
     },
+    cpp_std: "experimental",
 }
 
 cc_library_static {
@@ -48,12 +53,16 @@
     host_supported: true,
     srcs: [
         "ChattyLogBuffer.cpp",
+        "CompressionEngine.cpp",
         "LogReaderList.cpp",
         "LogReaderThread.cpp",
         "LogBufferElement.cpp",
         "LogStatistics.cpp",
         "LogWhiteBlackList.cpp",
         "LogTags.cpp",
+        "SerializedFlushToState.cpp",
+        "SerializedLogBuffer.cpp",
+        "SerializedLogChunk.cpp",
         "SimpleLogBuffer.cpp",
     ],
     logtags: ["event.logtags"],
@@ -132,6 +141,8 @@
         "ChattyLogBufferTest.cpp",
         "logd_test.cpp",
         "LogBufferTest.cpp",
+        "SerializedLogChunkTest.cpp",
+        "SerializedFlushToStateTest.cpp",
     ],
 
     static_libs: [
@@ -140,6 +151,8 @@
         "liblog",
         "liblogd",
         "libselinux",
+        "libz",
+        "libzstd",
     ],
 }
 
diff --git a/logd/CompressionEngine.cpp b/logd/CompressionEngine.cpp
new file mode 100644
index 0000000..f37208b
--- /dev/null
+++ b/logd/CompressionEngine.cpp
@@ -0,0 +1,104 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CompressionEngine.h"
+
+#include <limits>
+
+#include <android-base/logging.h>
+#include <zlib.h>
+#include <zstd.h>
+
+CompressionEngine& CompressionEngine::GetInstance() {
+    CompressionEngine* engine = new ZstdCompressionEngine();
+    return *engine;
+}
+
+bool ZlibCompressionEngine::Compress(std::span<uint8_t> in, std::vector<uint8_t>& out) {
+    z_stream strm;
+    strm.zalloc = Z_NULL;
+    strm.zfree = Z_NULL;
+    strm.opaque = Z_NULL;
+    int ret = deflateInit(&strm, Z_DEFAULT_COMPRESSION);
+    if (ret != Z_OK) {
+        LOG(FATAL) << "deflateInit() failed";
+    }
+
+    CHECK_LE(in.size(), static_cast<int64_t>(std::numeric_limits<uint32_t>::max()));
+    uint32_t out_size = deflateBound(&strm, in.size());
+
+    out.resize(out_size);
+    strm.avail_in = in.size();
+    strm.next_in = const_cast<uint8_t*>(in.data());
+    strm.avail_out = out_size;
+    strm.next_out = out.data();
+    ret = deflate(&strm, Z_FINISH);
+    CHECK_EQ(ret, Z_STREAM_END);
+
+    uint32_t compressed_data_size = strm.total_out;
+    deflateEnd(&strm);
+    out.resize(compressed_data_size);
+
+    return true;
+}
+
+bool ZlibCompressionEngine::Decompress(const std::vector<uint8_t>& in, std::vector<uint8_t>& out,
+                                       size_t out_size) {
+    out.resize(out_size);
+
+    z_stream strm;
+    strm.zalloc = Z_NULL;
+    strm.zfree = Z_NULL;
+    strm.opaque = Z_NULL;
+    strm.avail_in = in.size();
+    strm.next_in = const_cast<uint8_t*>(in.data());
+    strm.avail_out = out.size();
+    strm.next_out = out.data();
+
+    inflateInit(&strm);
+    int ret = inflate(&strm, Z_NO_FLUSH);
+
+    CHECK_EQ(strm.avail_in, 0U);
+    CHECK_EQ(strm.avail_out, 0U);
+    CHECK_EQ(ret, Z_STREAM_END);
+    inflateEnd(&strm);
+
+    return true;
+}
+
+bool ZstdCompressionEngine::Compress(std::span<uint8_t> in, std::vector<uint8_t>& out) {
+    size_t out_size = ZSTD_compressBound(in.size());
+    out.resize(out_size);
+
+    out_size = ZSTD_compress(out.data(), out_size, in.data(), in.size(), 1);
+    if (ZSTD_isError(out_size)) {
+        LOG(FATAL) << "ZSTD_compress failed: " << ZSTD_getErrorName(out_size);
+    }
+    out.resize(out_size);
+
+    return true;
+}
+
+bool ZstdCompressionEngine::Decompress(const std::vector<uint8_t>& in, std::vector<uint8_t>& out,
+                                       size_t out_size) {
+    out.resize(out_size);
+    size_t result = ZSTD_decompress(out.data(), out.size(), in.data(), in.size());
+    if (ZSTD_isError(result)) {
+        LOG(FATAL) << "ZSTD_decompress failed: " << ZSTD_getErrorName(result);
+    }
+    CHECK_EQ(result, out.size());
+    return true;
+}
diff --git a/logd/CompressionEngine.h b/logd/CompressionEngine.h
new file mode 100644
index 0000000..d760cea
--- /dev/null
+++ b/logd/CompressionEngine.h
@@ -0,0 +1,47 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <span>
+#include <vector>
+
+class CompressionEngine {
+  public:
+    static CompressionEngine& GetInstance();
+
+    virtual ~CompressionEngine(){};
+
+    virtual bool Compress(std::span<uint8_t> in, std::vector<uint8_t>& out) = 0;
+    // Decompress the contents of `in` into `out`.  `out_size` must be set to the decompressed size
+    // of the contents.
+    virtual bool Decompress(const std::vector<uint8_t>& in, std::vector<uint8_t>& out,
+                            size_t out_size) = 0;
+};
+
+class ZlibCompressionEngine : public CompressionEngine {
+  public:
+    bool Compress(std::span<uint8_t> in, std::vector<uint8_t>& out) override;
+    bool Decompress(const std::vector<uint8_t>& in, std::vector<uint8_t>& out,
+                    size_t out_size) override;
+};
+
+class ZstdCompressionEngine : public CompressionEngine {
+  public:
+    bool Compress(std::span<uint8_t> in, std::vector<uint8_t>& out) override;
+    bool Decompress(const std::vector<uint8_t>& in, std::vector<uint8_t>& out,
+                    size_t out_size) override;
+};
\ No newline at end of file
diff --git a/logd/LogBufferTest.cpp b/logd/LogBufferTest.cpp
index 412b6f1..47d2a2f 100644
--- a/logd/LogBufferTest.cpp
+++ b/logd/LogBufferTest.cpp
@@ -455,4 +455,5 @@
     CompareLogMessages(after_clear_messages, read_log_messages_after_clear);
 }
 
-INSTANTIATE_TEST_CASE_P(LogBufferTests, LogBufferTest, testing::Values("chatty", "simple"));
+INSTANTIATE_TEST_CASE_P(LogBufferTests, LogBufferTest,
+                        testing::Values("chatty", "serialized", "simple"));
diff --git a/logd/LogBufferTest.h b/logd/LogBufferTest.h
index f91a1b5..235f5ac 100644
--- a/logd/LogBufferTest.h
+++ b/logd/LogBufferTest.h
@@ -26,6 +26,7 @@
 #include "LogStatistics.h"
 #include "LogTags.h"
 #include "LogWhiteBlackList.h"
+#include "SerializedLogBuffer.h"
 #include "SimpleLogBuffer.h"
 
 struct LogMessage {
@@ -67,6 +68,8 @@
     void SetUp() override {
         if (GetParam() == "chatty") {
             log_buffer_.reset(new ChattyLogBuffer(&reader_list_, &tags_, &prune_, &stats_));
+        } else if (GetParam() == "serialized") {
+            log_buffer_.reset(new SerializedLogBuffer(&reader_list_, &tags_, &stats_));
         } else if (GetParam() == "simple") {
             log_buffer_.reset(new SimpleLogBuffer(&reader_list_, &tags_, &stats_));
         } else {
diff --git a/logd/LogReaderThread.h b/logd/LogReaderThread.h
index bf70b94..1855c0e 100644
--- a/logd/LogReaderThread.h
+++ b/logd/LogReaderThread.h
@@ -60,6 +60,7 @@
     std::string name() const { return writer_->name(); }
     uint64_t start() const { return flush_to_state_->start(); }
     std::chrono::steady_clock::time_point deadline() const { return deadline_; }
+    FlushToState& flush_to_state() { return *flush_to_state_; }
 
   private:
     void ThreadFunction();
diff --git a/logd/SerializedFlushToState.cpp b/logd/SerializedFlushToState.cpp
new file mode 100644
index 0000000..17ecb6d
--- /dev/null
+++ b/logd/SerializedFlushToState.cpp
@@ -0,0 +1,158 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "SerializedFlushToState.h"
+
+#include <android-base/logging.h>
+
+SerializedFlushToState::SerializedFlushToState(uint64_t start, LogMask log_mask)
+    : FlushToState(start, log_mask) {
+    log_id_for_each(i) {
+        if (((1 << i) & log_mask) == 0) {
+            continue;
+        }
+        logs_needed_from_next_position_[i] = true;
+    }
+}
+
+SerializedFlushToState::~SerializedFlushToState() {
+    log_id_for_each(i) {
+        if (log_positions_[i]) {
+            log_positions_[i]->buffer_it->DecReaderRefCount(true);
+        }
+    }
+}
+
+void SerializedFlushToState::CreateLogPosition(log_id_t log_id) {
+    CHECK(!logs_[log_id].empty());
+    LogPosition log_position;
+    auto it = logs_[log_id].begin();
+    while (it != logs_[log_id].end() && start() > it->highest_sequence_number()) {
+        ++it;
+    }
+    if (it == logs_[log_id].end()) {
+        --it;
+    }
+    it->IncReaderRefCount();
+    log_position.buffer_it = it;
+
+    // Find the offset of the first log with sequence number >= start().
+    int read_offset = 0;
+    while (read_offset < it->write_offset()) {
+        const auto* entry = it->log_entry(read_offset);
+        if (entry->sequence() >= start()) {
+            break;
+        }
+        read_offset += entry->total_len();
+    }
+    log_position.read_offset = read_offset;
+
+    log_positions_[log_id].emplace(std::move(log_position));
+}
+
+void SerializedFlushToState::AddMinHeapEntry(log_id_t log_id) {
+    auto& buffer_it = log_positions_[log_id]->buffer_it;
+    auto read_offset = log_positions_[log_id]->read_offset;
+
+    // If there is another log to read in this buffer, add it to the min heap.
+    if (read_offset < buffer_it->write_offset()) {
+        auto* entry = buffer_it->log_entry(read_offset);
+        min_heap_.emplace(log_id, entry);
+    } else if (read_offset == buffer_it->write_offset()) {
+        // If there are no more logs to read in this buffer and it's the last buffer, then
+        // set logs_needed_from_next_position_ to wait until more logs get logged.
+        if (buffer_it == std::prev(logs_[log_id].end())) {
+            logs_needed_from_next_position_[log_id] = true;
+        } else {
+            // Otherwise, if there is another buffer piece, move to that and do the same check.
+            buffer_it->DecReaderRefCount(true);
+            ++buffer_it;
+            buffer_it->IncReaderRefCount();
+            log_positions_[log_id]->read_offset = 0;
+            if (buffer_it->write_offset() == 0) {
+                logs_needed_from_next_position_[log_id] = true;
+            } else {
+                auto* entry = buffer_it->log_entry(0);
+                min_heap_.emplace(log_id, entry);
+            }
+        }
+    } else {
+        // read_offset > buffer_it->write_offset() should never happen.
+        CHECK(false);
+    }
+}
+
+void SerializedFlushToState::CheckForNewLogs() {
+    log_id_for_each(i) {
+        if (!logs_needed_from_next_position_[i]) {
+            continue;
+        }
+        if (!log_positions_[i]) {
+            if (logs_[i].empty()) {
+                continue;
+            }
+            CreateLogPosition(i);
+        }
+        logs_needed_from_next_position_[i] = false;
+        // If it wasn't possible to insert, logs_needed_from_next_position will be set back to true.
+        AddMinHeapEntry(i);
+    }
+}
+
+MinHeapElement SerializedFlushToState::PopNextUnreadLog() {
+    auto top = min_heap_.top();
+    min_heap_.pop();
+
+    auto* entry = top.entry;
+    auto log_id = top.log_id;
+
+    log_positions_[log_id]->read_offset += entry->total_len();
+
+    AddMinHeapEntry(log_id);
+
+    return top;
+}
+
+void SerializedFlushToState::Prune(log_id_t log_id,
+                                   const std::list<SerializedLogChunk>::iterator& buffer_it) {
+    // If we don't have a position for this log or if we're not referencing buffer_it, ignore.
+    if (!log_positions_[log_id].has_value() || log_positions_[log_id]->buffer_it != buffer_it) {
+        return;
+    }
+
+    // // Decrease the ref count since we're deleting our reference.
+    buffer_it->DecReaderRefCount(false);
+
+    // Delete in the reference.
+    log_positions_[log_id].reset();
+
+    // Remove the MinHeapElement referencing log_id, if it exists, but retain the others.
+    std::vector<MinHeapElement> old_elements;
+    while (!min_heap_.empty()) {
+        auto& element = min_heap_.top();
+        if (element.log_id != log_id) {
+            old_elements.emplace_back(element);
+        }
+        min_heap_.pop();
+    }
+    for (auto&& element : old_elements) {
+        min_heap_.emplace(std::move(element));
+    }
+
+    // Finally set logs_needed_from_next_position_, so CheckForNewLogs() will re-create the
+    // log_position_ object during the next read.
+    logs_needed_from_next_position_[log_id] = true;
+}
diff --git a/logd/SerializedFlushToState.h b/logd/SerializedFlushToState.h
new file mode 100644
index 0000000..74b3de5
--- /dev/null
+++ b/logd/SerializedFlushToState.h
@@ -0,0 +1,101 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <bitset>
+#include <list>
+#include <queue>
+
+#include "LogBuffer.h"
+#include "SerializedLogChunk.h"
+#include "SerializedLogEntry.h"
+
+struct LogPosition {
+    std::list<SerializedLogChunk>::iterator buffer_it;
+    int read_offset;
+};
+
+struct MinHeapElement {
+    MinHeapElement(log_id_t log_id, const SerializedLogEntry* entry)
+        : log_id(log_id), entry(entry) {}
+    log_id_t log_id;
+    const SerializedLogEntry* entry;
+    // The change of comparison operators is intentional, std::priority_queue uses operator<() to
+    // compare but creates a max heap.  Since we want a min heap, we return the opposite result.
+    bool operator<(const MinHeapElement& rhs) const {
+        return entry->sequence() > rhs.entry->sequence();
+    }
+};
+
+// This class tracks the specific point where a FlushTo client has read through the logs.  It
+// directly references the std::list<> iterators from the parent SerializedLogBuffer and the offset
+// into each log chunk where it has last read.  All interactions with this class, except for its
+// construction, must be done with SerializedLogBuffer::lock_ held.  No log chunks that it
+// references may be pruned, which is handled by ensuring prune does not touch any log chunk with
+// highest sequence number greater or equal to start().
+class SerializedFlushToState : public FlushToState {
+  public:
+    // Initializes this state object.  For each log buffer set in log_mask, this sets
+    // logs_needed_from_next_position_.
+    SerializedFlushToState(uint64_t start, LogMask log_mask);
+
+    // Decrease the reference of all referenced logs.  This happens when a reader is disconnected.
+    ~SerializedFlushToState() override;
+
+    // We can't hold SerializedLogBuffer::lock_ in the constructor, so we must initialize logs here.
+    void InitializeLogs(std::list<SerializedLogChunk>* logs) {
+        if (logs_ == nullptr) logs_ = logs;
+    }
+
+    // Checks to see if any log buffers set in logs_needed_from_next_position_ have new logs and
+    // calls AddMinHeapEntry() if so.
+    void CheckForNewLogs();
+
+    bool HasUnreadLogs() { return !min_heap_.empty(); }
+
+    // Pops the next unread log from the min heap.  Add the next log for that log_id to the min heap
+    // if one is available otherwise set logs_needed_from_next_position_ to indicate that we're
+    // waiting for more logs.
+    MinHeapElement PopNextUnreadLog();
+
+    // If the parent log buffer prunes logs, the reference that this class contains may become
+    // invalid, so this must be called first to drop the reference to buffer_it, if any.
+    void Prune(log_id_t log_id, const std::list<SerializedLogChunk>::iterator& buffer_it);
+
+  private:
+    // If there is a log in the serialized log buffer for `log_id` at the read_offset, add it to the
+    // min heap for reading, otherwise set logs_needed_from_next_position_ to indicate that we're
+    // waiting for the next log.
+    void AddMinHeapEntry(log_id_t log_id);
+
+    // Create a LogPosition object for the given log_id by searching through the log chunks for the
+    // first chunk and then first log entry within that chunk that is greater or equal to start().
+    void CreateLogPosition(log_id_t log_id);
+
+    std::list<SerializedLogChunk>* logs_ = nullptr;
+    // An optional structure that contains an iterator to the serialized log buffer and offset into
+    // it that this logger should handle next.
+    std::optional<LogPosition> log_positions_[LOG_ID_MAX];
+    // A bit for each log that is set if a given log_id has no logs or if this client has read all
+    // of its logs. In order words: `logs_[i].empty() || (buffer_it == std::prev(logs_.end) &&
+    // next_log_position == logs_write_position_)`.  These will be re-checked in each
+    // loop in case new logs came in.
+    std::bitset<LOG_ID_MAX> logs_needed_from_next_position_ = {};
+    // A min heap that has up to one entry per log buffer, sorted by sequence number, of the next
+    // element that this reader should read.
+    std::priority_queue<MinHeapElement> min_heap_;
+};
diff --git a/logd/SerializedFlushToStateTest.cpp b/logd/SerializedFlushToStateTest.cpp
new file mode 100644
index 0000000..a1d21ac
--- /dev/null
+++ b/logd/SerializedFlushToStateTest.cpp
@@ -0,0 +1,254 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "SerializedFlushToState.h"
+
+#include <map>
+
+#include <android-base/logging.h>
+#include <android-base/stringprintf.h>
+#include <android-base/strings.h>
+#include <gtest/gtest.h>
+
+using android::base::Join;
+using android::base::StringPrintf;
+
+constexpr size_t kChunkSize = 3 * 4096;
+
+class SerializedFlushToStateTest : public testing::Test {
+  protected:
+    void SetUp() override {
+        // This test spams many unneeded INFO logs, so we suppress them.
+        old_log_severity_ = android::base::SetMinimumLogSeverity(android::base::WARNING);
+    }
+    void TearDown() override { android::base::SetMinimumLogSeverity(old_log_severity_); }
+
+    std::string TestReport(const std::vector<uint64_t>& expected,
+                           const std::vector<uint64_t>& read) {
+        auto sequence_to_log_id = [&](uint64_t sequence) -> int {
+            for (const auto& [log_id, sequences] : sequence_numbers_per_buffer_) {
+                if (std::find(sequences.begin(), sequences.end(), sequence) != sequences.end()) {
+                    return log_id;
+                }
+            }
+            return -1;
+        };
+
+        std::map<int, std::vector<uint64_t>> missing_sequences;
+        std::vector<uint64_t> missing_expected;
+        std::set_difference(expected.begin(), expected.end(), read.begin(), read.end(),
+                            std::back_inserter(missing_expected));
+        for (uint64_t sequence : missing_expected) {
+            int log_id = sequence_to_log_id(sequence);
+            missing_sequences[log_id].emplace_back(sequence);
+        }
+
+        std::map<int, std::vector<uint64_t>> extra_sequences;
+        std::vector<uint64_t> extra_read;
+        std::set_difference(read.begin(), read.end(), expected.begin(), expected.end(),
+                            std::back_inserter(extra_read));
+        for (uint64_t sequence : extra_read) {
+            int log_id = sequence_to_log_id(sequence);
+            extra_sequences[log_id].emplace_back(sequence);
+        }
+
+        std::vector<std::string> errors;
+        for (const auto& [log_id, sequences] : missing_sequences) {
+            errors.emplace_back(
+                    StringPrintf("Log id %d missing %zu sequences", log_id, sequences.size()));
+        }
+
+        for (const auto& [log_id, sequences] : extra_sequences) {
+            errors.emplace_back(
+                    StringPrintf("Log id %d has extra %zu sequences", log_id, sequences.size()));
+        }
+
+        return Join(errors, ", ");
+    }
+
+    // Read sequence numbers in order from SerializedFlushToState for every mask combination and all
+    // sequence numbers from 0 through the highest logged sequence number + 1.
+    // This assumes that all of the logs have already been written.
+    void TestAllReading() {
+        uint64_t max_sequence = sequence_ + 1;
+        uint32_t max_mask = (1 << LOG_ID_MAX) - 1;
+        for (uint64_t sequence = 0; sequence < max_sequence; ++sequence) {
+            for (uint32_t mask = 0; mask < max_mask; ++mask) {
+                auto state = SerializedFlushToState{sequence, mask};
+                state.InitializeLogs(log_chunks_);
+                state.CheckForNewLogs();
+                TestReading(sequence, mask, state);
+            }
+        }
+    }
+
+    // Similar to TestAllReading() except that it doesn't assume any logs are in the buffer, instead
+    // it calls write_logs() in a loop for sequence/mask combination.  It clears log_chunks_ and
+    // sequence_numbers_per_buffer_ between calls, such that only the sequence numbers written in
+    // the previous call to write_logs() are expected.
+    void TestAllReadingWithFutureMessages(const std::function<bool(int)>& write_logs) {
+        uint64_t max_sequence = sequence_ + 1;
+        uint32_t max_mask = (1 << LOG_ID_MAX) - 1;
+        for (uint64_t sequence = 1; sequence < max_sequence; ++sequence) {
+            for (uint32_t mask = 1; mask < max_mask; ++mask) {
+                log_id_for_each(i) { log_chunks_[i].clear(); }
+                auto state = SerializedFlushToState{sequence, mask};
+                state.InitializeLogs(log_chunks_);
+                int loop_count = 0;
+                while (write_logs(loop_count++)) {
+                    state.CheckForNewLogs();
+                    TestReading(sequence, mask, state);
+                    sequence_numbers_per_buffer_.clear();
+                }
+            }
+        }
+    }
+
+    void TestReading(uint64_t start, LogMask log_mask, SerializedFlushToState& state) {
+        std::vector<uint64_t> expected_sequence;
+        log_id_for_each(i) {
+            if (((1 << i) & log_mask) == 0) {
+                continue;
+            }
+            for (const auto& sequence : sequence_numbers_per_buffer_[i]) {
+                if (sequence >= start) {
+                    expected_sequence.emplace_back(sequence);
+                }
+            }
+        }
+        std::sort(expected_sequence.begin(), expected_sequence.end());
+
+        std::vector<uint64_t> read_sequence;
+
+        while (state.HasUnreadLogs()) {
+            auto top = state.PopNextUnreadLog();
+            read_sequence.emplace_back(top.entry->sequence());
+        }
+
+        EXPECT_TRUE(std::is_sorted(read_sequence.begin(), read_sequence.end()));
+
+        EXPECT_EQ(expected_sequence.size(), read_sequence.size());
+
+        EXPECT_EQ(expected_sequence, read_sequence)
+                << "start: " << start << " log_mask: " << log_mask << " "
+                << TestReport(expected_sequence, read_sequence);
+    }
+
+    // Add a chunk with the given messages to the a given log buffer.  Keep track of the sequence
+    // numbers for future validation.  Optionally mark the block as having finished writing.
+    void AddChunkWithMessages(int buffer, bool finish_writing,
+                              const std::vector<std::string>& messages) {
+        auto chunk = SerializedLogChunk{kChunkSize};
+        for (const auto& message : messages) {
+            auto sequence = sequence_++;
+            sequence_numbers_per_buffer_[buffer].emplace_back(sequence);
+            ASSERT_TRUE(chunk.CanLog(message.size() + 1));
+            chunk.Log(sequence, log_time(), 0, 1, 1, message.c_str(), message.size() + 1);
+        }
+        if (finish_writing) {
+            chunk.FinishWriting();
+        }
+        log_chunks_[buffer].emplace_back(std::move(chunk));
+    }
+
+    android::base::LogSeverity old_log_severity_;
+    std::map<int, std::vector<uint64_t>> sequence_numbers_per_buffer_;
+    std::list<SerializedLogChunk> log_chunks_[LOG_ID_MAX];
+    uint64_t sequence_ = 1;
+};
+
+// 0: multiple chunks, with variable number of entries, with/without finishing writing
+// 1: 1 chunk with 1 log and finished writing
+// 2: 1 chunk with 1 log and not finished writing
+// 3: 1 chunk with 0 logs and not finished writing
+// 4: 1 chunk with 0 logs and finished writing (impossible, but SerializedFlushToState handles it)
+// 5-7: 0 chunks
+TEST_F(SerializedFlushToStateTest, smoke) {
+    AddChunkWithMessages(true, 0, {"1st", "2nd"});
+    AddChunkWithMessages(true, 1, {"3rd"});
+    AddChunkWithMessages(false, 0, {"4th"});
+    AddChunkWithMessages(true, 0, {"4th", "5th", "more", "even", "more", "go", "here"});
+    AddChunkWithMessages(false, 2, {"6th"});
+    AddChunkWithMessages(true, 0, {"7th"});
+    AddChunkWithMessages(false, 3, {});
+    AddChunkWithMessages(true, 4, {});
+
+    TestAllReading();
+}
+
+TEST_F(SerializedFlushToStateTest, random) {
+    srand(1);
+    for (int count = 0; count < 20; ++count) {
+        unsigned int num_messages = 1 + rand() % 15;
+        auto messages = std::vector<std::string>{num_messages, "same message"};
+
+        bool compress = rand() % 2;
+        int buf = rand() % LOG_ID_MAX;
+
+        AddChunkWithMessages(compress, buf, messages);
+    }
+
+    TestAllReading();
+}
+
+// Same start as smoke, but we selectively write logs to the buffers and ensure they're read.
+TEST_F(SerializedFlushToStateTest, future_writes) {
+    auto write_logs = [&](int loop_count) {
+        switch (loop_count) {
+            case 0:
+                // Initial writes.
+                AddChunkWithMessages(true, 0, {"1st", "2nd"});
+                AddChunkWithMessages(true, 1, {"3rd"});
+                AddChunkWithMessages(false, 0, {"4th"});
+                AddChunkWithMessages(true, 0, {"4th", "5th", "more", "even", "more", "go", "here"});
+                AddChunkWithMessages(false, 2, {"6th"});
+                AddChunkWithMessages(true, 0, {"7th"});
+                AddChunkWithMessages(false, 3, {});
+                AddChunkWithMessages(true, 4, {});
+                break;
+            case 1:
+                // Smoke test, add a simple chunk.
+                AddChunkWithMessages(true, 0, {"1st", "2nd"});
+                break;
+            case 2:
+                // Add chunks to all but one of the logs.
+                AddChunkWithMessages(true, 0, {"1st", "2nd"});
+                AddChunkWithMessages(true, 1, {"1st", "2nd"});
+                AddChunkWithMessages(true, 2, {"1st", "2nd"});
+                AddChunkWithMessages(true, 3, {"1st", "2nd"});
+                AddChunkWithMessages(true, 4, {"1st", "2nd"});
+                AddChunkWithMessages(true, 5, {"1st", "2nd"});
+                AddChunkWithMessages(true, 6, {"1st", "2nd"});
+                break;
+            case 3:
+                // Finally add chunks to all logs.
+                AddChunkWithMessages(true, 0, {"1st", "2nd"});
+                AddChunkWithMessages(true, 1, {"1st", "2nd"});
+                AddChunkWithMessages(true, 2, {"1st", "2nd"});
+                AddChunkWithMessages(true, 3, {"1st", "2nd"});
+                AddChunkWithMessages(true, 4, {"1st", "2nd"});
+                AddChunkWithMessages(true, 5, {"1st", "2nd"});
+                AddChunkWithMessages(true, 6, {"1st", "2nd"});
+                AddChunkWithMessages(true, 7, {"1st", "2nd"});
+                break;
+            default:
+                return false;
+        }
+        return true;
+    };
+
+    TestAllReadingWithFutureMessages(write_logs);
+}
diff --git a/logd/SerializedLogBuffer.cpp b/logd/SerializedLogBuffer.cpp
new file mode 100644
index 0000000..8bda7cf
--- /dev/null
+++ b/logd/SerializedLogBuffer.cpp
@@ -0,0 +1,351 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "SerializedLogBuffer.h"
+
+#include <limits>
+#include <thread>
+
+#include <android-base/logging.h>
+#include <android-base/scopeguard.h>
+
+#include "LogStatistics.h"
+#include "SerializedFlushToState.h"
+
+SerializedLogBuffer::SerializedLogBuffer(LogReaderList* reader_list, LogTags* tags,
+                                         LogStatistics* stats)
+    : reader_list_(reader_list), tags_(tags), stats_(stats) {
+    Init();
+}
+
+SerializedLogBuffer::~SerializedLogBuffer() {}
+
+void SerializedLogBuffer::Init() {
+    log_id_for_each(i) {
+        if (SetSize(i, __android_logger_get_buffer_size(i))) {
+            SetSize(i, LOG_BUFFER_MIN_SIZE);
+        }
+    }
+
+    // Release any sleeping reader threads to dump their current content.
+    auto reader_threads_lock = std::lock_guard{reader_list_->reader_threads_lock()};
+    for (const auto& reader_thread : reader_list_->reader_threads()) {
+        reader_thread->triggerReader_Locked();
+    }
+}
+
+bool SerializedLogBuffer::ShouldLog(log_id_t log_id, const char* msg, uint16_t len) {
+    if (log_id == LOG_ID_SECURITY) {
+        return true;
+    }
+
+    int prio = ANDROID_LOG_INFO;
+    const char* tag = nullptr;
+    size_t tag_len = 0;
+    if (IsBinary(log_id)) {
+        int32_t tag_int = MsgToTag(msg, len);
+        tag = tags_->tagToName(tag_int);
+        if (tag) {
+            tag_len = strlen(tag);
+        }
+    } else {
+        prio = *msg;
+        tag = msg + 1;
+        tag_len = strnlen(tag, len - 1);
+    }
+    return __android_log_is_loggable_len(prio, tag, tag_len, ANDROID_LOG_VERBOSE);
+}
+
+int SerializedLogBuffer::Log(log_id_t log_id, log_time realtime, uid_t uid, pid_t pid, pid_t tid,
+                             const char* msg, uint16_t len) {
+    if (log_id >= LOG_ID_MAX || len == 0) {
+        return -EINVAL;
+    }
+
+    if (!ShouldLog(log_id, msg, len)) {
+        stats_->AddTotal(log_id, len);
+        return -EACCES;
+    }
+
+    auto sequence = sequence_.fetch_add(1, std::memory_order_relaxed);
+
+    auto lock = std::lock_guard{lock_};
+
+    if (logs_[log_id].empty()) {
+        logs_[log_id].push_back(SerializedLogChunk(max_size_[log_id] / 4));
+    }
+
+    auto total_len = sizeof(SerializedLogEntry) + len;
+    if (!logs_[log_id].back().CanLog(total_len)) {
+        logs_[log_id].back().FinishWriting();
+        logs_[log_id].push_back(SerializedLogChunk(max_size_[log_id] / 4));
+    }
+
+    auto entry = logs_[log_id].back().Log(sequence, realtime, uid, pid, tid, msg, len);
+    stats_->Add(entry->ToLogStatisticsElement(log_id));
+
+    MaybePrune(log_id);
+
+    reader_list_->NotifyNewLog(1 << log_id);
+    return len;
+}
+
+void SerializedLogBuffer::MaybePrune(log_id_t log_id) {
+    auto get_total_size = [](const auto& buffer) {
+        size_t total_size = 0;
+        for (const auto& chunk : buffer) {
+            total_size += chunk.PruneSize();
+        }
+        return total_size;
+    };
+    size_t total_size = get_total_size(logs_[log_id]);
+    if (total_size > max_size_[log_id]) {
+        Prune(log_id, total_size - max_size_[log_id], 0);
+        LOG(INFO) << "Pruned Logs from log_id: " << log_id << ", previous size: " << total_size
+                  << " after size: " << get_total_size(logs_[log_id]);
+    }
+}
+
+// Decompresses the chunks, call LogStatistics::Subtract() on each entry, then delete the chunks and
+// the list.  Note that the SerializedLogChunk objects have been removed from logs_ and their
+// references have been deleted from any SerializedFlushToState objects, so this can be safely done
+// without holding lock_.  It is done in a separate thread to avoid delaying the writer thread.  The
+// lambda for the thread takes ownership of the 'chunks' list and thus when the thread ends and the
+// lambda is deleted, the objects are deleted.
+void SerializedLogBuffer::DeleteLogChunks(std::list<SerializedLogChunk>&& chunks, log_id_t log_id) {
+    auto delete_thread = std::thread{[chunks = std::move(chunks), log_id, this]() mutable {
+        for (auto& chunk : chunks) {
+            chunk.IncReaderRefCount();
+            int read_offset = 0;
+            while (read_offset < chunk.write_offset()) {
+                auto* entry = chunk.log_entry(read_offset);
+                stats_->Subtract(entry->ToLogStatisticsElement(log_id));
+                read_offset += entry->total_len();
+            }
+            chunk.DecReaderRefCount(false);
+        }
+    }};
+    delete_thread.detach();
+}
+
+void SerializedLogBuffer::NotifyReadersOfPrune(
+        log_id_t log_id, const std::list<SerializedLogChunk>::iterator& chunk) {
+    for (const auto& reader_thread : reader_list_->reader_threads()) {
+        auto& state = reinterpret_cast<SerializedFlushToState&>(reader_thread->flush_to_state());
+        state.Prune(log_id, chunk);
+    }
+}
+
+bool SerializedLogBuffer::Prune(log_id_t log_id, size_t bytes_to_free, uid_t uid) {
+    // Don't prune logs that are newer than the point at which any reader threads are reading from.
+    LogReaderThread* oldest = nullptr;
+    auto reader_threads_lock = std::lock_guard{reader_list_->reader_threads_lock()};
+    for (const auto& reader_thread : reader_list_->reader_threads()) {
+        if (!reader_thread->IsWatching(log_id)) {
+            continue;
+        }
+        if (!oldest || oldest->start() > reader_thread->start() ||
+            (oldest->start() == reader_thread->start() &&
+             reader_thread->deadline().time_since_epoch().count() != 0)) {
+            oldest = reader_thread.get();
+        }
+    }
+
+    auto& log_buffer = logs_[log_id];
+
+    std::list<SerializedLogChunk> chunks_to_prune;
+    auto prune_chunks = android::base::make_scope_guard([&chunks_to_prune, log_id, this] {
+        DeleteLogChunks(std::move(chunks_to_prune), log_id);
+    });
+
+    auto it = log_buffer.begin();
+    while (it != log_buffer.end()) {
+        if (oldest != nullptr && it->highest_sequence_number() >= oldest->start()) {
+            break;
+        }
+
+        // Increment ahead of time since we're going to splice this iterator from the list.
+        auto it_to_prune = it++;
+
+        // The sequence number check ensures that all readers have read all logs in this chunk, but
+        // they may still hold a reference to the chunk to track their last read log_position.
+        // Notify them to delete the reference.
+        NotifyReadersOfPrune(log_id, it_to_prune);
+
+        if (uid != 0) {
+            // Reorder the log buffer to remove logs from the given UID.  If there are no logs left
+            // in the buffer after the removal, delete it.
+            if (it_to_prune->ClearUidLogs(uid, log_id, stats_)) {
+                log_buffer.erase(it_to_prune);
+            }
+        } else {
+            size_t buffer_size = it_to_prune->PruneSize();
+            chunks_to_prune.splice(chunks_to_prune.end(), log_buffer, it_to_prune);
+            if (buffer_size >= bytes_to_free) {
+                return true;
+            }
+            bytes_to_free -= buffer_size;
+        }
+    }
+
+    // If we've deleted all buffers without bytes_to_free hitting 0, then we're called by Clear()
+    // and should return true.
+    if (it == log_buffer.end()) {
+        return true;
+    }
+
+    // Otherwise we are stuck due to a reader, so mitigate it.
+    KickReader(oldest, log_id, bytes_to_free);
+    return false;
+}
+
+// If the selected reader is blocking our pruning progress, decide on
+// what kind of mitigation is necessary to unblock the situation.
+void SerializedLogBuffer::KickReader(LogReaderThread* reader, log_id_t id, size_t bytes_to_free) {
+    if (bytes_to_free >= max_size_[id]) {  // +100%
+        // A misbehaving or slow reader is dropped if we hit too much memory pressure.
+        LOG(WARNING) << "Kicking blocked reader, " << reader->name()
+                     << ", from LogBuffer::kickMe()";
+        reader->release_Locked();
+    } else if (reader->deadline().time_since_epoch().count() != 0) {
+        // Allow a blocked WRAP deadline reader to trigger and start reporting the log data.
+        reader->triggerReader_Locked();
+    } else {
+        // Tell slow reader to skip entries to catch up.
+        unsigned long prune_rows = bytes_to_free / 300;
+        LOG(WARNING) << "Skipping " << prune_rows << " entries from slow reader, " << reader->name()
+                     << ", from LogBuffer::kickMe()";
+        reader->triggerSkip_Locked(id, prune_rows);
+    }
+}
+
+std::unique_ptr<FlushToState> SerializedLogBuffer::CreateFlushToState(uint64_t start,
+                                                                      LogMask log_mask) {
+    return std::make_unique<SerializedFlushToState>(start, log_mask);
+}
+
+bool SerializedLogBuffer::FlushTo(
+        LogWriter* writer, FlushToState& abstract_state,
+        const std::function<FilterResult(log_id_t log_id, pid_t pid, uint64_t sequence,
+                                         log_time realtime)>& filter) {
+    auto lock = std::unique_lock{lock_};
+
+    auto& state = reinterpret_cast<SerializedFlushToState&>(abstract_state);
+    state.InitializeLogs(logs_);
+    state.CheckForNewLogs();
+
+    while (state.HasUnreadLogs()) {
+        MinHeapElement top = state.PopNextUnreadLog();
+        auto* entry = top.entry;
+        auto log_id = top.log_id;
+
+        if (entry->sequence() < state.start()) {
+            continue;
+        }
+        state.set_start(entry->sequence());
+
+        if (!writer->privileged() && entry->uid() != writer->uid()) {
+            continue;
+        }
+
+        if (filter) {
+            auto ret = filter(log_id, entry->pid(), entry->sequence(), entry->realtime());
+            if (ret == FilterResult::kSkip) {
+                continue;
+            }
+            if (ret == FilterResult::kStop) {
+                break;
+            }
+        }
+
+        lock.unlock();
+        // We never prune logs equal to or newer than any LogReaderThreads' `start` value, so the
+        // `entry` pointer is safe here without the lock
+        if (!entry->Flush(writer, log_id)) {
+            return false;
+        }
+        lock.lock();
+
+        // Since we released the log above, buffers that aren't in our min heap may now have had
+        // logs added, so re-check them.
+        state.CheckForNewLogs();
+    }
+
+    state.set_start(state.start() + 1);
+    return true;
+}
+
+bool SerializedLogBuffer::Clear(log_id_t id, uid_t uid) {
+    // Try three times to clear, then disconnect the readers and try one final time.
+    for (int retry = 0; retry < 3; ++retry) {
+        {
+            auto lock = std::lock_guard{lock_};
+            bool prune_success = Prune(id, ULONG_MAX, uid);
+            if (prune_success) {
+                return true;
+            }
+        }
+        sleep(1);
+    }
+    // Check if it is still busy after the sleep, we try to prune one entry, not another clear run,
+    // so we are looking for the quick side effect of the return value to tell us if we have a
+    // _blocked_ reader.
+    bool busy = false;
+    {
+        auto lock = std::lock_guard{lock_};
+        busy = !Prune(id, 1, uid);
+    }
+    // It is still busy, disconnect all readers.
+    if (busy) {
+        auto reader_threads_lock = std::lock_guard{reader_list_->reader_threads_lock()};
+        for (const auto& reader_thread : reader_list_->reader_threads()) {
+            if (reader_thread->IsWatching(id)) {
+                LOG(WARNING) << "Kicking blocked reader, " << reader_thread->name()
+                             << ", from LogBuffer::clear()";
+                reader_thread->release_Locked();
+            }
+        }
+    }
+    auto lock = std::lock_guard{lock_};
+    return Prune(id, ULONG_MAX, uid);
+}
+
+unsigned long SerializedLogBuffer::GetSize(log_id_t id) {
+    auto lock = std::lock_guard{lock_};
+    size_t retval = 2 * max_size_[id] / 3;  // See the comment in SetSize().
+    return retval;
+}
+
+// New SerializedLogChunk objects will be allocated according to the new size, but older one are
+// unchanged.  MaybePrune() is called on the log buffer to reduce it to an appropriate size if the
+// new size is lower.
+// ChattyLogBuffer/SimpleLogBuffer don't consider the 'Overhead' of LogBufferElement or the
+// std::list<> overhead as part of the log size.  SerializedLogBuffer does by its very nature, so
+// the 'size' metric is not compatible between them.  Their actual memory usage is between 1.5x and
+// 2x of what they claim to use, so we conservatively set our internal size as size + size / 2.
+int SerializedLogBuffer::SetSize(log_id_t id, unsigned long size) {
+    // Reasonable limits ...
+    if (!__android_logger_valid_buffer_size(size)) {
+        return -1;
+    }
+
+    auto lock = std::lock_guard{lock_};
+    max_size_[id] = size + size / 2;
+
+    MaybePrune(id);
+
+    return 0;
+}
diff --git a/logd/SerializedLogBuffer.h b/logd/SerializedLogBuffer.h
new file mode 100644
index 0000000..27334ba
--- /dev/null
+++ b/logd/SerializedLogBuffer.h
@@ -0,0 +1,74 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <atomic>
+#include <bitset>
+#include <list>
+#include <mutex>
+#include <queue>
+#include <vector>
+
+#include <android-base/thread_annotations.h>
+
+#include "LogBuffer.h"
+#include "LogReaderList.h"
+#include "LogStatistics.h"
+#include "LogTags.h"
+#include "SerializedLogChunk.h"
+#include "SerializedLogEntry.h"
+#include "rwlock.h"
+
+class SerializedLogBuffer : public LogBuffer {
+  public:
+    SerializedLogBuffer(LogReaderList* reader_list, LogTags* tags, LogStatistics* stats);
+    ~SerializedLogBuffer();
+    void Init() override;
+
+    int Log(log_id_t log_id, log_time realtime, uid_t uid, pid_t pid, pid_t tid, const char* msg,
+            uint16_t len) override;
+    std::unique_ptr<FlushToState> CreateFlushToState(uint64_t start, LogMask log_mask) override;
+    bool FlushTo(LogWriter* writer, FlushToState& state,
+                 const std::function<FilterResult(log_id_t log_id, pid_t pid, uint64_t sequence,
+                                                  log_time realtime)>& filter) override;
+
+    bool Clear(log_id_t id, uid_t uid) override;
+    unsigned long GetSize(log_id_t id) override;
+    int SetSize(log_id_t id, unsigned long size) override;
+
+    uint64_t sequence() const override { return sequence_.load(std::memory_order_relaxed); }
+
+  private:
+    bool ShouldLog(log_id_t log_id, const char* msg, uint16_t len);
+    void MaybePrune(log_id_t log_id) REQUIRES(lock_);
+    bool Prune(log_id_t log_id, size_t bytes_to_free, uid_t uid) REQUIRES(lock_);
+    void KickReader(LogReaderThread* reader, log_id_t id, size_t bytes_to_free)
+            REQUIRES_SHARED(lock_);
+    void NotifyReadersOfPrune(log_id_t log_id, const std::list<SerializedLogChunk>::iterator& chunk)
+            REQUIRES(reader_list_->reader_threads_lock());
+    void DeleteLogChunks(std::list<SerializedLogChunk>&& chunks, log_id_t log_id);
+
+    LogReaderList* reader_list_;
+    LogTags* tags_;
+    LogStatistics* stats_;
+
+    unsigned long max_size_[LOG_ID_MAX] GUARDED_BY(lock_) = {};
+    std::list<SerializedLogChunk> logs_[LOG_ID_MAX] GUARDED_BY(lock_);
+    RwLock lock_;
+
+    std::atomic<uint64_t> sequence_ = 1;
+};
diff --git a/logd/SerializedLogChunk.cpp b/logd/SerializedLogChunk.cpp
new file mode 100644
index 0000000..2516003
--- /dev/null
+++ b/logd/SerializedLogChunk.cpp
@@ -0,0 +1,114 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "SerializedLogChunk.h"
+
+#include <android-base/logging.h>
+
+#include "CompressionEngine.h"
+
+SerializedLogChunk::~SerializedLogChunk() {
+    CHECK_EQ(reader_ref_count_, 0U);
+}
+
+void SerializedLogChunk::Compress() {
+    if (compressed_log_.empty()) {
+        CompressionEngine::GetInstance().Compress({contents_.data(), write_offset_},
+                                                  compressed_log_);
+        LOG(INFO) << "Compressed Log, buffer max size: " << contents_.size()
+                  << " size used: " << write_offset_
+                  << " compressed size: " << compressed_log_.size();
+    }
+    contents_.resize(0);
+}
+
+// TODO: Develop a better reference counting strategy to guard against the case where the writer is
+// much faster than the reader, and we needlessly compess / decompress the logs.
+void SerializedLogChunk::IncReaderRefCount() {
+    if (++reader_ref_count_ != 1 || writer_active_) {
+        return;
+    }
+    CompressionEngine::GetInstance().Decompress(compressed_log_, contents_, write_offset_);
+}
+
+void SerializedLogChunk::DecReaderRefCount(bool compress) {
+    CHECK_NE(reader_ref_count_, 0U);
+    if (--reader_ref_count_ != 0) {
+        return;
+    }
+    if (compress && !writer_active_) {
+        Compress();
+    }
+}
+
+bool SerializedLogChunk::ClearUidLogs(uid_t uid, log_id_t log_id, LogStatistics* stats) {
+    CHECK_EQ(reader_ref_count_, 0U);
+    if (write_offset_ == 0) {
+        return true;
+    }
+
+    IncReaderRefCount();
+
+    int read_offset = 0;
+    int new_write_offset = 0;
+    while (read_offset < write_offset_) {
+        const auto* entry = log_entry(read_offset);
+        if (entry->uid() == uid) {
+            read_offset += entry->total_len();
+            if (stats != nullptr) {
+                stats->Subtract(entry->ToLogStatisticsElement(log_id));
+            }
+            continue;
+        }
+        size_t entry_total_len = entry->total_len();
+        if (read_offset != new_write_offset) {
+            memmove(contents_.data() + new_write_offset, contents_.data() + read_offset,
+                    entry_total_len);
+        }
+        read_offset += entry_total_len;
+        new_write_offset += entry_total_len;
+    }
+
+    if (new_write_offset == 0) {
+        DecReaderRefCount(false);
+        return true;
+    }
+
+    // Clear the old compressed logs and set write_offset_ appropriately for DecReaderRefCount()
+    // to compress the new partially cleared log.
+    if (new_write_offset != write_offset_) {
+        compressed_log_.clear();
+        write_offset_ = new_write_offset;
+    }
+
+    DecReaderRefCount(true);
+
+    return false;
+}
+
+bool SerializedLogChunk::CanLog(size_t len) {
+    return write_offset_ + len <= contents_.size();
+}
+
+SerializedLogEntry* SerializedLogChunk::Log(uint64_t sequence, log_time realtime, uid_t uid,
+                                            pid_t pid, pid_t tid, const char* msg, uint16_t len) {
+    auto new_log_address = contents_.data() + write_offset_;
+    auto* entry = new (new_log_address) SerializedLogEntry(uid, pid, tid, sequence, realtime, len);
+    memcpy(entry->msg(), msg, len);
+    write_offset_ += entry->total_len();
+    highest_sequence_number_ = sequence;
+    return entry;
+}
\ No newline at end of file
diff --git a/logd/SerializedLogChunk.h b/logd/SerializedLogChunk.h
new file mode 100644
index 0000000..75cb936
--- /dev/null
+++ b/logd/SerializedLogChunk.h
@@ -0,0 +1,72 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <sys/types.h>
+
+#include <vector>
+
+#include "LogWriter.h"
+#include "SerializedLogEntry.h"
+
+class SerializedLogChunk {
+  public:
+    SerializedLogChunk(size_t size) : contents_(size) {}
+    ~SerializedLogChunk();
+
+    void Compress();
+    void IncReaderRefCount();
+    // Decrease the reader ref count and compress the log if appropriate.  `compress` should only be
+    // set to false in the case that the log buffer will be deleted afterwards.
+    void DecReaderRefCount(bool compress);
+
+    // Must have no readers referencing this.  Return true if there are no logs left in this chunk.
+    bool ClearUidLogs(uid_t uid, log_id_t log_id, LogStatistics* stats);
+
+    bool CanLog(size_t len);
+    SerializedLogEntry* Log(uint64_t sequence, log_time realtime, uid_t uid, pid_t pid, pid_t tid,
+                            const char* msg, uint16_t len);
+
+    // If this buffer has been compressed, we only consider its compressed size when accounting for
+    // memory consumption for pruning.  This is since the uncompressed log is only by used by
+    // readers, and thus not a representation of how much these logs cost to keep in memory.
+    size_t PruneSize() const { return compressed_log_.size() ?: contents_.size(); }
+
+    void FinishWriting() {
+        writer_active_ = false;
+        if (reader_ref_count_ == 0) {
+            Compress();
+        }
+    }
+
+    const SerializedLogEntry* log_entry(int offset) const {
+        return reinterpret_cast<const SerializedLogEntry*>(data() + offset);
+    }
+    const uint8_t* data() const { return contents_.data(); }
+    int write_offset() const { return write_offset_; }
+    uint64_t highest_sequence_number() const { return highest_sequence_number_; }
+
+  private:
+    // The decompressed contents of this log buffer.  Deallocated when the ref_count reaches 0 and
+    // writer_active_ is false.
+    std::vector<uint8_t> contents_;
+    int write_offset_ = 0;
+    uint32_t reader_ref_count_ = 0;
+    bool writer_active_ = true;
+    uint64_t highest_sequence_number_ = 1;
+    std::vector<uint8_t> compressed_log_;
+};
diff --git a/logd/SerializedLogChunkTest.cpp b/logd/SerializedLogChunkTest.cpp
new file mode 100644
index 0000000..6572503
--- /dev/null
+++ b/logd/SerializedLogChunkTest.cpp
@@ -0,0 +1,284 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "SerializedLogChunk.h"
+
+#include <limits>
+
+#include <android-base/stringprintf.h>
+#include <android/log.h>
+#include <gtest/gtest.h>
+
+using android::base::StringPrintf;
+
+TEST(SerializedLogChunk, smoke) {
+    size_t chunk_size = 10 * 4096;
+    auto chunk = SerializedLogChunk{chunk_size};
+    EXPECT_EQ(chunk_size, chunk.PruneSize());
+
+    static const char log_message[] = "log message";
+    size_t expected_total_len = sizeof(SerializedLogEntry) + sizeof(log_message);
+    ASSERT_TRUE(chunk.CanLog(expected_total_len));
+    EXPECT_TRUE(chunk.CanLog(chunk_size));
+    EXPECT_FALSE(chunk.CanLog(chunk_size + 1));
+
+    log_time time(CLOCK_REALTIME);
+    auto* entry = chunk.Log(1234, time, 0, 1, 2, log_message, sizeof(log_message));
+    ASSERT_NE(nullptr, entry);
+
+    EXPECT_EQ(1234U, entry->sequence());
+    EXPECT_EQ(time, entry->realtime());
+    EXPECT_EQ(0U, entry->uid());
+    EXPECT_EQ(1, entry->pid());
+    EXPECT_EQ(2, entry->tid());
+    EXPECT_EQ(sizeof(log_message), entry->msg_len());
+    EXPECT_STREQ(log_message, entry->msg());
+    EXPECT_EQ(expected_total_len, entry->total_len());
+
+    EXPECT_FALSE(chunk.CanLog(chunk_size));
+    EXPECT_EQ(static_cast<int>(expected_total_len), chunk.write_offset());
+    EXPECT_EQ(1234U, chunk.highest_sequence_number());
+}
+
+TEST(SerializedLogChunk, fill_log_exactly) {
+    static const char log_message[] = "this is a log message";
+    size_t individual_message_size = sizeof(SerializedLogEntry) + sizeof(log_message);
+    size_t chunk_size = individual_message_size * 3;
+    auto chunk = SerializedLogChunk{chunk_size};
+    EXPECT_EQ(chunk_size, chunk.PruneSize());
+
+    ASSERT_TRUE(chunk.CanLog(individual_message_size));
+    EXPECT_NE(nullptr, chunk.Log(1, log_time(), 1000, 1, 1, log_message, sizeof(log_message)));
+
+    ASSERT_TRUE(chunk.CanLog(individual_message_size));
+    EXPECT_NE(nullptr, chunk.Log(2, log_time(), 1000, 2, 1, log_message, sizeof(log_message)));
+
+    ASSERT_TRUE(chunk.CanLog(individual_message_size));
+    EXPECT_NE(nullptr, chunk.Log(3, log_time(), 1000, 3, 1, log_message, sizeof(log_message)));
+
+    EXPECT_FALSE(chunk.CanLog(1));
+}
+
+TEST(SerializedLogChunk, three_logs) {
+    size_t chunk_size = 10 * 4096;
+    auto chunk = SerializedLogChunk{chunk_size};
+
+    chunk.Log(2, log_time(0x1234, 0x5678), 0x111, 0x222, 0x333, "initial message",
+              strlen("initial message"));
+    chunk.Log(3, log_time(0x2345, 0x6789), 0x444, 0x555, 0x666, "second message",
+              strlen("second message"));
+    auto uint64_t_max = std::numeric_limits<uint64_t>::max();
+    auto uint32_t_max = std::numeric_limits<uint32_t>::max();
+    chunk.Log(uint64_t_max, log_time(uint32_t_max, uint32_t_max), uint32_t_max, uint32_t_max,
+              uint32_t_max, "last message", strlen("last message"));
+
+    static const char expected_buffer_data[] =
+            "\x11\x01\x00\x00\x22\x02\x00\x00\x33\x03\x00\x00"  // UID PID TID
+            "\x02\x00\x00\x00\x00\x00\x00\x00"                  // Sequence
+            "\x34\x12\x00\x00\x78\x56\x00\x00"                  // Timestamp
+            "\x0F\x00initial message"                           // msg_len + message
+            "\x44\x04\x00\x00\x55\x05\x00\x00\x66\x06\x00\x00"  // UID PID TID
+            "\x03\x00\x00\x00\x00\x00\x00\x00"                  // Sequence
+            "\x45\x23\x00\x00\x89\x67\x00\x00"                  // Timestamp
+            "\x0E\x00second message"                            // msg_len + message
+            "\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF"  // UID PID TID
+            "\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF"                  // Sequence
+            "\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF"                  // Timestamp
+            "\x0C\x00last message";                             // msg_len + message
+
+    for (size_t i = 0; i < chunk_size; ++i) {
+        if (i < sizeof(expected_buffer_data)) {
+            EXPECT_EQ(static_cast<uint8_t>(expected_buffer_data[i]), chunk.data()[i])
+                    << "position: " << i;
+        } else {
+            EXPECT_EQ(0, chunk.data()[i]) << "position: " << i;
+        }
+    }
+}
+
+// Check that the CHECK() in DecReaderRefCount() if the ref count goes bad is caught.
+TEST(SerializedLogChunk, catch_DecCompressedRef_CHECK) {
+    size_t chunk_size = 10 * 4096;
+    auto chunk = SerializedLogChunk{chunk_size};
+    EXPECT_DEATH({ chunk.DecReaderRefCount(true); }, "");
+    EXPECT_DEATH({ chunk.DecReaderRefCount(false); }, "");
+}
+
+// Check that the CHECK() in ClearUidLogs() if the ref count is greater than 0 is caught.
+TEST(SerializedLogChunk, catch_ClearUidLogs_CHECK) {
+    size_t chunk_size = 10 * 4096;
+    auto chunk = SerializedLogChunk{chunk_size};
+    chunk.IncReaderRefCount();
+    EXPECT_DEATH({ chunk.ClearUidLogs(1000, LOG_ID_MAIN, nullptr); }, "");
+    chunk.DecReaderRefCount(false);
+}
+
+class UidClearTest : public testing::TestWithParam<bool> {
+  protected:
+    template <typename Write, typename Check>
+    void Test(const Write& write, const Check& check, bool expected_result) {
+        write(chunk_);
+
+        bool finish_writing = GetParam();
+        if (finish_writing) {
+            chunk_.FinishWriting();
+        }
+        EXPECT_EQ(expected_result, chunk_.ClearUidLogs(kUidToClear, LOG_ID_MAIN, nullptr));
+        if (finish_writing) {
+            chunk_.IncReaderRefCount();
+        }
+
+        check(chunk_);
+
+        if (finish_writing) {
+            chunk_.DecReaderRefCount(false);
+        }
+    }
+
+    static constexpr size_t kChunkSize = 10 * 4096;
+    static constexpr uid_t kUidToClear = 1000;
+    static constexpr uid_t kOtherUid = 1234;
+
+    SerializedLogChunk chunk_{kChunkSize};
+};
+
+// Test that ClearUidLogs() is a no-op if there are no logs of that UID in the buffer.
+TEST_P(UidClearTest, no_logs_in_chunk) {
+    auto write = [](SerializedLogChunk&) {};
+    auto check = [](SerializedLogChunk&) {};
+
+    Test(write, check, true);
+}
+
+// Test that ClearUidLogs() is a no-op if there are no logs of that UID in the buffer.
+TEST_P(UidClearTest, no_logs_from_uid) {
+    static const char msg[] = "this is a log message";
+    auto write = [](SerializedLogChunk& chunk) {
+        chunk.Log(1, log_time(), kOtherUid, 1, 2, msg, sizeof(msg));
+    };
+
+    auto check = [](SerializedLogChunk& chunk) {
+        auto* entry = chunk.log_entry(0);
+        EXPECT_STREQ(msg, entry->msg());
+    };
+
+    Test(write, check, false);
+}
+
+// Test that ClearUidLogs() returns true if all logs in a given buffer correspond to the given UID.
+TEST_P(UidClearTest, all_single) {
+    static const char msg[] = "this is a log message";
+    auto write = [](SerializedLogChunk& chunk) {
+        chunk.Log(1, log_time(), kUidToClear, 1, 2, msg, sizeof(msg));
+    };
+    auto check = [](SerializedLogChunk&) {};
+
+    Test(write, check, true);
+}
+
+// Test that ClearUidLogs() returns true if all logs in a given buffer correspond to the given UID.
+TEST_P(UidClearTest, all_multiple) {
+    static const char msg[] = "this is a log message";
+    auto write = [](SerializedLogChunk& chunk) {
+        chunk.Log(2, log_time(), kUidToClear, 1, 2, msg, sizeof(msg));
+        chunk.Log(3, log_time(), kUidToClear, 1, 2, msg, sizeof(msg));
+        chunk.Log(4, log_time(), kUidToClear, 1, 2, msg, sizeof(msg));
+    };
+    auto check = [](SerializedLogChunk&) {};
+
+    Test(write, check, true);
+}
+
+static std::string MakePrintable(const uint8_t* in, size_t length) {
+    std::string result;
+    for (size_t i = 0; i < length; ++i) {
+        uint8_t c = in[i];
+        if (isprint(c)) {
+            result.push_back(c);
+        } else {
+            result.append(StringPrintf("\\%02x", static_cast<int>(c) & 0xFF));
+        }
+    }
+    return result;
+}
+
+// This test clears UID logs at the beginning and end of the buffer, as well as two back to back
+// logs in the interior.
+TEST_P(UidClearTest, clear_beginning_and_end) {
+    static const char msg1[] = "this is a log message";
+    static const char msg2[] = "non-cleared message";
+    static const char msg3[] = "back to back cleared messages";
+    static const char msg4[] = "second in a row gone";
+    static const char msg5[] = "but we save this one";
+    static const char msg6[] = "and this 1!";
+    static const char msg7[] = "the last one goes too";
+    auto write = [](SerializedLogChunk& chunk) {
+        ASSERT_NE(nullptr, chunk.Log(1, log_time(), kUidToClear, 1, 2, msg1, sizeof(msg1)));
+        ASSERT_NE(nullptr, chunk.Log(2, log_time(), kOtherUid, 1, 2, msg2, sizeof(msg2)));
+        ASSERT_NE(nullptr, chunk.Log(3, log_time(), kUidToClear, 1, 2, msg3, sizeof(msg3)));
+        ASSERT_NE(nullptr, chunk.Log(4, log_time(), kUidToClear, 1, 2, msg4, sizeof(msg4)));
+        ASSERT_NE(nullptr, chunk.Log(5, log_time(), kOtherUid, 1, 2, msg5, sizeof(msg5)));
+        ASSERT_NE(nullptr, chunk.Log(6, log_time(), kOtherUid, 1, 2, msg6, sizeof(msg6)));
+        ASSERT_NE(nullptr, chunk.Log(7, log_time(), kUidToClear, 1, 2, msg7, sizeof(msg7)));
+    };
+
+    auto check = [](SerializedLogChunk& chunk) {
+        size_t read_offset = 0;
+        auto* entry = chunk.log_entry(read_offset);
+        EXPECT_STREQ(msg2, entry->msg());
+        read_offset += entry->total_len();
+
+        entry = chunk.log_entry(read_offset);
+        EXPECT_STREQ(msg5, entry->msg());
+        read_offset += entry->total_len();
+
+        entry = chunk.log_entry(read_offset);
+        EXPECT_STREQ(msg6, entry->msg()) << MakePrintable(chunk.data(), chunk.write_offset());
+        read_offset += entry->total_len();
+
+        EXPECT_EQ(static_cast<int>(read_offset), chunk.write_offset());
+    };
+    Test(write, check, false);
+}
+
+// This tests the opposite case of beginning_and_end, in which we don't clear the beginning or end
+// logs.  There is a single log pruned in the middle instead of back to back logs.
+TEST_P(UidClearTest, save_beginning_and_end) {
+    static const char msg1[] = "saved first message";
+    static const char msg2[] = "cleared interior message";
+    static const char msg3[] = "last message stays";
+    auto write = [](SerializedLogChunk& chunk) {
+        ASSERT_NE(nullptr, chunk.Log(1, log_time(), kOtherUid, 1, 2, msg1, sizeof(msg1)));
+        ASSERT_NE(nullptr, chunk.Log(2, log_time(), kUidToClear, 1, 2, msg2, sizeof(msg2)));
+        ASSERT_NE(nullptr, chunk.Log(3, log_time(), kOtherUid, 1, 2, msg3, sizeof(msg3)));
+    };
+
+    auto check = [](SerializedLogChunk& chunk) {
+        size_t read_offset = 0;
+        auto* entry = chunk.log_entry(read_offset);
+        EXPECT_STREQ(msg1, entry->msg());
+        read_offset += entry->total_len();
+
+        entry = chunk.log_entry(read_offset);
+        EXPECT_STREQ(msg3, entry->msg());
+        read_offset += entry->total_len();
+
+        EXPECT_EQ(static_cast<int>(read_offset), chunk.write_offset());
+    };
+    Test(write, check, false);
+}
+
+INSTANTIATE_TEST_CASE_P(UidClearTests, UidClearTest, testing::Values(true, false));
diff --git a/logd/SerializedLogEntry.h b/logd/SerializedLogEntry.h
new file mode 100644
index 0000000..f599abe
--- /dev/null
+++ b/logd/SerializedLogEntry.h
@@ -0,0 +1,98 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <stdint.h>
+#include <stdlib.h>
+#include <sys/types.h>
+
+#include <log/log.h>
+#include <log/log_read.h>
+
+#include "LogStatistics.h"
+#include "LogWriter.h"
+
+// These structs are packed into a single chunk of memory for each log type within a
+// SerializedLogChunk object.  Their message is contained immediately at the end of the struct.  The
+// address of the next log in the buffer is *this + sizeof(SerializedLogEntry) + msg_len_.  If that
+// value would overflow the chunk of memory associated with the SerializedLogChunk object, then a
+// new SerializedLogChunk must be allocated to contain the next SerializedLogEntry.
+class __attribute__((packed)) SerializedLogEntry {
+  public:
+    SerializedLogEntry(uid_t uid, pid_t pid, pid_t tid, uint64_t sequence, log_time realtime,
+                       uint16_t len)
+        : uid_(uid),
+          pid_(pid),
+          tid_(tid),
+          sequence_(sequence),
+          realtime_(realtime),
+          msg_len_(len) {}
+    SerializedLogEntry(const SerializedLogEntry& elem) = delete;
+    SerializedLogEntry& operator=(const SerializedLogEntry& elem) = delete;
+    ~SerializedLogEntry() {
+        // Never place anything in this destructor.  This class is in place constructed and never
+        // destructed.
+    }
+
+    LogStatisticsElement ToLogStatisticsElement(log_id_t log_id) const {
+        return LogStatisticsElement{
+                .uid = uid(),
+                .pid = pid(),
+                .tid = tid(),
+                .tag = IsBinary(log_id) ? MsgToTag(msg(), msg_len()) : 0,
+                .realtime = realtime(),
+                .msg = msg(),
+                .msg_len = msg_len(),
+                .dropped_count = 0,
+                .log_id = log_id,
+        };
+    }
+
+    bool Flush(LogWriter* writer, log_id_t log_id) const {
+        struct logger_entry entry = {};
+
+        entry.hdr_size = sizeof(struct logger_entry);
+        entry.lid = log_id;
+        entry.pid = pid();
+        entry.tid = tid();
+        entry.uid = uid();
+        entry.sec = realtime().tv_sec;
+        entry.nsec = realtime().tv_nsec;
+        entry.len = msg_len();
+
+        return writer->Write(entry, msg());
+    }
+
+    uid_t uid() const { return uid_; }
+    pid_t pid() const { return pid_; }
+    pid_t tid() const { return tid_; }
+    uint16_t msg_len() const { return msg_len_; }
+    uint64_t sequence() const { return sequence_; }
+    log_time realtime() const { return realtime_; }
+
+    char* msg() { return reinterpret_cast<char*>(this) + sizeof(*this); }
+    const char* msg() const { return reinterpret_cast<const char*>(this) + sizeof(*this); }
+    uint16_t total_len() const { return sizeof(*this) + msg_len_; }
+
+  private:
+    const uint32_t uid_;
+    const uint32_t pid_;
+    const uint32_t tid_;
+    const uint64_t sequence_;
+    const log_time realtime_;
+    const uint16_t msg_len_;
+};
diff --git a/logd/fuzz/Android.bp b/logd/fuzz/Android.bp
index f65fbdf..9834ff0 100644
--- a/logd/fuzz/Android.bp
+++ b/logd/fuzz/Android.bp
@@ -26,6 +26,8 @@
         "liblogd",
         "libcutils",
         "libsysutils",
+        "libz",
+        "libzstd",
     ],
     cflags: ["-Werror"],
 }
diff --git a/logd/main.cpp b/logd/main.cpp
index 773ffb8..46b6567 100644
--- a/logd/main.cpp
+++ b/logd/main.cpp
@@ -58,6 +58,7 @@
 #include "LogStatistics.h"
 #include "LogTags.h"
 #include "LogUtils.h"
+#include "SerializedLogBuffer.h"
 #include "SimpleLogBuffer.h"
 
 #define KMSG_PRIORITY(PRI)                                 \