added group commit; drastically speeds up mult-threaded synchronous write workloads

git-svn-id: http://leveldb.googlecode.com/svn/trunk@60 62dab493-f737-651d-591e-8d6aee1b9529
diff --git a/db/db_impl.cc b/db/db_impl.cc
index 7b268ea..dde3711 100644
--- a/db/db_impl.cc
+++ b/db/db_impl.cc
@@ -35,6 +35,17 @@
 
 namespace leveldb {
 
+// Information kept for every waiting writer
+struct DBImpl::Writer {
+  Status status;
+  WriteBatch* batch;
+  bool sync;
+  bool done;
+  port::CondVar cv;
+
+  explicit Writer(port::Mutex* mu) : cv(mu) { }
+};
+
 struct DBImpl::CompactionState {
   Compaction* const compaction;
 
@@ -113,8 +124,7 @@
       logfile_(NULL),
       logfile_number_(0),
       log_(NULL),
-      logger_(NULL),
-      logger_cv_(&mutex_),
+      tmp_batch_(new WriteBatch),
       bg_compaction_scheduled_(false),
       manual_compaction_(NULL) {
   mem_->Ref();
@@ -144,6 +154,7 @@
   delete versions_;
   if (mem_ != NULL) mem_->Unref();
   if (imm_ != NULL) imm_->Unref();
+  delete tmp_batch_;
   delete log_;
   delete logfile_;
   delete table_cache_;
@@ -554,13 +565,11 @@
 }
 
 Status DBImpl::TEST_CompactMemTable() {
-  MutexLock l(&mutex_);
-  LoggerId self;
-  AcquireLoggingResponsibility(&self);
-  Status s = MakeRoomForWrite(true /* force compaction */);
-  ReleaseLoggingResponsibility(&self);
+  // NULL batch means just wait for earlier writes to be done
+  Status s = Write(WriteOptions(), NULL);
   if (s.ok()) {
     // Wait until the compaction completes
+    MutexLock l(&mutex_);
     while (imm_ != NULL && bg_error_.ok()) {
       bg_cv_.Wait();
     }
@@ -1094,38 +1103,35 @@
   return DB::Delete(options, key);
 }
 
-// There is at most one thread that is the current logger.  This call
-// waits until preceding logger(s) have finished and becomes the
-// current logger.
-void DBImpl::AcquireLoggingResponsibility(LoggerId* self) {
-  while (logger_ != NULL) {
-    logger_cv_.Wait();
-  }
-  logger_ = self;
-}
+Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
+  Writer w(&mutex_);
+  w.batch = my_batch;
+  w.sync = options.sync;
+  w.done = false;
 
-void DBImpl::ReleaseLoggingResponsibility(LoggerId* self) {
-  assert(logger_ == self);
-  logger_ = NULL;
-  logger_cv_.SignalAll();
-}
-
-Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
-  Status status;
   MutexLock l(&mutex_);
-  LoggerId self;
-  AcquireLoggingResponsibility(&self);
-  status = MakeRoomForWrite(false);  // May temporarily release lock and wait
+  writers_.push_back(&w);
+  while (!w.done && &w != writers_.front()) {
+    w.cv.Wait();
+  }
+  if (w.done) {
+    return w.status;
+  }
+
+  // May temporarily unlock and wait.
+  Status status = MakeRoomForWrite(my_batch == NULL);
   uint64_t last_sequence = versions_->LastSequence();
-  if (status.ok()) {
+  Writer* last_writer = &w;
+  if (status.ok() && my_batch != NULL) {  // NULL batch is for compactions
+    WriteBatch* updates = BuildBatchGroup(&last_writer);
     WriteBatchInternal::SetSequence(updates, last_sequence + 1);
     last_sequence += WriteBatchInternal::Count(updates);
 
-    // Add to log and apply to memtable.  We can release the lock during
-    // this phase since the "logger_" flag protects against concurrent
-    // loggers and concurrent writes into mem_.
+    // Add to log and apply to memtable.  We can release the lock
+    // during this phase since &w is currently responsible for logging
+    // and protects against concurrent loggers and concurrent writes
+    // into mem_.
     {
-      assert(logger_ == &self);
       mutex_.Unlock();
       status = log_->AddRecord(WriteBatchInternal::Contents(updates));
       if (status.ok() && options.sync) {
@@ -1135,20 +1141,85 @@
         status = WriteBatchInternal::InsertInto(updates, mem_);
       }
       mutex_.Lock();
-      assert(logger_ == &self);
     }
+    if (updates == tmp_batch_) tmp_batch_->Clear();
 
     versions_->SetLastSequence(last_sequence);
   }
-  ReleaseLoggingResponsibility(&self);
+
+  while (true) {
+    Writer* ready = writers_.front();
+    writers_.pop_front();
+    if (ready != &w) {
+      ready->status = status;
+      ready->done = true;
+      ready->cv.Signal();
+    }
+    if (ready == last_writer) break;
+  }
+
+  // Notify new head of write queue
+  if (!writers_.empty()) {
+    writers_.front()->cv.Signal();
+  }
+
   return status;
 }
 
+// REQUIRES: Writer list must be non-empty
+// REQUIRES: First writer must have a non-NULL batch
+WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
+  assert(!writers_.empty());
+  Writer* first = writers_.front();
+  WriteBatch* result = first->batch;
+  assert(result != NULL);
+
+  size_t size = WriteBatchInternal::ByteSize(first->batch);
+
+  // Allow the group to grow up to a maximum size, but if the
+  // original write is small, limit the growth so we do not slow
+  // down the small write too much.
+  size_t max_size = 1 << 20;
+  if (size <= (128<<10)) {
+    max_size = size + (128<<10);
+  }
+
+  *last_writer = first;
+  std::deque<Writer*>::iterator iter = writers_.begin();
+  ++iter;  // Advance past "first"
+  for (; iter != writers_.end(); ++iter) {
+    Writer* w = *iter;
+    if (w->sync && !first->sync) {
+      // Do not include a sync write into a batch handled by a non-sync write.
+      break;
+    }
+
+    if (w->batch != NULL) {
+      size += WriteBatchInternal::ByteSize(w->batch);
+      if (size > max_size) {
+        // Do not make batch too big
+        break;
+      }
+
+      // Append to *reuslt
+      if (result == first->batch) {
+        // Switch to temporary batch instead of disturbing caller's batch
+        result = tmp_batch_;
+        assert(WriteBatchInternal::Count(result) == 0);
+        WriteBatchInternal::Append(result, first->batch);
+      }
+      WriteBatchInternal::Append(result, w->batch);
+    }
+    *last_writer = w;
+  }
+  return result;
+}
+
 // REQUIRES: mutex_ is held
-// REQUIRES: this thread is the current logger
+// REQUIRES: this thread is currently at the front of the writer queue
 Status DBImpl::MakeRoomForWrite(bool force) {
   mutex_.AssertHeld();
-  assert(logger_ != NULL);
+  assert(!writers_.empty());
   bool allow_delay = !force;
   Status s;
   while (true) {
diff --git a/db/db_impl.h b/db/db_impl.h
index fc40d1e..e665c0e 100644
--- a/db/db_impl.h
+++ b/db/db_impl.h
@@ -5,6 +5,7 @@
 #ifndef STORAGE_LEVELDB_DB_DB_IMPL_H_
 #define STORAGE_LEVELDB_DB_DB_IMPL_H_
 
+#include <deque>
 #include <set>
 #include "db/dbformat.h"
 #include "db/log_writer.h"
@@ -59,6 +60,8 @@
 
  private:
   friend class DB;
+  struct CompactionState;
+  struct Writer;
 
   Iterator* NewInternalIterator(const ReadOptions&,
                                 SequenceNumber* latest_snapshot);
@@ -85,14 +88,8 @@
 
   Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base);
 
-  // Only thread is allowed to log at a time.
-  struct LoggerId { };          // Opaque identifier for logging thread
-  void AcquireLoggingResponsibility(LoggerId* self);
-  void ReleaseLoggingResponsibility(LoggerId* self);
-
   Status MakeRoomForWrite(bool force /* compact even if there is room? */);
-
-  struct CompactionState;
+  WriteBatch* BuildBatchGroup(Writer** last_writer);
 
   void MaybeScheduleCompaction();
   static void BGWork(void* db);
@@ -129,8 +126,11 @@
   WritableFile* logfile_;
   uint64_t logfile_number_;
   log::Writer* log_;
-  LoggerId* logger_;            // NULL, or the id of the current logging thread
-  port::CondVar logger_cv_;     // For threads waiting to log
+
+  // Queue of writers.
+  std::deque<Writer*> writers_;
+  WriteBatch* tmp_batch_;
+
   SnapshotList snapshots_;
 
   // Set of table files to protect from deletion because they are
diff --git a/db/write_batch.cc b/db/write_batch.cc
index a0e812f..33f4a42 100644
--- a/db/write_batch.cc
+++ b/db/write_batch.cc
@@ -23,6 +23,9 @@
 
 namespace leveldb {
 
+// WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
+static const size_t kHeader = 12;
+
 WriteBatch::WriteBatch() {
   Clear();
 }
@@ -33,16 +36,16 @@
 
 void WriteBatch::Clear() {
   rep_.clear();
-  rep_.resize(12);
+  rep_.resize(kHeader);
 }
 
 Status WriteBatch::Iterate(Handler* handler) const {
   Slice input(rep_);
-  if (input.size() < 12) {
+  if (input.size() < kHeader) {
     return Status::Corruption("malformed WriteBatch (too small)");
   }
 
-  input.remove_prefix(12);
+  input.remove_prefix(kHeader);
   Slice key, value;
   int found = 0;
   while (!input.empty()) {
@@ -131,8 +134,14 @@
 }
 
 void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
-  assert(contents.size() >= 12);
+  assert(contents.size() >= kHeader);
   b->rep_.assign(contents.data(), contents.size());
 }
 
+void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) {
+  SetCount(dst, Count(dst) + Count(src));
+  assert(src->rep_.size() >= kHeader);
+  dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader);
+}
+
 }  // namespace leveldb
diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h
index 49aeb84..4423a7f 100644
--- a/db/write_batch_internal.h
+++ b/db/write_batch_internal.h
@@ -39,6 +39,8 @@
   static void SetContents(WriteBatch* batch, const Slice& contents);
 
   static Status InsertInto(const WriteBatch* batch, MemTable* memtable);
+
+  static void Append(WriteBatch* dst, const WriteBatch* src);
 };
 
 }  // namespace leveldb
diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc
index 1ee6d7b..9064e3d 100644
--- a/db/write_batch_test.cc
+++ b/db/write_batch_test.cc
@@ -18,6 +18,7 @@
   mem->Ref();
   std::string state;
   Status s = WriteBatchInternal::InsertInto(b, mem);
+  int count = 0;
   Iterator* iter = mem->NewIterator();
   for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
     ParsedInternalKey ikey;
@@ -29,11 +30,13 @@
         state.append(", ");
         state.append(iter->value().ToString());
         state.append(")");
+        count++;
         break;
       case kTypeDeletion:
         state.append("Delete(");
         state.append(ikey.user_key.ToString());
         state.append(")");
+        count++;
         break;
     }
     state.append("@");
@@ -42,6 +45,8 @@
   delete iter;
   if (!s.ok()) {
     state.append("ParseError()");
+  } else if (count != WriteBatchInternal::Count(b)) {
+    state.append("CountMismatch()");
   }
   mem->Unref();
   return state;
@@ -82,6 +87,32 @@
             PrintContents(&batch));
 }
 
+TEST(WriteBatchTest, Append) {
+  WriteBatch b1, b2;
+  WriteBatchInternal::SetSequence(&b1, 200);
+  WriteBatchInternal::SetSequence(&b2, 300);
+  WriteBatchInternal::Append(&b1, &b2);
+  ASSERT_EQ("",
+            PrintContents(&b1));
+  b2.Put("a", "va");
+  WriteBatchInternal::Append(&b1, &b2);
+  ASSERT_EQ("Put(a, va)@200",
+            PrintContents(&b1));
+  b2.Clear();
+  b2.Put("b", "vb");
+  WriteBatchInternal::Append(&b1, &b2);
+  ASSERT_EQ("Put(a, va)@200"
+            "Put(b, vb)@201",
+            PrintContents(&b1));
+  b2.Delete("foo");
+  WriteBatchInternal::Append(&b1, &b2);
+  ASSERT_EQ("Put(a, va)@200"
+            "Put(b, vb)@202"
+            "Put(b, vb)@201"
+            "Delete(foo)@203",
+            PrintContents(&b1));
+}
+
 }  // namespace leveldb
 
 int main(int argc, char** argv) {