release-request-9336bb6f-90e7-478e-9d85-062ca3fdf6a3-for-git_oc-dr1-release-4152361 snap-temp-L91700000079439679

Change-Id: I733b9c53abb38ffd9f76adf3ca069a27870457a1
diff --git a/payload_generator/ab_generator.cc b/payload_generator/ab_generator.cc
index efb8ccf..3b0d012 100644
--- a/payload_generator/ab_generator.cc
+++ b/payload_generator/ab_generator.cc
@@ -88,14 +88,19 @@
                                      BlobFileWriter* blob_file) {
   vector<AnnotatedOperation> fragmented_aops;
   for (const AnnotatedOperation& aop : *aops) {
-    if (aop.op.type() == InstallOperation::SOURCE_COPY) {
-      TEST_AND_RETURN_FALSE(SplitSourceCopy(aop, &fragmented_aops));
-    } else if (IsAReplaceOperation(aop.op.type())) {
-      TEST_AND_RETURN_FALSE(SplitAReplaceOp(
-          version, aop, target_part_path, &fragmented_aops, blob_file));
-    } else {
-      fragmented_aops.push_back(aop);
+    // Only do split if the operation has more than one dst extents.
+    if (aop.op.dst_extents_size() > 1) {
+      if (aop.op.type() == InstallOperation::SOURCE_COPY) {
+        TEST_AND_RETURN_FALSE(SplitSourceCopy(aop, &fragmented_aops));
+        continue;
+      }
+      if (IsAReplaceOperation(aop.op.type())) {
+        TEST_AND_RETURN_FALSE(SplitAReplaceOp(
+            version, aop, target_part_path, &fragmented_aops, blob_file));
+        continue;
+      }
     }
+    fragmented_aops.push_back(aop);
   }
   *aops = std::move(fragmented_aops);
   return true;
@@ -139,8 +144,6 @@
     // Fix up our new operation and add it to the results.
     new_op.set_type(InstallOperation::SOURCE_COPY);
     *(new_op.add_dst_extents()) = dst_ext;
-    new_op.set_src_length(dst_ext.num_blocks() * kBlockSize);
-    new_op.set_dst_length(dst_ext.num_blocks() * kBlockSize);
 
     AnnotatedOperation new_aop;
     new_aop.op = new_op;
diff --git a/payload_generator/ab_generator_unittest.cc b/payload_generator/ab_generator_unittest.cc
index 3fd2323..ab4b164 100644
--- a/payload_generator/ab_generator_unittest.cc
+++ b/payload_generator/ab_generator_unittest.cc
@@ -354,11 +354,11 @@
   EXPECT_EQ("SplitSourceCopyTestOp:0", result_ops[0].name);
   InstallOperation first_op = result_ops[0].op;
   EXPECT_EQ(InstallOperation::SOURCE_COPY, first_op.type());
-  EXPECT_EQ(kBlockSize * 2, first_op.src_length());
+  EXPECT_FALSE(first_op.has_src_length());
   EXPECT_EQ(1, first_op.src_extents().size());
   EXPECT_EQ(2U, first_op.src_extents(0).start_block());
   EXPECT_EQ(2U, first_op.src_extents(0).num_blocks());
-  EXPECT_EQ(kBlockSize * 2, first_op.dst_length());
+  EXPECT_FALSE(first_op.has_dst_length());
   EXPECT_EQ(1, first_op.dst_extents().size());
   EXPECT_EQ(10U, first_op.dst_extents(0).start_block());
   EXPECT_EQ(2U, first_op.dst_extents(0).num_blocks());
@@ -366,7 +366,7 @@
   EXPECT_EQ("SplitSourceCopyTestOp:1", result_ops[1].name);
   InstallOperation second_op = result_ops[1].op;
   EXPECT_EQ(InstallOperation::SOURCE_COPY, second_op.type());
-  EXPECT_EQ(kBlockSize * 3, second_op.src_length());
+  EXPECT_FALSE(second_op.has_src_length());
   EXPECT_EQ(3, second_op.src_extents().size());
   EXPECT_EQ(4U, second_op.src_extents(0).start_block());
   EXPECT_EQ(1U, second_op.src_extents(0).num_blocks());
@@ -374,7 +374,7 @@
   EXPECT_EQ(1U, second_op.src_extents(1).num_blocks());
   EXPECT_EQ(8U, second_op.src_extents(2).start_block());
   EXPECT_EQ(1U, second_op.src_extents(2).num_blocks());
-  EXPECT_EQ(kBlockSize * 3, second_op.dst_length());
+  EXPECT_FALSE(second_op.has_dst_length());
   EXPECT_EQ(1, second_op.dst_extents().size());
   EXPECT_EQ(14U, second_op.dst_extents(0).start_block());
   EXPECT_EQ(3U, second_op.dst_extents(0).num_blocks());
@@ -382,11 +382,11 @@
   EXPECT_EQ("SplitSourceCopyTestOp:2", result_ops[2].name);
   InstallOperation third_op = result_ops[2].op;
   EXPECT_EQ(InstallOperation::SOURCE_COPY, third_op.type());
-  EXPECT_EQ(kBlockSize * 3, third_op.src_length());
+  EXPECT_FALSE(third_op.has_src_length());
   EXPECT_EQ(1, third_op.src_extents().size());
   EXPECT_EQ(9U, third_op.src_extents(0).start_block());
   EXPECT_EQ(3U, third_op.src_extents(0).num_blocks());
-  EXPECT_EQ(kBlockSize * 3, third_op.dst_length());
+  EXPECT_FALSE(third_op.has_dst_length());
   EXPECT_EQ(1, third_op.dst_extents().size());
   EXPECT_EQ(18U, third_op.dst_extents(0).start_block());
   EXPECT_EQ(3U, third_op.dst_extents(0).num_blocks());
@@ -445,8 +445,6 @@
   vector<AnnotatedOperation> aops;
   InstallOperation first_op;
   first_op.set_type(InstallOperation::SOURCE_COPY);
-  first_op.set_src_length(kBlockSize);
-  first_op.set_dst_length(kBlockSize);
   *(first_op.add_src_extents()) = ExtentForRange(1, 1);
   *(first_op.add_dst_extents()) = ExtentForRange(6, 1);
   AnnotatedOperation first_aop;
@@ -456,8 +454,6 @@
 
   InstallOperation second_op;
   second_op.set_type(InstallOperation::SOURCE_COPY);
-  second_op.set_src_length(3 * kBlockSize);
-  second_op.set_dst_length(3 * kBlockSize);
   *(second_op.add_src_extents()) = ExtentForRange(2, 2);
   *(second_op.add_src_extents()) = ExtentForRange(8, 2);
   *(second_op.add_dst_extents()) = ExtentForRange(7, 3);
@@ -469,8 +465,6 @@
 
   InstallOperation third_op;
   third_op.set_type(InstallOperation::SOURCE_COPY);
-  third_op.set_src_length(kBlockSize);
-  third_op.set_dst_length(kBlockSize);
   *(third_op.add_src_extents()) = ExtentForRange(11, 1);
   *(third_op.add_dst_extents()) = ExtentForRange(12, 1);
   AnnotatedOperation third_aop;
@@ -486,12 +480,12 @@
   EXPECT_EQ(1U, aops.size());
   InstallOperation first_result_op = aops[0].op;
   EXPECT_EQ(InstallOperation::SOURCE_COPY, first_result_op.type());
-  EXPECT_EQ(kBlockSize * 5, first_result_op.src_length());
+  EXPECT_FALSE(first_result_op.has_src_length());
   EXPECT_EQ(3, first_result_op.src_extents().size());
   EXPECT_TRUE(ExtentEquals(first_result_op.src_extents(0), 1, 3));
   EXPECT_TRUE(ExtentEquals(first_result_op.src_extents(1), 8, 2));
   EXPECT_TRUE(ExtentEquals(first_result_op.src_extents(2), 11, 1));
-  EXPECT_EQ(kBlockSize * 5, first_result_op.dst_length());
+  EXPECT_FALSE(first_result_op.has_dst_length());
   EXPECT_EQ(2, first_result_op.dst_extents().size());
   EXPECT_TRUE(ExtentEquals(first_result_op.dst_extents(0), 6, 4));
   EXPECT_TRUE(ExtentEquals(first_result_op.dst_extents(1), 11, 2));
diff --git a/payload_generator/delta_diff_utils.cc b/payload_generator/delta_diff_utils.cc
index 44aff7a..045d52f 100644
--- a/payload_generator/delta_diff_utils.cc
+++ b/payload_generator/delta_diff_utils.cc
@@ -22,7 +22,7 @@
 #pragma clang diagnostic ignored "-Wmacro-redefined"
 #include <ext2fs/ext2fs.h>
 #pragma clang diagnostic pop
-
+#include <unistd.h>
 
 #include <algorithm>
 #include <map>
@@ -30,6 +30,7 @@
 #include <base/files/file_util.h>
 #include <base/format_macros.h>
 #include <base/strings/stringprintf.h>
+#include <base/threading/simple_thread.h>
 
 #include "update_engine/common/hash_calculator.h"
 #include "update_engine/common/subprocess.h"
@@ -171,6 +172,81 @@
 
 namespace diff_utils {
 
+// This class encapsulates a file delta processing thread work. The
+// processor computes the delta between the source and target files;
+// and write the compressed delta to the blob.
+class FileDeltaProcessor : public base::DelegateSimpleThread::Delegate {
+ public:
+  FileDeltaProcessor(const string& old_part,
+                     const string& new_part,
+                     const PayloadVersion& version,
+                     const vector<Extent>& old_extents,
+                     const vector<Extent>& new_extents,
+                     const string& name,
+                     ssize_t chunk_blocks,
+                     BlobFileWriter* blob_file)
+      : old_part_(old_part),
+        new_part_(new_part),
+        version_(version),
+        old_extents_(old_extents),
+        new_extents_(new_extents),
+        name_(name),
+        chunk_blocks_(chunk_blocks),
+        blob_file_(blob_file) {}
+
+  FileDeltaProcessor(FileDeltaProcessor&& processor) = default;
+
+  ~FileDeltaProcessor() override = default;
+
+  // Overrides DelegateSimpleThread::Delegate.
+  // Calculate the list of operations and write their corresponding deltas to
+  // the blob_file.
+  void Run() override;
+
+  // Merge each file processor's ops list to aops.
+  void MergeOperation(vector<AnnotatedOperation>* aops);
+
+ private:
+  const string& old_part_;
+  const string& new_part_;
+  const PayloadVersion& version_;
+
+  // The block ranges of the old/new file within the src/tgt image
+  const vector<Extent> old_extents_;
+  const vector<Extent> new_extents_;
+  const string name_;
+  // Block limit of one aop.
+  ssize_t chunk_blocks_;
+  BlobFileWriter* blob_file_;
+
+  // The list of ops to reach the new file from the old file.
+  vector<AnnotatedOperation> file_aops_;
+
+  DISALLOW_COPY_AND_ASSIGN(FileDeltaProcessor);
+};
+
+void FileDeltaProcessor::Run() {
+  TEST_AND_RETURN(blob_file_ != nullptr);
+
+  if (!DeltaReadFile(&file_aops_,
+                     old_part_,
+                     new_part_,
+                     old_extents_,
+                     new_extents_,
+                     name_,
+                     chunk_blocks_,
+                     version_,
+                     blob_file_)) {
+    LOG(ERROR) << "Failed to generate delta for " << name_ << " ("
+               << BlocksInExtents(new_extents_) << " blocks)";
+  }
+}
+
+void FileDeltaProcessor::MergeOperation(vector<AnnotatedOperation>* aops) {
+  aops->reserve(aops->size() + file_aops_.size());
+  std::move(file_aops_.begin(), file_aops_.end(), std::back_inserter(*aops));
+}
+
 bool DeltaReadPartition(vector<AnnotatedOperation>* aops,
                         const PartitionConfig& old_part,
                         const PartitionConfig& new_part,
@@ -205,6 +281,8 @@
   vector<FilesystemInterface::File> new_files;
   new_part.fs_interface->GetFiles(&new_files);
 
+  vector<FileDeltaProcessor> file_delta_processors;
+
   // The processing is very straightforward here, we generate operations for
   // every file (and pseudo-file such as the metadata) in the new filesystem
   // based on the file with the same name in the old filesystem, if any.
@@ -239,16 +317,29 @@
         old_files_map[new_file.name], old_visited_blocks);
     old_visited_blocks.AddExtents(old_file_extents);
 
-    TEST_AND_RETURN_FALSE(DeltaReadFile(aops,
-                                        old_part.path,
-                                        new_part.path,
-                                        old_file_extents,
-                                        new_file_extents,
-                                        new_file.name,  // operation name
-                                        hard_chunk_blocks,
-                                        version,
-                                        blob_file));
+    file_delta_processors.emplace_back(old_part.path,
+                                       new_part.path,
+                                       version,
+                                       std::move(old_file_extents),
+                                       std::move(new_file_extents),
+                                       new_file.name,  // operation name
+                                       hard_chunk_blocks,
+                                       blob_file);
   }
+
+  size_t max_threads = GetMaxThreads();
+  base::DelegateSimpleThreadPool thread_pool("incremental-update-generator",
+                                             max_threads);
+  thread_pool.Start();
+  for (auto& processor : file_delta_processors) {
+    thread_pool.AddWork(&processor);
+  }
+  thread_pool.JoinAll();
+
+  for (auto& processor : file_delta_processors) {
+    processor.MergeOperation(aops);
+  }
+
   // Process all the blocks not included in any file. We provided all the unused
   // blocks in the old partition as available data.
   vector<Extent> new_unvisited = {
@@ -807,6 +898,11 @@
   return true;
 }
 
+// Return the number of CPUs on the machine, and 4 threads in minimum.
+size_t GetMaxThreads() {
+  return std::max(sysconf(_SC_NPROCESSORS_ONLN), 4L);
+}
+
 }  // namespace diff_utils
 
 }  // namespace chromeos_update_engine
diff --git a/payload_generator/delta_diff_utils.h b/payload_generator/delta_diff_utils.h
index 7254bca..c9fef17 100644
--- a/payload_generator/delta_diff_utils.h
+++ b/payload_generator/delta_diff_utils.h
@@ -143,6 +143,9 @@
 // false.
 bool IsExtFilesystem(const std::string& device);
 
+// Returns the max number of threads to process the files(chunks) in parallel.
+size_t GetMaxThreads();
+
 }  // namespace diff_utils
 
 }  // namespace chromeos_update_engine
diff --git a/payload_generator/full_update_generator.cc b/payload_generator/full_update_generator.cc
index 8fdb6ec..482a789 100644
--- a/payload_generator/full_update_generator.cc
+++ b/payload_generator/full_update_generator.cc
@@ -139,7 +139,7 @@
   TEST_AND_RETURN_FALSE(full_chunk_size % config.block_size == 0);
 
   size_t chunk_blocks = full_chunk_size / config.block_size;
-  size_t max_threads = std::max(sysconf(_SC_NPROCESSORS_ONLN), 4L);
+  size_t max_threads = diff_utils::GetMaxThreads();
   LOG(INFO) << "Compressing partition " << new_part.name
             << " from " << new_part.path << " splitting in chunks of "
             << chunk_blocks << " blocks (" << config.block_size