| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| from __future__ import unicode_literals |
| |
| from caffe2.python import core, queue_util |
| from caffe2.python.dataio import Reader, Writer |
| |
| |
| def processor_step( |
| reader, writer, num_threads=1, processor=None, name='processor'): |
| """ |
| Given a reader and a writer, couple them through a processor, running |
| across multiple threads. |
| |
| Args: |
| reader: an instance of dataio.Reader |
| writer: an instance of dataio.Wrier |
| num_threads: number of processing threads |
| processor: if provided, a function taking form: |
| (nets, out_record) = processor(record) |
| where `record` is a schema.Struct containing the input, |
| `nets` is the list of nets doing the transformation, and |
| `out_record` is a schema.Struct with transformed data; |
| name: Name to be given to nets and execution steps created. |
| |
| Returns: |
| Execution step that runs all threads of the processor in parallel. |
| """ |
| assert isinstance(reader, Reader) |
| assert isinstance(writer, Writer) |
| global_init_net = core.Net(name + '_producer_global_init') |
| global_exit_net = core.Net(name + '_producer_global_exit') |
| |
| reader.setup_ex(global_init_net, global_exit_net) |
| writer.setup_ex(global_init_net, global_exit_net) |
| |
| def default_processor(fields): |
| return [], fields |
| |
| if processor is None: |
| processor = default_processor |
| |
| steps = [] |
| for thread_id in range(num_threads): |
| init_net = core.Net(name + "_init_net_%d" % thread_id) |
| exit_net = core.Net(name + "_exit_net_%d" % thread_id) |
| |
| read_nets, status, rec = reader.read_record_ex(init_net, exit_net) |
| process_nets, rec = processor(rec) |
| write_nets, _ = writer.write_record_ex(rec, init_net, exit_net, status) |
| |
| step = core.execution_step( |
| name + "_thread_%d" % thread_id, [ |
| core.execution_step(name + "_init_step", init_net), |
| core.execution_step( |
| name + "_worker_step", |
| list(read_nets) + list(process_nets) + list(write_nets), |
| should_stop_blob=status |
| ), core.execution_step(name + "_exit_step", exit_net) |
| ] |
| ) |
| steps.append(step) |
| |
| return core.execution_step( |
| "sender_step", [ |
| core.execution_step('init_step', global_init_net), |
| core.execution_step( |
| "sender_steps", steps, concurrent_substeps=True), |
| core.execution_step('finish_step', global_exit_net), |
| ] |
| ) |
| |
| |
| class LocalPipeline(object): |
| """ |
| Create a data processing pipeline consisting of a sequence of |
| multi-threaded processors communicating through queues. |
| """ |
| def __init__(self): |
| self.tasks = [] |
| self.init_net = core.Net('worker_init') |
| |
| def create_queue(self, capacity, schema): |
| """ |
| Create a queue that will be used to communicate between processors. |
| |
| Args: |
| capacity: max number of records in the queue |
| schema: a schema.Struct representing the schema of a record in |
| the queue. |
| |
| Returns: |
| A QueueWrapper containing a queue. |
| """ |
| return queue_util.QueueWrapper(self.init_net, capacity, schema) |
| |
| def add_task(self, task): |
| """ |
| Add a task to the pipeline. |
| This task will run in parallel to other tasks in the pipeline. |
| """ |
| self.tasks.append(task) |
| |
| def link(self, reader, writer, num_threads=1, processor=None): |
| """ |
| Add a task that will read from `reader`, and write to `writer`. |
| See function `processor_step` above for description of the arguments. |
| """ |
| self.add_task(processor_step(reader, writer, num_threads, processor)) |
| |
| def get_step(self): |
| """ |
| Create and return a Caffe2 execution step that will run all the tasks |
| of this pipeline in parallel. |
| """ |
| return core.execution_step('worker_step', [ |
| core.execution_step('worker_init', self.init_net), |
| core.execution_step( |
| 'tasks_step', self.tasks, concurrent_substeps=True) |
| ]) |
| |
| def get_step_and_output(self): |
| """ |
| Return a tuple (execution_step, output) to be used as one of the tasks |
| in a distributed pipeline. |
| """ |
| output = self.init_net.ConstantFill([], value=0.0) |
| return self.get_step(), [output] |