| // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. See the AUTHORS file for names of contributors. |
| |
| #include "db/db_iter.h" |
| |
| #include "db/filename.h" |
| #include "db/dbformat.h" |
| #include "include/env.h" |
| #include "include/iterator.h" |
| #include "port/port.h" |
| #include "util/logging.h" |
| #include "util/mutexlock.h" |
| |
| namespace leveldb { |
| |
| #if 0 |
| static void DumpInternalIter(Iterator* iter) { |
| for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { |
| ParsedInternalKey k; |
| if (!ParseInternalKey(iter->key(), &k)) { |
| fprintf(stderr, "Corrupt '%s'\n", EscapeString(iter->key()).c_str()); |
| } else { |
| fprintf(stderr, "@ '%s'\n", k.DebugString().c_str()); |
| } |
| } |
| } |
| #endif |
| |
| namespace { |
| |
| // Memtables and sstables that make the DB representation contain |
| // (userkey,seq,type) => uservalue entries. DBIter |
| // combines multiple entries for the same userkey found in the DB |
| // representation into a single entry while accounting for sequence |
| // numbers, deletion markers, overwrites, etc. |
| class DBIter: public Iterator { |
| public: |
| DBIter(const std::string* dbname, Env* env, |
| const Comparator* cmp, Iterator* iter, SequenceNumber s) |
| : dbname_(dbname), |
| env_(env), |
| user_comparator_(cmp), |
| iter_(iter), |
| sequence_(s), |
| large_(NULL), |
| valid_(false) { |
| } |
| virtual ~DBIter() { |
| delete iter_; |
| delete large_; |
| } |
| virtual bool Valid() const { return valid_; } |
| virtual Slice key() const { |
| assert(valid_); |
| return key_; |
| } |
| virtual Slice value() const { |
| assert(valid_); |
| if (large_ == NULL) { |
| return value_; |
| } else { |
| MutexLock l(&large_->mutex); |
| if (!large_->produced) { |
| ReadIndirectValue(); |
| } |
| return large_->value; |
| } |
| } |
| |
| virtual void Next() { |
| assert(valid_); |
| // iter_ is already positioned past DBIter::key() |
| FindNextUserEntry(); |
| } |
| |
| virtual void Prev() { |
| assert(valid_); |
| bool ignored; |
| ScanUntilBeforeCurrentKey(&ignored); |
| FindPrevUserEntry(); |
| } |
| |
| virtual void Seek(const Slice& target) { |
| ParsedInternalKey ikey(target, sequence_, kValueTypeForSeek); |
| std::string tmp; |
| AppendInternalKey(&tmp, ikey); |
| iter_->Seek(tmp); |
| FindNextUserEntry(); |
| } |
| virtual void SeekToFirst() { |
| iter_->SeekToFirst(); |
| FindNextUserEntry(); |
| } |
| |
| virtual void SeekToLast(); |
| |
| virtual Status status() const { |
| if (status_.ok()) { |
| if (large_ != NULL && !large_->status.ok()) return large_->status; |
| return iter_->status(); |
| } else { |
| return status_; |
| } |
| } |
| |
| private: |
| void FindNextUserEntry(); |
| void FindPrevUserEntry(); |
| void SaveKey(const Slice& k) { key_.assign(k.data(), k.size()); } |
| void SaveValue(const Slice& v) { |
| if (value_.capacity() > v.size() + 1048576) { |
| std::string empty; |
| swap(empty, value_); |
| } |
| value_.assign(v.data(), v.size()); |
| } |
| bool ParseKey(ParsedInternalKey* key); |
| void SkipPast(const Slice& k); |
| void ScanUntilBeforeCurrentKey(bool* found_live); |
| |
| void ReadIndirectValue() const; |
| |
| struct Large { |
| port::Mutex mutex; |
| std::string value; |
| bool produced; |
| Status status; |
| }; |
| |
| const std::string* const dbname_; |
| Env* const env_; |
| |
| const Comparator* const user_comparator_; |
| |
| // iter_ is positioned just past current entry for DBIter if valid_ |
| Iterator* const iter_; |
| |
| SequenceNumber const sequence_; |
| Status status_; |
| std::string key_; // Always a user key |
| std::string value_; |
| Large* large_; // Non-NULL if value is an indirect reference |
| bool valid_; |
| |
| // No copying allowed |
| DBIter(const DBIter&); |
| void operator=(const DBIter&); |
| }; |
| |
| inline bool DBIter::ParseKey(ParsedInternalKey* ikey) { |
| if (!ParseInternalKey(iter_->key(), ikey)) { |
| status_ = Status::Corruption("corrupted internal key in DBIter"); |
| return false; |
| } else { |
| return true; |
| } |
| } |
| |
| void DBIter::FindNextUserEntry() { |
| if (large_ != NULL) { |
| if (status_.ok() && !large_->status.ok()) { |
| status_ = large_->status; |
| } |
| delete large_; |
| large_ = NULL; |
| } |
| while (iter_->Valid()) { |
| ParsedInternalKey ikey; |
| if (!ParseKey(&ikey)) { |
| // Skip past corrupted entry |
| iter_->Next(); |
| continue; |
| } |
| if (ikey.sequence > sequence_) { |
| // Ignore entries newer than the snapshot |
| iter_->Next(); |
| continue; |
| } |
| |
| switch (ikey.type) { |
| case kTypeDeletion: |
| SaveKey(ikey.user_key); // Make local copy for use by SkipPast() |
| iter_->Next(); |
| SkipPast(key_); |
| // Do not return deleted entries. Instead keep looping. |
| break; |
| |
| case kTypeValue: |
| SaveKey(ikey.user_key); |
| SaveValue(iter_->value()); |
| iter_->Next(); |
| SkipPast(key_); |
| // Yield the value we just found. |
| valid_ = true; |
| return; |
| |
| case kTypeLargeValueRef: |
| SaveKey(ikey.user_key); |
| // Save the large value ref as value_, and read it lazily on a call |
| // to value() |
| SaveValue(iter_->value()); |
| large_ = new Large; |
| large_->produced = false; |
| iter_->Next(); |
| SkipPast(key_); |
| // Yield the value we just found. |
| valid_ = true; |
| return; |
| } |
| } |
| valid_ = false; |
| key_.clear(); |
| value_.clear(); |
| assert(large_ == NULL); |
| } |
| |
| void DBIter::SkipPast(const Slice& k) { |
| while (iter_->Valid()) { |
| ParsedInternalKey ikey; |
| // Note that if we cannot parse an internal key, we keep looping |
| // so that if we have a run like the following: |
| // <x,100,v> => value100 |
| // <corrupted entry for user key x> |
| // <x,50,v> => value50 |
| // we will skip over the corrupted entry as well as value50. |
| if (ParseKey(&ikey) && user_comparator_->Compare(ikey.user_key, k) != 0) { |
| break; |
| } |
| iter_->Next(); |
| } |
| } |
| |
| void DBIter::SeekToLast() { |
| // Position iter_ at the last uncorrupted user key and then |
| // let FindPrevUserEntry() do the heavy lifting to find |
| // a user key that is live. |
| iter_->SeekToLast(); |
| ParsedInternalKey current; |
| while (iter_->Valid() && !ParseKey(¤t)) { |
| iter_->Prev(); |
| } |
| if (iter_->Valid()) { |
| SaveKey(current.user_key); |
| } |
| FindPrevUserEntry(); |
| } |
| |
| // Let X be the user key at which iter_ is currently positioned. |
| // Adjust DBIter to point at the last entry with a key <= X that |
| // has a live value. |
| void DBIter::FindPrevUserEntry() { |
| // Consider the following example: |
| // |
| // A@540 |
| // A@400 |
| // |
| // B@300 |
| // B@200 |
| // B@100 <- iter_ |
| // |
| // C@301 |
| // C@201 |
| // |
| // The comments marked "(first iteration)" below relate what happens |
| // for the preceding example in the first iteration of the while loop |
| // below. There may be more than one iteration either if there are |
| // no live values for B, or if there is a corruption. |
| while (iter_->Valid()) { |
| std::string saved = key_; |
| bool found_live; |
| ScanUntilBeforeCurrentKey(&found_live); |
| // (first iteration) iter_ at A@400 |
| if (found_live) { |
| // Step forward into range of entries with user key >= saved |
| if (!iter_->Valid()) { |
| iter_->SeekToFirst(); |
| } else { |
| iter_->Next(); |
| } |
| // (first iteration) iter_ at B@300 |
| |
| FindNextUserEntry(); // Sets key_ to the key of the next value it found |
| if (valid_ && user_comparator_->Compare(key_, saved) == 0) { |
| // (first iteration) iter_ at C@301 |
| return; |
| } |
| |
| // FindNextUserEntry() could not find any entries under the |
| // user key "saved". This is probably a corruption since |
| // ScanUntilBefore(saved) found a live value. So we skip |
| // backwards to an earlier key and ignore the corrupted |
| // entries for "saved". |
| // |
| // (first iteration) iter_ at C@301 and saved == "B" |
| key_ = saved; |
| bool ignored; |
| ScanUntilBeforeCurrentKey(&ignored); |
| // (first iteration) iter_ at A@400 |
| } |
| } |
| valid_ = false; |
| key_.clear(); |
| value_.clear(); |
| } |
| |
| void DBIter::ScanUntilBeforeCurrentKey(bool* found_live) { |
| *found_live = false; |
| if (!iter_->Valid()) { |
| iter_->SeekToLast(); |
| } |
| |
| while (iter_->Valid()) { |
| ParsedInternalKey current; |
| if (!ParseKey(¤t)) { |
| iter_->Prev(); |
| continue; |
| } |
| |
| if (current.sequence > sequence_) { |
| // Ignore entries that are serialized after this read |
| iter_->Prev(); |
| continue; |
| } |
| |
| const int cmp = user_comparator_->Compare(current.user_key, key_); |
| if (cmp < 0) { |
| SaveKey(current.user_key); |
| return; |
| } else if (cmp == 0) { |
| switch (current.type) { |
| case kTypeDeletion: |
| *found_live = false; |
| break; |
| |
| case kTypeValue: |
| case kTypeLargeValueRef: |
| *found_live = true; |
| break; |
| } |
| } else { // cmp > 0 |
| *found_live = false; |
| } |
| |
| iter_->Prev(); |
| } |
| } |
| |
| void DBIter::ReadIndirectValue() const { |
| assert(!large_->produced); |
| large_->produced = true; |
| LargeValueRef large_ref; |
| if (value_.size() != LargeValueRef::ByteSize()) { |
| large_->status = Status::Corruption("malformed large value reference"); |
| return; |
| } |
| memcpy(large_ref.data, value_.data(), LargeValueRef::ByteSize()); |
| std::string fname = LargeValueFileName(*dbname_, large_ref); |
| RandomAccessFile* file; |
| Status s = env_->NewRandomAccessFile(fname, &file); |
| if (s.ok()) { |
| uint64_t file_size = file->Size(); |
| uint64_t value_size = large_ref.ValueSize(); |
| large_->value.resize(value_size); |
| Slice result; |
| s = file->Read(0, file_size, &result, |
| const_cast<char*>(large_->value.data())); |
| if (s.ok()) { |
| if (result.size() == file_size) { |
| switch (large_ref.compression_type()) { |
| case kNoCompression: { |
| if (result.data() != large_->value.data()) { |
| large_->value.assign(result.data(), result.size()); |
| } |
| break; |
| } |
| case kLightweightCompression: { |
| std::string uncompressed; |
| if (port::Lightweight_Uncompress(result.data(), result.size(), |
| &uncompressed) && |
| uncompressed.size() == large_ref.ValueSize()) { |
| swap(uncompressed, large_->value); |
| } else { |
| s = Status::Corruption( |
| "Unable to read entire compressed large value file"); |
| } |
| } |
| } |
| } else { |
| s = Status::Corruption("Unable to read entire large value file"); |
| } |
| } |
| delete file; // Ignore errors on closing |
| } |
| if (!s.ok()) { |
| large_->value.clear(); |
| large_->status = s; |
| } |
| } |
| |
| } // anonymous namespace |
| |
| Iterator* NewDBIterator( |
| const std::string* dbname, |
| Env* env, |
| const Comparator* user_key_comparator, |
| Iterator* internal_iter, |
| const SequenceNumber& sequence) { |
| return new DBIter(dbname, env, user_key_comparator, internal_iter, sequence); |
| } |
| |
| } |