Small tweaks and bugfixes for Issue 18 and 19.

Slight tweak to the no-overlap optimization: only push to
level 2 to reduce the amount of wasted space when the same
small key range is being repeatedly overwritten.

Fix for Issue 18: Avoid failure on Windows by avoiding
deletion of lock file until the end of DestroyDB().

Fix for Issue 19: Disregard sequence numbers when checking for 
overlap in sstable ranges. This fixes issue 19: when writing 
the same key over and over again, we would generate a sequence 
of sstables that were never merged together since their sequence
numbers were disjoint.

Don't ignore map/unmap error checks.

Miscellaneous fixes for small problems Sanjay found while diagnosing
issue/9 and issue/16 (corruption_testr failures).
- log::Reader reports the record type when it finds an unexpected type.
- log::Reader no longer reports an error when it encounters an expected
  zero record regardless of the setting of the "checksum" flag.
- Added a missing forward declaration.
- Documented a side-effects of larger write buffer sizes
  (longer recovery time).



git-svn-id: http://leveldb.googlecode.com/svn/trunk@37 62dab493-f737-651d-591e-8d6aee1b9529
diff --git a/db/corruption_test.cc b/db/corruption_test.cc
index 8015101..69fa03a 100644
--- a/db/corruption_test.cc
+++ b/db/corruption_test.cc
@@ -295,7 +295,7 @@
   Build(10);
   DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
   dbi->TEST_CompactMemTable();
-  const int last = config::kNumLevels - 1;
+  const int last = config::kMaxMemCompactLevel;
   ASSERT_EQ(1, Property("leveldb.num-files-at-level" + NumberToString(last)));
 
   Corrupt(kTableFile, 100, 1);
diff --git a/db/db_impl.cc b/db/db_impl.cc
index 7556d5a..48056da 100644
--- a/db/db_impl.cc
+++ b/db/db_impl.cc
@@ -456,10 +456,13 @@
   // should not be added to the manifest.
   int level = 0;
   if (s.ok() && meta.file_size > 0) {
-    if (base != NULL && !base->OverlapInLevel(0, meta.smallest, meta.largest)) {
-      // Push to largest level we can without causing overlaps
-      while (level + 1 < config::kNumLevels &&
-             !base->OverlapInLevel(level + 1, meta.smallest, meta.largest)) {
+    const Slice min_user_key = meta.smallest.user_key();
+    const Slice max_user_key = meta.largest.user_key();
+    if (base != NULL && !base->OverlapInLevel(0, min_user_key, max_user_key)) {
+      // Push the new sstable to a higher level if possible to reduce
+      // expensive manifest file ops.
+      while (level < config::kMaxMemCompactLevel &&
+             !base->OverlapInLevel(level + 1, min_user_key, max_user_key)) {
         level++;
       }
     }
@@ -1276,12 +1279,14 @@
   }
 
   FileLock* lock;
-  Status result = env->LockFile(LockFileName(dbname), &lock);
+  const std::string lockname = LockFileName(dbname);
+  Status result = env->LockFile(lockname, &lock);
   if (result.ok()) {
     uint64_t number;
     FileType type;
     for (size_t i = 0; i < filenames.size(); i++) {
-      if (ParseFileName(filenames[i], &number, &type)) {
+      if (ParseFileName(filenames[i], &number, &type) &&
+          filenames[i] != lockname) {  // Lock file will be deleted at end
         Status del = env->DeleteFile(dbname + "/" + filenames[i]);
         if (result.ok() && !del.ok()) {
           result = del;
@@ -1289,7 +1294,7 @@
       }
     }
     env->UnlockFile(lock);  // Ignore error since state is already gone
-    env->DeleteFile(LockFileName(dbname));
+    env->DeleteFile(lockname);
     env->DeleteDir(dbname);  // Ignore error in case dir contains other files
   }
   return result;
diff --git a/db/db_test.cc b/db/db_test.cc
index d5d60cd..0ac29e6 100644
--- a/db/db_test.cc
+++ b/db/db_test.cc
@@ -650,6 +650,25 @@
   }
 }
 
+TEST(DBTest, RepeatedWritesToSameKey) {
+  Options options;
+  options.env = env_;
+  options.write_buffer_size = 100000;  // Small write buffer
+  Reopen(&options);
+
+  // We must have at most one file per level except for level-0,
+  // which may have up to kL0_StopWritesTrigger files.
+  const int kMaxFiles = config::kNumLevels + config::kL0_StopWritesTrigger;
+
+  Random rnd(301);
+  std::string value = RandomString(&rnd, 2 * options.write_buffer_size);
+  for (int i = 0; i < 5 * kMaxFiles; i++) {
+    Put("key", value);
+    ASSERT_LE(TotalTableFiles(), kMaxFiles);
+    fprintf(stderr, "after %d: %d files\n", int(i+1), TotalTableFiles());
+  }
+}
+
 TEST(DBTest, SparseMerge) {
   Options options;
   options.compression = kNoCompression;
@@ -863,7 +882,7 @@
 TEST(DBTest, DeletionMarkers1) {
   Put("foo", "v1");
   ASSERT_OK(dbfull()->TEST_CompactMemTable());
-  const int last = config::kNumLevels - 1;
+  const int last = config::kMaxMemCompactLevel;
   ASSERT_EQ(NumTableFilesAtLevel(last), 1);   // foo => v1 is now in last level
 
   // Place a table at level last-1 to prevent merging with preceding mutation
@@ -891,7 +910,7 @@
 TEST(DBTest, DeletionMarkers2) {
   Put("foo", "v1");
   ASSERT_OK(dbfull()->TEST_CompactMemTable());
-  const int last = config::kNumLevels - 1;
+  const int last = config::kMaxMemCompactLevel;
   ASSERT_EQ(NumTableFilesAtLevel(last), 1);   // foo => v1 is now in last level
 
   // Place a table at level last-1 to prevent merging with preceding mutation
diff --git a/db/dbformat.h b/db/dbformat.h
index 97491bc..ec1d193 100644
--- a/db/dbformat.h
+++ b/db/dbformat.h
@@ -29,6 +29,14 @@
 // Maximum number of level-0 files.  We stop writes at this point.
 static const int kL0_StopWritesTrigger = 12;
 
+// Maximum level to which a new compacted memtable is pushed if it
+// does not create overlap.  We try to push to level 2 to avoid the
+// relatively expensive level 0=>1 compactions and to avoid some
+// expensive manifest file operations.  We do not push all the way to
+// the largest level since that can generate a lot of wasted disk
+// space if the same key space is being repeatedly overwritten.
+static const int kMaxMemCompactLevel = 2;
+
 }
 
 class InternalKey;
diff --git a/db/log_reader.cc b/db/log_reader.cc
index 8721071..fcb3aa7 100644
--- a/db/log_reader.cc
+++ b/db/log_reader.cc
@@ -4,6 +4,7 @@
 
 #include "db/log_reader.h"
 
+#include <stdio.h>
 #include "leveldb/env.h"
 #include "util/coding.h"
 #include "util/crc32c.h"
@@ -72,7 +73,8 @@
   Slice fragment;
   while (true) {
     uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
-    switch (ReadPhysicalRecord(&fragment)) {
+    const unsigned int record_type = ReadPhysicalRecord(&fragment);
+    switch (record_type) {
       case kFullType:
         if (in_fragmented_record) {
           // Handle bug in earlier versions of log::Writer where
@@ -144,13 +146,16 @@
         }
         break;
 
-      default:
+      default: {
+        char buf[40];
+        snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
         ReportCorruption(
             (fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
-            "unknown record type");
+            buf);
         in_fragmented_record = false;
         scratch->clear();
         break;
+      }
     }
   }
   return false;
@@ -212,16 +217,16 @@
       return kBadRecord;
     }
 
+    if (type == kZeroType && length == 0) {
+      // Skip zero length record without reporting any drops since
+      // such records are produced by the mmap based writing code in
+      // env_posix.cc that preallocates file regions.
+      buffer_.clear();
+      return kBadRecord;
+    }
+
     // Check crc
     if (checksum_) {
-      if (type == kZeroType && length == 0) {
-        // Skip zero length record without reporting any drops since
-        // such records are produced by the mmap based writing code in
-        // env_posix.cc that preallocates file regions.
-        buffer_.clear();
-        return kBadRecord;
-      }
-
       uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
       uint32_t actual_crc = crc32c::Value(header + 6, 1 + length);
       if (actual_crc != expected_crc) {
diff --git a/db/version_set.cc b/db/version_set.cc
index 54342e4..816f189 100644
--- a/db/version_set.cc
+++ b/db/version_set.cc
@@ -99,11 +99,14 @@
 bool SomeFileOverlapsRange(
     const InternalKeyComparator& icmp,
     const std::vector<FileMetaData*>& files,
-    const InternalKey& smallest,
-    const InternalKey& largest) {
-  const int index = FindFile(icmp, files, smallest.Encode());
+    const Slice& smallest_user_key,
+    const Slice& largest_user_key) {
+  // Find the earliest possible internal key for smallest_user_key
+  InternalKey small(smallest_user_key, kMaxSequenceNumber, kValueTypeForSeek);
+  const int index = FindFile(icmp, files, small.Encode());
   return ((index < files.size()) &&
-          icmp.Compare(largest, files[index]->smallest) >= 0);
+          icmp.user_comparator()->Compare(
+              largest_user_key, files[index]->smallest.user_key()) >= 0);
 }
 
 // An internal iterator.  For a given version/level pair, yields
@@ -353,9 +356,11 @@
 }
 
 bool Version::OverlapInLevel(int level,
-                             const InternalKey& smallest,
-                             const InternalKey& largest) {
-  return SomeFileOverlapsRange(vset_->icmp_, files_[level], smallest, largest);
+                             const Slice& smallest_user_key,
+                             const Slice& largest_user_key) {
+  return SomeFileOverlapsRange(vset_->icmp_, files_[level],
+                               smallest_user_key,
+                               largest_user_key);
 }
 
 std::string Version::DebugString() const {
diff --git a/db/version_set.h b/db/version_set.h
index f00c35a..693fc6f 100644
--- a/db/version_set.h
+++ b/db/version_set.h
@@ -42,13 +42,13 @@
                     const std::vector<FileMetaData*>& files,
                     const Slice& key);
 
-// Returns true iff some file in "files" overlaps some part of
+// Returns true iff some file in "files" overlaps the user key range
 // [smallest,largest].
 extern bool SomeFileOverlapsRange(
     const InternalKeyComparator& icmp,
     const std::vector<FileMetaData*>& files,
-    const InternalKey& smallest,
-    const InternalKey& largest);
+    const Slice& smallest_user_key,
+    const Slice& largest_user_key);
 
 class Version {
  public:
@@ -78,10 +78,10 @@
   void Unref();
 
   // Returns true iff some file in the specified level overlaps
-  // some part of [smallest,largest].
+  // some part of [smallest_user_key,largest_user_key].
   bool OverlapInLevel(int level,
-                      const InternalKey& smallest,
-                      const InternalKey& largest);
+                      const Slice& smallest_user_key,
+                      const Slice& largest_user_key);
 
   int NumFiles(int level) const { return files_[level].size(); }
 
diff --git a/db/version_set_test.cc b/db/version_set_test.cc
index eae2a80..ecfd62b 100644
--- a/db/version_set_test.cc
+++ b/db/version_set_test.cc
@@ -19,11 +19,13 @@
     }
   }
 
-  void Add(const char* smallest, const char* largest) {
+  void Add(const char* smallest, const char* largest,
+           SequenceNumber smallest_seq = 100,
+           SequenceNumber largest_seq = 100) {
     FileMetaData* f = new FileMetaData;
     f->number = files_.size() + 1;
-    f->smallest = InternalKey(smallest, 100, kTypeValue);
-    f->largest = InternalKey(largest, 100, kTypeValue);
+    f->smallest = InternalKey(smallest, smallest_seq, kTypeValue);
+    f->largest = InternalKey(largest, largest_seq, kTypeValue);
     files_.push_back(f);
   }
 
@@ -34,10 +36,8 @@
   }
 
   bool Overlaps(const char* smallest, const char* largest) {
-    InternalKey s(smallest, 100, kTypeValue);
-    InternalKey l(largest, 100, kTypeValue);
     InternalKeyComparator cmp(BytewiseComparator());
-    return SomeFileOverlapsRange(cmp, files_, s, l);
+    return SomeFileOverlapsRange(cmp, files_, smallest, largest);
   }
 };
 
@@ -108,6 +108,15 @@
   ASSERT_TRUE(Overlaps("450", "500"));
 }
 
+TEST(FindFileTest, OverlapSequenceChecks) {
+  Add("200", "200", 5000, 3000);
+  ASSERT_TRUE(! Overlaps("199", "199"));
+  ASSERT_TRUE(! Overlaps("201", "300"));
+  ASSERT_TRUE(Overlaps("200", "200"));
+  ASSERT_TRUE(Overlaps("190", "200"));
+  ASSERT_TRUE(Overlaps("200", "210"));
+}
+
 }
 
 int main(int argc, char** argv) {
diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h
index d975444..6d65eed 100644
--- a/db/write_batch_internal.h
+++ b/db/write_batch_internal.h
@@ -9,6 +9,8 @@
 
 namespace leveldb {
 
+class MemTable;
+
 // WriteBatchInternal provides static methods for manipulating a
 // WriteBatch that we don't want in the public WriteBatch interface.
 class WriteBatchInternal {
diff --git a/include/leveldb/options.h b/include/leveldb/options.h
index a94651f..a0afbf2 100644
--- a/include/leveldb/options.h
+++ b/include/leveldb/options.h
@@ -75,6 +75,8 @@
   // Larger values increase performance, especially during bulk loads.
   // Up to two write buffers may be held in memory at the same time,
   // so you may wish to adjust this parameter to control memory usage.
+  // Also, a larger write buffer will result in a longer recovery time
+  // the next time the database is opened.
   //
   // Default: 4MB
   size_t write_buffer_size;
diff --git a/util/env_posix.cc b/util/env_posix.cc
index fec1599..46723e2 100644
--- a/util/env_posix.cc
+++ b/util/env_posix.cc
@@ -28,6 +28,10 @@
 
 namespace {
 
+static Status IOError(const std::string& context, int err_number) {
+  return Status::IOError(context, strerror(err_number));
+}
+
 class PosixSequentialFile: public SequentialFile {
  private:
   std::string filename_;
@@ -47,7 +51,7 @@
         // We leave status as ok if we hit the end of the file
       } else {
         // A partial read with an error: return a non-ok status
-        s = Status::IOError(filename_, strerror(errno));
+        s = IOError(filename_, errno);
       }
     }
     return s;
@@ -55,7 +59,7 @@
 
   virtual Status Skip(uint64_t n) {
     if (fseek(file_, n, SEEK_CUR)) {
-      return Status::IOError(filename_, strerror(errno));
+      return IOError(filename_, errno);
     }
     return Status::OK();
   }
@@ -78,7 +82,7 @@
     *result = Slice(scratch, (r < 0) ? 0 : r);
     if (r < 0) {
       // An error: return a non-ok status
-      s = Status::IOError(filename_, strerror(errno));
+      s = IOError(filename_, errno);
     }
     return s;
   }
@@ -114,13 +118,16 @@
     return s;
   }
 
-  void UnmapCurrentRegion() {
+  bool UnmapCurrentRegion() {
+    bool result = true;
     if (base_ != NULL) {
       if (last_sync_ < limit_) {
         // Defer syncing this data until next Sync() call, if any
         pending_sync_ = true;
       }
-      munmap(base_, limit_ - base_);
+      if (munmap(base_, limit_ - base_) != 0) {
+        result = false;
+      }
       file_offset_ += limit_ - base_;
       base_ = NULL;
       limit_ = NULL;
@@ -132,6 +139,7 @@
         map_size_ *= 2;
       }
     }
+    return result;
   }
 
   bool MapNewRegion() {
@@ -181,8 +189,10 @@
       assert(dst_ <= limit_);
       size_t avail = limit_ - dst_;
       if (avail == 0) {
-        UnmapCurrentRegion();
-        MapNewRegion();
+        if (!UnmapCurrentRegion() ||
+            !MapNewRegion()) {
+          return IOError(filename_, errno);
+        }
       }
 
       size_t n = (left <= avail) ? left : avail;
@@ -197,17 +207,18 @@
   virtual Status Close() {
     Status s;
     size_t unused = limit_ - dst_;
-    UnmapCurrentRegion();
-    if (unused > 0) {
+    if (!UnmapCurrentRegion()) {
+      s = IOError(filename_, errno);
+    } else if (unused > 0) {
       // Trim the extra space at the end of the file
       if (ftruncate(fd_, file_offset_ - unused) < 0) {
-        s = Status::IOError(filename_, strerror(errno));
+        s = IOError(filename_, errno);
       }
     }
 
     if (close(fd_) < 0) {
       if (s.ok()) {
-        s = Status::IOError(filename_, strerror(errno));
+        s = IOError(filename_, errno);
       }
     }
 
@@ -228,7 +239,7 @@
       // Some unmapped data was not synced
       pending_sync_ = false;
       if (fdatasync(fd_) < 0) {
-        s = Status::IOError(filename_, strerror(errno));
+        s = IOError(filename_, errno);
       }
     }
 
@@ -239,7 +250,7 @@
       size_t p2 = TruncateToPageBoundary(dst_ - base_ - 1);
       last_sync_ = dst_;
       if (msync(base_ + p1, p2 - p1 + page_size_, MS_SYNC) < 0) {
-        s = Status::IOError(filename_, strerror(errno));
+        s = IOError(filename_, errno);
       }
     }
 
@@ -276,7 +287,7 @@
     FILE* f = fopen(fname.c_str(), "r");
     if (f == NULL) {
       *result = NULL;
-      return Status::IOError(fname, strerror(errno));
+      return IOError(fname, errno);
     } else {
       *result = new PosixSequentialFile(fname, f);
       return Status::OK();
@@ -288,7 +299,7 @@
     int fd = open(fname.c_str(), O_RDONLY);
     if (fd < 0) {
       *result = NULL;
-      return Status::IOError(fname, strerror(errno));
+      return IOError(fname, errno);
     }
     *result = new PosixRandomAccessFile(fname, fd);
     return Status::OK();
@@ -300,7 +311,7 @@
     const int fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
     if (fd < 0) {
       *result = NULL;
-      s = Status::IOError(fname, strerror(errno));
+      s = IOError(fname, errno);
     } else {
       *result = new PosixMmapFile(fname, fd, page_size_);
     }
@@ -316,7 +327,7 @@
     result->clear();
     DIR* d = opendir(dir.c_str());
     if (d == NULL) {
-      return Status::IOError(dir, strerror(errno));
+      return IOError(dir, errno);
     }
     struct dirent* entry;
     while ((entry = readdir(d)) != NULL) {
@@ -329,7 +340,7 @@
   virtual Status DeleteFile(const std::string& fname) {
     Status result;
     if (unlink(fname.c_str()) != 0) {
-      result = Status::IOError(fname, strerror(errno));
+      result = IOError(fname, errno);
     }
     return result;
   };
@@ -337,7 +348,7 @@
   virtual Status CreateDir(const std::string& name) {
     Status result;
     if (mkdir(name.c_str(), 0755) != 0) {
-      result = Status::IOError(name, strerror(errno));
+      result = IOError(name, errno);
     }
     return result;
   };
@@ -345,7 +356,7 @@
   virtual Status DeleteDir(const std::string& name) {
     Status result;
     if (rmdir(name.c_str()) != 0) {
-      result = Status::IOError(name, strerror(errno));
+      result = IOError(name, errno);
     }
     return result;
   };
@@ -355,7 +366,7 @@
     struct stat sbuf;
     if (stat(fname.c_str(), &sbuf) != 0) {
       *size = 0;
-      s = Status::IOError(fname, strerror(errno));
+      s = IOError(fname, errno);
     } else {
       *size = sbuf.st_size;
     }
@@ -365,7 +376,7 @@
   virtual Status RenameFile(const std::string& src, const std::string& target) {
     Status result;
     if (rename(src.c_str(), target.c_str()) != 0) {
-      result = Status::IOError(src, strerror(errno));
+      result = IOError(src, errno);
     }
     return result;
   }
@@ -375,9 +386,9 @@
     Status result;
     int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644);
     if (fd < 0) {
-      result = Status::IOError(fname, strerror(errno));
+      result = IOError(fname, errno);
     } else if (LockOrUnlock(fd, true) == -1) {
-      result = Status::IOError("lock " + fname, strerror(errno));
+      result = IOError("lock " + fname, errno);
       close(fd);
     } else {
       PosixFileLock* my_lock = new PosixFileLock;
@@ -391,7 +402,7 @@
     PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock);
     Status result;
     if (LockOrUnlock(my_lock->fd_, false) == -1) {
-      result = Status::IOError(strerror(errno));
+      result = IOError("unlock", errno);
     }
     close(my_lock->fd_);
     delete my_lock;