Parallelize .ninja_deps loading

Split the build log into independently-parseable chunks using a complex
heuristic involving 0x8000xxxx values, then parse the chunks in several
parallel passes.

The file will still be loaded correctly if the heuristic fails -- only
performance would be affected. The heuristic assumes the v3 deps log
format.

Combined with the previous parallelization changes, ninja is much faster
on the AOSP tree. e.g. Assuming warm disk caches:
 * Building "nothing": 11.3s -> 0.5s
 * Building "droid:    12.9s -> 1.0s
(runtimes are from "ninja -f out/combined-hikey960.ninja target")

Bug: none
Test: ninja_test, build Android
Change-Id: I6b2f3ef1e0cade0046efa93f0408236c8c187cea
diff --git a/src/deps_log.cc b/src/deps_log.cc
index 080faac..4371c34 100644
--- a/src/deps_log.cc
+++ b/src/deps_log.cc
@@ -22,15 +22,21 @@
 #include <unistd.h>
 #endif
 
+#include <numeric>
+
 #include "disk_interface.h"
 #include "graph.h"
 #include "metrics.h"
+#include "parallel_map.h"
 #include "state.h"
 #include "util.h"
 
 // The version is stored as 4 bytes after the signature and also serves as a
 // byte order mark. Signature and version combined are 16 bytes long.
-const char kFileSignature[] = "# ninjadeps\n";
+static constexpr StringPiece kFileSignature { "# ninjadeps\n", 12 };
+static_assert(kFileSignature.size() % 4 == 0,
+              "file signature size is not a multiple of 4");
+static constexpr size_t kFileHeaderSize = kFileSignature.size() + 4;
 const int kCurrentVersion = 3;
 
 // Record size is currently limited to less than the full 32 bit, due to
@@ -62,7 +68,7 @@
   fseek(file_, 0, SEEK_END);
 
   if (ftell(file_) == 0) {
-    if (fwrite(kFileSignature, sizeof(kFileSignature) - 1, 1, file_) < 1) {
+    if (fwrite(kFileSignature.data(), kFileSignature.size(), 1, file_) < 1) {
       *err = strerror(errno);
       return false;
     }
@@ -162,121 +168,343 @@
   file_ = NULL;
 }
 
-bool DepsLog::Load(const string& path, State* state, string* err) {
-  METRIC_RECORD(".ninja_deps load");
-  char buf[kMaxRecordSize + 1];
-  FILE* f = fopen(path.c_str(), "rb");
-  if (!f) {
-    if (errno == ENOENT)
-      return true;
-    *err = strerror(errno);
+// Return the number of words in the record, including the header, or 0 if
+// the header is invalid.
+static inline size_t RecordSizeInWords(size_t header) {
+  header &= 0x7FFFFFFF;
+  if (header % sizeof(uint32_t) != 0) return 0;
+  // Either (node ID and mtime) or (data and checksum)
+  if (header < sizeof(uint32_t) * 2) return 0;
+  if (header > kMaxRecordSize) return 0;
+  return header / sizeof(uint32_t) + 1;
+}
+
+static inline bool IsDepsRecordHeader(size_t header) {
+  return (header & 0x80000000) == 0x80000000;
+}
+
+static inline bool IsValidDepsRecordHeader(size_t header) {
+  return IsDepsRecordHeader(header) && RecordSizeInWords(header);
+}
+
+/// Split the v3 deps log into independently-parseable chunks using a heuristic.
+/// If the heuristic fails, we'll still load the file correctly, but it could be
+/// slower.
+///
+/// There are two kinds of records -- path and deps records. Their formats:
+///
+///    path:
+///     - uint32 size -- high bit is clear
+///     - String content. The string is padded to a multiple of 4 bytes with
+///       trailing NULs.
+///     - uint32 checksum (ones complement of the path's index / node ID)
+///
+///    deps:
+///     - uint32 size -- high bit is set
+///     - int32 output_path_id
+///     - uint32 output_path_mtime
+///     - int32 input_path_id[...] -- every remaining word is an input ID
+///
+/// To split the deps log into chunks, look for uint32 words with the value
+/// 0x8000xxxx, where xxxx is nonzero. Such a word is almost guaranteed to be
+/// the size field of a deps record (with fewer than ~16K dependencies):
+///  - It can't be part of a string, because paths can't have embedded NULs.
+///  - It (probably) can't be a node ID, because node IDs are represented using
+///    "int", and it would be unlikely to have more than 2 billion of them. An
+///    Android build typically has about 1 million nodes.
+///  - It's unlikely to be part of a path checksum, because that would also
+///    imply that we have at least 2 billion nodes.
+///  - It could be an mtime from 1901, which we rule out by looking for the
+///    mtime's deps size two words above the split candidate.
+///
+/// This heuristic can fail in a few ways:
+///  - We only find path records in the area we scan.
+///  - The deps records all have >16K of dependencies. (Almost all deps records
+///    I've seen in the Android build have a few hundred. Only a few have ~10K.)
+///  - The area contains only deps entries with an mtime from 1901 and one
+///    dependency.
+///
+/// Maybe we can add a delimiter to the log format and replace this code. I
+/// believe this heuristic can be adapted to work with the v4 format, which
+/// expands the mtime to 64-bits.
+static std::vector<std::pair<size_t, size_t>>
+SplitDepsLog(const uint32_t* table, size_t size, ThreadPool* thread_pool) {
+  if (size == 0) return {};
+
+  std::vector<std::pair<size_t, size_t>> blind_splits = SplitByThreads(size);
+  std::vector<std::pair<size_t, size_t>> chunks;
+  size_t chunk_start = 0;
+
+  auto split_candidates = ParallelMap(thread_pool, blind_splits,
+      [table](std::pair<size_t, size_t> chunk) {
+    // Skip the first two words to allow for the 1901 mtime check later on.
+    for (size_t index = chunk.first + 2; index < chunk.second; ++index) {
+      size_t this_header = table[index];
+      if (!IsValidDepsRecordHeader(this_header)) continue;
+      if ((this_header & 0xFFFF0000) != 0x80000000) continue;
+
+      // We've either found a deps record or a 1901 mtime (unlikely). If it's an
+      // mtime, the word two spaces back will be a valid deps size (0x800xxxxx).
+      if (IsValidDepsRecordHeader(table[index - 2])) continue;
+
+      // Success: In a valid deps log, this index must start a deps record.
+      return index;
+    }
+    return SIZE_MAX;
+  });
+  for (size_t candidate : split_candidates) {
+    if (candidate != SIZE_MAX) {
+      assert(chunk_start < candidate);
+      chunks.push_back({ chunk_start, candidate });
+      chunk_start = candidate;
+    }
+  }
+
+  assert(chunk_start < size);
+  chunks.push_back({ chunk_start, size });
+  return chunks;
+}
+
+struct DepsLogInput {
+  std::unique_ptr<LoadedFile> file;
+  const uint32_t* table = nullptr;
+  size_t table_size = 0;
+};
+
+static bool OpenDepsLogForReading(const std::string& path,
+                                  DepsLogInput* log,
+                                  std::string* err) {
+  *log = {};
+
+  RealDiskInterface file_reader;
+  std::string load_err;
+  switch (file_reader.LoadFile(path, &log->file, &load_err)) {
+  case FileReader::Okay:
+    break;
+  case FileReader::NotFound:
+    return true;
+  default:
+    *err = load_err;
     return false;
   }
 
-  bool valid_header = true;
+  bool valid_header = false;
   int version = 0;
-  if (!fgets(buf, sizeof(buf), f) || fread(&version, 4, 1, f) < 1)
-    valid_header = false;
+  if (log->file->content().size() >= kFileHeaderSize ||
+      log->file->content().substr(0, kFileSignature.size()) == kFileSignature) {
+    valid_header = true;
+    memcpy(&version,
+           log->file->content().data() + kFileSignature.size(),
+           sizeof(version));
+  }
+
   // Note: For version differences, this should migrate to the new format.
   // But the v1 format could sometimes (rarely) end up with invalid data, so
   // don't migrate v1 to v3 to force a rebuild. (v2 only existed for a few days,
   // and there was no release with it, so pretend that it never happened.)
-  if (!valid_header || strcmp(buf, kFileSignature) != 0 ||
-      version != kCurrentVersion) {
+  if (!valid_header || version != kCurrentVersion) {
     if (version == 1)
       *err = "deps log version change; rebuilding";
     else
       *err = "bad deps log signature or version; starting over";
-    fclose(f);
+    log->file.reset();
     unlink(path.c_str());
     // Don't report this as a failure.  An empty deps log will cause
     // us to rebuild the outputs anyway.
     return true;
   }
 
-  long offset;
-  bool read_failed = false;
-  int unique_dep_record_count = 0;
-  int total_dep_record_count = 0;
-  for (;;) {
-    offset = ftell(f);
+  log->table =
+      reinterpret_cast<const uint32_t*>(
+          log->file->content().data() + kFileHeaderSize);
+  log->table_size =
+      (log->file->content().size() - kFileHeaderSize) / sizeof(uint32_t);
 
-    unsigned size;
-    if (fread(&size, 4, 1, f) < 1) {
-      if (!feof(f))
-        read_failed = true;
-      break;
-    }
-    bool is_deps = (size >> 31) != 0;
-    size = size & 0x7FFFFFFF;
+  return true;
+}
 
-    if (fread(buf, size, 1, f) < 1 || size > kMaxRecordSize) {
-      read_failed = true;
-      break;
-    }
+bool DepsLog::Load(const string& path, State* state, string* err) {
+  METRIC_RECORD(".ninja_deps load");
 
-    if (is_deps) {
-      assert(size % 4 == 0);
-      int* deps_data = reinterpret_cast<int*>(buf);
-      int out_id = deps_data[0];
-      int mtime = deps_data[1];
-      deps_data += 2;
-      int deps_count = (size / 4) - 2;
+  assert(nodes_.empty());
+  DepsLogInput log;
 
-      Deps* deps = new Deps(mtime, deps_count);
-      for (int i = 0; i < deps_count; ++i) {
-        assert(deps_data[i] < (int)nodes_.size());
-        assert(nodes_[deps_data[i]]);
-        deps->nodes[i] = nodes_[deps_data[i]];
+  if (!OpenDepsLogForReading(path, &log, err)) return false;
+  if (log.file.get() == nullptr) return true;
+
+  struct NINJA_ALIGNAS_CACHE_LINE Chunk {
+    size_t start = 0;
+    size_t stop = 0;
+    int first_node_id = 0;
+    int initial_node_count = 0;
+    int final_node_count = 0;
+    size_t deps_count = 0;
+    bool parse_error = false;
+  };
+
+  std::unique_ptr<ThreadPool> thread_pool = CreateThreadPool();
+
+  std::vector<Chunk> chunks;
+  for (std::pair<size_t, size_t> span :
+      SplitDepsLog(log.table, log.table_size, thread_pool.get())) {
+    Chunk chunk {};
+    chunk.start = span.first;
+    chunk.stop = span.second;
+    chunks.push_back(chunk);
+  }
+
+  // Compute the starting node ID for each chunk. The result is correct as long as
+  // preceding chunks are parsed successfully. If there is a parsing error in a
+  // chunk, then following chunks are discarded after the validation pass.
+  ParallelMap(thread_pool.get(), chunks, [&log](Chunk& chunk) {
+    size_t index = chunk.start;
+    while (index < chunk.stop) {
+      size_t header = log.table[index];
+      size_t size = RecordSizeInWords(header);
+      if (!size) return; // invalid header
+      if (!IsDepsRecordHeader(header)) {
+        ++chunk.initial_node_count;
       }
+      index += size;
+    }
+  });
+  int initial_node_count = 0;
+  for (size_t i = 0; i < chunks.size(); ++i) {
+    Chunk& chunk = chunks[i];
+    chunk.first_node_id = initial_node_count;
+    initial_node_count += chunk.initial_node_count;
+  }
 
-      total_dep_record_count++;
-      if (!UpdateDeps(out_id, deps))
-        ++unique_dep_record_count;
-    } else {
-      int path_size = size - 4;
-      assert(path_size > 0);  // CanonicalizePath() rejects empty paths.
-      // There can be up to 3 bytes of padding.
-      if (buf[path_size - 1] == '\0') --path_size;
-      if (buf[path_size - 1] == '\0') --path_size;
-      if (buf[path_size - 1] == '\0') --path_size;
-      StringPiece subpath(buf, path_size);
-      // It is not necessary to pass in a correct slash_bits here. It will
-      // either be a Node that's in the manifest (in which case it will already
-      // have a correct slash_bits that GetNode will look up), or it is an
-      // implicit dependency from a .d which does not affect the build command
-      // (and so need not have its slashes maintained).
-      Node* node = state->GetNode(subpath, 0);
+  // A map from node ID to the final file table index of the dep record
+  // outputting the given node ID. The index is biased by 1 because 0 indicates
+  // that no dep record outputs this ID.
+  std::vector<std::atomic<size_t>> dep_index(initial_node_count);
+  // A map from node ID to file index of that node, with no bias.
+  std::vector<size_t> node_index(initial_node_count);
 
-      // Check that the expected index matches the actual index. This can only
-      // happen if two ninja processes write to the same deps log concurrently.
-      // (This uses unary complement to make the checksum look less like a
-      // dependency record entry.)
-      unsigned checksum = *reinterpret_cast<unsigned*>(buf + size - 4);
-      int expected_id = ~checksum;
-      int id = nodes_.size();
-      if (id != expected_id) {
-        read_failed = true;
-        break;
+  // The main parsing pass. Validate each chunk's entries and, for each node ID,
+  // record the location of its node and deps records. If there is parser error,
+  // truncate the log just before the problem record.
+  ParallelMap(thread_pool.get(), chunks,
+      [&log, &dep_index, &node_index](Chunk& chunk) {
+    size_t index = chunk.start;
+    int next_node_id = chunk.first_node_id;
+    while (index < chunk.stop) {
+      size_t header = log.table[index];
+      size_t size = RecordSizeInWords(header);
+      if (!size || (index + size > chunk.stop)) break;
+      if (IsDepsRecordHeader(header)) {
+        // Verify that input/output node IDs are valid.
+        int output_id = log.table[index + 1];
+        if (output_id < 0 || output_id >= next_node_id) break;
+        for (size_t i = 3; i < size; ++i) {
+          int input_id = log.table[index + i];
+          if (input_id < 0 || input_id >= next_node_id) break;
+        }
+        AtomicUpdateMaximum(&dep_index[output_id], index + 1);
+        ++chunk.deps_count;
+      } else {
+        // Validate the path's checksum.
+        int checksum = log.table[index + size - 1];
+        if (checksum != ~next_node_id) break;
+        node_index[next_node_id] = index;
+        ++next_node_id;
+        ++chunk.final_node_count;
       }
-
-      assert(node->id() < 0);
-      node->set_id(id);
-      nodes_.push_back(node);
+      index += size;
+    }
+    // We'll exit early on a parser error.
+    if (index < chunk.stop) {
+      chunk.stop = index;
+      chunk.parse_error = true;
+    }
+  });
+  int node_count = 0;
+  size_t total_dep_record_count = 0;
+  for (size_t i = 0; i < chunks.size(); ++i) {
+    Chunk& chunk = chunks[i];
+    assert(chunk.first_node_id == node_count);
+    total_dep_record_count += chunk.deps_count;
+    node_count += chunk.final_node_count;
+    if (chunk.parse_error) {
+      // Part of this chunk may have been parsed successfully, so keep it, but
+      // discard all later chunks.
+      chunks.resize(i + 1);
+      break;
     }
   }
 
-  if (read_failed) {
-    // An error occurred while loading; try to recover by truncating the
-    // file to the last fully-read record.
-    if (ferror(f)) {
-      *err = strerror(ferror(f));
-    } else {
-      *err = "premature end of file";
-    }
-    fclose(f);
+  // The final node count could be smaller than the initial count if there was a
+  // parser error.
+  assert(node_count <= initial_node_count);
 
-    if (!Truncate(path, offset, err))
+  // The log is valid. Commit the nodes into the state graph. First make sure
+  // that the hash table has at least one bucket for each node in this deps log.
+  state->paths_.reserve(node_count);
+  nodes_.resize(node_count);
+  ParallelMap(thread_pool.get(), IntegralRange<int>(0, node_count),
+      [this, state, &log, &node_index](int node_id) {
+    size_t index = node_index[node_id];
+    size_t header = log.table[index];
+    size_t size = RecordSizeInWords(header);
+    const char* path = reinterpret_cast<const char*>(&log.table[index + 1]);
+    size_t path_size = (size - 2) * sizeof(uint32_t);
+    if (path[path_size - 1] == '\0') --path_size;
+    if (path[path_size - 1] == '\0') --path_size;
+    if (path[path_size - 1] == '\0') --path_size;
+    // It is not necessary to pass in a correct slash_bits here. It will
+    // either be a Node that's in the manifest (in which case it will
+    // already have a correct slash_bits that GetNode will look up), or it
+    // is an implicit dependency from a .d which does not affect the build
+    // command (and so need not have its slashes maintained).
+    Node* node = state->GetNode(StringPiece(path, path_size), 0);
+    assert(node->id() < 0);
+    node->set_id(node_id);
+    nodes_[node_id] = node;
+  });
+
+  // Add the deps records.
+  deps_.resize(node_count);
+  std::vector<size_t> unique_counts = ParallelMap(thread_pool.get(),
+      SplitByThreads(node_count),
+      [this, &log, &dep_index](std::pair<int, int> node_chunk) {
+    size_t unique_count = 0;
+    for (int node_id = node_chunk.first; node_id < node_chunk.second; ++node_id) {
+      size_t index = dep_index[node_id];
+      if (index == 0) continue;
+      --index;
+      ++unique_count;
+      size_t header = log.table[index];
+      size_t size = RecordSizeInWords(header);
+      assert(size != 0 && IsDepsRecordHeader(header));
+      int output_id = log.table[index + 1];
+      int mtime = log.table[index + 2];
+      int deps_count = size - 3;
+      Deps* deps = new Deps(mtime, deps_count);
+      for (int i = 0; i < deps_count; ++i) {
+        int input_id = log.table[index + 3 + i];
+        Node* node = nodes_[input_id];
+        assert(node != nullptr);
+        deps->nodes[i] = node;
+      }
+      deps_[output_id] = deps;
+    }
+    return unique_count;
+  });
+  size_t unique_dep_record_count = std::accumulate(unique_counts.begin(),
+                                                   unique_counts.end(), 0);
+
+  const size_t actual_file_size = log.file->content().size();
+  const size_t parsed_file_size = kFileHeaderSize +
+      (chunks.empty() ? 0 : chunks.back().stop) * sizeof(uint32_t);
+  assert(parsed_file_size <= actual_file_size);
+  if (parsed_file_size < actual_file_size) {
+    // An error occurred while loading; try to recover by truncating the file to
+    // the last fully-read record.
+    *err = "premature end of file";
+    log.file.reset();
+
+    if (!Truncate(path, parsed_file_size, err))
       return false;
 
     // The truncate succeeded; we'll just report the load error as a
@@ -285,11 +513,9 @@
     return true;
   }
 
-  fclose(f);
-
   // Rebuild the log if there are too many dead records.
-  int kMinCompactionEntryCount = 1000;
-  int kCompactionRatio = 3;
+  const unsigned kMinCompactionEntryCount = 1000;
+  const unsigned kCompactionRatio = 3;
   if (total_dep_record_count > kMinCompactionEntryCount &&
       total_dep_record_count > unique_dep_record_count * kCompactionRatio) {
     needs_recompaction_ = true;
diff --git a/src/deps_log_test.cc b/src/deps_log_test.cc
index 866e5ba..60bee0b 100644
--- a/src/deps_log_test.cc
+++ b/src/deps_log_test.cc
@@ -119,13 +119,6 @@
 
   State state2;
   DepsLog log2;
-
-  // The paths_ concurrent hash table doesn't automatically resize itself, and
-  // the deps log reader doesn't currently reserve space in the table, so
-  // reserve some space in advance. The parallel deps log parser avoids this
-  // problem.
-  state2.paths_.reserve(kNumDeps);
-
   EXPECT_TRUE(log2.Load(kTestFilename, &state2, &err));
   ASSERT_EQ("", err);