blob: 1f8004a9951a1202d0002e4ab65f9a6686decc84 [file] [log] [blame]
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from caffe2.python import core, queue_util
from caffe2.python.dataio import Reader, Writer
def processor_step(
reader, writer, num_threads=1, processor=None, name='processor'):
"""
Given a reader and a writer, couple them through a processor, running
across multiple threads.
Args:
reader: an instance of dataio.Reader
writer: an instance of dataio.Wrier
num_threads: number of processing threads
processor: if provided, a function taking form:
(nets, out_record) = processor(record)
where `record` is a schema.Struct containing the input,
`nets` is the list of nets doing the transformation, and
`out_record` is a schema.Struct with transformed data;
name: Name to be given to nets and execution steps created.
Returns:
Execution step that runs all threads of the processor in parallel.
"""
assert isinstance(reader, Reader)
assert isinstance(writer, Writer)
global_init_net = core.Net(name + '_producer_global_init')
global_exit_net = core.Net(name + '_producer_global_exit')
reader.setup_ex(global_init_net, global_exit_net)
writer.setup_ex(global_init_net, global_exit_net)
def default_processor(fields):
return [], fields
if processor is None:
processor = default_processor
steps = []
for thread_id in range(num_threads):
init_net = core.Net(name + "_init_net_%d" % thread_id)
exit_net = core.Net(name + "_exit_net_%d" % thread_id)
read_nets, status, rec = reader.read_record_ex(init_net, exit_net)
process_nets, rec = processor(rec)
write_nets, _ = writer.write_record_ex(rec, init_net, exit_net, status)
step = core.execution_step(
name + "_thread_%d" % thread_id, [
core.execution_step(name + "_init_step", init_net),
core.execution_step(
name + "_worker_step",
list(read_nets) + list(process_nets) + list(write_nets),
should_stop_blob=status
), core.execution_step(name + "_exit_step", exit_net)
]
)
steps.append(step)
return core.execution_step(
"sender_step", [
core.execution_step('init_step', global_init_net),
core.execution_step(
"sender_steps", steps, concurrent_substeps=True),
core.execution_step('finish_step', global_exit_net),
]
)
class LocalPipeline(object):
"""
Create a data processing pipeline consisting of a sequence of
multi-threaded processors communicating through queues.
"""
def __init__(self):
self.tasks = []
self.init_net = core.Net('worker_init')
def create_queue(self, capacity, schema):
"""
Create a queue that will be used to communicate between processors.
Args:
capacity: max number of records in the queue
schema: a schema.Struct representing the schema of a record in
the queue.
Returns:
A QueueWrapper containing a queue.
"""
return queue_util.QueueWrapper(self.init_net, capacity, schema)
def add_task(self, task):
"""
Add a task to the pipeline.
This task will run in parallel to other tasks in the pipeline.
"""
self.tasks.append(task)
def link(self, reader, writer, num_threads=1, processor=None):
"""
Add a task that will read from `reader`, and write to `writer`.
See function `processor_step` above for description of the arguments.
"""
self.add_task(processor_step(reader, writer, num_threads, processor))
def get_step(self):
"""
Create and return a Caffe2 execution step that will run all the tasks
of this pipeline in parallel.
"""
return core.execution_step('worker_step', [
core.execution_step('worker_init', self.init_net),
core.execution_step(
'tasks_step', self.tasks, concurrent_substeps=True)
])
def get_step_and_output(self):
"""
Return a tuple (execution_step, output) to be used as one of the tasks
in a distributed pipeline.
"""
output = self.init_net.ConstantFill([], value=0.0)
return self.get_step(), [output]