Add multi thread support for A/B generator

Use threadpool to calculate file deltas in parallel. This decreases the
time consumption from 40 mins to 13 mins for the incremental payload generation
of two taimen system images.

Bug: 62470452
Test: Unit tests pass && generate an identical payload for incremental
taimen as before

Change-Id: Ieec135fe3f5e93b79f4f4b133442f6808d9040f2
(cherry picked from commit c4ad1ebc33abc088aca2909ba5cbaf7ae5e5659f)
diff --git a/payload_generator/delta_diff_utils.cc b/payload_generator/delta_diff_utils.cc
index 44aff7a..045d52f 100644
--- a/payload_generator/delta_diff_utils.cc
+++ b/payload_generator/delta_diff_utils.cc
@@ -22,7 +22,7 @@
 #pragma clang diagnostic ignored "-Wmacro-redefined"
 #include <ext2fs/ext2fs.h>
 #pragma clang diagnostic pop
-
+#include <unistd.h>
 
 #include <algorithm>
 #include <map>
@@ -30,6 +30,7 @@
 #include <base/files/file_util.h>
 #include <base/format_macros.h>
 #include <base/strings/stringprintf.h>
+#include <base/threading/simple_thread.h>
 
 #include "update_engine/common/hash_calculator.h"
 #include "update_engine/common/subprocess.h"
@@ -171,6 +172,81 @@
 
 namespace diff_utils {
 
+// This class encapsulates a file delta processing thread work. The
+// processor computes the delta between the source and target files;
+// and write the compressed delta to the blob.
+class FileDeltaProcessor : public base::DelegateSimpleThread::Delegate {
+ public:
+  FileDeltaProcessor(const string& old_part,
+                     const string& new_part,
+                     const PayloadVersion& version,
+                     const vector<Extent>& old_extents,
+                     const vector<Extent>& new_extents,
+                     const string& name,
+                     ssize_t chunk_blocks,
+                     BlobFileWriter* blob_file)
+      : old_part_(old_part),
+        new_part_(new_part),
+        version_(version),
+        old_extents_(old_extents),
+        new_extents_(new_extents),
+        name_(name),
+        chunk_blocks_(chunk_blocks),
+        blob_file_(blob_file) {}
+
+  FileDeltaProcessor(FileDeltaProcessor&& processor) = default;
+
+  ~FileDeltaProcessor() override = default;
+
+  // Overrides DelegateSimpleThread::Delegate.
+  // Calculate the list of operations and write their corresponding deltas to
+  // the blob_file.
+  void Run() override;
+
+  // Merge each file processor's ops list to aops.
+  void MergeOperation(vector<AnnotatedOperation>* aops);
+
+ private:
+  const string& old_part_;
+  const string& new_part_;
+  const PayloadVersion& version_;
+
+  // The block ranges of the old/new file within the src/tgt image
+  const vector<Extent> old_extents_;
+  const vector<Extent> new_extents_;
+  const string name_;
+  // Block limit of one aop.
+  ssize_t chunk_blocks_;
+  BlobFileWriter* blob_file_;
+
+  // The list of ops to reach the new file from the old file.
+  vector<AnnotatedOperation> file_aops_;
+
+  DISALLOW_COPY_AND_ASSIGN(FileDeltaProcessor);
+};
+
+void FileDeltaProcessor::Run() {
+  TEST_AND_RETURN(blob_file_ != nullptr);
+
+  if (!DeltaReadFile(&file_aops_,
+                     old_part_,
+                     new_part_,
+                     old_extents_,
+                     new_extents_,
+                     name_,
+                     chunk_blocks_,
+                     version_,
+                     blob_file_)) {
+    LOG(ERROR) << "Failed to generate delta for " << name_ << " ("
+               << BlocksInExtents(new_extents_) << " blocks)";
+  }
+}
+
+void FileDeltaProcessor::MergeOperation(vector<AnnotatedOperation>* aops) {
+  aops->reserve(aops->size() + file_aops_.size());
+  std::move(file_aops_.begin(), file_aops_.end(), std::back_inserter(*aops));
+}
+
 bool DeltaReadPartition(vector<AnnotatedOperation>* aops,
                         const PartitionConfig& old_part,
                         const PartitionConfig& new_part,
@@ -205,6 +281,8 @@
   vector<FilesystemInterface::File> new_files;
   new_part.fs_interface->GetFiles(&new_files);
 
+  vector<FileDeltaProcessor> file_delta_processors;
+
   // The processing is very straightforward here, we generate operations for
   // every file (and pseudo-file such as the metadata) in the new filesystem
   // based on the file with the same name in the old filesystem, if any.
@@ -239,16 +317,29 @@
         old_files_map[new_file.name], old_visited_blocks);
     old_visited_blocks.AddExtents(old_file_extents);
 
-    TEST_AND_RETURN_FALSE(DeltaReadFile(aops,
-                                        old_part.path,
-                                        new_part.path,
-                                        old_file_extents,
-                                        new_file_extents,
-                                        new_file.name,  // operation name
-                                        hard_chunk_blocks,
-                                        version,
-                                        blob_file));
+    file_delta_processors.emplace_back(old_part.path,
+                                       new_part.path,
+                                       version,
+                                       std::move(old_file_extents),
+                                       std::move(new_file_extents),
+                                       new_file.name,  // operation name
+                                       hard_chunk_blocks,
+                                       blob_file);
   }
+
+  size_t max_threads = GetMaxThreads();
+  base::DelegateSimpleThreadPool thread_pool("incremental-update-generator",
+                                             max_threads);
+  thread_pool.Start();
+  for (auto& processor : file_delta_processors) {
+    thread_pool.AddWork(&processor);
+  }
+  thread_pool.JoinAll();
+
+  for (auto& processor : file_delta_processors) {
+    processor.MergeOperation(aops);
+  }
+
   // Process all the blocks not included in any file. We provided all the unused
   // blocks in the old partition as available data.
   vector<Extent> new_unvisited = {
@@ -807,6 +898,11 @@
   return true;
 }
 
+// Return the number of CPUs on the machine, and 4 threads in minimum.
+size_t GetMaxThreads() {
+  return std::max(sysconf(_SC_NPROCESSORS_ONLN), 4L);
+}
+
 }  // namespace diff_utils
 
 }  // namespace chromeos_update_engine
diff --git a/payload_generator/delta_diff_utils.h b/payload_generator/delta_diff_utils.h
index 7254bca..c9fef17 100644
--- a/payload_generator/delta_diff_utils.h
+++ b/payload_generator/delta_diff_utils.h
@@ -143,6 +143,9 @@
 // false.
 bool IsExtFilesystem(const std::string& device);
 
+// Returns the max number of threads to process the files(chunks) in parallel.
+size_t GetMaxThreads();
+
 }  // namespace diff_utils
 
 }  // namespace chromeos_update_engine
diff --git a/payload_generator/full_update_generator.cc b/payload_generator/full_update_generator.cc
index 8fdb6ec..482a789 100644
--- a/payload_generator/full_update_generator.cc
+++ b/payload_generator/full_update_generator.cc
@@ -139,7 +139,7 @@
   TEST_AND_RETURN_FALSE(full_chunk_size % config.block_size == 0);
 
   size_t chunk_blocks = full_chunk_size / config.block_size;
-  size_t max_threads = std::max(sysconf(_SC_NPROCESSORS_ONLN), 4L);
+  size_t max_threads = diff_utils::GetMaxThreads();
   LOG(INFO) << "Compressing partition " << new_part.name
             << " from " << new_part.path << " splitting in chunks of "
             << chunk_blocks << " blocks (" << config.block_size