[autotest] Factor out job result waiting
BUG=chromium:672348
TEST=None
Change-Id: Ie53774effcac69a54456ad16f70c0167f2030e0c
Reviewed-on: https://chromium-review.googlesource.com/565745
Commit-Ready: Allen Li <ayatane@chromium.org>
Tested-by: Allen Li <ayatane@chromium.org>
Reviewed-by: Allen Li <ayatane@chromium.org>
diff --git a/server/cros/dynamic_suite/job_status.py b/server/cros/dynamic_suite/job_status.py
index 9dde058..b5c90de 100644
--- a/server/cros/dynamic_suite/job_status.py
+++ b/server/cros/dynamic_suite/job_status.py
@@ -115,6 +115,57 @@
status.test_name.startswith('CLIENT_JOB'))
+class _JobResultWaiter(object):
+ """Class for waiting on job results."""
+
+ def __init__(self, afe, tko):
+ """Instantiate class
+
+ @param afe: an instance of AFE as defined in server/frontend.py.
+ @param tko: an instance of TKO as defined in server/frontend.py.
+ """
+ self._afe = afe
+ self._tko = tko
+ self._job_ids = set()
+
+ def add_job(self, job):
+ """Add job to wait on.
+
+ @param job: Job object to get results from, as defined in
+ server/frontend.py
+ """
+ self.add_jobs((job,))
+
+ def add_jobs(self, jobs):
+ """Add job to wait on.
+
+ @param jobs: Iterable of Job object to get results from, as defined in
+ server/frontend.py
+ """
+ self._job_ids.update(job.id for job in jobs)
+
+ def wait_for_results(self):
+ """Wait for jobs to finish and return their results.
+
+ The returned generator blocks until all jobs have finished,
+ naturally.
+
+ @yields an iterator of Statuses, one per test.
+ """
+ while self._job_ids:
+ for job in self._get_finished_jobs():
+ for result in _yield_job_results(self._afe, self._tko, job):
+ yield result
+ self._job_ids.remove(job.id)
+ self._sleep()
+
+ def _get_finished_jobs(self):
+ return self._afe.get_jobs(id__in=self._job_ids, finished=True)
+
+ def _sleep(self):
+ time.sleep(_DEFAULT_POLL_INTERVAL_SECONDS * (random.random() + 0.5))
+
+
def _yield_job_results(afe, tko, job):
"""
Yields the results of an individual job.
@@ -185,31 +236,14 @@
@param parent_job_id: Parent job id for the jobs to wait on.
@yields an iterator of Statuses, one per test.
"""
- remaining_child_jobs = set(job.id for job in
- afe.get_jobs(parent_job_id=parent_job_id))
- while remaining_child_jobs:
- new_finished_jobs = afe.get_jobs(id__in=list(remaining_child_jobs),
- finished=True)
-
- for job in new_finished_jobs:
-
- remaining_child_jobs.remove(job.id)
- for result in _yield_job_results(afe, tko, job):
- # To figure out what new jobs (like retry jobs) have been
- # created since last iteration, we could re-poll for
- # the set of child jobs in each iteration and
- # calculate the set difference against the set we got in
- # last iteration. As an alternative, we could just make
- # the caller 'send' new jobs to this generator. We go
- # with the latter to avoid unnecessary overhead.
- new_child_jobs = (yield result)
- if new_child_jobs:
- remaining_child_jobs.update([new_job.id for new_job in
- new_child_jobs])
- # Return nothing if 'send' is called
- yield None
-
- time.sleep(_DEFAULT_POLL_INTERVAL_SECONDS * (random.random() + 0.5))
+ waiter = _JobResultWaiter(afe, tko)
+ waiter.add_jobs(afe.get_jobs(parent_job_id=parent_job_id))
+ for result in waiter.wait_for_results():
+ new_jobs = (yield result)
+ if new_jobs:
+ waiter.add_jobs(new_jobs)
+ # Return nothing if 'send' is called
+ yield None
def wait_for_results(afe, tko, jobs):
@@ -225,23 +259,14 @@
@param jobs: a list of Job objects, as defined in server/frontend.py.
@yields an iterator of Statuses, one per test.
"""
- local_jobs = list(jobs)
- while local_jobs:
- for job in list(local_jobs):
- if not afe.get_jobs(id=job.id, finished=True):
- continue
-
- local_jobs.remove(job)
- for result in _yield_job_results(afe, tko, job):
- # The caller can 'send' new jobs (i.e. retry jobs)
- # to this generator at any time.
- new_jobs = (yield result)
- if new_jobs:
- local_jobs.extend(new_jobs)
- # Return nothing if 'send' is called
- yield None
-
- time.sleep(_DEFAULT_POLL_INTERVAL_SECONDS * (random.random() + 0.5))
+ waiter = _JobResultWaiter(afe, tko)
+ waiter.add_jobs(jobs)
+ for result in waiter.wait_for_results():
+ new_jobs = (yield result)
+ if new_jobs:
+ waiter.add_jobs(new_jobs)
+ # Return nothing if 'send' is called
+ yield None
class Status(object):
diff --git a/server/cros/dynamic_suite/job_status_unittest.py b/server/cros/dynamic_suite/job_status_unittest.py
index c70325b..0fa461a 100755
--- a/server/cros/dynamic_suite/job_status_unittest.py
+++ b/server/cros/dynamic_suite/job_status_unittest.py
@@ -44,11 +44,6 @@
shutil.rmtree(self.tmpdir, ignore_errors=True)
- def expect_result_gathering(self, job):
- self.afe.get_jobs(id=job.id, finished=True).AndReturn(job)
- self.expect_yield_job_entries(job)
-
-
def expect_yield_job_entries(self, job):
entries = [s.entry for s in job.statuses]
self.afe.run('get_host_queue_entries',
@@ -68,7 +63,6 @@
FakeJob(3, [FakeStatus('FAIL', 'T0', 'broken')]),
FakeJob(4, [FakeStatus('ERROR', 'SERVER_JOB', 'server error'),
FakeStatus('GOOD', 'T0', '')]),]
-
# TODO: Write a better test for the case where we yield
# results for aborts vs cannot yield results because of
# a premature abort. Currently almost all client aborts
@@ -78,21 +72,23 @@
# FakeJob(5, [FakeStatus('ERROR', 'T0', 'gah', True)]),
# The next job shouldn't be recorded in the results.
# FakeJob(6, [FakeStatus('GOOD', 'SERVER_JOB', '')])]
-
for status in jobs[4].statuses:
status.entry['job'] = {'name': 'broken_infra_job'}
- # To simulate a job that isn't ready the first time we check.
- self.afe.get_jobs(id=jobs[0].id, finished=True).AndReturn([])
- # Expect all the rest of the jobs to be good to go the first time.
- for job in jobs[1:]:
- self.expect_result_gathering(job)
- # Then, expect job[0] to be ready.
- self.expect_result_gathering(jobs[0])
- # Expect us to poll twice.
+ job_id_set = set([job.id for job in jobs])
+ yield_values = [
+ [jobs[1]],
+ [jobs[0], jobs[2]],
+ jobs[3:6]
+ ]
self.mox.StubOutWithMock(time, 'sleep')
- time.sleep(mox.IgnoreArg())
- time.sleep(mox.IgnoreArg())
+ for yield_this in yield_values:
+ self.afe.get_jobs(id__in=set(job_id_set),
+ finished=True).AndReturn(yield_this)
+ for job in yield_this:
+ self.expect_yield_job_entries(job)
+ job_id_set.remove(job.id)
+ time.sleep(mox.IgnoreArg())
self.mox.ReplayAll()
results = [result for result in job_status.wait_for_results(self.afe,
@@ -119,7 +115,6 @@
FakeJob(4, [FakeStatus('ERROR', 'SERVER_JOB', 'server error'),
FakeStatus('GOOD', 'T0', '')],
parent_job_id=parent_job_id),]
-
# TODO: Write a better test for the case where we yield
# results for aborts vs cannot yield results because of
# a premature abort. Currently almost all client aborts
@@ -145,7 +140,7 @@
]
self.mox.StubOutWithMock(time, 'sleep')
for yield_this in yield_values:
- self.afe.get_jobs(id__in=list(job_id_set),
+ self.afe.get_jobs(id__in=set(job_id_set),
finished=True).AndReturn(yield_this)
for job in yield_this:
self.expect_yield_job_entries(job)