snapuserd: Create a ReadWorker class.

This splits the dm-user specific parts of Worker into a derived class.

Bug: 288273605
Test: snapuserd_test
Change-Id: Ic0ed1a8dff30018fa8466e7dc6e92469f1c87579
diff --git a/fs_mgr/libsnapshot/snapuserd/Android.bp b/fs_mgr/libsnapshot/snapuserd/Android.bp
index 9fe567a..38ec23f 100644
--- a/fs_mgr/libsnapshot/snapuserd/Android.bp
+++ b/fs_mgr/libsnapshot/snapuserd/Android.bp
@@ -63,8 +63,8 @@
         "dm-snapshot-merge/snapuserd_readahead.cpp",
         "snapuserd_buffer.cpp",
         "user-space-merge/handler_manager.cpp",
+        "user-space-merge/read_worker.cpp",
         "user-space-merge/snapuserd_core.cpp",
-        "user-space-merge/snapuserd_dm_user.cpp",
         "user-space-merge/snapuserd_merge.cpp",
         "user-space-merge/snapuserd_readahead.cpp",
         "user-space-merge/snapuserd_transitions.cpp",
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp
index 734e84f..4105b4b 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp
@@ -18,6 +18,7 @@
 
 #include <android-base/logging.h>
 
+#include "read_worker.h"
 #include "snapuserd_core.h"
 #include "snapuserd_merge.h"
 
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.cpp
similarity index 94%
rename from fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp
rename to fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.cpp
index 2b9d14e..49a8360 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.cpp
@@ -14,6 +14,8 @@
  * limitations under the License.
  */
 
+#include "read_worker.h"
+
 #include "snapuserd_core.h"
 
 namespace android {
@@ -34,6 +36,12 @@
     snapuserd_ = snapuserd;
 }
 
+ReadWorker::ReadWorker(const std::string& cow_device, const std::string& backing_device,
+                       const std::string& control_device, const std::string& misc_name,
+                       const std::string& base_path_merge,
+                       std::shared_ptr<SnapshotHandler> snapuserd)
+    : Worker(cow_device, backing_device, control_device, misc_name, base_path_merge, snapuserd) {}
+
 bool Worker::InitializeFds() {
     backing_store_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY));
     if (backing_store_fd_ < 0) {
@@ -118,7 +126,7 @@
 
 // Start the copy operation. This will read the backing
 // block device which is represented by cow_op->source.
-bool Worker::ProcessCopyOp(const CowOperation* cow_op) {
+bool ReadWorker::ProcessCopyOp(const CowOperation* cow_op) {
     if (!ReadFromSourceDevice(cow_op)) {
         return false;
     }
@@ -126,7 +134,7 @@
     return true;
 }
 
-bool Worker::ProcessXorOp(const CowOperation* cow_op) {
+bool ReadWorker::ProcessXorOp(const CowOperation* cow_op) {
     if (!ReadFromSourceDevice(cow_op)) {
         return false;
     }
@@ -165,7 +173,7 @@
     return true;
 }
 
-bool Worker::ProcessOrderedOp(const CowOperation* cow_op) {
+bool ReadWorker::ProcessOrderedOp(const CowOperation* cow_op) {
     void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
     if (buffer == nullptr) {
         SNAP_LOG(ERROR) << "ProcessOrderedOp: Failed to get payload buffer";
@@ -218,7 +226,7 @@
     return false;
 }
 
-bool Worker::ProcessCowOp(const CowOperation* cow_op) {
+bool ReadWorker::ProcessCowOp(const CowOperation* cow_op) {
     if (cow_op == nullptr) {
         SNAP_LOG(ERROR) << "ProcessCowOp: Invalid cow_op";
         return false;
@@ -257,7 +265,6 @@
 
 bool Worker::Init() {
     InitializeBufsink();
-    xorsink_.Initialize(&bufsink_, BLOCK_SZ);
 
     if (!InitializeFds()) {
         return false;
@@ -270,7 +277,15 @@
     return true;
 }
 
-bool Worker::RunThread() {
+bool ReadWorker::Init() {
+    if (!Worker::Init()) {
+        return false;
+    }
+    xorsink_.Initialize(&bufsink_, BLOCK_SZ);
+    return true;
+}
+
+bool ReadWorker::Run() {
     SNAP_LOG(INFO) << "Processing snapshot I/O requests....";
 
     if (setpriority(PRIO_PROCESS, gettid(), kNiceValueForMergeThreads)) {
@@ -291,7 +306,7 @@
 }
 
 // Send the payload/data back to dm-user misc device.
-bool Worker::WriteDmUserPayload(size_t size) {
+bool ReadWorker::WriteDmUserPayload(size_t size) {
     size_t payload_size = size;
     void* buf = bufsink_.GetPayloadBufPtr();
     if (header_response_) {
@@ -329,7 +344,7 @@
     return true;
 }
 
-bool Worker::ReadAlignedSector(sector_t sector, size_t sz) {
+bool ReadWorker::ReadAlignedSector(sector_t sector, size_t sz) {
     size_t remaining_size = sz;
     std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
     int ret = 0;
@@ -389,7 +404,7 @@
     return true;
 }
 
-int Worker::ReadUnalignedSector(
+int ReadWorker::ReadUnalignedSector(
         sector_t sector, size_t size,
         std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it) {
     size_t skip_sector_size = 0;
@@ -424,7 +439,7 @@
     return std::min(size, (BLOCK_SZ - skip_sector_size));
 }
 
-bool Worker::ReadUnalignedSector(sector_t sector, size_t size) {
+bool ReadWorker::ReadUnalignedSector(sector_t sector, size_t size) {
     bufsink_.ResetBufferOffset();
     std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
 
@@ -563,7 +578,7 @@
     return true;
 }
 
-void Worker::RespondIOError() {
+void ReadWorker::RespondIOError() {
     struct dm_user_header* header = bufsink_.GetHeaderPtr();
     header->type = DM_USER_RESP_ERROR;
     // This is an issue with the dm-user interface. There
@@ -580,7 +595,7 @@
     WriteDmUserPayload(0);
 }
 
-bool Worker::DmuserReadRequest() {
+bool ReadWorker::DmuserReadRequest() {
     struct dm_user_header* header = bufsink_.GetHeaderPtr();
 
     // Unaligned I/O request
@@ -591,7 +606,7 @@
     return ReadAlignedSector(header->sector, header->len);
 }
 
-bool Worker::ProcessIORequest() {
+bool ReadWorker::ProcessIORequest() {
     // Read Header from dm-user misc device. This gives
     // us the sector number for which IO is issued by dm-snapshot device
     struct dm_user_header* header = bufsink_.GetHeaderPtr();
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.h
new file mode 100644
index 0000000..262f8ad
--- /dev/null
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.h
@@ -0,0 +1,53 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#pragma once
+
+#include "snapuserd_core.h"
+
+namespace android {
+namespace snapshot {
+
+class ReadWorker : public Worker {
+  public:
+    ReadWorker(const std::string& cow_device, const std::string& backing_device,
+               const std::string& control_device, const std::string& misc_name,
+               const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd);
+
+    bool Run();
+    bool Init() override;
+
+  private:
+    // Functions interacting with dm-user
+    bool ProcessIORequest();
+    bool WriteDmUserPayload(size_t size);
+    bool DmuserReadRequest();
+    void RespondIOError();
+
+    bool ProcessCowOp(const CowOperation* cow_op);
+    bool ProcessXorOp(const CowOperation* cow_op);
+    bool ProcessOrderedOp(const CowOperation* cow_op);
+    bool ProcessCopyOp(const CowOperation* cow_op);
+
+    bool ReadAlignedSector(sector_t sector, size_t sz);
+    bool ReadUnalignedSector(sector_t sector, size_t size);
+    int ReadUnalignedSector(sector_t sector, size_t size,
+                            std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it);
+
+    XorSink xorsink_;
+    bool header_response_ = false;
+};
+
+}  // namespace snapshot
+}  // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
index 4f7495c..baf06b3 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
@@ -23,6 +23,7 @@
 #include <android-base/scopeguard.h>
 #include <android-base/strings.h>
 
+#include "read_worker.h"
 #include "snapuserd_merge.h"
 
 namespace android {
@@ -48,9 +49,8 @@
 
 bool SnapshotHandler::InitializeWorkers() {
     for (int i = 0; i < num_worker_threads_; i++) {
-        std::unique_ptr<Worker> wt =
-                std::make_unique<Worker>(cow_device_, backing_store_device_, control_device_,
-                                         misc_name_, base_path_merge_, GetSharedPtr());
+        auto wt = std::make_unique<ReadWorker>(cow_device_, backing_store_device_, control_device_,
+                                               misc_name_, base_path_merge_, GetSharedPtr());
         if (!wt->Init()) {
             SNAP_LOG(ERROR) << "Thread initialization failed";
             return false;
@@ -315,7 +315,7 @@
     // Launch worker threads
     for (int i = 0; i < worker_threads_.size(); i++) {
         threads.emplace_back(
-                std::async(std::launch::async, &Worker::RunThread, worker_threads_[i].get()));
+                std::async(std::launch::async, &ReadWorker::Run, worker_threads_[i].get()));
     }
 
     std::future<bool> merge_thread =
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
index fe10edf..0c30eac 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
@@ -76,6 +76,7 @@
 };
 
 class MergeWorker;
+class ReadWorker;
 class SnapshotHandler;
 
 enum class MERGE_GROUP_STATE {
@@ -104,8 +105,9 @@
     Worker(const std::string& cow_device, const std::string& backing_device,
            const std::string& control_device, const std::string& misc_name,
            const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd);
-    bool RunThread();
-    bool Init();
+    virtual ~Worker() = default;
+
+    virtual bool Init();
 
   protected:
     // Initialization
@@ -118,39 +120,21 @@
         base_path_merge_fd_ = {};
     }
 
-    // Functions interacting with dm-user
-    bool WriteDmUserPayload(size_t size);
-    bool DmuserReadRequest();
-
     // IO Path
-    bool ProcessIORequest();
     bool IsBlockAligned(size_t size) { return ((size & (BLOCK_SZ - 1)) == 0); }
 
     bool ReadDataFromBaseDevice(sector_t sector, size_t read_size);
     bool ReadFromSourceDevice(const CowOperation* cow_op);
 
-    bool ReadAlignedSector(sector_t sector, size_t sz);
-    bool ReadUnalignedSector(sector_t sector, size_t size);
-    int ReadUnalignedSector(sector_t sector, size_t size,
-                            std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it);
-    void RespondIOError();
-
     // Processing COW operations
-    bool ProcessCowOp(const CowOperation* cow_op);
     bool ProcessReplaceOp(const CowOperation* cow_op);
     bool ProcessZeroOp();
 
-    // Handles Copy and Xor
-    bool ProcessCopyOp(const CowOperation* cow_op);
-    bool ProcessXorOp(const CowOperation* cow_op);
-    bool ProcessOrderedOp(const CowOperation* cow_op);
-
     sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
     chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; }
 
     std::unique_ptr<CowReader> reader_;
     BufferSink bufsink_;
-    XorSink xorsink_;
 
     std::string cow_device_;
     std::string backing_store_device_;
@@ -162,7 +146,6 @@
     unique_fd backing_store_fd_;
     unique_fd base_path_merge_fd_;
     unique_fd ctrl_fd_;
-    bool header_response_ = false;
 
     std::unique_ptr<ICowOpIter> cowop_iter_;
 
@@ -286,7 +269,7 @@
     void* mapped_addr_;
     size_t total_mapped_addr_length_;
 
-    std::vector<std::unique_ptr<Worker>> worker_threads_;
+    std::vector<std::unique_ptr<ReadWorker>> worker_threads_;
     // Read-ahead related
     bool populate_data_from_cow_ = false;
     bool ra_thread_ = false;