blob: fabfc45947f9e9fe556257034a93c86a73446c94 [file] [log] [blame]
#include "caffe2/core/net.h"
#include <set>
#include <stack>
#include <unordered_map>
#include <unordered_set>
#include "caffe2/core/operator.h"
#include "caffe2/core/timer.h"
#include "caffe2/proto/caffe2.pb.h"
CAFFE2_DEFINE_bool(
caffe2_disable_chaining,
false,
"Disable chaining logic (some latent multi-device issues).");
namespace caffe2 {
namespace {
bool sameDevice(const OperatorDef& lhs, const OperatorDef& rhs) {
return lhs.device_option().device_type() ==
rhs.device_option().device_type() &&
lhs.device_option().cuda_gpu_id() == rhs.device_option().cuda_gpu_id();
}
using OpIndex = int;
DAGNetBase::ExecutionChains singleChains(
const std::vector<internal::OperatorNode>& nodes) {
DAGNetBase::ExecutionChains chains;
for (auto i = 0; i < nodes.size(); ++i) {
chains[i] = {i};
}
return chains;
}
DAGNetBase::ExecutionChains computeChains(
const std::vector<internal::OperatorNode>& nodes) {
vector<int> initial_frontier;
for (int idx = 0; idx < nodes.size(); ++idx) {
if (nodes[idx].parents_.size() == 0) {
initial_frontier.push_back(idx);
}
}
// We need to construct the node_seen_count to know how many inner edges each
// node has.
std::unordered_map<OpIndex, int> node_seen_count;
for (int root_index : initial_frontier) {
const auto& root = nodes[root_index];
std::stack<std::pair<OpIndex, std::vector<int>::const_iterator>>
depth_stack;
depth_stack.push(make_pair(root_index, root.children_.begin()));
node_seen_count[root_index]++;
CAFFE_ENFORCE(
node_seen_count[root_index] == 1,
"root node ",
root_index,
" visit count must be == 1");
while (depth_stack.size() > 0) {
auto cur = depth_stack.top();
depth_stack.pop();
if (cur.second != nodes[cur.first].children_.end()) {
OpIndex node_index = *cur.second;
node_seen_count[node_index]++;
cur.second++;
depth_stack.push(cur);
if (node_seen_count[node_index] == 1) {
// Visit each child only once.
depth_stack.push(
make_pair(node_index, nodes[node_index].children_.begin()));
}
}
}
}
// Now, we compute the set of execution chains An execution chain is
// a linear set of nodes that can be executed on a single stream
// (e.g. a chain of single input, single output operators)
DAGNetBase::ExecutionChains chains;
std::unordered_set<OpIndex> seen_nodes;
std::vector<OpIndex> chain;
std::pair<OpIndex, std::vector<int>::const_iterator> cur;
std::stack<std::pair<OpIndex, std::vector<int>::const_iterator>> depth_stack;
auto check_current_for_chaining = [&]() -> bool {
return (
node_seen_count[cur.first] == 1 &&
(chain.size() == 0 || sameDevice(
nodes[cur.first].operator_->def(),
nodes[chain.back()].operator_->def())));
};
auto commit_chain = [&]() {
if (chain.size() > 0) {
CAFFE_ENFORCE(
chains.insert({chain.front(), chain}).second,
"Chain ",
chain.front(),
" was already added.");
VLOG(2) << "Added chain: " << chain.front() << "with elements";
for (auto ch : chain) {
VLOG(2) << ch << ", ";
}
chain.clear();
}
};
auto depth_traverse = [&]() {
while (cur.second != nodes[cur.first].children_.end() &&
seen_nodes.find(*cur.second) != seen_nodes.end()) {
cur.second++;
}
if (cur.second != nodes[cur.first].children_.end()) {
auto next = make_pair(*cur.second, nodes[*cur.second].children_.begin());
depth_stack.push(cur);
depth_stack.push(next);
}
};
for (int root_index : initial_frontier) {
depth_stack.push(
make_pair(root_index, nodes[root_index].children_.begin()));
while (depth_stack.size() > 0) {
cur = depth_stack.top();
depth_stack.pop();
if (seen_nodes.find(cur.first) == seen_nodes.end()) {
seen_nodes.insert(cur.first);
// Has one child, can be candidate for chain or can be added to the
// previous chain.
if (nodes[cur.first].children_.size() == 1) {
if (check_current_for_chaining()) {
// Add oneself to the current chain.
VLOG(1) << "Adding to existing chain" << cur.first;
chain.push_back(cur.first);
int index = *nodes[cur.first].children_.begin();
depth_stack.push(make_pair(index, nodes[index].children_.begin()));
} else {
// Can't belong to the previous chain, commit previous chain and
// start a new one.
commit_chain();
chain.push_back(cur.first);
int index = *nodes[cur.first].children_.begin();
depth_stack.push(make_pair(index, nodes[index].children_.begin()));
}
} else if (
nodes[cur.first].children_.size() == 0 &&
check_current_for_chaining()) {
// Add current node to the current chain and commit.
chain.push_back(cur.first);
commit_chain();
} else {
// Node has more than one child.
commit_chain();
// Add current node as an independent chain since it won't be a part
// of a bigger chain.
chain.push_back(cur.first);
commit_chain();
depth_traverse();
}
} else {
// This node has been seen before, we will only traverse its children.
// Commit any pending chains and continue traversing.
commit_chain();
depth_traverse();
}
} // End while
// Check if this if is even needed.
commit_chain();
}
CAFFE_ENFORCE(
seen_nodes.size() == nodes.size(),
"Haven't seen all the nodes, expected number of nodes ",
nodes.size(),
", but seen only ",
seen_nodes.size(),
".");
return chains;
}
}
CAFFE_DEFINE_REGISTRY(NetRegistry, NetBase, const NetDef&, Workspace*);
NetBase::NetBase(const NetDef& def, Workspace* /* unused */)
: external_input_(def.external_input().begin(),
def.external_input().end()),
external_output_(def.external_output().begin(),
def.external_output().end()) {
// Go through the operators and make sure that blobs are correctly made.
std::set<string> known_blobs(
external_input_.begin(), external_input_.end());
std::set<string> remaining_output(
external_output_.begin(), external_output_.end());
for (const OperatorDef& op : def.op()) {
for (const string& in : op.input()) {
if (!known_blobs.count(in)) {
if (external_input_.size()) {
CAFFE_THROW(
"op ",
op.type(),
": Source for input ",
in,
" is unknown for net ",
def.name(),
", operator ",
ProtoDebugString(op));
} else {
// If we are not declaring input and output, we will simply VLOG it
// for debugging purposes.
VLOG(1) << "op " << op.type() << ": input " << in << " is unknown.";
}
}
}
for (const string& out : op.output()) {
known_blobs.insert(out);
remaining_output.erase(out);
}
}
// Finally, check if all declared outputs are being created.
CAFFE_ENFORCE(
remaining_output.size() == 0,
"Some of the blobs are declared as output but never produced by the "
"net ",
def.name(),
", the first one is ",
*remaining_output.begin());
}
unique_ptr<NetBase> CreateNet(const NetDef& net_def, Workspace* ws) {
// In default, we will return a simple network that just runs all operators
// sequentially.
if (!net_def.has_type()) {
return make_unique<SimpleNet>(net_def, ws);
}
return NetRegistry()->Create(net_def.type(), net_def, ws);
}
SimpleNet::SimpleNet(const NetDef& net_def, Workspace* ws)
: NetBase(net_def, ws) {
VLOG(1) << "Constructing SimpleNet " << net_def.name();
bool net_def_has_device_option = net_def.has_device_option();
// Initialize the operators
for (const OperatorDef& operator_def : net_def.op()) {
VLOG(1) << "Creating operator " << operator_def.name()
<< ":" << operator_def.type();
if (!operator_def.has_device_option() && net_def_has_device_option) {
// In the case that the operator def does not specify a device option but
// the net def has a default option, we copy the device option over to the
// operator def.
OperatorDef temp_def(operator_def);
temp_def.mutable_device_option()->CopyFrom(net_def.device_option());
operators_.emplace_back(CreateOperator(temp_def, ws));
CAFFE_ENFORCE(
operators_.back() != nullptr,
"Cannot create operator for def: ",
ProtoDebugString(temp_def));
} else {
operators_.emplace_back(CreateOperator(operator_def, ws));
CAFFE_ENFORCE(
operators_.back() != nullptr,
"Cannot create operator for def: ",
ProtoDebugString(operator_def));
}
}
}
bool SimpleNet::Run() {
VLOG(1) << "Running net.";
for (auto& op : operators_) {
VLOG(1) << "Running operator " << op->def().name()
<< "(" << op->def().type() << ").";
if (!op->Run()) {
LOG(ERROR) << "Operator failed: "
<< ProtoDebugString(op->def());
return false;
}
}
return true;
}
bool SimpleNet::RunAsync() {
VLOG(1) << "Running net.";
for (auto& op : operators_) {
VLOG(1) << "Running operator " << op->def().name()
<< "(" << op->def().type() << ").";
if (!op->RunAsync()) {
LOG(ERROR) << "Operator failed: "
<< ProtoDebugString(op->def());
return false;
}
}
return true;
}
vector<float> SimpleNet::TEST_Benchmark(
const int warmup_runs,
const int main_runs,
const bool run_individual) {
LOG(INFO) << "Starting benchmark.";
LOG(INFO) << "Running warmup runs.";
CAFFE_ENFORCE(
warmup_runs >= 0,
"Number of warm up runs should be non negative, provided ",
warmup_runs,
".");
for (int i = 0; i < warmup_runs; ++i) {
CAFFE_ENFORCE(Run(), "Warmup run ", i, " has failed.");
}
LOG(INFO) << "Main runs.";
CAFFE_ENFORCE(
main_runs >= 0,
"Number of main runs should be non negative, provided ",
main_runs,
".");
Timer timer;
for (int i = 0; i < main_runs; ++i) {
CAFFE_ENFORCE(Run(), "Main run ", i, " has failed.");
}
auto millis = timer.MilliSeconds();
LOG(INFO) << "Main run finished. Milliseconds per iter: "
<< millis / main_runs
<< ". Iters per second: " << 1000.0 * main_runs / millis;
vector<float> time_per_op(operators_.size(), 0);
CaffeMap<string, float> time_per_op_type;
if (run_individual) {
for (int i = 0; i < main_runs; ++i) {
int idx = 0;
for (auto& op : operators_) {
const string& op_type = op->def().type();
timer.Start();
CAFFE_ENFORCE(
op->Run(),
"operator ",
op->def().name(),
"(",
op_type,
") has failed.");
float spent = timer.MilliSeconds();
time_per_op[idx] += spent;
time_per_op_type[op_type] += spent;
++idx;
}
}
int idx = 0;
for (auto& op : operators_) {
const string& op_type = op->def().type();
const string& print_name =
(op->def().name().size()
? op->def().name()
: (op->def().output_size() ? op->def().output(0) : "NO_OUTPUT"));
LOG(INFO) << "Operator #" << idx << " (" << print_name << ", " << op_type
<< ") " << time_per_op[idx] / main_runs << " ms/iter";
++idx;
}
LOG(INFO) << "Time per operator type:";
for (const auto& item : time_per_op_type) {
LOG(INFO) << std::setw(15) << std::setfill(' ')
<< item.second / main_runs
<< " " << item.first;
}
}
// We will reuse time_per_op to return the result of BenchmarkNet.
for (int i = 0; i < time_per_op.size(); ++i) {
time_per_op[i] /= main_runs;
}
time_per_op.insert(time_per_op.begin(), millis / main_runs);
return time_per_op;
}
DAGNetBase::DAGNetBase(const NetDef& net_def, Workspace* ws)
: NetBase(net_def, ws), operator_nodes_(net_def.op_size()) {
// Blob creator allows us to track which operator created which blob.
VLOG(1) << "Constructing DAGNet " << net_def.name();
std::map<string, int> blob_creator;
std::map<string, std::set<int> > blob_readers;
bool net_def_has_device_option = net_def.has_device_option();
// Initialize the operators
for (int idx = 0; idx < net_def.op_size(); ++idx) {
const OperatorDef& op_def = net_def.op(idx);
VLOG(1) << "Creating operator #" << idx << ": "
<< op_def.name() << ":" << op_def.type();
if (!op_def.has_device_option() && net_def_has_device_option) {
OperatorDef temp_def(op_def);
temp_def.mutable_device_option()->CopyFrom(net_def.device_option());
operator_nodes_[idx].operator_ = CreateOperator(temp_def, ws);
CAFFE_ENFORCE(
operator_nodes_[idx].operator_ != nullptr,
"Cannot create operator for def: ",
ProtoDebugString(temp_def));
} else {
operator_nodes_[idx].operator_ = CreateOperator(op_def, ws);
CAFFE_ENFORCE(
operator_nodes_[idx].operator_ != nullptr,
"Cannot create operator for def: ",
ProtoDebugString(op_def));
}
// Check the inputs, and set up parents if necessary. This addressese the
// read after write case.
auto checkInputs = [&](
const google::protobuf::RepeatedPtrField<std::string>& inputs) {
for (const string& input : inputs) {
if (blob_creator.count(input) == 0) {
VLOG(1) << "Input " << input << " not produced by this net. "
<< "Assuming it is pre-existing.";
} else {
int parent = blob_creator[input];
VLOG(1) << "op dependency (RaW " << input << "): " << parent << "->"
<< idx;
operator_nodes_[idx].parents_.push_back(parent);
operator_nodes_[parent].children_.push_back(idx);
}
// Add the current idx to the readers of this input.
blob_readers[input].insert(idx);
}
};
checkInputs(op_def.input());
checkInputs(op_def.control_input());
// Check the outputs.
for (const string& output : op_def.output()) {
if (blob_creator.count(output) != 0) {
// This addresses the write after write case - we will assume that all
// writes are inherently sequential.
int waw_parent = blob_creator[output];
VLOG(1) << "op dependency (WaW " << output << "): "
<< waw_parent << "->" << idx;
operator_nodes_[idx].parents_.push_back(waw_parent);
operator_nodes_[waw_parent].children_.push_back(idx);
}
// This addresses the write after read case - we will assume that writes
// should only occur after all previous reads are finished.
for (const int war_parent : blob_readers[output]) {
VLOG(1) << "op dependency (WaR " << output << "): "
<< war_parent << "->" << idx;
operator_nodes_[idx].parents_.push_back(war_parent);
operator_nodes_[war_parent].children_.push_back(idx);
}
// Renew the creator of the output name.
blob_creator[output] = idx;
// The write would create an implicit barrier that all earlier readers of
// this output is now parents of the current op, and future writes would
// not need to depend on these earlier readers. Thus, we can clear up the
// blob readers.
blob_readers[output].clear();
}
}
// Now, make sure that the parent list and the children list do not contain
// duplicated items.
for (int i = 0; i < operator_nodes_.size(); ++i) {
auto& node = operator_nodes_[i];
// Sort, remove duplicates, and delete self dependency.
auto& p = node.parents_;
std::sort(p.begin(), p.end());
p.erase(std::unique(p.begin(), p.end()), p.end());
p.erase(std::remove(p.begin(), p.end(), i), p.end());
// Do the same for the children vector.
auto& c = node.children_;
std::sort(c.begin(), c.end());
c.erase(std::unique(c.begin(), c.end()), c.end());
c.erase(std::remove(c.begin(), c.end(), i), c.end());
}
execution_chains_ =
(FLAGS_caffe2_disable_chaining ? singleChains(operator_nodes_)
: computeChains(operator_nodes_));
LOG(INFO) << "Number of parallel execution chains "
<< execution_chains_.size()
<< " Number of operators = " << net_def.op_size();
// TODO: do we want to make sure that there are no loops in the
// dependency graph?
// Figure out the initial frontier - this is the one we will feed into the job
// queue to start a run.
for (int idx = 0; idx < operator_nodes_.size(); ++idx) {
if (operator_nodes_[idx].parents_.size() == 0) {
initial_frontier_.push_back(idx);
}
}
// Finally, start the workers.
int num_workers = net_def.has_num_workers() ? net_def.num_workers() : 1;
CAFFE_ENFORCE(num_workers > 0, "Must have a positive number of workers.");
if (num_workers == 1) {
LOG(WARNING) << "Number of workers is 1: this means that all operators "
<< "will be executed sequentially. Did you forget to set "
<< "num_workers in the NetDef?";
}
for (int i = 0; i < num_workers; ++i) {
VLOG(1) << "Start worker #" << i;
workers_.push_back(std::thread(&DAGNetBase::WorkerFunction, this));
}
}
DAGNetBase::~DAGNetBase() {
// Safely join all the workers before exiting.
job_queue_.NoMoreJobs();
VLOG(1) << "Joining workers.";
for (auto& worker : workers_) {
worker.join();
}
}
bool DAGNetBase::Run() {
// Lock the run_in_progress_ lock so that we do not accidentally call Run()
// in parallel.
std::unique_lock<std::mutex> run_lock(run_in_progress_);
VLOG(1) << "Running parallel net.";
// First, set up job queue.
remaining_ops_ = operator_nodes_.size();
success_ = true;
// TODO(jiayq): Start all worker threads.
// Initialize the runtime parent count.
for (auto& node : operator_nodes_) {
node.runtime_parent_count_ = node.parents_.size();
}
// Kickstart the job queue.
for (auto& value : initial_frontier_) {
job_queue_.Push(value);
}
std::unique_lock<std::mutex> mutex_lock(remaining_ops_mutex_);
while (remaining_ops_ > 0) {
VLOG(2) << "Remaining ops to run: " << remaining_ops_;
cv_.wait(mutex_lock);
}
VLOG(2) << "All ops finished running.";
for (const auto& op : operator_nodes_) {
CAFFE_ENFORCE(
op.runtime_parent_count_ == 0,
"Operator ",
op.operator_->def().name(),
"(",
op.operator_->def().type(),
") has some runtime parents left.");
}
// If the above while loop finished, we know that the current run finished.
return success_;
}
void DAGNetBase::WorkerFunction() {
// WorkerFunctions() is an infinite loop until there are no more jobs to run.
while (true) {
int idx = 0;
// If there is no more jobs - meaning that the DAGNetBase is destructing -
// we will exit safely.
if (!job_queue_.Pop(&idx)) {
return;
}
VLOG(1) << "Running operator #" << idx << " "
<< operator_nodes_[idx].operator_->def().name()
<< "(" << operator_nodes_[idx].operator_->def().type() << ").";
CAFFE_ENFORCE(
execution_chains_.find(idx) != execution_chains_.end(),
"Can't find chain ",
idx,
".");
const auto& chain = execution_chains_[idx];
bool this_success = RunAt(execution_chains_[idx]);
if (!this_success) {
LOG(ERROR) << "Operator chain failed: "
<< ProtoDebugString(operator_nodes_[idx].operator_->def());
}
// Do book-keeping
for (const auto idx: chain) {
for (const auto child: operator_nodes_[idx].children_) {
const int count = --operator_nodes_[child].runtime_parent_count_;
CAFFE_ENFORCE(
count >= 0,
"Found runtime parent count smaller than zero for ",
"operator node ",
operator_nodes_[child].operator_->def().name(),
"(",
operator_nodes_[child].operator_->def().type(),
").");
if (count != 0) {
continue;
}
if (std::find(chain.begin(), chain.end(), child) != chain.end()) {
// already executed
continue;
}
VLOG(2) << "Pushing operator #" << child << " to queue.";
job_queue_.Push(child);
}
}
// Notify that the processed op is incremented by one.
{
std::unique_lock<std::mutex> mutex_lock(remaining_ops_mutex_);
remaining_ops_ -= chain.size();
success_ &= this_success;
CAFFE_ENFORCE(
remaining_ops_ >= 0,
"All the operations should be finished by now, still have ",
remaining_ops_,
" remaining.");
}
cv_.notify_one();
VLOG(2) << "Finished executing operator #" << idx;
}
}
vector<float> DAGNetBase::TEST_Benchmark(
const int warmup_runs,
const int main_runs,
const bool run_individual) {
LOG(INFO) << "Starting benchmark.";
LOG(INFO) << "Running warmup runs.";
CAFFE_ENFORCE(
warmup_runs >= 0,
"Number of warm up runs should be non negative, provided ",
warmup_runs,
".");
for (int i = 0; i < warmup_runs; ++i) {
CAFFE_ENFORCE(Run(), "Warmup run ", i, " has failed.");
}
LOG(INFO) << "Main runs.";
CAFFE_ENFORCE(
main_runs >= 0,
"Number of main runs should be non negative, provided ",
main_runs,
".");
Timer timer;
for (int i = 0; i < main_runs; ++i) {
CAFFE_ENFORCE(Run(), "Main run ", i, " has failed.");
}
auto millis = timer.MilliSeconds();
LOG(INFO) << "Main run finished. Milliseconds per iter: "
<< millis / main_runs
<< ". Iters per second: " << 1000.0 * main_runs / millis;
if (run_individual) {
LOG(INFO) << "DAGNet does not do per-op benchmark. To do so, "
"switch to a simple net type.";
}
return vector<float>{millis / main_runs};
}
class DAGNet : public DAGNetBase {
public:
using DAGNetBase::DAGNetBase;
protected:
bool RunAt(const std::vector<int>& chain) override {
bool success = true;
for (const auto idx : chain) {
success &= operator_nodes_[idx].operator_->Run();
}
return success;
}
};
namespace {
REGISTER_NET(simple, SimpleNet);
REGISTER_NET(dag, DAGNet);
}
} // namespace caffe2