blob: d54307182645b5663148f033a883fde79d276763 [file] [log] [blame]
#include <set>
#include "caffe2/core/net.h"
#include "caffe2/core/operator.h"
#include "caffe2/core/timer.h"
#include "caffe2/proto/caffe2.pb.h"
namespace caffe2 {
DEFINE_REGISTRY(NetRegistry, NetBase, const NetDef&, Workspace*);
REGISTER_NET(simple, SimpleNet);
REGISTER_NET(dag, DAGNet);
NetBase* CreateNet(const NetDef& net_def, Workspace* ws) {
// In default, we will return a simple network that just runs all operators
// sequentially.
if (!net_def.has_net_type()) {
return new SimpleNet(net_def, ws);
}
return NetRegistry()->Create(net_def.net_type(), net_def, ws);
}
SimpleNet::SimpleNet(const NetDef& net_def, Workspace* ws)
: NetBase(net_def, ws) {
bool net_def_has_device_option = net_def.has_device_option();
// Initialize the operators
for (const OperatorDef& operator_def : net_def.op()) {
CAFFE_VLOG(1) << "Creating operator " << operator_def.name()
<< ":" << operator_def.type();
if (!operator_def.has_device_option() && net_def_has_device_option) {
// In the case that the operator def does not specify a device option but
// the net def has a default option, we copy the device option over to the
// operator def.
OperatorDef temp_def(operator_def);
temp_def.mutable_device_option()->CopyFrom(net_def.device_option());
operators_.emplace_back(CreateOperator(temp_def, ws));
} else {
operators_.emplace_back(CreateOperator(operator_def, ws));
}
}
}
bool SimpleNet::Verify() {
for (auto& op : operators_) {
if (op.get() == nullptr) {
CAFFE_LOG_ERROR << "Found empty operator.";
return false;
}
CAFFE_VLOG(1) << "Verifying operator " << op->def().name()
<< "(" << op->def().type() << ").";
if (!op->Verify()) {
CAFFE_LOG_ERROR << "Failed to verify operator.";
return false;
}
}
return true;
}
bool SimpleNet::Run() {
CAFFE_VLOG(1) << "Running net.";
for (auto& op : operators_) {
CAFFE_VLOG(1) << "Running operator " << op->def().name()
<< "(" << op->def().type() << ").";
if (!op->Run()) return false;
}
return true;
}
void SimpleNet::TEST_Benchmark(const int warmup_runs, const int main_runs,
const bool run_individual) {
CAFFE_LOG_INFO << "Starting benchmark.";
CAFFE_LOG_INFO << "Running warmup runs.";
CAFFE_CHECK_GE(warmup_runs, 0);
for (int i = 0; i < warmup_runs; ++i) {
CAFFE_CHECK(Run());
}
CAFFE_LOG_INFO << "Main runs.";
CAFFE_CHECK_GE(main_runs, 0);
Timer timer;
for (int i = 0; i < main_runs; ++i) {
CAFFE_CHECK(Run());
}
CAFFE_LOG_INFO << "Main run finished. Milliseconds per iter: "
<< timer.MilliSeconds() / main_runs;
vector<float> time_per_op(operators_.size(), 0);
CaffeMap<string, float> time_per_op_type;
if (run_individual) {
for (int i = 0; i < main_runs; ++i) {
int idx = 0;
for (auto& op : operators_) {
const string& op_type = op->def().type();
timer.Start();
CAFFE_CHECK(op->Run());
float spent = timer.MilliSeconds();
time_per_op[idx] += spent;
time_per_op_type[op_type] += spent;
++idx;
}
}
int idx = 0;
for (auto& op : operators_) {
const string& op_type = op->def().type();
CAFFE_LOG_INFO << "Operator #" << idx << " ("
<< op->def().name() << ", " << op_type << ") "
<< time_per_op[idx] / main_runs << " ms/iter";
++idx;
}
CAFFE_LOG_INFO << "Time per operator type:";
for (const auto& item : time_per_op_type) {
CAFFE_LOG_INFO << std::setw(15) << std::setfill(' ')
<< item.second / main_runs
<< " " << item.first;
}
}
}
DAGNet::DAGNet(const NetDef& net_def, Workspace* ws)
: NetBase(net_def, ws), operator_nodes_(net_def.op_size()) {
// Blob creator allows us to track which operator created which blob.
std::map<string, int> blob_creator;
std::map<string, std::set<int> > blob_readers;
std::map<string, int> execution_chains;
bool net_def_has_device_option = net_def.has_device_option();
// Initialize the operators
for (int idx = 0; idx < net_def.op_size(); ++idx) {
const OperatorDef& op_def = net_def.op(idx);
CAFFE_VLOG(1) << "Creating operator #" << idx << ": "
<< op_def.name() << ":" << op_def.type();
if (!op_def.has_device_option() && net_def_has_device_option) {
OperatorDef temp_def(op_def);
temp_def.mutable_device_option()->CopyFrom(net_def.device_option());
operator_nodes_[idx].operator_.reset(CreateOperator(temp_def, ws));
} else {
operator_nodes_[idx].operator_.reset(CreateOperator(op_def, ws));
}
// Check the inputs, and set up parents if necessary. This addressese the
// read after write case.
for (const string& input : op_def.input()) {
if (blob_creator.count(input) == 0) {
CAFFE_VLOG(1) << "Input " << input << " not produced by this net. "
<< "Assuming it is pre-existing.";
} else {
int parent = blob_creator[input];
CAFFE_VLOG(1) << "op dependency (RaW " << input << "): "
<< parent << "->" << idx;
operator_nodes_[idx].parents_.push_back(parent);
operator_nodes_[parent].children_.push_back(idx);
}
// Add the current idx to the readers of this input.
blob_readers[input].insert(idx);
}
// Check the outputs.
for (const string& output : op_def.output()) {
if (blob_creator.count(output) != 0) {
CAFFE_LOG_WARNING << "Output " << output << " produced again. "
<< "Such operation is not strictly tested. "
<< "Use at your own risk.";
// This addresses the write after write case - we will assume that all
// writes are inherently sequential.
int waw_parent = blob_creator[output];
CAFFE_VLOG(1) << "op dependency (WaW " << output << "): "
<< waw_parent << "->" << idx;
operator_nodes_[idx].parents_.push_back(waw_parent);
operator_nodes_[waw_parent].children_.push_back(idx);
}
// This addresses the write after read case - we will assume that writes
// should only occur after all previous reads are finished.
for (const int war_parent : blob_readers[output]) {
CAFFE_VLOG(1) << "op dependency (WaR " << output << "): "
<< war_parent << "->" << idx;
operator_nodes_[idx].parents_.push_back(war_parent);
operator_nodes_[war_parent].children_.push_back(idx);
}
// Renew the creator of the output name.
blob_creator[output] = idx;
// The write would create an implicit barrier that all earlier readers of
// this output is now parents of the current op, and future writes would
// not need to depend on these earlier readers. Thus, we can clear up the
// blob readers.
blob_readers[output].clear();
}
// Explicitly specified dependency with execution_chain.
for (const auto& arg : op_def.arg()) {
if (arg.name() == "execution_chain") {
for (const string& name : arg.strings()) {
if (execution_chains.count(name) == 0) {
// New execution chain. Do nothing but add it.
execution_chains[name] = idx;
} else {
int parent = execution_chains[name];
CAFFE_VLOG(1) << "op dependency due to execution chain " << name
<< ": " << parent << "->" << idx;
operator_nodes_[idx].parents_.push_back(parent);
operator_nodes_[parent].children_.push_back(idx);
// update the tail of the current execution chain.
execution_chains[name] = idx;
}
}
}
}
}
// Now, make sure that the parent list and the children list do not contain
// duplicated items.
for (int i = 0; i < operator_nodes_.size(); ++i) {
auto& node = operator_nodes_[i];
// Sort, remove duplicates, and delete self dependency.
auto& p = node.parents_;
std::sort(p.begin(), p.end());
p.erase(std::unique(p.begin(), p.end()), p.end());
p.erase(std::remove(p.begin(), p.end(), i), p.end());
// Do the same for the children vector.
auto& c = node.children_;
std::sort(c.begin(), c.end());
c.erase(std::unique(c.begin(), c.end()), c.end());
c.erase(std::remove(c.begin(), c.end(), i), c.end());
}
// TODO: do we want to make sure that there are no loops in the dependency
// graph?
// Figure out the initial frontier - this is the one we will feed into the job
// queue to start a run.
for (int idx = 0; idx < operator_nodes_.size(); ++idx) {
if (operator_nodes_[idx].parents_.size() == 0) {
initial_frontier_.push_back(idx);
}
}
// Finally, start the workers.
int num_workers = net_def.has_num_workers() ? net_def.num_workers() : 1;
CAFFE_CHECK_GT(num_workers, 0) << "Must have a nonnegative number of workers";
if (num_workers == 1) {
CAFFE_LOG_WARNING << "Number of workers is 1: this means that all operators "
<< "will be executed sequentially. Did you forget to set "
<< "num_workers in the NetDef?";
}
for (int i = 0; i < num_workers; ++i) {
CAFFE_VLOG(1) << "Start worker #" << i;
workers_.push_back(std::thread(&DAGNet::WorkerFunction, this));
}
}
DAGNet::~DAGNet() {
// Safely join all the workers before exiting.
job_queue_.NoMoreJobs();
CAFFE_VLOG(1) << "Joining workers.";
for (auto& worker : workers_) {
worker.join();
}
}
bool DAGNet::Verify() {
for (auto& op_node : operator_nodes_) {
auto& op = op_node.operator_;
CAFFE_VLOG(1) << "Verifying operator " << op->def().name()
<< "(" << op->def().type() << ").";
if (op.get() == nullptr || !op->Verify()) {
return false;
}
}
return true;
}
bool DAGNet::Run() {
// Lock the run_in_progress_ lock so that we do not accidentally call Run()
// in parallel.
std::unique_lock<std::mutex> run_lock(run_in_progress_);
CAFFE_VLOG(1) << "Running parallel net.";
// First, set up job queue.
remaining_ops_ = operator_nodes_.size();
success_ = true;
// TODO(jiayq): Start all worker threads.
// Initialize the runtime parent count.
for (auto& node : operator_nodes_) {
node.runtime_parent_count_ = node.parents_.size();
}
// Kickstart the job queue.
for (auto& value : initial_frontier_) {
job_queue_.Push(value);
}
std::unique_lock<std::mutex> mutex_lock(remaining_ops_mutex_);
while (remaining_ops_ > 0) {
CAFFE_VLOG(2) << "Remaining ops to run: " << remaining_ops_;
cv_.wait(mutex_lock);
}
CAFFE_VLOG(2) << "All ops finished running.";
// If the above while loop finished, we know that the current run finished.
return success_;
}
void DAGNet::WorkerFunction() {
// WorkerFunctions() is an infinite loop until there are no more jobs to run.
while (true) {
int idx;
// If there is no more jobs - meaning that the DAGNet is destructing -
// we will exit safely.
if (!job_queue_.Pop(&idx)) {
return;
}
CAFFE_VLOG(1) << "Running operator #" << idx << " "
<< operator_nodes_[idx].operator_->def().name()
<< "(" << operator_nodes_[idx].operator_->def().type() << ").";
bool this_success = operator_nodes_[idx].operator_->Run();
for (int child : operator_nodes_[idx].children_) {
int count = --operator_nodes_[child].runtime_parent_count_;
// The count should never be smaller than zero.
CAFFE_DCHECK_GE(count, 0)
<< "Found runtime parent count smaller than zero for "
<< "operator node "
<< operator_nodes_[child].operator_->def().name()
<< "(" << operator_nodes_[child].operator_->def().type() << ").";
if (count == 0) {
CAFFE_VLOG(2) << "Pushing operator #" << child << " to queue.";
job_queue_.Push(child);
}
}
// Notify that the processed op is incremented by one.
std::unique_lock<std::mutex> mutex_lock(remaining_ops_mutex_);
--remaining_ops_;
success_ &= this_success;
CAFFE_DCHECK_GE(remaining_ops_, 0);
cv_.notify_one();
CAFFE_VLOG(2) << "Finished executing operator #" << idx;
}
}
} // namespace caffe2