Better names for nets, steps and tasks
Summary:
- NetBuilder now honors its name
- When Nets are created in the context of a NetBuilder, they take NetBuilder's name as prefix
- When a NetBuilder is created in the context of a Task, it takes the Tasks's name.
- pipe() now tries to find a good name based on its processor's, output or input queue's name.
- RPC tries to find a name from its handler's name.
- Better names in DataStream
- net_printer prints the name of Tasks and Steps
- net_printer optionally factors out common prefixes form blob names.
Differential Revision: D4527578
fbshipit-source-id: 5d3d1237c186e9576313c5aa01cc8800a9051217
diff --git a/caffe2/python/core.py b/caffe2/python/core.py
index 8a98744..25a39bc 100644
--- a/caffe2/python/core.py
+++ b/caffe2/python/core.py
@@ -290,10 +290,7 @@
def _RegisterPythonImpl(f, grad_f=None, pass_workspace=False):
if isinstance(f, tuple):
- print('Registering python with tuple', f)
f = f[0](*f[1], **f[2])
- else:
- print('Registering python without tuple', f)
if isinstance(grad_f, tuple):
grad_f = grad_f[0](*grad_f[1], **grad_f[2])
@@ -1104,8 +1101,15 @@
operator_registry_ = {}
@staticmethod
+ def current_prefix():
+ from caffe2.python.net_builder import NetBuilder
+ builder = NetBuilder.current(required=False)
+ return builder.name if builder else ''
+
+ @staticmethod
def _get_next_net_name(basename):
- name = basename
+ name = basename = '/'.join(filter(
+ lambda x: x, (Net.current_prefix(), basename)))
next_idx = 1
while name in Net._net_names_used:
name = basename + '_' + str(next_idx)
@@ -1160,13 +1164,14 @@
self._next_name_index = max(autogen_indices) + 1
else:
self._next_name_index = 0
+ name = self._net.name
else:
+ name = name_or_proto
self._net = caffe2_pb2.NetDef()
- self._net.name = name_or_proto
self._next_name_index = 0
# make sure that this net name hasn't been used before
- self._net.name = Net._get_next_net_name(self._net.name)
+ self._net.name = Net._get_next_net_name(name)
def AppendNet(self, net):
assert isinstance(net, Net)
@@ -1889,6 +1894,8 @@
return step_or_nets
stop_blob = None
+ if not default_name and hasattr(step_or_nets, 'name'):
+ default_name = step_or_nets.name
if isinstance(step_or_nets, NetBuilder):
stop_blob = step_or_nets._stop_blob
step_or_nets = step_or_nets.get()
diff --git a/caffe2/python/net_builder.py b/caffe2/python/net_builder.py
index 2390276..3942ae4 100644
--- a/caffe2/python/net_builder.py
+++ b/caffe2/python/net_builder.py
@@ -25,9 +25,12 @@
ops.Print(d, [])
step = core.to_execution_step(nb)
"""
- def __init__(self, name=None, _stop_blob_required=False, _stop_blob=None):
- self._name = name or ''
- self._prefix = name + '/' if name else ''
+ def __init__(self, name=None, _stop_blob_required=False,
+ _stop_blob=None, _fullname=None):
+ nb = NetBuilder.current(required=False)
+ assert not _fullname or not name, 'Cannot set both _fullname and name'
+ self.name = _fullname or '/'.join(filter(lambda x: x, (
+ nb.name if nb else None, name)))
self._frozen = False
self._current_net = None
self._children = []
@@ -47,8 +50,7 @@
self._stop_blob = core.BlobReference(
net.NextName('stop_blob'), net=net)
if self._current_net != self._children[0]:
- self._children.insert(0, core.Net(
- self._prefix + 'stop_blob_init'))
+ self._children.insert(0, core.Net('stop_blob_init'))
self._children[0].Const(False, blob_out=self._stop_blob)
return self._stop_blob
@@ -58,7 +60,7 @@
def _assert_mutable(self):
assert not self._frozen, (
- 'This NetBuilder (%s) has been built already.' % self._name)
+ 'This NetBuilder (%s) has been built already.' % self.name)
def add(self, child):
self._assert_mutable()
@@ -69,10 +71,10 @@
self._current_net = child
return child
- def current_net(self):
+ def current_net(self, name=None):
self._assert_mutable()
- if self._current_net is None:
- self.add(core.Net(self._prefix + 'net'))
+ if self._current_net is None or name is not None:
+ self.add(core.Net(name))
return self._current_net
def freeze(self):
@@ -91,22 +93,33 @@
if etype is not None:
return
assert (not self._stop_blob_required) or self._stop_blob is not None, (
- 'This NetBuilder (%s) requires a stop condition ' % self._name +
+ 'This NetBuilder (%s) requires a stop condition ' % self.name +
'to be set with `stop` or `stop_if`')
+ def __str__(self):
+ return self.name or 'Un-named NetBuilder'
+
class Operations(object):
"""
Operations to be used in the context of a NetBuilder.
"""
- def net(self, net=None):
+ def net(self, net=None, name=None):
"""
Retrieves the current net, or add a new net to the builder.
+ Args:
+ net: If provided, add the given net to the active builder.
+ Else, returns the current Net or creates a new one as needed.
+ name: if provided, creates a new Net with given name and makes
+ it the new current net of the active builder. Cannot
+ be provided if net is provided.
"""
+ assert name is None or net is None, (
+ 'Cannot provide both `net` and `name`.')
if net is not None:
NetBuilder.current().add(net)
return net
- return NetBuilder.current().current_net()
+ return NetBuilder.current().current_net(name=name)
def __getattr__(self, op_type):
"""
@@ -153,7 +166,7 @@
"""
return NetBuilder.current().stop_if(blob)
- def loop(self, iters=None):
+ def loop(self, iters=None, name=None):
"""
Creates a NetBuilder that will execute in a loop as the next step of
the current NetBuilder. If `iters` is provided, the loop will execute
@@ -177,9 +190,9 @@
ops.LogInfo(loop.iter())
This will print the numbers from 0 to 19.
"""
- return NetBuilder.current().add(_Loop(iters))
+ return NetBuilder.current().add(_Loop(iters, name=name))
- def stop_guard(self, has_stopped_blob=None):
+ def stop_guard(self, has_stopped_blob=None, name=None):
"""
Creates a NetBuilder that will execute once as the next step of the
current NetBuilder. After execution, a bool tensor will indicate
@@ -199,9 +212,9 @@
followed by True and False.
"""
return NetBuilder.current().add(
- _StopGuard(has_stopped_blob=has_stopped_blob))
+ _StopGuard(has_stopped_blob=has_stopped_blob, name=name))
- def If(self, cond):
+ def If(self, cond, name=None):
"""
Creates a NetBuilder that will execute once as the next step of the
current NetBuilder if the blob `cond` is True.
@@ -212,7 +225,7 @@
ops.Print(ops.Const('Wont print'))
The example will print 'Will print' once.
"""
- return NetBuilder.current().add(_RunIf(cond))
+ return NetBuilder.current().add(_RunIf(cond, name=name))
def task_init(self):
"""
@@ -296,7 +309,7 @@
class _StopGuard(_RunOnce):
- def __init__(self, name=None, has_stopped_blob=None):
+ def __init__(self, has_stopped_blob=None, name=None):
_RunOnce.__init__(self, name)
self._stopped = has_stopped_blob
self._ran = False
@@ -371,12 +384,12 @@
ops.Const(True, blob_out=self._already_ran)
return r
- def Elif(self, cond):
+ def Elif(self, cond, name=None):
assert not self._is_else, 'Else not allowed for an Else.'
- return NetBuilder.current().add(
- _RunIf(cond, _already_ran=self._already_ran))
+ return NetBuilder.current().add(_RunIf(
+ cond, name=name or self.name, _already_ran=self._already_ran))
- def Else(self):
+ def Else(self, name=None):
assert not self._is_else, 'Elif not allowed for an Else.'
return NetBuilder.current().add(
- _RunIf(_already_ran=self._already_ran))
+ _RunIf(name=name or self.name, _already_ran=self._already_ran))
diff --git a/caffe2/python/net_printer.py b/caffe2/python/net_printer.py
index d88dce5..3397db2 100644
--- a/caffe2/python/net_printer.py
+++ b/caffe2/python/net_printer.py
@@ -167,7 +167,10 @@
class Printer(Visitor, Text):
- pass
+ def __init__(self, factor_prefixes=False):
+ super(Visitor, self).__init__()
+ super(Text, self).__init__()
+ self.factor_prefixes = factor_prefixes
def _sanitize_str(s):
@@ -191,12 +194,38 @@
return '[]'
-def call(op, inputs=None, outputs=None):
- inputs = '' if not inputs else ', '.join(
- '%s=%s' % (str(a[0]), str(a[1])) if isinstance(a, tuple) else str(a)
- for a in inputs)
+def commonprefix(m):
+ "Given a list of strings, returns the longest common prefix"
+ if not m:
+ return ''
+ s1 = min(m)
+ s2 = max(m)
+ for i, c in enumerate(s1):
+ if c != s2[i]:
+ return s1[:i]
+ return s1
+
+
+def factor_prefix(vals, do_it):
+ vals = map(str, vals)
+ prefix = commonprefix(vals) if len(vals) > 1 and do_it else ''
+ joined = ', '.join(v[len(prefix):] for v in vals)
+ return '%s[%s]' % (prefix, joined) if prefix else joined
+
+
+def call(op, inputs=None, outputs=None, factor_prefixes=False):
+ if not inputs:
+ inputs = ''
+ else:
+ inputs_v = [a for a in inputs if not isinstance(a, tuple)]
+ inputs_kv = [a for a in inputs if isinstance(a, tuple)]
+ inputs = ', '.join(filter(
+ bool,
+ [factor_prefix(inputs_v, factor_prefixes)] +
+ ['%s=%s' % kv for kv in inputs_kv]))
call = '%s(%s)' % (op, inputs)
- return call if not outputs else '%s = %s' % (', '.join(outputs), call)
+ return call if not outputs else '%s = %s' % (
+ factor_prefix(outputs, factor_prefixes), call)
@Printer.register(OperatorDef)
@@ -204,7 +233,8 @@
text.add(call(
op.type,
list(op.input) + [(a.name, _arg_val(a)) for a in op.arg],
- op.output))
+ op.output,
+ factor_prefixes=text.factor_prefixes))
@Printer.register(Net)
@@ -217,27 +247,28 @@
def _get_step_context(step):
proto = step.Proto()
if proto.should_stop_blob:
- return call('loop'), None
+ return call('loop'), False
if proto.num_iter and proto.num_iter != 1:
- return call('loop', [proto.num_iter]), None
+ return call('loop', [proto.num_iter]), False
concurrent = proto.concurrent_substeps and len(step.Substeps()) > 1
if concurrent:
- return call('parallel'), call('step')
+ return call('parallel'), True
if proto.report_net:
- return call('run_once'), None
- return None, None
+ return call('run_once'), False
+ return None, False
@Printer.register(ExecutionStep)
def print_step(text, step):
proto = step.Proto()
- step_ctx, substep_ctx = _get_step_context(step)
+ step_ctx, do_substep = _get_step_context(step)
with text.context(step_ctx):
if proto.report_net:
with text.context(call('report_net', [proto.report_interval])):
text(step.get_net(proto.report_net))
substeps = step.Substeps() + [step.get_net(n) for n in proto.network]
for substep in substeps:
+ substep_ctx = call('step', [str(substep)]) if do_substep else None
with text.context(substep_ctx):
text(substep)
if proto.should_stop_blob:
@@ -246,7 +277,7 @@
@Printer.register(Task)
def print_task(text, task):
- with text.context(call('Task', [('node', task.node)])):
+ with text.context(call('Task', [('node', task.node), ('name', task.name)])):
text(task.get_step())
diff --git a/caffe2/python/pipeline.py b/caffe2/python/pipeline.py
index a9779f3..5255c35 100644
--- a/caffe2/python/pipeline.py
+++ b/caffe2/python/pipeline.py
@@ -5,7 +5,7 @@
from caffe2.python import core, queue_util
from caffe2.python.dataio import Reader, Writer
-from caffe2.python.net_builder import NetBuilder
+from caffe2.python.net_builder import NetBuilder, ops
from caffe2.python.schema import as_record, Field
from caffe2.python.task import Task, TaskGroup
@@ -128,10 +128,8 @@
Output Queue, DataStream, Reader, or None, depending on the parameters
passed.
"""
- result, step = _pipe_step(
+ result, _ = _pipe_step(
input, output, num_threads, processor, name, capacity, group)
- if step is not None:
- Task(step=step, group=group)
return result
@@ -148,11 +146,10 @@
task_outputs: TaskOutput object, fetchable from the client after
session.run() returns.
"""
- result, step = _pipe_step(
+ assert num_threads > 0
+ result, task = _pipe_step(
input, output, num_threads, processor, name, capacity, group,
final_outputs)
- assert step is not None
- task = Task(step=step, group=group, outputs=final_outputs)
output = None
if final_outputs is not None:
output = task.outputs()
@@ -161,15 +158,23 @@
return result, output
+def processor_name(processor):
+ if hasattr(processor, 'name'):
+ return processor.name
+ if hasattr(processor, 'func_name'):
+ if processor.func_name == '<lambda>':
+ return processor.__module__
+ if hasattr(processor, 'im_class'):
+ return '%s.%s' % (processor.im_class.__name__, processor.func_name)
+ return processor.func_name
+ return processor.__class__.__name__
+
+
def _pipe_step(
input, output=None, num_threads=1, processor=None, name=None,
capacity=None, group=None, final_outputs=None):
"""
"""
- group = TaskGroup.current(group)
- if name is None:
- name = 'processor:%d' % group.num_registered_tasks()
-
if isinstance(input, Reader):
reader = input
elif hasattr(input, 'reader'):
@@ -184,48 +189,51 @@
assert output is None
return reader, None
- global_exit_net = core.Net(name + '_producer_global_exit')
- global_init_net = core.Net(name + '_producer_global_init')
- out_queue = None
- writer = None
+ if name is None and processor is not None:
+ name = processor_name(processor)
+ if name is None and output is not None:
+ name = 'pipe_into:%s' % processor_name(output)
+ if name is None:
+ name = 'pipe_from:%s' % processor_name(input)
- reader.setup_ex(global_init_net, global_exit_net)
+ with Task(name=name, group=group, outputs=final_outputs) as task:
+ global_exit_net = core.Net('exit')
+ global_init_net = core.Net('init')
+ reader.setup_ex(global_init_net, global_exit_net)
- steps = []
- for thread_id in range(num_threads):
- init_net = core.Net(name + "_init_net_%d" % thread_id)
- exit_net = core.Net(name + "_exit_net_%d" % thread_id)
+ out_queue = None
+ writer = None
- read_nets, status, rec = reader.read_record_ex(init_net, exit_net)
+ steps = []
+ for thread_id in range(num_threads):
+ with NetBuilder(name='t:%d' % thread_id) as nb:
+ init_net = core.Net('init')
+ exit_net = core.Net('exit')
+ read_nets, status, rec = reader.read_record_ex(
+ init_net, exit_net)
- if rec is not None:
- if writer is None:
- out_queue, writer = _init_output(
- output, capacity, global_init_net, global_exit_net)
- write_nets, _ = writer.write_record_ex(
- rec, init_net, exit_net, status)
- else:
- write_nets = []
-
- step = core.execution_step(
- name + "_thread_%d" % thread_id, [
- core.execution_step(name + "_init_step", init_net),
- core.execution_step(
- name + "_worker_step",
+ if rec is not None:
+ if writer is None:
+ # hack so that the out queue gets the right name prefix
+ # (otherwise they would be prefixed with the thread id)
+ with NetBuilder(_fullname=task.name):
+ out_queue, writer = _init_output(
+ output, capacity, global_init_net,
+ global_exit_net)
+ write_nets, _ = writer.write_record_ex(
+ rec, init_net, exit_net, status)
+ else:
+ write_nets = []
+ ops.net(init_net)
+ ops.net(core.execution_step('body',
list(read_nets) + list(write_nets),
- should_stop_blob=status
- ), core.execution_step(name + "_exit_step", exit_net)
- ]
- )
- steps.append(step)
- step = core.execution_step(
- "sender_step", [
- core.execution_step('init_step', global_init_net),
- core.execution_step(
- "sender_steps", steps, concurrent_substeps=True),
- core.execution_step('finish_step', global_exit_net),
- ])
- return out_queue, step
+ should_stop_blob=status))
+ ops.net(exit_net)
+ steps.append(core.to_execution_step(nb))
+ ops.net(global_init_net)
+ ops.net(core.execution_step('body', steps, concurrent_substeps=True))
+ ops.net(global_exit_net)
+ return out_queue, task
class ProcessingReader(Reader):
@@ -269,10 +277,11 @@
and (optionally) output records set, with net.set_input_record() and
net.set_output_record().
"""
- def __init__(self, net, stop_signal=None, thread_init_nets=None):
+ def __init__(self, net, stop_signal=None, thread_init_nets=None, name=None):
assert isinstance(net, core.Net)
assert stop_signal is None or isinstance(
stop_signal, core.BlobReference)
+ self.name = name or str(net)
self.thread_init_nets = thread_init_nets or []
self.net = net
self._stop_signal = stop_signal
@@ -288,7 +297,7 @@
def __call__(self, rec):
assert not self._frozen
- prefix = '/worker:%d/' % len(self._blob_maps)
+ prefix = NetBuilder.current().name + '/'
blob_remap = {}
for net in self.thread_init_nets:
new_net, _ = core.clone_and_bind_net(
diff --git a/caffe2/python/queue_util.py b/caffe2/python/queue_util.py
index 2cae95a..8150d01 100644
--- a/caffe2/python/queue_util.py
+++ b/caffe2/python/queue_util.py
@@ -19,7 +19,7 @@
def read_ex(self, local_init_net, local_finish_net):
self._wrapper._new_reader(local_init_net)
- dequeue_net = core.Net('dequeue_net')
+ dequeue_net = core.Net('dequeue')
fields, status_blob = dequeue(
dequeue_net,
self._wrapper.queue(),
@@ -36,7 +36,7 @@
def write_ex(self, fields, local_init_net, local_finish_net, status):
self._wrapper._new_writer(self.schema(), local_init_net)
- enqueue_net = core.Net('enqueue_net')
+ enqueue_net = core.Net('enqueue')
enqueue(enqueue_net, self._wrapper.queue(), fields, status)
return [enqueue_net]
@@ -77,15 +77,15 @@
def enqueue(net, queue, data_blobs, status=None):
if status is None:
- status = net.NextName("%s_enqueue_status" % str(queue))
+ status = net.NextName('status')
results = net.SafeEnqueueBlobs([queue] + data_blobs, data_blobs + [status])
return results[-1]
def dequeue(net, queue, num_blobs, status=None):
- data_names = [net.NextName("%s_dequeue_data", i) for i in range(num_blobs)]
+ data_names = [net.NextName('data', i) for i in range(num_blobs)]
if status is None:
- status = net.NextName("%s_dequeue_status")
+ status = net.NextName('status')
results = net.SafeDequeueBlobs(queue, data_names + [status])
results = list(results)
status_blob = results.pop(-1)
diff --git a/caffe2/python/task.py b/caffe2/python/task.py
index 9d2b844..df19626 100644
--- a/caffe2/python/task.py
+++ b/caffe2/python/task.py
@@ -305,6 +305,7 @@
step = core.execution_step(node, steps)
Task(
node=node, step=step, outputs=outputs,
+ name='grouped_by_node',
group=grouped_by_node, workspace_type=workspace_type)
self._tasks_by_node = (grouped_by_node, node_map)
return grouped_by_node
@@ -383,18 +384,41 @@
TASK_SETUP = 'task_setup'
REPORT_NET = 'report_net'
+ _global_names_used = set()
+
+ @staticmethod
+ def _get_next_name(node, group, name):
+ basename = str(node) + '/' + str(name)
+ names_used = (
+ Task._global_names_used
+ if group is None else
+ set(t.name for t in group._tasks_to_add))
+ cur_name = basename
+ i = 0
+ while cur_name in names_used:
+ i += 1
+ cur_name = '%s:%d' % (basename, i)
+ return cur_name
def __init__(
self, step=None, outputs=None,
- workspace_type=None, group=None, node=None):
+ workspace_type=None, group=None, node=None, name=None):
"""
Instantiate a Task and add it to the current TaskGroup and Node.
"""
+ if not name and isinstance(step, core.ExecutionStep):
+ name = step.Proto().name
+ if not name:
+ name = 'task'
# register this node name with active context
self.node = str(Node.current(None if node is None else Node(node)))
- group = TaskGroup.current(group, required=False)
- if group is not None:
- group._tasks_to_add.append(self)
+ self.group = TaskGroup.current(group, required=False)
+
+ self.name = Task._get_next_name(self.node, self.group, name)
+
+ # may need to be temporarily removed later if Task used as a context
+ if self.group is not None:
+ self.group._tasks_to_add.append(self)
self._already_used = False
self._step = None
@@ -411,17 +435,22 @@
self._report_net = None
def __enter__(self):
+ # temporarily remove from _tasks_to_add to ensure correct order
+ if self.group is not None:
+ self.group._tasks_to_add.remove(self)
self._assert_not_used()
assert self._step is None, 'This Task already has an execution step.'
from caffe2.python import net_builder
- self._net_builder = net_builder.NetBuilder()
+ self._net_builder = net_builder.NetBuilder(_fullname=self.name)
self._net_builder.__enter__()
return self
def __exit__(self, type, value, traceback):
+ self._net_builder.__exit__(type, value, traceback)
if type is None:
self.set_step(self._net_builder)
- self._net_builder.__exit__(type, value, traceback)
+ if self.group is not None:
+ self.group._tasks_to_add.append(self)
self._net_builder = None
def workspace_type(self):
@@ -462,20 +491,20 @@
init_nets, exit_nets = get_setup_nets(
Task.TASK_SETUP, [self._step], self)
if len(self._outputs) == 0:
- output_net = core.Net("output_net")
+ output_net = core.Net('%s:output' % self.name)
self.add_output(output_net.ConstantFill(
[], 1, dtype=core.DataType.INT32, value=0))
exit_nets.append(output_net)
self._step_with_setup = core.execution_step(
- 'task',
+ self.name,
[
- core.execution_step('task_init', init_nets),
+ core.execution_step('%s:init' % self.name, init_nets),
self._step,
- core.execution_step('task_exit', exit_nets),
+ core.execution_step('%s:exit' % self.name, exit_nets),
]
)
elif self._step_with_setup is None:
- self._step_with_setup = core.execution_step('task', [])
+ self._step_with_setup = core.execution_step(self.name, [])
return self._step_with_setup
def outputs(self):