|  | #include "caffe2/operators/dataset_ops.h" | 
|  |  | 
|  | #include <memory> | 
|  | #include <mutex> | 
|  | #include <string> | 
|  | #include <vector> | 
|  | #include "caffe2/core/blob_serialization.h" | 
|  | #include "caffe2/core/operator.h" | 
|  | #include "caffe2/core/tensor.h" | 
|  | #include "caffe2/utils/string_utils.h" | 
|  |  | 
|  | namespace caffe2 { | 
|  |  | 
|  | CAFFE_KNOWN_TYPE(std::unique_ptr<dataset_ops::TreeCursor>); | 
|  | CAFFE_KNOWN_TYPE(dataset_ops::TensorVectorPtr); | 
|  | CAFFE_KNOWN_TYPE(dataset_ops::SharedTensorVectorPtr); | 
|  | CAFFE_KNOWN_TYPE(dataset_ops::Shared2DTensorVectorPtr); | 
|  |  | 
|  | namespace dataset_ops { | 
|  | namespace { | 
|  |  | 
|  | const char kDatasetFieldSeparator = ':'; | 
|  | const char* kDatasetLengthField = "lengths"; | 
|  |  | 
|  | // how much percent to grow the dataset when needed | 
|  | const int kDatasetGrowthPct = 40; | 
|  |  | 
|  | } // namespace | 
|  |  | 
|  | TreeIterator::TreeIterator(const std::vector<std::string>& fields) { | 
|  | // populate field vector and split field names | 
|  | fields_.resize(fields.size()); | 
|  | std::vector<std::vector<std::string>> nameParts(fields_.size()); | 
|  | for (size_t i = 0; i < fields.size(); ++i) { | 
|  | auto& field = fields_.at(i); | 
|  | field.name = fields[i]; | 
|  | field.id = i; | 
|  | field.lengthFieldId = -1; | 
|  | nameParts.at(i) = split(kDatasetFieldSeparator, field.name); | 
|  | } | 
|  |  | 
|  | // populate lengthFields | 
|  | for (const auto& field : fields_) { | 
|  | const auto& parts = nameParts.at(field.id); | 
|  | if (!parts.empty() && parts.back() == kDatasetLengthField) { | 
|  | lengthFieldIds_.push_back(field.id); | 
|  | } | 
|  | } | 
|  |  | 
|  | // find length-field with maximum prefix matching for each field | 
|  | for (auto& field : fields_) { | 
|  | // by default, we are matching against the root domain | 
|  | size_t maxMatchLevel = 1; | 
|  | int maxMatchLengthFieldId = -1; | 
|  | for (int j = 0; j < numLengthFields(); ++j) { | 
|  | const auto& lenField = lengthField(j); | 
|  | // a length field can't have itself as its length field | 
|  | if (field.id == lenField.id) { | 
|  | continue; | 
|  | } | 
|  | auto lf = nameParts.at(lenField.id); | 
|  | auto lfEnd = lf.end() - 1; | 
|  | // check whether this lengthField is a prefix for this field name | 
|  | if (std::mismatch(lf.begin(), lfEnd, nameParts.at(field.id).begin()) | 
|  | .first != lfEnd) { | 
|  | continue; | 
|  | } | 
|  | if (lf.size() > maxMatchLevel) { | 
|  | maxMatchLevel = lf.size(); | 
|  | maxMatchLengthFieldId = j; | 
|  | } | 
|  | } | 
|  | field.lengthFieldId = maxMatchLengthFieldId; | 
|  | } | 
|  |  | 
|  | // check that fields are topologically sorted | 
|  | // (no length field depends on a length defined afterwards) | 
|  | for (const auto& field : fields_) { | 
|  | const auto* lengthField = lengthFieldFor(field); | 
|  | CAFFE_ENFORCE( | 
|  | (lengthField == nullptr) || (lengthField->id < field.id), | 
|  | "Error: Field ", | 
|  | field.id, | 
|  | " (", | 
|  | field.name, | 
|  | ") ", | 
|  | "depends on a field defined afterwards: ", | 
|  | lengthField->id, | 
|  | " (", | 
|  | lengthField->name, | 
|  | ")."); | 
|  | } | 
|  | } | 
|  |  | 
|  | void TreeIterator::advance( | 
|  | const std::vector<const TLength*>& lengths, | 
|  | std::vector<TOffset>& offsets, | 
|  | std::vector<TOffset>& sizes, | 
|  | std::vector<TOffset>& limits, | 
|  | TOffset num) { | 
|  | std::vector<TOffset> newOffsets; | 
|  | CAFFE_ENFORCE_EQ(lengths.size(), numLengthFields()); | 
|  | CAFFE_ENFORCE_EQ(offsets.size(), numOffsetFields()); | 
|  | sizes.resize(offsets.size()); | 
|  | newOffsets.resize(offsets.size()); | 
|  | // first index, top level | 
|  | { | 
|  | auto limit = limits[0]; | 
|  | auto offset = offsets[0]; | 
|  | CAFFE_ENFORCE(limit >= offset, "Tried to advance past end of cursor."); | 
|  | TOffset total = std::min(limit - offset, num); | 
|  | sizes[0] = total; | 
|  | newOffsets[0] = offset + total; | 
|  | } | 
|  | // child indices | 
|  | for (int j = 1; j < numOffsetFields(); ++j) { | 
|  | TOffset total = 0; | 
|  | int parentOffsetId = offsetFieldIdFor(lengthField(j - 1)); | 
|  | const TLength* length = lengths[j - 1] + offsets[parentOffsetId]; | 
|  | for (int k = 0; k < sizes[parentOffsetId]; ++k) { | 
|  | total += *(length++); | 
|  | } | 
|  | auto offset = offsets[j]; | 
|  | CAFFE_ENFORCE( | 
|  | offset + total <= limits[j], | 
|  | "Inconsistent field length: ", | 
|  | "tried to advance past the end of field ", | 
|  | j); | 
|  | sizes[j] = total; | 
|  | newOffsets[j] = offset + total; | 
|  | } | 
|  | offsets = newOffsets; | 
|  | } | 
|  |  | 
|  | TreeWalker::TreeWalker(const vector<const Blob*>& inputs, TreeCursor& cursor) | 
|  | : inputs_(inputs), cursor_(cursor), sizes_(cursor.it.numOffsetFields()) { | 
|  | CAFFE_ENFORCE_EQ(inputs.size(), cursor.it.fields().size()); | 
|  | if (cursor.offsets.empty()) { | 
|  | cursor.offsets.assign(cursor.it.numOffsetFields(), 0); | 
|  | } | 
|  |  | 
|  | // NOLINTNEXTLINE(clang-diagnostic-sign-compare) | 
|  | for (int fieldId = 0; fieldId < cursor_.it.fields().size(); ++fieldId) { | 
|  | fields_.emplace_back(*this, fieldId); | 
|  | } | 
|  |  | 
|  | gatherLengthData(); | 
|  |  | 
|  | gatherSizeLimits(); | 
|  |  | 
|  | // The invariant we hold is that we are always one step ahead | 
|  | advance(); | 
|  | } | 
|  |  | 
|  | void TreeWalker::advance() { | 
|  | prevOffsets_ = cursor_.offsets; | 
|  | cursor_.it.advance(lengths_, cursor_.offsets, sizes_, limits_, 1); | 
|  | } | 
|  |  | 
|  | std::vector<int64_t> TreeWalker::fieldDim(int fieldId) const { | 
|  | auto tensorDim = input(fieldId).sizes().vec(); | 
|  | tensorDim[0] = sizes_[lengthIdx(fieldId)]; | 
|  | return tensorDim; | 
|  | } | 
|  |  | 
|  | void* TreeWalker::fieldPtr(int fieldId) const { | 
|  | auto& in = input(fieldId); | 
|  | return (char*)in.raw_data() + | 
|  | offset(fieldId) * in.size_from_dim(1) * in.dtype().itemsize(); | 
|  | } | 
|  |  | 
|  | void TreeWalker::gatherLengthData() { | 
|  | static const TLength lenZero = 0; | 
|  | lengths_.resize(cursor_.it.numLengthFields()); | 
|  | // NOLINTNEXTLINE(clang-diagnostic-sign-compare) | 
|  | for (int i = 0; i < lengths_.size(); ++i) { | 
|  | auto& in = input(cursor_.it.lengthField(i).id); | 
|  | if (in.numel() > 0) { | 
|  | lengths_[i] = in.data<int>(); | 
|  | } else { | 
|  | lengths_[i] = &lenZero; | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | void TreeWalker::gatherSizeLimits() { | 
|  | limits_.assign(sizes_.size(), std::numeric_limits<TOffset>::max()); | 
|  | // NOLINTNEXTLINE(clang-diagnostic-sign-compare) | 
|  | for (auto fieldId = 0; fieldId < cursor_.it.fields().size(); ++fieldId) { | 
|  | auto lengthFieldIdx = lengthIdx(fieldId); | 
|  | limits_[lengthFieldIdx] = | 
|  | std::min(limits_[lengthFieldIdx], (TOffset)input(fieldId).sizes()[0]); | 
|  | } | 
|  | } | 
|  |  | 
|  | namespace { | 
|  |  | 
|  | class CreateTreeCursorOp : public Operator<CPUContext> { | 
|  | public: | 
|  | template <class... Args> | 
|  | explicit CreateTreeCursorOp(Args&&... args) | 
|  | : Operator(std::forward<Args>(args)...), | 
|  | fields_(OperatorBase::GetRepeatedArgument<std::string>("fields")) {} | 
|  |  | 
|  | bool RunOnDevice() override { | 
|  | *OperatorBase::Output<std::unique_ptr<TreeCursor>>(0) = | 
|  | // NOLINTNEXTLINE(modernize-make-unique) | 
|  | std::unique_ptr<TreeCursor>(new TreeCursor(TreeIterator(fields_))); | 
|  | return true; | 
|  | } | 
|  |  | 
|  | private: | 
|  | std::vector<std::string> fields_; | 
|  | }; | 
|  |  | 
|  | class GetCursorOffsetOp : public Operator<CPUContext> { | 
|  | public: | 
|  | template <class... Args> | 
|  | explicit GetCursorOffsetOp(Args&&... args) | 
|  | : Operator(std::forward<Args>(args)...) {} | 
|  |  | 
|  | bool RunOnDevice() override { | 
|  | auto& cursor = OperatorBase::Input<std::unique_ptr<TreeCursor>>(0); | 
|  | Output(0)->Resize(cursor->offsets.size()); | 
|  | auto* output = Output(0)->template mutable_data<int>(); | 
|  | for (size_t i = 0; i < cursor->offsets.size(); ++i) { | 
|  | output[i] = cursor->offsets[i]; | 
|  | } | 
|  | return true; | 
|  | } | 
|  | }; | 
|  |  | 
|  | class ResetCursorOp : public Operator<CPUContext> { | 
|  | public: | 
|  | template <class... Args> | 
|  | explicit ResetCursorOp(Args&&... args) | 
|  | : Operator(std::forward<Args>(args)...) {} | 
|  |  | 
|  | bool RunOnDevice() override { | 
|  | auto& cursor = OperatorBase::Input<std::unique_ptr<TreeCursor>>(0); | 
|  | std::lock_guard<std::mutex> lock(cursor->mutex_); | 
|  | cursor->offsets.clear(); | 
|  | return true; | 
|  | } | 
|  | }; | 
|  |  | 
|  | class CheckDatasetConsistencyOp : public Operator<CPUContext> { | 
|  | public: | 
|  | template <class... Args> | 
|  | explicit CheckDatasetConsistencyOp(Args&&... args) | 
|  | : Operator(std::forward<Args>(args)...), | 
|  | iterator_(OperatorBase::GetRepeatedArgument<std::string>("fields")) {} | 
|  |  | 
|  | bool RunOnDevice() override { | 
|  | std::vector<const TLength*> lengths; | 
|  | std::vector<TOffset> limits; | 
|  | std::vector<TOffset> sizes; | 
|  | std::vector<TOffset> offsets; | 
|  | CAFFE_ENFORCE( | 
|  | // NOLINTNEXTLINE(clang-diagnostic-sign-compare) | 
|  | InputSize() == iterator_.fields().size(), | 
|  | "Invalid number of fields. Expected ", | 
|  | iterator_.fields().size(), | 
|  | ", got ", | 
|  | InputSize()); | 
|  | sizes.resize(iterator_.numOffsetFields()); | 
|  | // gather length data | 
|  | lengths.resize(iterator_.numLengthFields()); | 
|  | for (size_t i = 0; i < lengths.size(); ++i) { | 
|  | lengths[i] = Input(iterator_.lengthField(i).id).data<TLength>(); | 
|  | } | 
|  | // gather size limits | 
|  | limits.assign(sizes.size(), std::numeric_limits<TOffset>::max()); | 
|  | for (size_t i = 0; i < iterator_.fields().size(); ++i) { | 
|  | int lengthIdx = iterator_.fields()[i].lengthFieldId + 1; | 
|  | CAFFE_ENFORCE_GT(Input(i).dim(), 0); | 
|  | TOffset size = (TOffset)Input(i).sizes()[0]; | 
|  | if (limits[lengthIdx] == std::numeric_limits<TOffset>::max()) { | 
|  | limits[lengthIdx] = size; | 
|  | } else { | 
|  | CAFFE_ENFORCE( | 
|  | limits[lengthIdx] == size, | 
|  | "Inconsistent sizes for fields belonging to same domain.", | 
|  | " Field: ", | 
|  | i, | 
|  | " (", | 
|  | iterator_.fields()[i].name, | 
|  | "); Length field index: ", | 
|  | lengthIdx, | 
|  | "); Previous size: ", | 
|  | limits[lengthIdx], | 
|  | "; New size: ", | 
|  | size); | 
|  | } | 
|  | } | 
|  | // advance to the end | 
|  | offsets.assign(sizes.size(), 0); | 
|  | iterator_.advance(lengths, offsets, sizes, limits, limits[0]); | 
|  | for (size_t i = 0; i < limits.size(); ++i) { | 
|  | CAFFE_ENFORCE(limits[i] == offsets[i]); | 
|  | } | 
|  | return true; | 
|  | } | 
|  |  | 
|  | private: | 
|  | TreeIterator iterator_; | 
|  | }; | 
|  |  | 
|  | class PackRecordsOp : public Operator<CPUContext> { | 
|  | public: | 
|  | template <class... Args> | 
|  | explicit PackRecordsOp(Args&&... args) | 
|  | : Operator(std::forward<Args>(args)...), | 
|  | fields_(OperatorBase::GetRepeatedArgument<std::string>("fields")), | 
|  | packToSingleSharedPtr_(OperatorBase::GetSingleArgument<int>( | 
|  | "pack_to_single_shared_ptr", | 
|  | 0)) {} | 
|  |  | 
|  | bool RunOnDevice() override { | 
|  | // There should be one input per field | 
|  | CAFFE_ENFORCE_EQ(InputSize(), fields_.size()); | 
|  | CAFFE_ENFORCE_EQ(OutputSize(), 1); | 
|  |  | 
|  | TreeCursor cursor((TreeIterator(fields_))); | 
|  |  | 
|  | TreeWalker walker(Inputs(), cursor); | 
|  |  | 
|  | if (packToSingleSharedPtr_) { | 
|  | Output(0)->Resize(1); | 
|  | auto* dst = Output(0)->template mutable_data<Shared2DTensorVectorPtr>(); | 
|  | dst[0] = std::make_shared<Tensor2DVector>(); | 
|  | dst[0]->resize(walker.size()); | 
|  |  | 
|  | for (int batchId = 0; batchId < walker.size(); ++batchId) { | 
|  | std::vector<TensorCPU>& tensors = dst[0]->at(batchId); | 
|  | tensors.reserve(walker.fields().size()); | 
|  | for (const auto& field : walker.fields()) { | 
|  | tensors.emplace_back(field.dim(), CPU); | 
|  | auto& tensor = tensors.back(); | 
|  | context_.CopyItemsSameDevice( | 
|  | field.meta(), | 
|  | tensor.numel(), | 
|  | field.ptr() /* src */, | 
|  | tensor.raw_mutable_data(field.meta()) /* dst */); | 
|  | } | 
|  | walker.advance(); | 
|  | } | 
|  | } else { | 
|  | Output(0)->Resize(walker.size()); | 
|  | auto* dst = Output(0)->template mutable_data<SharedTensorVectorPtr>(); | 
|  |  | 
|  | for (int batchId = 0; batchId < walker.size(); ++batchId) { | 
|  | dst[batchId] = std::make_shared<std::vector<TensorCPU>>(); | 
|  | dst[batchId]->reserve(walker.fields().size()); | 
|  | for (const auto& field : walker.fields()) { | 
|  | dst[batchId]->emplace_back(field.dim(), CPU); | 
|  | auto& tensor = dst[batchId]->back(); | 
|  | context_.CopyItemsSameDevice( | 
|  | field.meta(), | 
|  | tensor.numel(), | 
|  | field.ptr() /* src */, | 
|  | tensor.raw_mutable_data(field.meta()) /* dst */); | 
|  | } | 
|  | walker.advance(); | 
|  | } | 
|  | } | 
|  |  | 
|  | return true; | 
|  | } | 
|  |  | 
|  | private: | 
|  | std::vector<std::string> fields_; | 
|  | const bool packToSingleSharedPtr_; | 
|  | }; | 
|  |  | 
|  | class UnPackRecordsOp : public Operator<CPUContext> { | 
|  | public: | 
|  | template <class... Args> | 
|  | explicit UnPackRecordsOp(Args&&... args) | 
|  | : Operator(std::forward<Args>(args)...), | 
|  | fields_(OperatorBase::GetRepeatedArgument<std::string>("fields")) {} | 
|  |  | 
|  | bool RunOnDevice() override { | 
|  | size_t numRows = 0; | 
|  | Shared2DTensorVectorPtr data_ptr = nullptr; | 
|  | if (Input(0).IsType<SharedTensorVectorPtr>()) { | 
|  | numRows = Input(0).numel(); | 
|  | CAFFE_ENFORCE_GE(numRows, 0); | 
|  | data_ptr = std::make_shared<Tensor2DVector>(); | 
|  | data_ptr->reserve(numRows); | 
|  |  | 
|  | const auto* inputs = Input(0).template data<SharedTensorVectorPtr>(); | 
|  | // NOLINTNEXTLINE(clang-diagnostic-sign-compare) | 
|  | for (int i = 0; i < numRows; i++) { | 
|  | data_ptr->emplace_back(*inputs[i]); | 
|  | } | 
|  | } else if (Input(0).IsType<Shared2DTensorVectorPtr>()) { | 
|  | CAFFE_ENFORCE_EQ(Input(0).numel(), 1); | 
|  | const auto* inputs = Input(0).template data<Shared2DTensorVectorPtr>(); | 
|  | CAFFE_ENFORCE(inputs[0] != nullptr); | 
|  | data_ptr = inputs[0]; | 
|  | numRows = inputs[0]->size(); | 
|  | CAFFE_ENFORCE_GE(numRows, 0); | 
|  | } else { | 
|  | // input contains a single tensor | 
|  | CAFFE_ENFORCE_EQ(InputSize(), 1); | 
|  | CAFFE_ENFORCE_EQ(OutputSize(), 1); | 
|  | Output(0)->CopyFrom(Input(0)); | 
|  | return true; | 
|  | } | 
|  |  | 
|  | auto numTensors = OutputSize(); | 
|  |  | 
|  | // Precomputer the output sizes to avoid resizing | 
|  | std::vector<std::vector<int64_t>> outputDims(numTensors); | 
|  | std::vector<TypeMeta> metas(numTensors); | 
|  |  | 
|  | CAFFE_ENFORCE( | 
|  | numRows > 0 || InputSize() > 1, | 
|  | "Unpacking empty record without shape will leave output blobs in " | 
|  | "undefined state."); | 
|  |  | 
|  | if (InputSize() == 1) { | 
|  | getShapeAndMetaFromInput(data_ptr, outputDims, metas); | 
|  | } else { | 
|  | getShapeAndMetaFromPrototypeBlobs(outputDims, metas); | 
|  | } | 
|  |  | 
|  | // inputs contains a single shared_ptr of vector<vector<caffe2::TensorCPU>> | 
|  | auto& tensors = *data_ptr; | 
|  | // NOLINTNEXTLINE(clang-diagnostic-sign-compare) | 
|  | for (int i = 0; i < numRows; ++i) { | 
|  | // NOLINTNEXTLINE(clang-diagnostic-sign-compare) | 
|  | for (int j = 0; j < tensors[i].size(); ++j) { | 
|  | const auto& input = tensors[i][j]; | 
|  |  | 
|  | // Checks to ensure that dimensions/sizes match | 
|  | CAFFE_ENFORCE_EQ(outputDims[j].size(), input.dim()); | 
|  | CAFFE_ENFORCE(metas[j] == input.dtype()); | 
|  | // We look from first dimension, because we concat on the first. | 
|  | for (int k = 1; k < input.dim(); ++k) { | 
|  | CAFFE_ENFORCE_EQ(input.sizes()[k], outputDims[j][k]); | 
|  | } | 
|  |  | 
|  | outputDims[j][0] += input.size(0); | 
|  | } | 
|  | } | 
|  |  | 
|  | // Resize to the final output size | 
|  | std::vector<void*> destinations(numTensors); | 
|  | for (int i = 0; i < numTensors; ++i) { | 
|  | Output(i)->Resize(outputDims[i]); | 
|  | destinations[i] = Output(i)->raw_mutable_data(metas[i]); | 
|  | } | 
|  |  | 
|  | // NOLINTNEXTLINE(clang-diagnostic-sign-compare) | 
|  | for (int i = 0; i < numRows; ++i) { | 
|  | for (int j = 0; j < numTensors; ++j) { | 
|  | const auto& input = tensors[i][j]; | 
|  |  | 
|  | context_.CopyItemsSameDevice( | 
|  | metas[j], | 
|  | input.numel(), | 
|  | input.raw_data() /* src */, | 
|  | destinations[j] /* dst */ | 
|  | ); | 
|  |  | 
|  | destinations[j] = | 
|  | (char*)destinations[j] + input.numel() * input.itemsize(); | 
|  | } | 
|  | } | 
|  |  | 
|  | return true; | 
|  | } | 
|  |  | 
|  | private: | 
|  | void getShapeAndMetaFromInput( | 
|  | const Shared2DTensorVectorPtr& inputs, | 
|  | std::vector<std::vector<int64_t>>& outputDims, | 
|  | std::vector<TypeMeta>& metas) { | 
|  | const auto& inputZero = inputs->at(0); | 
|  |  | 
|  | const auto numTensors = inputZero.size(); | 
|  |  | 
|  | CAFFE_ENFORCE_EQ(numTensors, fields_.size()); | 
|  | CAFFE_ENFORCE_EQ(numTensors, OutputSize()); | 
|  |  | 
|  | // NOLINTNEXTLINE(clang-diagnostic-sign-compare) | 
|  | for (int i = 0; i < numTensors; ++i) { | 
|  | outputDims[i] = inputZero[i].sizes().vec(); | 
|  | outputDims[i][0] = 0; | 
|  | metas[i] = inputZero[i].dtype(); | 
|  | } | 
|  | } | 
|  |  | 
|  | void getShapeAndMetaFromPrototypeBlobs( | 
|  | std::vector<std::vector<int64_t>>& outputDims, | 
|  | std::vector<TypeMeta>& metas) { | 
|  | const auto numTensors = fields_.size(); | 
|  | CAFFE_ENFORCE_EQ(numTensors, InputSize() - 1); | 
|  | CAFFE_ENFORCE_EQ(numTensors, OutputSize()); | 
|  | // NOLINTNEXTLINE(clang-diagnostic-sign-compare) | 
|  | for (int i = 0; i < numTensors; ++i) { | 
|  | const auto& input = Input(i + 1); | 
|  | outputDims[i] = input.sizes().vec(); | 
|  | outputDims[i][0] = 0; | 
|  | metas[i] = input.dtype(); | 
|  | } | 
|  | } | 
|  |  | 
|  | std::vector<std::string> fields_; | 
|  | }; | 
|  |  | 
|  | class ReadNextBatchOp : public Operator<CPUContext> { | 
|  | public: | 
|  | template <class... Args> | 
|  | explicit ReadNextBatchOp(Args&&... args) | 
|  | : Operator(std::forward<Args>(args)...), | 
|  | batchSize_(OperatorBase::GetSingleArgument<int>("batch_size", 1)), | 
|  | enforceBatchSize_(OperatorBase::GetSingleArgument<bool>( | 
|  | "enforce_batch_size", | 
|  | false)) {} | 
|  |  | 
|  | bool RunOnDevice() override { | 
|  | auto& cursor = OperatorBase::Input<std::unique_ptr<TreeCursor>>(0); | 
|  | // NOLINTNEXTLINE(clang-diagnostic-sign-compare) | 
|  | CAFFE_ENFORCE(InputSize() == cursor->it.fields().size() + 1); | 
|  | std::vector<const TLength*> lengths; | 
|  | std::vector<TOffset> limits; | 
|  | std::vector<TOffset> sizes; | 
|  | std::vector<TOffset> offsets; | 
|  | TLength lenZero = 0; | 
|  | sizes.resize(cursor->it.numOffsetFields()); | 
|  | // gather length data | 
|  | lengths.resize(cursor->it.numLengthFields()); | 
|  | // NOLINTNEXTLINE(clang-diagnostic-sign-compare) | 
|  | for (int i = 0; i < lengths.size(); ++i) { | 
|  | auto& a = Input(cursor->it.lengthField(i).id + 1); | 
|  | if (a.numel() > 0) { | 
|  | lengths[i] = a.data<int>(); | 
|  | } else { | 
|  | lengths[i] = &lenZero; | 
|  | } | 
|  | } | 
|  | // gather size limits | 
|  | limits.assign(sizes.size(), std::numeric_limits<TOffset>::max()); | 
|  | // NOLINTNEXTLINE(clang-diagnostic-sign-compare) | 
|  | for (int i = 0; i < cursor->it.fields().size(); ++i) { | 
|  | int lengthFieldIdx = cursor->it.fields()[i].lengthFieldId + 1; | 
|  | limits[lengthFieldIdx] = | 
|  | std::min(limits[lengthFieldIdx], (TOffset)Input(i + 1).sizes()[0]); | 
|  | } | 
|  | // advance cursor | 
|  | { | 
|  | std::lock_guard<std::mutex> lock(cursor->mutex_); | 
|  | if (cursor->offsets.empty()) { | 
|  | cursor->offsets.assign(sizes.size(), 0); | 
|  | } | 
|  | offsets = cursor->offsets; | 
|  | cursor->it.advance(lengths, cursor->offsets, sizes, limits, batchSize_); | 
|  | if (enforceBatchSize_ && sizes[0] < batchSize_) { | 
|  | // if we enforce batch_size but don't have enough rows left to | 
|  | // complete a full batch, return empty for all columns. | 
|  | // This signals end of dataset to the caller. | 
|  | sizes.assign(sizes.size(), 0); | 
|  | } | 
|  | } | 
|  | // gather data | 
|  | std::vector<int64_t> outDim; | 
|  | // NOLINTNEXTLINE(clang-diagnostic-sign-compare) | 
|  | for (int i = 0; i < cursor->it.fields().size(); ++i) { | 
|  | auto lengthIdx = cursor->it.fields()[i].lengthFieldId + 1; | 
|  | auto size = sizes[lengthIdx]; | 
|  | auto offset = offsets[lengthIdx]; | 
|  | auto& in = Input(i + 1); | 
|  | auto innerSize = in.size_from_dim(1); | 
|  | outDim = in.sizes().vec(); | 
|  | outDim[0] = size; | 
|  | auto* out = Output(i); | 
|  | out->Resize(outDim); | 
|  | void* src = | 
|  | (char*)in.raw_data() + offset * innerSize * in.dtype().itemsize(); | 
|  | void* dst = out->raw_mutable_data(in.dtype()); // create the tensor | 
|  | if (out->numel() == 0) { | 
|  | continue; | 
|  | } | 
|  | context_.CopyItemsSameDevice(in.dtype(), out->numel(), src, dst); | 
|  | } | 
|  | return true; | 
|  | } | 
|  | int batchSize_; | 
|  | bool enforceBatchSize_; | 
|  | }; | 
|  |  | 
|  | class ComputeOffsetOp : public Operator<CPUContext> { | 
|  | public: | 
|  | template <class... Args> | 
|  | explicit ComputeOffsetOp(Args&&... args) | 
|  | : Operator(std::forward<Args>(args)...) {} | 
|  |  | 
|  | bool RunOnDevice() override { | 
|  | auto& cursor = OperatorBase::Input<std::unique_ptr<TreeCursor>>(0); | 
|  | // NOLINTNEXTLINE(clang-diagnostic-sign-compare) | 
|  | CAFFE_ENFORCE(InputSize() == cursor->it.fields().size() + 1); | 
|  | auto* out = Output(0); | 
|  | std::vector<const TLength*> lengths; | 
|  | std::vector<TOffset> limits; | 
|  | std::vector<TOffset> sizes; | 
|  | std::vector<TOffset> offsets; | 
|  | TLength lenZero = 0; | 
|  | sizes.resize(cursor->it.numOffsetFields()); | 
|  | // gather length data | 
|  | lengths.resize(cursor->it.numLengthFields()); | 
|  | // NOLINTNEXTLINE(clang-diagnostic-sign-compare) | 
|  | for (int i = 0; i < lengths.size(); ++i) { | 
|  | auto& a = Input(cursor->it.lengthField(i).id + 1); | 
|  | if (a.numel() > 0) { | 
|  | lengths[i] = a.data<int>(); | 
|  | } else { | 
|  | lengths[i] = &lenZero; | 
|  | } | 
|  | } | 
|  | // gather size limits | 
|  | limits.assign(sizes.size(), std::numeric_limits<TOffset>::max()); | 
|  | // NOLINTNEXTLINE(clang-diagnostic-sign-compare) | 
|  | for (int i = 0; i < cursor->it.fields().size(); ++i) { | 
|  | int lengthFieldIdx = cursor->it.fields()[i].lengthFieldId + 1; | 
|  | limits[lengthFieldIdx] = | 
|  | std::min(limits[lengthFieldIdx], (TOffset)Input(i + 1).sizes()[0]); | 
|  | } | 
|  | out->Resize(limits.at(0) + 1, sizes.size()); | 
|  | auto* out_data = out->template mutable_data<int64_t>(); | 
|  | for (int k = 0; k <= limits.at(0); k++) { | 
|  | // advance cursor | 
|  | if (cursor->offsets.empty()) { | 
|  | cursor->offsets.assign(sizes.size(), 0); | 
|  | } | 
|  | // write output | 
|  | std::copy(cursor->offsets.begin(), cursor->offsets.end(), out_data); | 
|  | out_data += sizes.size(); | 
|  | cursor->it.advance(lengths, cursor->offsets, sizes, limits, 1); | 
|  | } | 
|  | cursor->offsets.assign(sizes.size(), 0); // reSet after getting meta info | 
|  | return true; | 
|  | } | 
|  | }; | 
|  |  | 
|  | class SortAndShuffleOp : public Operator<CPUContext> { | 
|  | public: | 
|  | template <class... Args> | 
|  | explicit SortAndShuffleOp(Args&&... args) | 
|  | : Operator(std::forward<Args>(args)...), | 
|  | sort_by_field_idx_( | 
|  | OperatorBase::GetSingleArgument<int>("sort_by_field_idx", 1)), | 
|  | batch_size_(OperatorBase::GetSingleArgument<int>("batch_size", 1)), | 
|  | shuffle_size_(OperatorBase::GetSingleArgument<int>("shuffle_size", 1)) { | 
|  | } | 
|  |  | 
|  | bool RunOnDevice() override { | 
|  | auto& cursor = OperatorBase::Input<std::unique_ptr<TreeCursor>>(0); | 
|  | // NOLINTNEXTLINE(clang-diagnostic-sign-compare) | 
|  | CAFFE_ENFORCE(InputSize() == cursor->it.fields().size() + 1); | 
|  | CAFFE_ENFORCE(-1 <= sort_by_field_idx_); | 
|  | CAFFE_ENFORCE(cursor->it.fields().size() - sort_by_field_idx_ > 0); | 
|  | // NOLINTNEXTLINE(cppcoreguidelines-init-variables) | 
|  | int size; | 
|  | if (sort_by_field_idx_ != -1) { | 
|  | size = Input(sort_by_field_idx_ + 1).sizes()[0]; | 
|  | } else { | 
|  | size = Input(1).sizes()[0]; | 
|  | } | 
|  |  | 
|  | CAFFE_ENFORCE( | 
|  | batch_size_ > 0 && shuffle_size_ > 0 && | 
|  | 0 < batch_size_ * shuffle_size_); | 
|  | // adjust shuffle_size_ if it is too large | 
|  | if (batch_size_ * shuffle_size_ > size) { | 
|  | shuffle_size_ = size / batch_size_; | 
|  | } | 
|  |  | 
|  | int num_batch = size / batch_size_; | 
|  | auto* out = Output(0); | 
|  | out->Resize(size); | 
|  | auto* out_data = out->template mutable_data<int64_t>(); | 
|  |  | 
|  | vector<int> shuffle_idx(size); | 
|  | iota(shuffle_idx.begin(), shuffle_idx.end(), 0); | 
|  |  | 
|  | if (sort_by_field_idx_ != -1) { | 
|  | auto& sortblob = Input(sort_by_field_idx_ + 1); | 
|  | auto* sortdata = sortblob.data<int>(); | 
|  | // must sort by a field at the root level | 
|  | CAFFE_ENFORCE( | 
|  | cursor->it.fields()[sort_by_field_idx_].lengthFieldId == -1); | 
|  | sort(shuffle_idx.begin(), shuffle_idx.end(), [&sortdata](int i1, int i2) { | 
|  | return sortdata[i1] < sortdata[i2]; | 
|  | }); | 
|  | } | 
|  |  | 
|  | if (batch_size_ * shuffle_size_ > 1) { | 
|  | int offset = 0; | 
|  | while (offset + batch_size_ * shuffle_size_ < size) { | 
|  | std::shuffle( | 
|  | shuffle_idx.begin() + offset, | 
|  | shuffle_idx.begin() + offset + batch_size_ * shuffle_size_, | 
|  | std::default_random_engine()); | 
|  | offset += batch_size_ * shuffle_size_; | 
|  | } | 
|  | } | 
|  |  | 
|  | vector<int> batch_idx(num_batch); | 
|  | iota(batch_idx.begin(), batch_idx.end(), 0); | 
|  | std::shuffle( | 
|  | batch_idx.begin(), batch_idx.end(), std::default_random_engine()); | 
|  |  | 
|  | for (int i = 0; i < num_batch; i++) { | 
|  | std::copy( | 
|  | shuffle_idx.begin() + batch_idx[i] * batch_size_, | 
|  | shuffle_idx.begin() + (batch_idx[i] + 1) * batch_size_, | 
|  | out_data); | 
|  | out_data += batch_size_; | 
|  | } | 
|  | std::copy( | 
|  | shuffle_idx.begin() + num_batch * batch_size_, | 
|  | shuffle_idx.end(), | 
|  | out_data); | 
|  |  | 
|  | return true; | 
|  | } | 
|  |  | 
|  | int sort_by_field_idx_; | 
|  | int batch_size_; | 
|  | int shuffle_size_; | 
|  | }; | 
|  |  | 
|  | class ReadRandomBatchOp : public Operator<CPUContext> { | 
|  | public: | 
|  | template <class... Args> | 
|  | explicit ReadRandomBatchOp(Args&&... args) | 
|  | : Operator(std::forward<Args>(args)...), | 
|  | batchSize_(OperatorBase::GetSingleArgument<int>("batch_size", 1)), | 
|  | enforceBatchSize_( | 
|  | OperatorBase::GetSingleArgument<bool>("enforce_batch_size", false)), | 
|  | loopOver_(OperatorBase::GetSingleArgument<bool>("loop_over", false)) {} | 
|  | bool RunOnDevice() override { | 
|  | auto& cursor = OperatorBase::Input<std::unique_ptr<TreeCursor>>(0); | 
|  | auto& idxblob = Input(1); | 
|  | auto& offsetsmat = Input(2); | 
|  | // NOLINTNEXTLINE(clang-diagnostic-sign-compare) | 
|  | CAFFE_ENFORCE(InputSize() == cursor->it.fields().size() + 3); | 
|  | auto idxvec = idxblob.template data<int64_t>(); | 
|  | auto offsetdim = offsetsmat.sizes(); | 
|  | // gather data | 
|  | std::vector<int64_t> outDim; | 
|  | // NOLINTNEXTLINE(cppcoreguidelines-init-variables) | 
|  | int64_t idx; | 
|  | { | 
|  | std::lock_guard<std::mutex> lock(cursor->mutex_); | 
|  | cursor->offsets.resize(1); | 
|  | idx = cursor->offsets.at(0); | 
|  | // if we want to enforce batch size but we dont have a complete | 
|  | // batch, skip the last rows. | 
|  | if (enforceBatchSize_ && idx + batchSize_ > idxblob.numel()) { | 
|  | idx = idxblob.numel(); | 
|  | } | 
|  | if (loopOver_ && idx >= idxblob.numel()) { | 
|  | cursor->offsets.at(0) = 0; | 
|  | idx = 0; | 
|  | } | 
|  | cursor->offsets.at(0) += batchSize_; | 
|  | } | 
|  |  | 
|  | // NOLINTNEXTLINE(clang-diagnostic-sign-compare) | 
|  | for (int i = 0; i < cursor->it.fields().size(); ++i) { | 
|  | auto lengthIdx = cursor->it.fields()[i].lengthFieldId + 1; | 
|  | auto& in = Input(i + 3); | 
|  | outDim = in.sizes().vec(); | 
|  | outDim.at(0) = 0; | 
|  | auto idxbegin = idx; | 
|  | for (int j = 0; j < batchSize_; ++j) { | 
|  | if (idx >= idxblob.numel()) { | 
|  | break; | 
|  | } | 
|  | CAFFE_ENFORCE( | 
|  | (idxvec[idx] + 1) * offsetdim[1] + lengthIdx < offsetsmat.numel(), | 
|  | "Out of bound when trying to get elem from offsetsmat"); | 
|  | auto offsetptr = offsetsmat.template data<TOffset>() + | 
|  | idxvec[idx] * offsetdim[1] + lengthIdx; | 
|  | auto offset = *offsetptr; | 
|  | auto size = *(offsetptr + offsetdim[1]) - offset; | 
|  | outDim.at(0) += size; // accumulate over the batch | 
|  | idx++; | 
|  | } | 
|  | idx = idxbegin; // reSet | 
|  | auto* out = Output(i); | 
|  | out->Resize(outDim); | 
|  | if (out->numel() == 0) { | 
|  | continue; | 
|  | } | 
|  | auto dst = static_cast<char*>(out->raw_mutable_data(in.dtype())); | 
|  | // NOLINTNEXTLINE(cppcoreguidelines-narrowing-conversions,bugprone-narrowing-conversions) | 
|  | int block_size = in.numel() / in.size(0); | 
|  | auto block_bytesize = in.size_from_dim(1) * in.dtype().itemsize(); | 
|  | CAFFE_ENFORCE( | 
|  | block_bytesize == in.nbytes() / in.size(0), | 
|  | "block_bytesize should be consistent with data dim"); | 
|  | auto src_base = static_cast<const char*>(in.raw_data()); | 
|  | int start = 0; | 
|  | for (int j = 0; j < batchSize_; ++j) { | 
|  | if (idx >= idxblob.numel()) { | 
|  | break; | 
|  | } | 
|  | auto offsetptr = offsetsmat.template data<TOffset>() + | 
|  | idxvec[idx] * offsetdim[1] + lengthIdx; | 
|  | auto offset = *offsetptr; | 
|  | auto size = *(offsetptr + offsetdim[1]) - offset; | 
|  | // copy data | 
|  | auto src = src_base + offset * block_bytesize; | 
|  | context_.CopyItemsSameDevice( | 
|  | in.dtype(), size * block_size, src, dst + start * block_bytesize); | 
|  | start += size; | 
|  | idx++; | 
|  | } | 
|  | idx = idxbegin; // reSet | 
|  | } | 
|  | return true; | 
|  | } | 
|  | int batchSize_; | 
|  | bool enforceBatchSize_; | 
|  | bool loopOver_; | 
|  | }; | 
|  |  | 
|  | template <class Context> | 
|  | class AppendOp final : public Operator<Context> { | 
|  | public: | 
|  | USE_OPERATOR_CONTEXT_FUNCTIONS; | 
|  | template <class... Args> | 
|  | explicit AppendOp(Args&&... args) | 
|  | : Operator<Context>(std::forward<Args>(args)...) {} | 
|  |  | 
|  | bool RunOnDevice() override { | 
|  | auto& a = Input(0); | 
|  | auto& b = Input(1); | 
|  | auto* c = Output(0); | 
|  | CAFFE_ENFORCE(b.dim() >= 1); | 
|  | if (a.numel() == 0 && a.size(0) == 0) { | 
|  | c->CopyFrom(b); | 
|  | return true; | 
|  | } | 
|  | CAFFE_ENFORCE(&a == c, "First argument must be in-place."); | 
|  | CAFFE_ENFORCE(c->dim() == b.dim()); | 
|  | CAFFE_ENFORCE(b.dim() == c->dim()); | 
|  | CAFFE_ENFORCE(a.dtype() == b.dtype()); | 
|  | for (int i = 1; i < a.dim(); ++i) { | 
|  | CAFFE_ENFORCE(a.sizes()[i] == b.sizes()[i]); | 
|  | } | 
|  | auto oldSize = c->numel(); | 
|  | c->Extend(b.sizes()[0], kDatasetGrowthPct); | 
|  | auto* dst = (char*)c->raw_mutable_data() + oldSize * b.dtype().itemsize(); | 
|  | context_.CopyItemsSameDevice(b.dtype(), b.numel(), b.raw_data(), dst); | 
|  | return true; | 
|  | } | 
|  | }; | 
|  |  | 
|  | template <class Context> | 
|  | class AtomicAppendOp final : public Operator<Context> { | 
|  | public: | 
|  | USE_OPERATOR_CONTEXT_FUNCTIONS; | 
|  | template <class... Args> | 
|  | explicit AtomicAppendOp(Args&&... args) | 
|  | : Operator<Context>(std::forward<Args>(args)...) {} | 
|  |  | 
|  | bool RunOnDevice() override { | 
|  | auto& mutex = OperatorBase::Input<std::unique_ptr<std::mutex>>(0); | 
|  | const auto numFields = (InputSize() - 1) / 2; | 
|  | CAFFE_ENFORCE(OutputSize() == numFields); | 
|  |  | 
|  | std::lock_guard<std::mutex> guard(*mutex); | 
|  |  | 
|  | // 1: checks | 
|  | for (int i = 0; i < numFields; ++i) { | 
|  | auto& a = Input(1 + i); | 
|  | auto& b = Input(1 + i + numFields); | 
|  | auto* c = Output(i); | 
|  | CAFFE_ENFORCE(b.dim() >= 1); | 
|  | if (a.numel() == 0) { | 
|  | continue; | 
|  | } | 
|  | CAFFE_ENFORCE( | 
|  | (void*)&a == (void*)c, "Appended-to arguments must be in-place."); | 
|  | CAFFE_ENFORCE(c->dim() == b.dim()); | 
|  | CAFFE_ENFORCE(b.dim() == c->dim()); | 
|  | CAFFE_ENFORCE(a.dtype() == b.dtype()); | 
|  | for (int j = 1; j < a.dim(); ++j) { | 
|  | CAFFE_ENFORCE(a.sizes()[j] == b.sizes()[j]); | 
|  | } | 
|  | } | 
|  |  | 
|  | // 2: copies | 
|  | for (int i = 0; i < numFields; ++i) { | 
|  | auto& a = Input(1 + i); | 
|  | auto& b = Input(1 + i + numFields); | 
|  | auto* c = Output(i); | 
|  | if (a.numel() == 0 && a.size(0) == 0) { | 
|  | c->CopyFrom(b); | 
|  | continue; | 
|  | } | 
|  | auto oldSize = c->numel(); | 
|  | c->Extend(b.sizes()[0], kDatasetGrowthPct); | 
|  | auto* dst = (char*)c->raw_mutable_data() + oldSize * b.dtype().itemsize(); | 
|  | context_.CopyItemsSameDevice(b.dtype(), b.numel(), b.raw_data(), dst); | 
|  | } | 
|  | return true; | 
|  | } | 
|  | }; | 
|  |  | 
|  | template <class Context> | 
|  | class CreateTensorVectorOp final : public Operator<Context> { | 
|  | public: | 
|  | USE_OPERATOR_CONTEXT_FUNCTIONS; | 
|  | using Operator<Context>::Operator; | 
|  |  | 
|  | bool RunOnDevice() override { | 
|  | auto ptr = make_unique<std::vector<Tensor>>(); | 
|  | *OperatorBase::Output<TensorVectorPtr>(TENSOR_VECTOR) = std::move(ptr); | 
|  | return true; | 
|  | } | 
|  |  | 
|  | private: | 
|  | OUTPUT_TAGS(TENSOR_VECTOR); | 
|  | }; | 
|  |  | 
|  | template <class Context> | 
|  | class TensorVectorSizeOp final : public Operator<Context> { | 
|  | public: | 
|  | USE_OPERATOR_CONTEXT_FUNCTIONS; | 
|  | USE_SIMPLE_CTOR_DTOR(TensorVectorSizeOp); | 
|  |  | 
|  | bool RunOnDevice() override { | 
|  | auto& vector_ptr = OperatorBase::Input<TensorVectorPtr>(TENSOR_VECTOR); | 
|  | auto* size = Output(SIZE); | 
|  | size->Resize(); | 
|  | // 32-bit should be enough here | 
|  | *size->template mutable_data<int32_t>() = vector_ptr->size(); | 
|  | return true; | 
|  | } | 
|  |  | 
|  | private: | 
|  | INPUT_TAGS(TENSOR_VECTOR); | 
|  | OUTPUT_TAGS(SIZE); | 
|  | }; | 
|  |  | 
|  | template <class Context> | 
|  | class ConcatTensorVectorOp final : public Operator<Context> { | 
|  | public: | 
|  | USE_OPERATOR_CONTEXT_FUNCTIONS; | 
|  | using Operator<Context>::Operator; | 
|  |  | 
|  | bool RunOnDevice() override { | 
|  | const TensorVectorPtr& tensorVector = | 
|  | OperatorBase::Input<TensorVectorPtr>(TENSOR_VECTOR); | 
|  |  | 
|  | auto* tensor = Output(TENSOR); | 
|  | CAFFE_ENFORCE(!tensorVector->empty()); | 
|  |  | 
|  | vector<int64_t> outputDims(tensorVector->at(0).sizes().vec()); | 
|  | CAFFE_ENFORCE(outputDims.size() > 0); | 
|  | // NOLINTNEXTLINE(clang-diagnostic-sign-compare) | 
|  | for (int i = 1; i < tensorVector->size(); i++) { | 
|  | // the tensor shapes are the same except for the first dimension | 
|  | for (int j = 1; j < tensorVector->at(i).dim(); j++) { | 
|  | CAFFE_ENFORCE(outputDims[j] == tensorVector->at(i).sizes()[j]); | 
|  | } | 
|  | CAFFE_ENFORCE(tensorVector->at(0).dtype() == tensorVector->at(i).dtype()); | 
|  | outputDims[0] += tensorVector->at(i).sizes()[0]; | 
|  | } | 
|  |  | 
|  | tensor->Resize(outputDims); | 
|  | int64_t offset = 0; | 
|  | auto* dst = (char*)tensor->raw_mutable_data(tensorVector->at(0).dtype()); | 
|  |  | 
|  | for (const auto& t : *tensorVector) { | 
|  | context_.CopyItemsSameDevice( | 
|  | t.dtype(), t.numel(), t.raw_data(), dst + offset); | 
|  | offset += t.nbytes(); | 
|  | } | 
|  |  | 
|  | return true; | 
|  | } | 
|  |  | 
|  | private: | 
|  | INPUT_TAGS(TENSOR_VECTOR); | 
|  | OUTPUT_TAGS(TENSOR); | 
|  | }; | 
|  |  | 
|  | template <class Context> | 
|  | // NOLINTNEXTLINE(cppcoreguidelines-pro-type-member-init) | 
|  | class CollectTensorOp final : public Operator<Context> { | 
|  | public: | 
|  | USE_OPERATOR_CONTEXT_FUNCTIONS; | 
|  | template <class... Args> | 
|  | explicit CollectTensorOp(Args&&... args) | 
|  | : Operator<Context>(std::forward<Args>(args)...), | 
|  | numToCollect_( | 
|  | OperatorBase::GetSingleArgument<int>("num_to_collect", -1)), | 
|  | numVisited_(0) { | 
|  | CAFFE_ENFORCE(numToCollect_ > 0); | 
|  | } | 
|  |  | 
|  | bool RunOnDevice() override { | 
|  | int pos = -1; | 
|  | if (numVisited_ < numToCollect_) { | 
|  | // append | 
|  | pos = numVisited_; | 
|  | } else { | 
|  | // uniform between [0, numVisited_] | 
|  | at::uniform_int_from_to_distribution<int> uniformDist(numVisited_+1, 0); | 
|  | pos = uniformDist(context_.RandGenerator()); | 
|  | if (pos >= numToCollect_) { | 
|  | // discard | 
|  | pos = -1; | 
|  | } | 
|  | } | 
|  |  | 
|  | for (int i = 0; i < OutputSize(); ++i) { | 
|  | // TENSOR_VECTOR_IN is enforced inplace with TENSOR_VECTOR_OUT | 
|  | TensorVectorPtr& tensorVector = *OperatorBase::Output<TensorVectorPtr>(i); | 
|  |  | 
|  | if (numVisited_ >= numToCollect_) { | 
|  | CAFFE_ENFORCE( | 
|  | // NOLINTNEXTLINE(clang-diagnostic-sign-compare) | 
|  | tensorVector->size() == numToCollect_, | 
|  | "TensorVector size = ", | 
|  | tensorVector->size(), | 
|  | " is different from numToCollect = ", | 
|  | numToCollect_); | 
|  | } | 
|  |  | 
|  | const auto& tensor = Input(OutputSize() + i); | 
|  |  | 
|  | if (pos < 0) { | 
|  | // discard | 
|  | CAFFE_ENFORCE(numVisited_ >= numToCollect_); | 
|  | // NOLINTNEXTLINE(clang-diagnostic-sign-compare) | 
|  | } else if (pos >= tensorVector->size()) { | 
|  | // append | 
|  | tensorVector->emplace_back(); | 
|  | ReinitializeAndCopyFrom( | 
|  | &tensorVector->back(), | 
|  | Context::GetDeviceType(), | 
|  | tensor); // sync copy | 
|  | } else { | 
|  | // replace | 
|  | tensorVector->at(pos).CopyFrom(tensor); // sync copy | 
|  | } | 
|  | } | 
|  |  | 
|  | numVisited_++; | 
|  | return true; | 
|  | } | 
|  |  | 
|  | private: | 
|  | // number of tensors to collect | 
|  | int numToCollect_; | 
|  | // number of tensors visited | 
|  | int numVisited_; | 
|  | }; | 
|  |  | 
|  | class TrimDatasetOp : public Operator<CPUContext> { | 
|  | public: | 
|  | template <class... Args> | 
|  | explicit TrimDatasetOp(Args&&... args) | 
|  | : Operator(std::forward<Args>(args)...), | 
|  | iterator_(OperatorBase::GetRepeatedArgument<std::string>("fields")), | 
|  | multiple_of_(OperatorBase::GetSingleArgument<int>("multiple_of", 1)) { | 
|  | CAFFE_ENFORCE_GE(multiple_of_, 1); | 
|  | } | 
|  |  | 
|  | bool RunOnDevice() override { | 
|  | TreeCursor cursor(iterator_); | 
|  | TreeWalker walker(Inputs(), cursor); | 
|  |  | 
|  | // NOLINTNEXTLINE(cppcoreguidelines-narrowing-conversions,bugprone-narrowing-conversions) | 
|  | int trimmedSize = (walker.size() / multiple_of_) * multiple_of_; | 
|  | if (trimmedSize == walker.size()) { | 
|  | // we already satisfy the condition | 
|  | return true; | 
|  | } | 
|  | // advance desired number of records | 
|  | for (int i = 0; i < trimmedSize; ++i) { | 
|  | walker.advance(); | 
|  | } | 
|  | // trim each column to the offset | 
|  | // NOLINTNEXTLINE(clang-diagnostic-sign-compare) | 
|  | for (int col = 0; col < walker.fields().size(); ++col) { | 
|  | auto newOuterSize = walker.fields().at(col).offset(); | 
|  | Output(col)->ShrinkTo(newOuterSize); | 
|  | } | 
|  | return true; | 
|  | } | 
|  |  | 
|  | private: | 
|  | TreeIterator iterator_; | 
|  | int multiple_of_; | 
|  | }; | 
|  |  | 
|  | REGISTER_CPU_OPERATOR(CreateTreeCursor, CreateTreeCursorOp); | 
|  | REGISTER_CPU_OPERATOR(ResetCursor, ResetCursorOp); | 
|  | REGISTER_CPU_OPERATOR(ReadNextBatch, ReadNextBatchOp); | 
|  | REGISTER_CPU_OPERATOR(GetCursorOffset, GetCursorOffsetOp); | 
|  | REGISTER_CPU_OPERATOR(ComputeOffset, ComputeOffsetOp); | 
|  | REGISTER_CPU_OPERATOR(SortAndShuffle, SortAndShuffleOp); | 
|  | REGISTER_CPU_OPERATOR(ReadRandomBatch, ReadRandomBatchOp); | 
|  | REGISTER_CPU_OPERATOR(CheckDatasetConsistency, CheckDatasetConsistencyOp); | 
|  | REGISTER_CPU_OPERATOR(Append, AppendOp<CPUContext>); | 
|  | REGISTER_CPU_OPERATOR(AtomicAppend, AtomicAppendOp<CPUContext>); | 
|  | REGISTER_CPU_OPERATOR(CreateTensorVector, CreateTensorVectorOp<CPUContext>); | 
|  | REGISTER_CPU_OPERATOR(TensorVectorSize, TensorVectorSizeOp<CPUContext>); | 
|  | REGISTER_CPU_OPERATOR(ConcatTensorVector, ConcatTensorVectorOp<CPUContext>); | 
|  | REGISTER_CPU_OPERATOR(CollectTensor, CollectTensorOp<CPUContext>); | 
|  | REGISTER_CPU_OPERATOR(PackRecords, PackRecordsOp); | 
|  | REGISTER_CPU_OPERATOR(UnPackRecords, UnPackRecordsOp); | 
|  | REGISTER_CPU_OPERATOR(TrimDataset, TrimDatasetOp); | 
|  |  | 
|  | OPERATOR_SCHEMA(CreateTreeCursor) | 
|  | .NumInputs(0) | 
|  | .NumOutputs(1) | 
|  | .SetDoc(R"DOC( | 
|  | Creates a cursor to iterate through a list of tensors, where some of those | 
|  | tensors contain the lengths in a nested schema. The schema is determined by | 
|  | the `fields` arguments. | 
|  |  | 
|  | For example, to represent the following schema: | 
|  |  | 
|  | Struct( | 
|  | a=Int(), | 
|  | b=List(List(Int)), | 
|  | c=List( | 
|  | Struct( | 
|  | c1=String, | 
|  | c2=List(Int), | 
|  | ), | 
|  | ), | 
|  | ) | 
|  |  | 
|  | the field list will be: | 
|  | [ | 
|  | "a", | 
|  | "b:lengths", | 
|  | "b:values:lengths", | 
|  | "b:values:values", | 
|  | "c:lengths", | 
|  | "c:c1", | 
|  | "c:c2:lengths", | 
|  | "c:c2:values", | 
|  | ] | 
|  |  | 
|  | And for the following instance of the struct: | 
|  |  | 
|  | Struct( | 
|  | a=3, | 
|  | b=[[4, 5], [6, 7, 8], [], [9]], | 
|  | c=[ | 
|  | Struct(c1='alex', c2=[10, 11]), | 
|  | Struct(c1='bob', c2=[12]), | 
|  | ], | 
|  | ) | 
|  |  | 
|  | The values of the fields will be: | 
|  | { | 
|  | "a": [3], | 
|  | "b:lengths": [4], | 
|  | "b:values:lengths": [2, 3, 0, 1], | 
|  | "b:values:values": [4, 5, 6, 7, 8, 9], | 
|  | "c:lengths": [2], | 
|  | "c:c1": ["alex", "bob"], | 
|  | "c:c2:lengths": [2, 1], | 
|  | "c:c2:values", [10, 11, 12], | 
|  | } | 
|  |  | 
|  | In general, every field name in the format "{prefix}:lengths" defines a domain | 
|  | "{prefix}", and every subsequent field in the format "{prefix}:{field}" will | 
|  | be in that domain, and the length of the domain is provided for each entry of | 
|  | the parent domain. In the example, "b:lengths" defines a domain of length 4, so | 
|  | every field under domain "b" will have 4 entries. | 
|  | The "lengths" field for a given domain must appear before any reference to | 
|  | that domain. | 
|  |  | 
|  | Returns a pointer to an instance of the Cursor, which keeps the current offset | 
|  | on each of the domains defined by `fields`. Cursor also ensures thread-safety | 
|  | such that ReadNextBatch and ResetCursor can be used safely in parallel. | 
|  |  | 
|  | A cursor does not contain data per se, so calls to ReadNextBatch actually need | 
|  | to pass a list of blobs containing the data to read for each one of the fields. | 
|  | )DOC") | 
|  | .Output(0, "cursor", "A blob pointing to an instance of a new TreeCursor.") | 
|  | .Arg( | 
|  | "fields", | 
|  | "A list of strings each one representing a field of the dataset."); | 
|  |  | 
|  | OPERATOR_SCHEMA(ResetCursor) | 
|  | .NumInputs(1) | 
|  | .NumOutputs(0) | 
|  | .SetDoc(R"DOC( | 
|  | Resets the offsets for the given TreeCursor. This operation is thread safe. | 
|  | )DOC") | 
|  | .Input(0, "cursor", "A blob containing a pointer to the cursor."); | 
|  |  | 
|  | OPERATOR_SCHEMA(ReadNextBatch) | 
|  | .NumInputs(1, INT_MAX) | 
|  | .NumOutputs(1, INT_MAX) | 
|  | .SetDoc(R"DOC( | 
|  | Read the next batch of examples out of the given cursor and data blobs. | 
|  |  | 
|  | Input(0) is a blob pointing to a TreeCursor, and | 
|  | [Input(1),... Input(num_fields)] a list of tensors containing the data for | 
|  | each field of the dataset. | 
|  |  | 
|  | ReadNextBatch is thread safe. | 
|  | )DOC") | 
|  | .Input(0, "cursor", "A blob containing a pointer to the cursor.") | 
|  | .Input(1, "dataset_field_0", "First dataset field") | 
|  | .Output(0, "field_0", "Tensor containing the next batch for field 0.") | 
|  | .Arg("batch_size", "Number of top-level entries to read."); | 
|  |  | 
|  | OPERATOR_SCHEMA(GetCursorOffset) | 
|  | .NumInputs(1) | 
|  | .NumOutputs(1) | 
|  | .SetDoc("Get the current offset in the cursor.") | 
|  | .Input(0, "cursor", "A blob containing a pointer to the cursor.") | 
|  | .Output(0, "offsets", "Tensor containing the offsets for the cursor."); | 
|  |  | 
|  | OPERATOR_SCHEMA(ComputeOffset) | 
|  | .NumInputs(1, INT_MAX) | 
|  | .NumOutputs(1) | 
|  | .SetDoc(R"DOC( | 
|  | Compute the offsets matrix given cursor and data blobs. Need to be ran at | 
|  | beginning or after reseting cursor | 
|  |  | 
|  | Input(0) is a blob pointing to a TreeCursor, and | 
|  | [Input(1),... Input(num_fields)] a list of tensors containing the data for | 
|  | each field of the dataset. | 
|  |  | 
|  | ComputeOffset is thread safe. | 
|  | )DOC") | 
|  | .Input(0, "cursor", "A blob containing a pointer to the cursor.") | 
|  | .Input(1, "dataset_field_0", "First dataset field") | 
|  | .Output(0, "field_0", "Tensor containing offset info for this chunk."); | 
|  |  | 
|  | OPERATOR_SCHEMA(SortAndShuffle) | 
|  | .NumInputs(1, INT_MAX) | 
|  | .NumOutputs(1) | 
|  | .SetDoc(R"DOC( | 
|  | Compute the sorted indices given a field index to sort by and break the sorted | 
|  | indices into chunks of shuffle_size * batch_size and shuffle each chunk, | 
|  | finally we shuffle between batches. If sort_by_field_idx is -1 we skip sort. | 
|  |  | 
|  | For example, we have data sorted as | 
|  | 1,2,3,4,5,6,7,8,9,10,11,12 | 
|  |  | 
|  | and batchSize = 2 and shuffleSize = 3, when we shuffle we get: | 
|  | [3,1,4,6,5,2] [12,10,11,8,9,7] | 
|  |  | 
|  | After this we will shuffle among different batches with size 2 | 
|  | [3,1],[4,6],[5,2],[12,10],[11,8],[9,7] | 
|  |  | 
|  | We may end up with something like | 
|  | [9,7],[5,2],[12,10],[4,6],[3,1],[11,8] | 
|  |  | 
|  | Input(0) is a blob pointing to a TreeCursor, and | 
|  | [Input(1),... Input(num_fields)] a list of tensors containing the data for | 
|  | each field of the dataset. | 
|  |  | 
|  | SortAndShuffle is thread safe. | 
|  | )DOC") | 
|  | .Input(0, "cursor", "A blob containing a pointer to the cursor.") | 
|  | .Input(1, "dataset_field_0", "First dataset field") | 
|  | .Output(0, "indices", "Tensor containing sorted indices."); | 
|  |  | 
|  | OPERATOR_SCHEMA(ReadRandomBatch) | 
|  | .NumInputs(1, INT_MAX) | 
|  | .NumOutputs(1, INT_MAX) | 
|  | .SetDoc(R"DOC( | 
|  | Read the next batch of examples out of the given cursor, | 
|  | idx blob, offset matrix and data blobs. | 
|  |  | 
|  | Input(0) is a blob pointing to a TreeCursor, | 
|  | Input(1) is a blob pointing to the shuffled idx | 
|  | Input(2) is a blob pointing to the offset matrix and | 
|  | [Input(3),... Input(num_fields)] a list of tensors containing the data for | 
|  | each field of the dataset. | 
|  |  | 
|  | ReadRandomBatch is thread safe. | 
|  | )DOC") | 
|  | .Input(0, "cursor", "A blob containing a pointer to the cursor.") | 
|  | .Input(1, "idx", "idx with a shuffled order.") | 
|  | .Input(2, "offsetsmat", "offset matrix containing length offset info.") | 
|  | .Input(3, "dataset_field_0", "First dataset field") | 
|  | .Output(0, "field_0", "Tensor containing the next batch for field 0.") | 
|  | .Arg("batch_size", "Number of top-level entries to read.") | 
|  | .Arg("loop_over", "(bool) Repeat the dataset indefinitely"); | 
|  |  | 
|  | OPERATOR_SCHEMA(CheckDatasetConsistency) | 
|  | .NumInputs(1, INT_MAX) | 
|  | .NumOutputs(0) | 
|  | .SetDoc(R"DOC( | 
|  | Checks that the given data fields represents a consistent dataset under | 
|  | the schema specified by the `fields` argument. Operator fails if the fields | 
|  | are not consistent. If data is consistent, each field's data can be safely | 
|  | appended to an existing dataset, keeping it consistent. | 
|  | )DOC") | 
|  | .Input(0, "field_0", "Data for field 0.") | 
|  | .Arg( | 
|  | "fields", | 
|  | "List of strings representing the string names in the format" | 
|  | "specified in the doc for CreateTreeCursor."); | 
|  |  | 
|  | OPERATOR_SCHEMA(Append) | 
|  | .NumInputs(2) | 
|  | .NumOutputs(1) | 
|  | .EnforceInplace({{0, 0}}) | 
|  | .SetDoc(R"DOC( | 
|  | Append input `B` to the end of input `A`. | 
|  |  | 
|  | - It is required that this operation run in-place, meaning that the input `A` blob must match the output blob. | 
|  | - All except the outer-most dimension must be the same between `A` and `B`. | 
|  | - Input `A` may have to be re-allocated in order for accommodate to the new size. Currently, an exponential growth ratio is used in order to ensure amortized constant time complexity. | 
|  |  | 
|  | Github Links: | 
|  | - https://github.com/pytorch/pytorch/blob/main/caffe2/operators/dataset_ops.cc | 
|  |  | 
|  | <details> | 
|  |  | 
|  | <summary> <b>Example</b> </summary> | 
|  |  | 
|  | **Code** | 
|  |  | 
|  | ``` | 
|  |  | 
|  | workspace.ResetWorkspace() | 
|  |  | 
|  | op = core.CreateOperator( | 
|  | "Append", | 
|  | ["A", "B"], | 
|  | ["A"], | 
|  | ) | 
|  |  | 
|  | workspace.FeedBlob("A", np.random.randint(10, size=(1,3,3))) | 
|  | workspace.FeedBlob("B", np.random.randint(10, size=(2,3,3))) | 
|  | print("A:", workspace.FetchBlob("A")) | 
|  | print("B:", workspace.FetchBlob("B")) | 
|  | workspace.RunOperatorOnce(op) | 
|  | print("A:", workspace.FetchBlob("A")) | 
|  |  | 
|  | ``` | 
|  |  | 
|  | **Result** | 
|  |  | 
|  | ``` | 
|  |  | 
|  | A: | 
|  | [[[3 8 7] | 
|  | [1 6 6] | 
|  | [5 0 6]]] | 
|  | B: | 
|  | [[[4 3 1] | 
|  | [7 9 6] | 
|  | [9 4 5]] | 
|  |  | 
|  | [[7 7 4] | 
|  | [9 8 7] | 
|  | [1 6 6]]] | 
|  | A: | 
|  | [[[3 8 7] | 
|  | [1 6 6] | 
|  | [5 0 6]] | 
|  |  | 
|  | [[4 3 1] | 
|  | [7 9 6] | 
|  | [9 4 5]] | 
|  |  | 
|  | [[7 7 4] | 
|  | [9 8 7] | 
|  | [1 6 6]]] | 
|  |  | 
|  | ``` | 
|  |  | 
|  | </details> | 
|  |  | 
|  | )DOC") | 
|  | .Input( | 
|  | 0, | 
|  | "A", | 
|  | "(*Tensor*): base input tensor of shape $(N, d_1, d_2, ..., d_n)$") | 
|  | .Input( | 
|  | 1, | 
|  | "B", | 
|  | "(*Tensor*): second input tensor of shape $(M, d_1, d_2, ..., d_n)$ to be appended to the base") | 
|  | .Output( | 
|  | 0, | 
|  | "A", | 
|  | "(*Tensor*): output tensor of shape $(N+M, d_1, d_2, ..., d_n)$"); | 
|  |  | 
|  | OPERATOR_SCHEMA(AtomicAppend) | 
|  | .NumInputs(3, INT_MAX) | 
|  | .NumOutputs(1, INT_MAX) | 
|  | .AllowInplace([](int in, int out) { return in == out + 1; }); | 
|  |  | 
|  | OPERATOR_SCHEMA(CreateTensorVector) | 
|  | .NumInputs(0) | 
|  | .NumOutputs(1) | 
|  | .SetDoc("Create a std::unique_ptr<std::vector<Tensor> >"); | 
|  |  | 
|  | OPERATOR_SCHEMA(TensorVectorSize) | 
|  | .NumInputs(1) | 
|  | .NumOutputs(1) | 
|  | .SetDoc("Get the size of the input vector") | 
|  | .Input(0, "tensor vector", "std::unique_ptr<std::vector<Tensor> >") | 
|  | .Output(0, "size", "int32_t size"); | 
|  |  | 
|  | OPERATOR_SCHEMA(ConcatTensorVector) | 
|  | .NumInputs(1) | 
|  | .NumOutputs(1) | 
|  | .SetDoc(R"DOC( | 
|  | Concat Tensors in the std::unique_ptr<std::vector<Tensor> > | 
|  | along the first dimension. | 
|  | )DOC") | 
|  | .Input(0, "vector of Tensor", "std::unique_ptr<std::vector<Tensor> >") | 
|  | .Output(0, "tensor", "tensor after concatenating"); | 
|  |  | 
|  | OPERATOR_SCHEMA(CollectTensor) | 
|  | .NumInputs([](int n) { return n > 0 && n % 2 == 0; }) | 
|  | .NumOutputs(1, INT_MAX) | 
|  | .NumInputsOutputs([](int in, int out) { return in == out * 2; }) | 
|  | .EnforceInplace([](int in, int out) { return in == out; }) | 
|  | .SetDoc(R"DOC( | 
|  | Collect tensor into tensor vector by reservoir sampling, | 
|  | argument num_to_collect indicates the max number of tensors that will be | 
|  | collected. The first half of the inputs are tensor vectors, which are also the | 
|  | outputs. The second half of the inputs are the tensors to be collected into each | 
|  | vector (in the same order). The input tensors are collected in all-or-none | 
|  | manner. If they are collected, they will be placed at the same index in the | 
|  | output vectors. | 
|  | )DOC") | 
|  | .Arg("num_to_collect", "The max number of tensors to collect"); | 
|  |  | 
|  | OPERATOR_SCHEMA(PackRecords) | 
|  | .NumInputs(1, INT_MAX) | 
|  | .NumOutputs(1) | 
|  | .SetDoc(R"DOC( | 
|  | Given a dataset under a schema specified by the `fields` argument, pack all | 
|  | the input tensors into one, where each tensor element represents a row of data | 
|  | (batch of size 1). This format allows easier use with the rest of Caffe2 | 
|  | operators. | 
|  | )DOC") | 
|  | .Arg( | 
|  | "fields", | 
|  | "List of strings representing the string names in the format" | 
|  | "specified in the doc for CreateTreeCursor.") | 
|  | .Output( | 
|  | 0, | 
|  | "tensor", | 
|  | "One dimensional tensor having a complex type of SharedTensorVectorPtr." | 
|  | " In order to reverse it back to the original input it has to be " | 
|  | "inserted into UnPackRecordsOp."); | 
|  |  | 
|  | OPERATOR_SCHEMA(TrimDataset) | 
|  | .NumInputs(1, INT_MAX) | 
|  | .NumOutputs(1, INT_MAX) | 
|  | .SetDoc(R"DOC( | 
|  | Trim the given dataset inplace, given the dataset blobs and the field specs. | 
|  | Trimming happens such that the dataset will contain the largest possible number | 
|  | of records that is a multiple of the 'multiple_of' argument. | 
|  | )DOC") | 
|  | .EnforceInplace([](int input, int output) { return input == output; }) | 
|  | .Arg( | 
|  | "fields", | 
|  | "List of strings representing the string names in the format" | 
|  | "specified in the doc for CreateTreeCursor."); | 
|  |  | 
|  | OPERATOR_SCHEMA(UnPackRecords) | 
|  | .NumInputs(1, INT_MAX) | 
|  | .NumOutputs(1, INT_MAX) | 
|  | .SetDoc(R"DOC( | 
|  | Given a packed dataset (packed by the PackRecordsOp) and the `fields` argument | 
|  | describing the datasets schema, return the original dataset format. Number of | 
|  | returned tensors is equal to the number of fields in the `fields` argument. | 
|  |  | 
|  | The first input is the packed tensor to be unpacked. Optionally, you can provide | 
|  | prototype tensors to give the expected shapes of the output tensors. This is | 
|  | helpful when you expected to unpack empty tensor, e.g., output of a sampling | 
|  | process. | 
|  | )DOC") | 
|  | .Arg( | 
|  | "fields", | 
|  | "List of strings representing the string names in the format" | 
|  | "specified in the doc for CreateTreeCursor.") | 
|  | .Input(0, "packed_tensor", "The tensor to be unpacked"); | 
|  |  | 
|  | SHOULD_NOT_DO_GRADIENT(CreateTreeCursor); | 
|  | SHOULD_NOT_DO_GRADIENT(ResetCursor); | 
|  | SHOULD_NOT_DO_GRADIENT(ReadNextBatch); | 
|  | SHOULD_NOT_DO_GRADIENT(ComputeOffset); | 
|  | SHOULD_NOT_DO_GRADIENT(ReadRandomBatch); | 
|  | SHOULD_NOT_DO_GRADIENT(CheckDatasetConsistency); | 
|  | SHOULD_NOT_DO_GRADIENT(Append); | 
|  | SHOULD_NOT_DO_GRADIENT(AtomicAppend); | 
|  | SHOULD_NOT_DO_GRADIENT(CreateTensorVector); | 
|  | SHOULD_NOT_DO_GRADIENT(TensorVectorSize); | 
|  | SHOULD_NOT_DO_GRADIENT(ConcatTensorVector); | 
|  | SHOULD_NOT_DO_GRADIENT(CollectTensor); | 
|  | SHOULD_NOT_DO_GRADIENT(UnPackRecords); | 
|  | SHOULD_NOT_DO_GRADIENT(PackRecords); | 
|  |  | 
|  | class TreeCursorSerializer : public BlobSerializerBase { | 
|  | public: | 
|  | // NOLINTNEXTLINE(modernize-use-equals-default) | 
|  | TreeCursorSerializer() {} | 
|  | // NOLINTNEXTLINE(modernize-use-equals-default) | 
|  | ~TreeCursorSerializer() override {} | 
|  |  | 
|  | void Serialize( | 
|  | const void* pointer, | 
|  | TypeMeta typeMeta, | 
|  | const string& name, | 
|  | SerializationAcceptor acceptor) override { | 
|  | CAFFE_ENFORCE(typeMeta.Match<std::unique_ptr<TreeCursor>>()); | 
|  | const auto& cursor = | 
|  | *static_cast<const std::unique_ptr<TreeCursor>*>(pointer); | 
|  | BlobProto blob_proto; | 
|  |  | 
|  | // serialize offsets as a tensor | 
|  | if (cursor->offsets.size() > 0) { | 
|  | Blob offsets_blob; | 
|  | auto* offsets = BlobGetMutableTensor(&offsets_blob, CPU); | 
|  | offsets->Resize(cursor->offsets.size()); | 
|  | std::copy( | 
|  | cursor->offsets.begin(), | 
|  | cursor->offsets.end(), | 
|  | offsets->template mutable_data<TOffset>()); | 
|  | TensorSerializer ser; | 
|  | ser.Serialize( | 
|  | *offsets, name, blob_proto.mutable_tensor(), 0, offsets->numel()); | 
|  | } | 
|  | blob_proto.set_name(name); | 
|  | blob_proto.set_type("std::unique_ptr<TreeCursor>"); | 
|  |  | 
|  | // serialize field names in the content | 
|  | std::ostringstream os; | 
|  | for (const auto& field : cursor->it.fields()) { | 
|  | os << field.name << " "; | 
|  | } | 
|  | blob_proto.set_content(os.str()); | 
|  |  | 
|  | acceptor(name, SerializeBlobProtoAsString_EnforceCheck(blob_proto)); | 
|  | } | 
|  | }; | 
|  |  | 
|  | class TreeCursorDeserializer : public BlobDeserializerBase { | 
|  | public: | 
|  | void Deserialize(const BlobProto& proto, Blob* blob) override { | 
|  | // Deserialize the field names | 
|  | std::vector<std::string> fieldNames; | 
|  | std::istringstream is(proto.content()); | 
|  | std::string field; | 
|  | while (true) { | 
|  | is >> field; | 
|  | if (is.eof()) { | 
|  | break; | 
|  | } | 
|  | fieldNames.push_back(field); | 
|  | } | 
|  | TreeIterator it(fieldNames); | 
|  |  | 
|  | auto* base = blob->template GetMutable<std::unique_ptr<TreeCursor>>(); | 
|  | CAFFE_ENFORCE(base != nullptr, "TreeCursor doesn't exist."); | 
|  | // NOLINTNEXTLINE(modernize-make-unique) | 
|  | (*base).reset(new TreeCursor(it)); | 
|  |  | 
|  | // Deserialize the offset vector when it is not empty. The proto.tensor() | 
|  | // function will return a TensorProto associated with offset vector. The | 
|  | // offset vector contains fields of type int64_t, and we verify it is not | 
|  | // empty before calling the deserializer. | 
|  | if (proto.tensor().int64_data().size() > 0) { | 
|  | TensorDeserializer deser; | 
|  | Blob offset_blob; | 
|  | deser.Deserialize(proto, &offset_blob); | 
|  | auto& offsets = offset_blob.template Get<Tensor>(); | 
|  | auto* offsets_ptr = offsets.data<TOffset>(); | 
|  | (*base)->offsets.assign(offsets_ptr, offsets_ptr + offsets.numel()); | 
|  | } | 
|  | } | 
|  | }; | 
|  |  | 
|  | REGISTER_BLOB_SERIALIZER( | 
|  | (TypeMeta::Id<std::unique_ptr<TreeCursor>>()), | 
|  | TreeCursorSerializer); | 
|  | REGISTER_BLOB_DESERIALIZER(std::unique_ptr<TreeCursor>, TreeCursorDeserializer); | 
|  |  | 
|  | } // namespace | 
|  |  | 
|  | void SharedTensorVectorPtrSerializer::Serialize( | 
|  | const void* pointer, | 
|  | TypeMeta typeMeta, | 
|  | const string& name, | 
|  | BlobSerializerBase::SerializationAcceptor acceptor) { | 
|  | /* This is dummy serialize that doesn't save anything. If saving the content | 
|  | is desired in future use case, you can change this serializer. Note: special | 
|  | care need to be taken for the parameter initialization of | 
|  | LastNWindowCollectorOp and ReservoirSamplingOp if this serializer actually | 
|  | saves the content. | 
|  | */ | 
|  | CAFFE_ENFORCE(typeMeta.Match<std::shared_ptr<std::vector<TensorCPU>>>()); | 
|  | BlobProto blob_proto; | 
|  | blob_proto.set_name(name); | 
|  | blob_proto.set_type("std::shared_ptr<std::vector<TensorCPU>>"); | 
|  | blob_proto.set_content(""); | 
|  | acceptor(name, SerializeBlobProtoAsString_EnforceCheck(blob_proto)); | 
|  | }; | 
|  |  | 
|  | void SharedTensorVectorPtrDeserializer::Deserialize( | 
|  | const BlobProto& /* unused */, | 
|  | Blob* blob) { | 
|  | /* This is dummy deserialize which creates a nullptr | 
|  | */ | 
|  | blob->GetMutable<std::shared_ptr<std::vector<TensorCPU>>>(); | 
|  | } | 
|  |  | 
|  | REGISTER_BLOB_SERIALIZER( | 
|  | (TypeMeta::Id<std::shared_ptr<std::vector<TensorCPU>>>()), | 
|  | SharedTensorVectorPtrSerializer); | 
|  |  | 
|  | REGISTER_BLOB_DESERIALIZER( | 
|  | std::shared_ptr<std::vector<TensorCPU>>, | 
|  | SharedTensorVectorPtrDeserializer); | 
|  |  | 
|  | } // namespace dataset_ops | 
|  | } // namespace caffe2 |