[autotest] Factor out job result waiting

BUG=chromium:672348
TEST=None

Change-Id: Ie53774effcac69a54456ad16f70c0167f2030e0c
Reviewed-on: https://chromium-review.googlesource.com/565745
Commit-Ready: Allen Li <ayatane@chromium.org>
Tested-by: Allen Li <ayatane@chromium.org>
Reviewed-by: Allen Li <ayatane@chromium.org>
diff --git a/server/cros/dynamic_suite/job_status.py b/server/cros/dynamic_suite/job_status.py
index 9dde058..b5c90de 100644
--- a/server/cros/dynamic_suite/job_status.py
+++ b/server/cros/dynamic_suite/job_status.py
@@ -115,6 +115,57 @@
                 status.test_name.startswith('CLIENT_JOB'))
 
 
+class _JobResultWaiter(object):
+    """Class for waiting on job results."""
+
+    def __init__(self, afe, tko):
+        """Instantiate class
+
+        @param afe: an instance of AFE as defined in server/frontend.py.
+        @param tko: an instance of TKO as defined in server/frontend.py.
+        """
+        self._afe = afe
+        self._tko = tko
+        self._job_ids = set()
+
+    def add_job(self, job):
+        """Add job to wait on.
+
+        @param job: Job object to get results from, as defined in
+                    server/frontend.py
+        """
+        self.add_jobs((job,))
+
+    def add_jobs(self, jobs):
+        """Add job to wait on.
+
+        @param jobs: Iterable of Job object to get results from, as defined in
+                     server/frontend.py
+        """
+        self._job_ids.update(job.id for job in jobs)
+
+    def wait_for_results(self):
+        """Wait for jobs to finish and return their results.
+
+        The returned generator blocks until all jobs have finished,
+        naturally.
+
+        @yields an iterator of Statuses, one per test.
+        """
+        while self._job_ids:
+            for job in self._get_finished_jobs():
+                for result in _yield_job_results(self._afe, self._tko, job):
+                    yield result
+                self._job_ids.remove(job.id)
+            self._sleep()
+
+    def _get_finished_jobs(self):
+        return self._afe.get_jobs(id__in=self._job_ids, finished=True)
+
+    def _sleep(self):
+        time.sleep(_DEFAULT_POLL_INTERVAL_SECONDS * (random.random() + 0.5))
+
+
 def _yield_job_results(afe, tko, job):
     """
     Yields the results of an individual job.
@@ -185,31 +236,14 @@
     @param parent_job_id: Parent job id for the jobs to wait on.
     @yields an iterator of Statuses, one per test.
     """
-    remaining_child_jobs = set(job.id for job in
-                               afe.get_jobs(parent_job_id=parent_job_id))
-    while remaining_child_jobs:
-        new_finished_jobs = afe.get_jobs(id__in=list(remaining_child_jobs),
-                                         finished=True)
-
-        for job in new_finished_jobs:
-
-            remaining_child_jobs.remove(job.id)
-            for result in _yield_job_results(afe, tko, job):
-                # To figure out what new jobs (like retry jobs) have been
-                # created since last iteration, we could re-poll for
-                # the set of child jobs in each iteration and
-                # calculate the set difference against the set we got in
-                # last iteration. As an alternative, we could just make
-                # the caller 'send' new jobs to this generator. We go
-                # with the latter to avoid unnecessary overhead.
-                new_child_jobs = (yield result)
-                if new_child_jobs:
-                    remaining_child_jobs.update([new_job.id for new_job in
-                                                 new_child_jobs])
-                    # Return nothing if 'send' is called
-                    yield None
-
-        time.sleep(_DEFAULT_POLL_INTERVAL_SECONDS * (random.random() + 0.5))
+    waiter = _JobResultWaiter(afe, tko)
+    waiter.add_jobs(afe.get_jobs(parent_job_id=parent_job_id))
+    for result in waiter.wait_for_results():
+        new_jobs = (yield result)
+        if new_jobs:
+            waiter.add_jobs(new_jobs)
+            # Return nothing if 'send' is called
+            yield None
 
 
 def wait_for_results(afe, tko, jobs):
@@ -225,23 +259,14 @@
     @param jobs: a list of Job objects, as defined in server/frontend.py.
     @yields an iterator of Statuses, one per test.
     """
-    local_jobs = list(jobs)
-    while local_jobs:
-        for job in list(local_jobs):
-            if not afe.get_jobs(id=job.id, finished=True):
-                continue
-
-            local_jobs.remove(job)
-            for result in _yield_job_results(afe, tko, job):
-                # The caller can 'send' new jobs (i.e. retry jobs)
-                # to this generator at any time.
-                new_jobs = (yield result)
-                if new_jobs:
-                    local_jobs.extend(new_jobs)
-                    # Return nothing if 'send' is called
-                    yield None
-
-        time.sleep(_DEFAULT_POLL_INTERVAL_SECONDS * (random.random() + 0.5))
+    waiter = _JobResultWaiter(afe, tko)
+    waiter.add_jobs(jobs)
+    for result in waiter.wait_for_results():
+        new_jobs = (yield result)
+        if new_jobs:
+            waiter.add_jobs(new_jobs)
+            # Return nothing if 'send' is called
+            yield None
 
 
 class Status(object):
diff --git a/server/cros/dynamic_suite/job_status_unittest.py b/server/cros/dynamic_suite/job_status_unittest.py
index c70325b..0fa461a 100755
--- a/server/cros/dynamic_suite/job_status_unittest.py
+++ b/server/cros/dynamic_suite/job_status_unittest.py
@@ -44,11 +44,6 @@
         shutil.rmtree(self.tmpdir, ignore_errors=True)
 
 
-    def expect_result_gathering(self, job):
-        self.afe.get_jobs(id=job.id, finished=True).AndReturn(job)
-        self.expect_yield_job_entries(job)
-
-
     def expect_yield_job_entries(self, job):
         entries = [s.entry for s in job.statuses]
         self.afe.run('get_host_queue_entries',
@@ -68,7 +63,6 @@
                 FakeJob(3, [FakeStatus('FAIL', 'T0', 'broken')]),
                 FakeJob(4, [FakeStatus('ERROR', 'SERVER_JOB', 'server error'),
                             FakeStatus('GOOD', 'T0', '')]),]
-
                 # TODO: Write a better test for the case where we yield
                 # results for aborts vs cannot yield results because of
                 # a premature abort. Currently almost all client aborts
@@ -78,21 +72,23 @@
                 # FakeJob(5, [FakeStatus('ERROR', 'T0', 'gah', True)]),
                 # The next job shouldn't be recorded in the results.
                 # FakeJob(6, [FakeStatus('GOOD', 'SERVER_JOB', '')])]
-
         for status in jobs[4].statuses:
             status.entry['job'] = {'name': 'broken_infra_job'}
 
-        # To simulate a job that isn't ready the first time we check.
-        self.afe.get_jobs(id=jobs[0].id, finished=True).AndReturn([])
-        # Expect all the rest of the jobs to be good to go the first time.
-        for job in jobs[1:]:
-            self.expect_result_gathering(job)
-        # Then, expect job[0] to be ready.
-        self.expect_result_gathering(jobs[0])
-        # Expect us to poll twice.
+        job_id_set = set([job.id for job in jobs])
+        yield_values = [
+                [jobs[1]],
+                [jobs[0], jobs[2]],
+                jobs[3:6]
+            ]
         self.mox.StubOutWithMock(time, 'sleep')
-        time.sleep(mox.IgnoreArg())
-        time.sleep(mox.IgnoreArg())
+        for yield_this in yield_values:
+            self.afe.get_jobs(id__in=set(job_id_set),
+                              finished=True).AndReturn(yield_this)
+            for job in yield_this:
+                self.expect_yield_job_entries(job)
+                job_id_set.remove(job.id)
+            time.sleep(mox.IgnoreArg())
         self.mox.ReplayAll()
 
         results = [result for result in job_status.wait_for_results(self.afe,
@@ -119,7 +115,6 @@
                 FakeJob(4, [FakeStatus('ERROR', 'SERVER_JOB', 'server error'),
                             FakeStatus('GOOD', 'T0', '')],
                         parent_job_id=parent_job_id),]
-
                 # TODO: Write a better test for the case where we yield
                 # results for aborts vs cannot yield results because of
                 # a premature abort. Currently almost all client aborts
@@ -145,7 +140,7 @@
             ]
         self.mox.StubOutWithMock(time, 'sleep')
         for yield_this in yield_values:
-            self.afe.get_jobs(id__in=list(job_id_set),
+            self.afe.get_jobs(id__in=set(job_id_set),
                               finished=True).AndReturn(yield_this)
             for job in yield_this:
                 self.expect_yield_job_entries(job)