Better names for nets, steps and tasks

Summary:
- NetBuilder now honors its name
- When Nets are created in the context of a NetBuilder, they take NetBuilder's name as prefix
- When a NetBuilder is created in the context of a Task, it takes the Tasks's name.
- pipe() now tries to find a good name based on its processor's, output or input queue's name.
- RPC tries to find a name from its handler's name.
- Better names in DataStream
- net_printer prints the name of Tasks and Steps
- net_printer optionally factors out common prefixes form blob names.

Differential Revision: D4527578

fbshipit-source-id: 5d3d1237c186e9576313c5aa01cc8800a9051217
diff --git a/caffe2/python/core.py b/caffe2/python/core.py
index 8a98744..25a39bc 100644
--- a/caffe2/python/core.py
+++ b/caffe2/python/core.py
@@ -290,10 +290,7 @@
 
 def _RegisterPythonImpl(f, grad_f=None, pass_workspace=False):
     if isinstance(f, tuple):
-        print('Registering python with tuple', f)
         f = f[0](*f[1], **f[2])
-    else:
-        print('Registering python without tuple', f)
     if isinstance(grad_f, tuple):
         grad_f = grad_f[0](*grad_f[1], **grad_f[2])
 
@@ -1104,8 +1101,15 @@
     operator_registry_ = {}
 
     @staticmethod
+    def current_prefix():
+        from caffe2.python.net_builder import NetBuilder
+        builder = NetBuilder.current(required=False)
+        return builder.name if builder else ''
+
+    @staticmethod
     def _get_next_net_name(basename):
-        name = basename
+        name = basename = '/'.join(filter(
+            lambda x: x, (Net.current_prefix(), basename)))
         next_idx = 1
         while name in Net._net_names_used:
             name = basename + '_' + str(next_idx)
@@ -1160,13 +1164,14 @@
                 self._next_name_index = max(autogen_indices) + 1
             else:
                 self._next_name_index = 0
+            name = self._net.name
         else:
+            name = name_or_proto
             self._net = caffe2_pb2.NetDef()
-            self._net.name = name_or_proto
             self._next_name_index = 0
 
         # make sure that this net name hasn't been used before
-        self._net.name = Net._get_next_net_name(self._net.name)
+        self._net.name = Net._get_next_net_name(name)
 
     def AppendNet(self, net):
         assert isinstance(net, Net)
@@ -1889,6 +1894,8 @@
         return step_or_nets
 
     stop_blob = None
+    if not default_name and hasattr(step_or_nets, 'name'):
+        default_name = step_or_nets.name
     if isinstance(step_or_nets, NetBuilder):
         stop_blob = step_or_nets._stop_blob
         step_or_nets = step_or_nets.get()
diff --git a/caffe2/python/net_builder.py b/caffe2/python/net_builder.py
index 2390276..3942ae4 100644
--- a/caffe2/python/net_builder.py
+++ b/caffe2/python/net_builder.py
@@ -25,9 +25,12 @@
             ops.Print(d, [])
         step = core.to_execution_step(nb)
     """
-    def __init__(self, name=None, _stop_blob_required=False, _stop_blob=None):
-        self._name = name or ''
-        self._prefix = name + '/' if name else ''
+    def __init__(self, name=None, _stop_blob_required=False,
+                 _stop_blob=None, _fullname=None):
+        nb = NetBuilder.current(required=False)
+        assert not _fullname or not name, 'Cannot set both _fullname and name'
+        self.name = _fullname or '/'.join(filter(lambda x: x, (
+            nb.name if nb else None, name)))
         self._frozen = False
         self._current_net = None
         self._children = []
@@ -47,8 +50,7 @@
             self._stop_blob = core.BlobReference(
                 net.NextName('stop_blob'), net=net)
             if self._current_net != self._children[0]:
-                self._children.insert(0, core.Net(
-                    self._prefix + 'stop_blob_init'))
+                self._children.insert(0, core.Net('stop_blob_init'))
                 self._children[0].Const(False, blob_out=self._stop_blob)
         return self._stop_blob
 
@@ -58,7 +60,7 @@
 
     def _assert_mutable(self):
         assert not self._frozen, (
-            'This NetBuilder (%s) has been built already.' % self._name)
+            'This NetBuilder (%s) has been built already.' % self.name)
 
     def add(self, child):
         self._assert_mutable()
@@ -69,10 +71,10 @@
             self._current_net = child
         return child
 
-    def current_net(self):
+    def current_net(self, name=None):
         self._assert_mutable()
-        if self._current_net is None:
-            self.add(core.Net(self._prefix + 'net'))
+        if self._current_net is None or name is not None:
+            self.add(core.Net(name))
         return self._current_net
 
     def freeze(self):
@@ -91,22 +93,33 @@
         if etype is not None:
             return
         assert (not self._stop_blob_required) or self._stop_blob is not None, (
-            'This NetBuilder (%s) requires a stop condition ' % self._name +
+            'This NetBuilder (%s) requires a stop condition ' % self.name +
             'to be set with `stop` or `stop_if`')
 
+    def __str__(self):
+        return self.name or 'Un-named NetBuilder'
+
 
 class Operations(object):
     """
     Operations to be used in the context of a NetBuilder.
     """
-    def net(self, net=None):
+    def net(self, net=None, name=None):
         """
         Retrieves the current net, or add a new net to the builder.
+        Args:
+            net:   If provided, add the given net to the active builder.
+                   Else, returns the current Net or creates a new one as needed.
+            name:  if provided, creates a new Net with given name and makes
+                   it the new current net of the active builder. Cannot
+                   be provided if net is provided.
         """
+        assert name is None or net is None, (
+            'Cannot provide both `net` and `name`.')
         if net is not None:
             NetBuilder.current().add(net)
             return net
-        return NetBuilder.current().current_net()
+        return NetBuilder.current().current_net(name=name)
 
     def __getattr__(self, op_type):
         """
@@ -153,7 +166,7 @@
         """
         return NetBuilder.current().stop_if(blob)
 
-    def loop(self, iters=None):
+    def loop(self, iters=None, name=None):
         """
         Creates a NetBuilder that will execute in a loop as the next step of
         the current NetBuilder. If `iters` is provided, the loop will execute
@@ -177,9 +190,9 @@
                     ops.LogInfo(loop.iter())
             This will print the numbers from 0 to 19.
         """
-        return NetBuilder.current().add(_Loop(iters))
+        return NetBuilder.current().add(_Loop(iters, name=name))
 
-    def stop_guard(self, has_stopped_blob=None):
+    def stop_guard(self, has_stopped_blob=None, name=None):
         """
         Creates a NetBuilder that will execute once as the next step of the
         current NetBuilder. After execution, a bool tensor will indicate
@@ -199,9 +212,9 @@
             followed by True and False.
         """
         return NetBuilder.current().add(
-            _StopGuard(has_stopped_blob=has_stopped_blob))
+            _StopGuard(has_stopped_blob=has_stopped_blob, name=name))
 
-    def If(self, cond):
+    def If(self, cond, name=None):
         """
         Creates a NetBuilder that will execute once as the next step of the
         current NetBuilder if the blob `cond` is True.
@@ -212,7 +225,7 @@
                     ops.Print(ops.Const('Wont print'))
             The example will print 'Will print' once.
         """
-        return NetBuilder.current().add(_RunIf(cond))
+        return NetBuilder.current().add(_RunIf(cond, name=name))
 
     def task_init(self):
         """
@@ -296,7 +309,7 @@
 
 
 class _StopGuard(_RunOnce):
-    def __init__(self, name=None, has_stopped_blob=None):
+    def __init__(self, has_stopped_blob=None, name=None):
         _RunOnce.__init__(self, name)
         self._stopped = has_stopped_blob
         self._ran = False
@@ -371,12 +384,12 @@
         ops.Const(True, blob_out=self._already_ran)
         return r
 
-    def Elif(self, cond):
+    def Elif(self, cond, name=None):
         assert not self._is_else, 'Else not allowed for an Else.'
-        return NetBuilder.current().add(
-            _RunIf(cond, _already_ran=self._already_ran))
+        return NetBuilder.current().add(_RunIf(
+            cond, name=name or self.name, _already_ran=self._already_ran))
 
-    def Else(self):
+    def Else(self, name=None):
         assert not self._is_else, 'Elif not allowed for an Else.'
         return NetBuilder.current().add(
-            _RunIf(_already_ran=self._already_ran))
+            _RunIf(name=name or self.name, _already_ran=self._already_ran))
diff --git a/caffe2/python/net_printer.py b/caffe2/python/net_printer.py
index d88dce5..3397db2 100644
--- a/caffe2/python/net_printer.py
+++ b/caffe2/python/net_printer.py
@@ -167,7 +167,10 @@
 
 
 class Printer(Visitor, Text):
-    pass
+    def __init__(self, factor_prefixes=False):
+        super(Visitor, self).__init__()
+        super(Text, self).__init__()
+        self.factor_prefixes = factor_prefixes
 
 
 def _sanitize_str(s):
@@ -191,12 +194,38 @@
     return '[]'
 
 
-def call(op, inputs=None, outputs=None):
-    inputs = '' if not inputs else ', '.join(
-        '%s=%s' % (str(a[0]), str(a[1])) if isinstance(a, tuple) else str(a)
-        for a in inputs)
+def commonprefix(m):
+    "Given a list of strings, returns the longest common prefix"
+    if not m:
+        return ''
+    s1 = min(m)
+    s2 = max(m)
+    for i, c in enumerate(s1):
+        if c != s2[i]:
+            return s1[:i]
+    return s1
+
+
+def factor_prefix(vals, do_it):
+    vals = map(str, vals)
+    prefix = commonprefix(vals) if len(vals) > 1 and do_it else ''
+    joined = ', '.join(v[len(prefix):] for v in vals)
+    return '%s[%s]' % (prefix, joined) if prefix else joined
+
+
+def call(op, inputs=None, outputs=None, factor_prefixes=False):
+    if not inputs:
+        inputs = ''
+    else:
+        inputs_v = [a for a in inputs if not isinstance(a, tuple)]
+        inputs_kv = [a for a in inputs if isinstance(a, tuple)]
+        inputs = ', '.join(filter(
+            bool,
+            [factor_prefix(inputs_v, factor_prefixes)] +
+            ['%s=%s' % kv for kv in inputs_kv]))
     call = '%s(%s)' % (op, inputs)
-    return call if not outputs else '%s = %s' % (', '.join(outputs), call)
+    return call if not outputs else '%s = %s' % (
+        factor_prefix(outputs, factor_prefixes), call)
 
 
 @Printer.register(OperatorDef)
@@ -204,7 +233,8 @@
     text.add(call(
         op.type,
         list(op.input) + [(a.name, _arg_val(a)) for a in op.arg],
-        op.output))
+        op.output,
+        factor_prefixes=text.factor_prefixes))
 
 
 @Printer.register(Net)
@@ -217,27 +247,28 @@
 def _get_step_context(step):
     proto = step.Proto()
     if proto.should_stop_blob:
-        return call('loop'), None
+        return call('loop'), False
     if proto.num_iter and proto.num_iter != 1:
-        return call('loop', [proto.num_iter]), None
+        return call('loop', [proto.num_iter]), False
     concurrent = proto.concurrent_substeps and len(step.Substeps()) > 1
     if concurrent:
-        return call('parallel'), call('step')
+        return call('parallel'), True
     if proto.report_net:
-        return call('run_once'), None
-    return None, None
+        return call('run_once'), False
+    return None, False
 
 
 @Printer.register(ExecutionStep)
 def print_step(text, step):
     proto = step.Proto()
-    step_ctx, substep_ctx = _get_step_context(step)
+    step_ctx, do_substep = _get_step_context(step)
     with text.context(step_ctx):
         if proto.report_net:
             with text.context(call('report_net', [proto.report_interval])):
                 text(step.get_net(proto.report_net))
         substeps = step.Substeps() + [step.get_net(n) for n in proto.network]
         for substep in substeps:
+            substep_ctx = call('step', [str(substep)]) if do_substep else None
             with text.context(substep_ctx):
                 text(substep)
                 if proto.should_stop_blob:
@@ -246,7 +277,7 @@
 
 @Printer.register(Task)
 def print_task(text, task):
-    with text.context(call('Task', [('node', task.node)])):
+    with text.context(call('Task', [('node', task.node), ('name', task.name)])):
         text(task.get_step())
 
 
diff --git a/caffe2/python/pipeline.py b/caffe2/python/pipeline.py
index a9779f3..5255c35 100644
--- a/caffe2/python/pipeline.py
+++ b/caffe2/python/pipeline.py
@@ -5,7 +5,7 @@
 
 from caffe2.python import core, queue_util
 from caffe2.python.dataio import Reader, Writer
-from caffe2.python.net_builder import NetBuilder
+from caffe2.python.net_builder import NetBuilder, ops
 from caffe2.python.schema import as_record, Field
 from caffe2.python.task import Task, TaskGroup
 
@@ -128,10 +128,8 @@
         Output Queue, DataStream, Reader, or None, depending on the parameters
         passed.
     """
-    result, step = _pipe_step(
+    result, _ = _pipe_step(
         input, output, num_threads, processor, name, capacity, group)
-    if step is not None:
-        Task(step=step, group=group)
     return result
 
 
@@ -148,11 +146,10 @@
             task_outputs: TaskOutput object, fetchable from the client after
                           session.run() returns.
     """
-    result, step = _pipe_step(
+    assert num_threads > 0
+    result, task = _pipe_step(
         input, output, num_threads, processor, name, capacity, group,
         final_outputs)
-    assert step is not None
-    task = Task(step=step, group=group, outputs=final_outputs)
     output = None
     if final_outputs is not None:
         output = task.outputs()
@@ -161,15 +158,23 @@
     return result, output
 
 
+def processor_name(processor):
+    if hasattr(processor, 'name'):
+        return processor.name
+    if hasattr(processor, 'func_name'):
+        if processor.func_name == '<lambda>':
+            return processor.__module__
+        if hasattr(processor, 'im_class'):
+            return '%s.%s' % (processor.im_class.__name__, processor.func_name)
+        return processor.func_name
+    return processor.__class__.__name__
+
+
 def _pipe_step(
         input, output=None, num_threads=1, processor=None, name=None,
         capacity=None, group=None, final_outputs=None):
     """
     """
-    group = TaskGroup.current(group)
-    if name is None:
-        name = 'processor:%d' % group.num_registered_tasks()
-
     if isinstance(input, Reader):
         reader = input
     elif hasattr(input, 'reader'):
@@ -184,48 +189,51 @@
         assert output is None
         return reader, None
 
-    global_exit_net = core.Net(name + '_producer_global_exit')
-    global_init_net = core.Net(name + '_producer_global_init')
-    out_queue = None
-    writer = None
+    if name is None and processor is not None:
+        name = processor_name(processor)
+    if name is None and output is not None:
+        name = 'pipe_into:%s' % processor_name(output)
+    if name is None:
+        name = 'pipe_from:%s' % processor_name(input)
 
-    reader.setup_ex(global_init_net, global_exit_net)
+    with Task(name=name, group=group, outputs=final_outputs) as task:
+        global_exit_net = core.Net('exit')
+        global_init_net = core.Net('init')
+        reader.setup_ex(global_init_net, global_exit_net)
 
-    steps = []
-    for thread_id in range(num_threads):
-        init_net = core.Net(name + "_init_net_%d" % thread_id)
-        exit_net = core.Net(name + "_exit_net_%d" % thread_id)
+        out_queue = None
+        writer = None
 
-        read_nets, status, rec = reader.read_record_ex(init_net, exit_net)
+        steps = []
+        for thread_id in range(num_threads):
+            with NetBuilder(name='t:%d' % thread_id) as nb:
+                init_net = core.Net('init')
+                exit_net = core.Net('exit')
+                read_nets, status, rec = reader.read_record_ex(
+                    init_net, exit_net)
 
-        if rec is not None:
-            if writer is None:
-                out_queue, writer = _init_output(
-                    output, capacity, global_init_net, global_exit_net)
-            write_nets, _ = writer.write_record_ex(
-                rec, init_net, exit_net, status)
-        else:
-            write_nets = []
-
-        step = core.execution_step(
-            name + "_thread_%d" % thread_id, [
-                core.execution_step(name + "_init_step", init_net),
-                core.execution_step(
-                    name + "_worker_step",
+                if rec is not None:
+                    if writer is None:
+                        # hack so that the out queue gets the right name prefix
+                        # (otherwise they would be prefixed with the thread id)
+                        with NetBuilder(_fullname=task.name):
+                            out_queue, writer = _init_output(
+                                output, capacity, global_init_net,
+                                global_exit_net)
+                    write_nets, _ = writer.write_record_ex(
+                        rec, init_net, exit_net, status)
+                else:
+                    write_nets = []
+                ops.net(init_net)
+                ops.net(core.execution_step('body',
                     list(read_nets) + list(write_nets),
-                    should_stop_blob=status
-                ), core.execution_step(name + "_exit_step", exit_net)
-            ]
-        )
-        steps.append(step)
-    step = core.execution_step(
-        "sender_step", [
-            core.execution_step('init_step', global_init_net),
-            core.execution_step(
-                "sender_steps", steps, concurrent_substeps=True),
-            core.execution_step('finish_step', global_exit_net),
-        ])
-    return out_queue, step
+                    should_stop_blob=status))
+                ops.net(exit_net)
+            steps.append(core.to_execution_step(nb))
+        ops.net(global_init_net)
+        ops.net(core.execution_step('body', steps, concurrent_substeps=True))
+        ops.net(global_exit_net)
+    return out_queue, task
 
 
 class ProcessingReader(Reader):
@@ -269,10 +277,11 @@
     and (optionally) output records set, with net.set_input_record() and
     net.set_output_record().
     """
-    def __init__(self, net, stop_signal=None, thread_init_nets=None):
+    def __init__(self, net, stop_signal=None, thread_init_nets=None, name=None):
         assert isinstance(net, core.Net)
         assert stop_signal is None or isinstance(
             stop_signal, core.BlobReference)
+        self.name = name or str(net)
         self.thread_init_nets = thread_init_nets or []
         self.net = net
         self._stop_signal = stop_signal
@@ -288,7 +297,7 @@
 
     def __call__(self, rec):
         assert not self._frozen
-        prefix = '/worker:%d/' % len(self._blob_maps)
+        prefix = NetBuilder.current().name + '/'
         blob_remap = {}
         for net in self.thread_init_nets:
             new_net, _ = core.clone_and_bind_net(
diff --git a/caffe2/python/queue_util.py b/caffe2/python/queue_util.py
index 2cae95a..8150d01 100644
--- a/caffe2/python/queue_util.py
+++ b/caffe2/python/queue_util.py
@@ -19,7 +19,7 @@
 
     def read_ex(self, local_init_net, local_finish_net):
         self._wrapper._new_reader(local_init_net)
-        dequeue_net = core.Net('dequeue_net')
+        dequeue_net = core.Net('dequeue')
         fields, status_blob = dequeue(
             dequeue_net,
             self._wrapper.queue(),
@@ -36,7 +36,7 @@
 
     def write_ex(self, fields, local_init_net, local_finish_net, status):
         self._wrapper._new_writer(self.schema(), local_init_net)
-        enqueue_net = core.Net('enqueue_net')
+        enqueue_net = core.Net('enqueue')
         enqueue(enqueue_net, self._wrapper.queue(), fields, status)
         return [enqueue_net]
 
@@ -77,15 +77,15 @@
 
 def enqueue(net, queue, data_blobs, status=None):
     if status is None:
-        status = net.NextName("%s_enqueue_status" % str(queue))
+        status = net.NextName('status')
     results = net.SafeEnqueueBlobs([queue] + data_blobs, data_blobs + [status])
     return results[-1]
 
 
 def dequeue(net, queue, num_blobs, status=None):
-    data_names = [net.NextName("%s_dequeue_data", i) for i in range(num_blobs)]
+    data_names = [net.NextName('data', i) for i in range(num_blobs)]
     if status is None:
-        status = net.NextName("%s_dequeue_status")
+        status = net.NextName('status')
     results = net.SafeDequeueBlobs(queue, data_names + [status])
     results = list(results)
     status_blob = results.pop(-1)
diff --git a/caffe2/python/task.py b/caffe2/python/task.py
index 9d2b844..df19626 100644
--- a/caffe2/python/task.py
+++ b/caffe2/python/task.py
@@ -305,6 +305,7 @@
                 step = core.execution_step(node, steps)
             Task(
                 node=node, step=step, outputs=outputs,
+                name='grouped_by_node',
                 group=grouped_by_node, workspace_type=workspace_type)
         self._tasks_by_node = (grouped_by_node, node_map)
         return grouped_by_node
@@ -383,18 +384,41 @@
 
     TASK_SETUP = 'task_setup'
     REPORT_NET = 'report_net'
+    _global_names_used = set()
+
+    @staticmethod
+    def _get_next_name(node, group, name):
+        basename = str(node) + '/' + str(name)
+        names_used = (
+            Task._global_names_used
+            if group is None else
+            set(t.name for t in group._tasks_to_add))
+        cur_name = basename
+        i = 0
+        while cur_name in names_used:
+            i += 1
+            cur_name = '%s:%d' % (basename, i)
+        return cur_name
 
     def __init__(
             self, step=None, outputs=None,
-            workspace_type=None, group=None, node=None):
+            workspace_type=None, group=None, node=None, name=None):
         """
         Instantiate a Task and add it to the current TaskGroup and Node.
         """
+        if not name and isinstance(step, core.ExecutionStep):
+            name = step.Proto().name
+        if not name:
+            name = 'task'
         # register this node name with active context
         self.node = str(Node.current(None if node is None else Node(node)))
-        group = TaskGroup.current(group, required=False)
-        if group is not None:
-            group._tasks_to_add.append(self)
+        self.group = TaskGroup.current(group, required=False)
+
+        self.name = Task._get_next_name(self.node, self.group, name)
+
+        # may need to be temporarily removed later if Task used as a context
+        if self.group is not None:
+            self.group._tasks_to_add.append(self)
 
         self._already_used = False
         self._step = None
@@ -411,17 +435,22 @@
         self._report_net = None
 
     def __enter__(self):
+        # temporarily remove from _tasks_to_add to ensure correct order
+        if self.group is not None:
+            self.group._tasks_to_add.remove(self)
         self._assert_not_used()
         assert self._step is None, 'This Task already has an execution step.'
         from caffe2.python import net_builder
-        self._net_builder = net_builder.NetBuilder()
+        self._net_builder = net_builder.NetBuilder(_fullname=self.name)
         self._net_builder.__enter__()
         return self
 
     def __exit__(self, type, value, traceback):
+        self._net_builder.__exit__(type, value, traceback)
         if type is None:
             self.set_step(self._net_builder)
-        self._net_builder.__exit__(type, value, traceback)
+        if self.group is not None:
+            self.group._tasks_to_add.append(self)
         self._net_builder = None
 
     def workspace_type(self):
@@ -462,20 +491,20 @@
             init_nets, exit_nets = get_setup_nets(
                 Task.TASK_SETUP, [self._step], self)
             if len(self._outputs) == 0:
-                output_net = core.Net("output_net")
+                output_net = core.Net('%s:output' % self.name)
                 self.add_output(output_net.ConstantFill(
                     [], 1, dtype=core.DataType.INT32, value=0))
                 exit_nets.append(output_net)
             self._step_with_setup = core.execution_step(
-                'task',
+                self.name,
                 [
-                    core.execution_step('task_init', init_nets),
+                    core.execution_step('%s:init' % self.name, init_nets),
                     self._step,
-                    core.execution_step('task_exit', exit_nets),
+                    core.execution_step('%s:exit' % self.name, exit_nets),
                 ]
             )
         elif self._step_with_setup is None:
-            self._step_with_setup = core.execution_step('task', [])
+            self._step_with_setup = core.execution_step(self.name, [])
         return self._step_with_setup
 
     def outputs(self):