Share queue + reduce logging
Summary: It is better for the workers to share the python-side queue, since I saw a case where workers assigned for one GPU was lagging behind others. Also, reduced logging as requested by rpenggithub.
Differential Revision: D4620487
fbshipit-source-id: 73353f9570b07788c8cd71c9fec9308cd93a44dd
diff --git a/caffe2/python/data_workers.py b/caffe2/python/data_workers.py
index 5128267..14ae716 100644
--- a/caffe2/python/data_workers.py
+++ b/caffe2/python/data_workers.py
@@ -58,6 +58,8 @@
log = logging.getLogger("data_workers")
log.setLevel(logging.INFO)
+LOG_INT_SECS = 1200
+
def init_data_input_workers(
net,
@@ -66,7 +68,7 @@
batch_size,
num_worker_threads=2,
input_source_name="train",
- max_buffered_batches=100,
+ max_buffered_batches=800,
):
global global_coordinator
device_option = scope.CurrentDeviceScope()
@@ -81,7 +83,7 @@
device_option,
scope.CurrentNameScope(),
input_source_name,
- max_buffered_batches,
+ global_coordinator.get_queue(input_source_name, max_buffered_batches)
)
# Launch fetch worker threads
@@ -105,14 +107,12 @@
class DataInputCoordinator(object):
def __init__(self, net, input_blob_names, batch_size,
- device_option, namescope, input_source_name,
- max_buffered_batches):
- assert isinstance(max_buffered_batches, int)
+ device_option, namescope, input_source_name, queue):
self._net = net
self._counter = 0
self._input_blob_names = input_blob_names
self._batch_size = batch_size
- self._internal_queue = Queue.Queue(maxsize=max_buffered_batches)
+ self._internal_queue = queue
self._queues = []
self._device_option = device_option
self._namescope = namescope
@@ -123,6 +123,7 @@
self._create_caffe2_queues_and_ops()
self._inputs = 0
self._prev_seconds = 0
+ self._last_warning = time.time()
def is_active(self):
return self._active
@@ -176,12 +177,13 @@
while self.is_active():
try:
qsize = self._internal_queue.qsize()
- if qsize < 2 and self._counter > 100:
- log.warn("Warning, data loading lagging behind: queue={} \
- name=".format(qsize, self._input_source_name))
+ if qsize < 2 and (time.time() - self._last_warning) > LOG_INT_SECS:
+ log.warn("Warning, data loading lagging behind: " +
+ "name={}".format(qsize, self._input_source_name))
+ self._last_warning = time.time()
self._counter += 1
self._internal_queue.put(chunk, block=True, timeout=0.5)
- self._log_inputs_per_minute(chunk[0].shape[0])
+ self._log_inputs_per_interval(chunk[0].shape[0])
return
except Queue.Full:
log.debug("Queue full: stalling fetchers...")
@@ -265,15 +267,15 @@
# Add operator to the Caffe2 network to dequeue
self._net.DequeueBlobs(q, blob_name)
- def _log_inputs_per_minute(self, inputs):
+ def _log_inputs_per_interval(self, inputs):
self._inputs += inputs
current_seconds = time.time()
delta_seconds = current_seconds - self._prev_seconds
- if delta_seconds >= 60:
+ if delta_seconds >= LOG_INT_SECS:
log.info("{}/{}: {} inputs/sec".format(
self._input_source_name,
self._namescope,
- self._inputs / delta_seconds,
+ int(self._inputs / delta_seconds),
))
log.info("-- queue: {} batches".format(self._internal_queue.qsize()))
self._inputs = 0
@@ -284,11 +286,18 @@
def __init__(self):
self._coordinators = []
self._fetcher_id_seq = 0
+ self._queues = {}
self.register_shutdown_handler()
def add(self, coordinator):
self._coordinators.append(coordinator)
+ def get_queue(self, queue_name, max_buffered_batches):
+ assert isinstance(max_buffered_batches, int)
+ if queue_name not in self._queues:
+ self._queues[queue_name] = Queue.Queue(maxsize=max_buffered_batches)
+ return self._queues[queue_name]
+
def start(self):
for c in self._coordinators:
c._start()