blob: 5050ec33acbd31e1348b26c8555b44edf6e4af1c [file] [log] [blame]
/**
* Copyright (c) 2016-present, Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "rebatching_queue.h"
namespace caffe2 {
using RebatchingQueuePtr = std::unique_ptr<RebatchingQueue>;
class CreateRebatchingQueueOp : public Operator<CPUContext> {
public:
CreateRebatchingQueueOp(const OperatorDef& operator_def, Workspace* ws)
: Operator(operator_def, ws) {}
bool RunOnDevice() override {
*OperatorBase::Output<RebatchingQueuePtr>(0) =
RebatchingQueuePtr(new RebatchingQueue(
OperatorBase::GetSingleArgument<int>("capacity", 1),
OperatorBase::GetSingleArgument<int>("num_blobs", 1)));
return true;
}
};
class EnqueueRebatchingQueueOp : public Operator<CPUContext> {
public:
EnqueueRebatchingQueueOp(const OperatorDef& operator_def, Workspace* ws)
: Operator(operator_def, ws),
enqueueBatch_(
OperatorBase::GetSingleArgument<bool>("enqueue_batch", false)) {}
bool RunOnDevice() override {
auto& queue = Inputs()[0]->template Get<RebatchingQueuePtr>();
CHECK(queue);
CAFFE_ENFORCE_EQ(InputSize(), queue->numBlobs() + 1);
std::vector<const TensorCPU*> inputTensors;
inputTensors.reserve(InputSize() - 1);
for (int i = 1; i < InputSize(); ++i) {
inputTensors.push_back(&Input(i));
}
return enqueueBatch_ ? queue->enqueueMany(context_, inputTensors)
: queue->enqueueOne(context_, inputTensors);
}
private:
const bool enqueueBatch_;
};
class DequeueRebatchingQueueOp : public Operator<CPUContext> {
public:
DequeueRebatchingQueueOp(const OperatorDef& operator_def, Workspace* ws)
: Operator(operator_def, ws),
numElements_(OperatorBase::GetSingleArgument<int>("num_elements", 1)) {}
bool RunOnDevice() override {
auto& queue = Inputs()[0]->template Get<RebatchingQueuePtr>();
CHECK(queue);
std::vector<TensorCPU*> outputTensors;
outputTensors.reserve(OutputSize());
for (int i = 0; i < OutputSize(); ++i) {
outputTensors.push_back(Output(i));
}
return queue->dequeue(context_, numElements_, outputTensors);
}
private:
int numElements_;
};
class CloseRebatchingQueueOp : public Operator<CPUContext> {
public:
CloseRebatchingQueueOp(const OperatorDef& operator_def, Workspace* ws)
: Operator(operator_def, ws) {}
bool RunOnDevice() override {
CAFFE_ENFORCE_EQ(InputSize(), 1);
auto& queue = Inputs()[0]->template Get<RebatchingQueuePtr>();
CAFFE_ENFORCE(queue);
queue->close();
return true;
}
};
} // caffe2