Share queue + reduce logging

Summary: It is better for the workers to share the python-side queue, since I saw a case where workers assigned for one GPU was lagging behind others. Also, reduced logging as requested by rpenggithub.

Differential Revision: D4620487

fbshipit-source-id: 73353f9570b07788c8cd71c9fec9308cd93a44dd
diff --git a/caffe2/python/data_workers.py b/caffe2/python/data_workers.py
index 5128267..14ae716 100644
--- a/caffe2/python/data_workers.py
+++ b/caffe2/python/data_workers.py
@@ -58,6 +58,8 @@
 
 log = logging.getLogger("data_workers")
 log.setLevel(logging.INFO)
+LOG_INT_SECS = 1200
+
 
 def init_data_input_workers(
     net,
@@ -66,7 +68,7 @@
     batch_size,
     num_worker_threads=2,
     input_source_name="train",
-    max_buffered_batches=100,
+    max_buffered_batches=800,
 ):
     global global_coordinator
     device_option = scope.CurrentDeviceScope()
@@ -81,7 +83,7 @@
         device_option,
         scope.CurrentNameScope(),
         input_source_name,
-        max_buffered_batches,
+        global_coordinator.get_queue(input_source_name, max_buffered_batches)
     )
 
     # Launch fetch worker threads
@@ -105,14 +107,12 @@
 
 class DataInputCoordinator(object):
     def __init__(self, net, input_blob_names, batch_size,
-                 device_option, namescope, input_source_name,
-                 max_buffered_batches):
-        assert isinstance(max_buffered_batches, int)
+                 device_option, namescope, input_source_name, queue):
         self._net = net
         self._counter = 0
         self._input_blob_names = input_blob_names
         self._batch_size = batch_size
-        self._internal_queue = Queue.Queue(maxsize=max_buffered_batches)
+        self._internal_queue = queue
         self._queues = []
         self._device_option = device_option
         self._namescope = namescope
@@ -123,6 +123,7 @@
         self._create_caffe2_queues_and_ops()
         self._inputs = 0
         self._prev_seconds = 0
+        self._last_warning = time.time()
 
     def is_active(self):
         return self._active
@@ -176,12 +177,13 @@
         while self.is_active():
             try:
                 qsize = self._internal_queue.qsize()
-                if qsize < 2 and self._counter > 100:
-                    log.warn("Warning, data loading lagging behind: queue={} \
-                             name=".format(qsize, self._input_source_name))
+                if qsize < 2 and (time.time() - self._last_warning) > LOG_INT_SECS:
+                    log.warn("Warning, data loading lagging behind: " +
+                             "name={}".format(qsize, self._input_source_name))
+                    self._last_warning = time.time()
                 self._counter += 1
                 self._internal_queue.put(chunk, block=True, timeout=0.5)
-                self._log_inputs_per_minute(chunk[0].shape[0])
+                self._log_inputs_per_interval(chunk[0].shape[0])
                 return
             except Queue.Full:
                 log.debug("Queue full: stalling fetchers...")
@@ -265,15 +267,15 @@
             # Add operator to the Caffe2 network to dequeue
             self._net.DequeueBlobs(q, blob_name)
 
-    def _log_inputs_per_minute(self, inputs):
+    def _log_inputs_per_interval(self, inputs):
         self._inputs += inputs
         current_seconds = time.time()
         delta_seconds = current_seconds - self._prev_seconds
-        if delta_seconds >= 60:
+        if delta_seconds >= LOG_INT_SECS:
             log.info("{}/{}: {} inputs/sec".format(
                 self._input_source_name,
                 self._namescope,
-                self._inputs / delta_seconds,
+                int(self._inputs / delta_seconds),
             ))
             log.info("-- queue: {} batches".format(self._internal_queue.qsize()))
             self._inputs = 0
@@ -284,11 +286,18 @@
     def __init__(self):
         self._coordinators = []
         self._fetcher_id_seq = 0
+        self._queues = {}
         self.register_shutdown_handler()
 
     def add(self, coordinator):
         self._coordinators.append(coordinator)
 
+    def get_queue(self, queue_name, max_buffered_batches):
+        assert isinstance(max_buffered_batches, int)
+        if queue_name not in self._queues:
+            self._queues[queue_name] = Queue.Queue(maxsize=max_buffered_batches)
+        return self._queues[queue_name]
+
     def start(self):
         for c in self._coordinators:
             c._start()