|  | #include "caffe2/core/db.h" | 
|  |  | 
|  | #include <mutex> | 
|  |  | 
|  | #include "caffe2/core/blob_serialization.h" | 
|  | #include "caffe2/core/logging.h" | 
|  |  | 
|  | namespace caffe2 { | 
|  |  | 
|  | CAFFE_KNOWN_TYPE(db::DBReader); | 
|  | CAFFE_KNOWN_TYPE(db::Cursor); | 
|  |  | 
|  | namespace db { | 
|  |  | 
|  | // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) | 
|  | C10_DEFINE_REGISTRY(Caffe2DBRegistry, DB, const string&, Mode); | 
|  |  | 
|  | // Below, we provide a bare minimum database "minidb" as a reference | 
|  | // implementation as well as a portable choice to store data. | 
|  | // Note that the MiniDB classes are not exposed via a header file - they should | 
|  | // be created directly via the db interface. See MiniDB for details. | 
|  |  | 
|  | class MiniDBCursor : public Cursor { | 
|  | public: | 
|  | // NOLINTNEXTLINE(cppcoreguidelines-pro-type-member-init) | 
|  | explicit MiniDBCursor(FILE* f, std::mutex* mutex) | 
|  | : file_(f), lock_(*mutex), valid_(true) { | 
|  | // We call Next() to read in the first entry. | 
|  | // NOLINTNEXTLINE(clang-analyzer-optin.cplusplus.VirtualCall) | 
|  | Next(); | 
|  | } | 
|  | // NOLINTNEXTLINE(modernize-use-equals-default) | 
|  | ~MiniDBCursor() override {} | 
|  |  | 
|  | void Seek(const string& /*key*/) override { | 
|  | LOG(FATAL) << "MiniDB does not support seeking to a specific key."; | 
|  | } | 
|  |  | 
|  | void SeekToFirst() override { | 
|  | fseek(file_, 0, SEEK_SET); | 
|  | CAFFE_ENFORCE(!feof(file_), "Hmm, empty file?"); | 
|  | // Read the first item. | 
|  | valid_ = true; | 
|  | Next(); | 
|  | } | 
|  |  | 
|  | void Next() override { | 
|  | // First, read in the key and value length. | 
|  | if (fread(&key_len_, sizeof(int), 1, file_) == 0) { | 
|  | // Reaching EOF. | 
|  | VLOG(1) << "EOF reached, setting valid to false"; | 
|  | valid_ = false; | 
|  | return; | 
|  | } | 
|  | CAFFE_ENFORCE_EQ(fread(&value_len_, sizeof(int), 1, file_), 1); | 
|  | CAFFE_ENFORCE_GT(key_len_, 0); | 
|  | CAFFE_ENFORCE_GT(value_len_, 0); | 
|  | // Resize if the key and value len is larger than the current one. | 
|  | if (key_len_ > (int)key_.size()) { | 
|  | key_.resize(key_len_); | 
|  | } | 
|  | if (value_len_ > (int)value_.size()) { | 
|  | value_.resize(value_len_); | 
|  | } | 
|  | // Actually read in the contents. | 
|  | CAFFE_ENFORCE_EQ( | 
|  | fread(key_.data(), sizeof(char), key_len_, file_), key_len_); | 
|  | CAFFE_ENFORCE_EQ( | 
|  | fread(value_.data(), sizeof(char), value_len_, file_), value_len_); | 
|  | // Note(Yangqing): as we read the file, the cursor naturally moves to the | 
|  | // beginning of the next entry. | 
|  | } | 
|  |  | 
|  | string key() override { | 
|  | CAFFE_ENFORCE(valid_, "Cursor is at invalid location!"); | 
|  | return string(key_.data(), key_len_); | 
|  | } | 
|  |  | 
|  | string value() override { | 
|  | CAFFE_ENFORCE(valid_, "Cursor is at invalid location!"); | 
|  | return string(value_.data(), value_len_); | 
|  | } | 
|  |  | 
|  | bool Valid() override { | 
|  | return valid_; | 
|  | } | 
|  |  | 
|  | private: | 
|  | FILE* file_; | 
|  | std::lock_guard<std::mutex> lock_; | 
|  | bool valid_; | 
|  | int key_len_; | 
|  | vector<char> key_; | 
|  | int value_len_; | 
|  | vector<char> value_; | 
|  | }; | 
|  |  | 
|  | class MiniDBTransaction : public Transaction { | 
|  | public: | 
|  | explicit MiniDBTransaction(FILE* f, std::mutex* mutex) | 
|  | : file_(f), lock_(*mutex) {} | 
|  | ~MiniDBTransaction() override { | 
|  | // NOLINTNEXTLINE(clang-analyzer-optin.cplusplus.VirtualCall) | 
|  | Commit(); | 
|  | } | 
|  |  | 
|  | void Put(const string& key, const string& value) override { | 
|  | int key_len = key.size(); | 
|  | int value_len = value.size(); | 
|  | CAFFE_ENFORCE_EQ(fwrite(&key_len, sizeof(int), 1, file_), 1); | 
|  | CAFFE_ENFORCE_EQ(fwrite(&value_len, sizeof(int), 1, file_), 1); | 
|  | CAFFE_ENFORCE_EQ( | 
|  | fwrite(key.c_str(), sizeof(char), key_len, file_), key_len); | 
|  | CAFFE_ENFORCE_EQ( | 
|  | fwrite(value.c_str(), sizeof(char), value_len, file_), value_len); | 
|  | } | 
|  |  | 
|  | void Commit() override { | 
|  | if (file_ != nullptr) { | 
|  | CAFFE_ENFORCE_EQ(fflush(file_), 0); | 
|  | file_ = nullptr; | 
|  | } | 
|  | } | 
|  |  | 
|  | private: | 
|  | FILE* file_; | 
|  | std::lock_guard<std::mutex> lock_; | 
|  |  | 
|  | C10_DISABLE_COPY_AND_ASSIGN(MiniDBTransaction); | 
|  | }; | 
|  |  | 
|  | class MiniDB : public DB { | 
|  | public: | 
|  | MiniDB(const string& source, Mode mode) : DB(source, mode), file_(nullptr) { | 
|  | switch (mode) { | 
|  | case NEW: | 
|  | file_ = fopen(source.c_str(), "wb"); | 
|  | break; | 
|  | case WRITE: | 
|  | file_ = fopen(source.c_str(), "ab"); | 
|  | fseek(file_, 0, SEEK_END); | 
|  | break; | 
|  | case READ: | 
|  | file_ = fopen(source.c_str(), "rb"); | 
|  | break; | 
|  | } | 
|  | CAFFE_ENFORCE(file_, "Cannot open file: " + source); | 
|  | VLOG(1) << "Opened MiniDB " << source; | 
|  | } | 
|  | ~MiniDB() override { | 
|  | // NOLINTNEXTLINE(clang-analyzer-optin.cplusplus.VirtualCall) | 
|  | Close(); | 
|  | } | 
|  |  | 
|  | void Close() override { | 
|  | if (file_) { | 
|  | fclose(file_); | 
|  | } | 
|  | file_ = nullptr; | 
|  | } | 
|  |  | 
|  | unique_ptr<Cursor> NewCursor() override { | 
|  | CAFFE_ENFORCE_EQ(this->mode_, READ); | 
|  | return make_unique<MiniDBCursor>(file_, &file_access_mutex_); | 
|  | } | 
|  |  | 
|  | unique_ptr<Transaction> NewTransaction() override { | 
|  | CAFFE_ENFORCE(this->mode_ == NEW || this->mode_ == WRITE); | 
|  | return make_unique<MiniDBTransaction>(file_, &file_access_mutex_); | 
|  | } | 
|  |  | 
|  | private: | 
|  | FILE* file_; | 
|  | // access mutex makes sure we don't have multiple cursors/transactions | 
|  | // reading the same file. | 
|  | std::mutex file_access_mutex_; | 
|  | }; | 
|  |  | 
|  | // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) | 
|  | REGISTER_CAFFE2_DB(MiniDB, MiniDB); | 
|  | // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) | 
|  | REGISTER_CAFFE2_DB(minidb, MiniDB); | 
|  |  | 
|  | void DBReaderSerializer::Serialize( | 
|  | const void* pointer, | 
|  | TypeMeta typeMeta, | 
|  | const string& name, | 
|  | BlobSerializerBase::SerializationAcceptor acceptor) { | 
|  | CAFFE_ENFORCE(typeMeta.Match<DBReader>()); | 
|  | const auto& reader = *static_cast<const DBReader*>(pointer); | 
|  | DBReaderProto proto; | 
|  | proto.set_name(name); | 
|  | proto.set_source(reader.source_); | 
|  | proto.set_db_type(reader.db_type_); | 
|  | if (reader.cursor() && reader.cursor()->SupportsSeek()) { | 
|  | proto.set_key(reader.cursor()->key()); | 
|  | } | 
|  | BlobProto blob_proto; | 
|  | blob_proto.set_name(name); | 
|  | blob_proto.set_type("DBReader"); | 
|  | blob_proto.set_content(SerializeAsString_EnforceCheck(proto)); | 
|  | acceptor(name, SerializeBlobProtoAsString_EnforceCheck(blob_proto)); | 
|  | } | 
|  |  | 
|  | void DBReaderDeserializer::Deserialize(const BlobProto& proto, Blob* blob) { | 
|  | DBReaderProto reader_proto; | 
|  | CAFFE_ENFORCE( | 
|  | reader_proto.ParseFromString(proto.content()), | 
|  | "Cannot parse content into a DBReaderProto."); | 
|  | blob->Reset(new DBReader(reader_proto)); | 
|  | } | 
|  |  | 
|  | namespace { | 
|  | // Serialize TensorCPU. | 
|  | // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) | 
|  | REGISTER_BLOB_SERIALIZER((TypeMeta::Id<DBReader>()), DBReaderSerializer); | 
|  | // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) | 
|  | REGISTER_BLOB_DESERIALIZER(DBReader, DBReaderDeserializer); | 
|  | } // namespace | 
|  |  | 
|  | } // namespace db | 
|  | } // namespace caffe2 |