blob: 32ff7ec5473de07360f297fe8a5401343e7a7d83 [file] [log] [blame]
# Copyright 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Unit tests for the contents of parallelizer.py."""
# pylint: disable=W0212
# pylint: disable=W0613
import os
import tempfile
import time
import unittest
from devil.utils import parallelizer
class ParallelizerTestObject(object):
"""Class used to test parallelizer.Parallelizer."""
parallel = parallelizer.Parallelizer
def __init__(self, thing, completion_file_name=None):
self._thing = thing
self._completion_file_name = completion_file_name
self.helper = ParallelizerTestObjectHelper(thing)
@staticmethod
def doReturn(what):
return what
@classmethod
def doRaise(cls, what):
raise what
def doSetTheThing(self, new_thing):
self._thing = new_thing
def doReturnTheThing(self):
return self._thing
def doRaiseTheThing(self):
raise self._thing
def doRaiseIfExceptionElseSleepFor(self, sleep_duration):
if isinstance(self._thing, Exception):
raise self._thing
time.sleep(sleep_duration)
self._write_completion_file()
return self._thing
def _write_completion_file(self):
if self._completion_file_name and len(self._completion_file_name):
with open(self._completion_file_name, 'w+b') as completion_file:
completion_file.write('complete')
def __getitem__(self, index):
return self._thing[index]
def __str__(self):
return type(self).__name__
class ParallelizerTestObjectHelper(object):
def __init__(self, thing):
self._thing = thing
def doReturnStringThing(self):
return str(self._thing)
class ParallelizerTest(unittest.TestCase):
def testInitEmptyList(self):
r = parallelizer.Parallelizer([]).replace('a', 'b').pGet(0.1)
self.assertEquals([], r)
def testMethodCall(self):
test_data = ['abc_foo', 'def_foo', 'ghi_foo']
expected = ['abc_bar', 'def_bar', 'ghi_bar']
r = parallelizer.Parallelizer(test_data).replace('_foo', '_bar').pGet(0.1)
self.assertEquals(expected, r)
def testMutate(self):
devices = [ParallelizerTestObject(True) for _ in xrange(0, 10)]
self.assertTrue(all(d.doReturnTheThing() for d in devices))
ParallelizerTestObject.parallel(devices).doSetTheThing(False).pFinish(1)
self.assertTrue(not any(d.doReturnTheThing() for d in devices))
def testAllReturn(self):
devices = [ParallelizerTestObject(True) for _ in xrange(0, 10)]
results = ParallelizerTestObject.parallel(
devices).doReturnTheThing().pGet(1)
self.assertTrue(isinstance(results, list))
self.assertEquals(10, len(results))
self.assertTrue(all(results))
def testAllRaise(self):
devices = [ParallelizerTestObject(Exception('thing %d' % i))
for i in xrange(0, 10)]
p = ParallelizerTestObject.parallel(devices).doRaiseTheThing()
with self.assertRaises(Exception):
p.pGet(1)
def testOneFailOthersComplete(self):
parallel_device_count = 10
exception_index = 7
exception_msg = 'thing %d' % exception_index
try:
completion_files = [tempfile.NamedTemporaryFile(delete=False)
for _ in xrange(0, parallel_device_count)]
devices = [
ParallelizerTestObject(
i if i != exception_index else Exception(exception_msg),
completion_files[i].name)
for i in xrange(0, parallel_device_count)]
for f in completion_files:
f.close()
p = ParallelizerTestObject.parallel(devices)
with self.assertRaises(Exception) as e:
p.doRaiseIfExceptionElseSleepFor(2).pGet(3)
self.assertTrue(exception_msg in str(e.exception))
for i in xrange(0, parallel_device_count):
with open(completion_files[i].name) as f:
if i == exception_index:
self.assertEquals('', f.read())
else:
self.assertEquals('complete', f.read())
finally:
for f in completion_files:
os.remove(f.name)
def testReusable(self):
devices = [ParallelizerTestObject(True) for _ in xrange(0, 10)]
p = ParallelizerTestObject.parallel(devices)
results = p.doReturn(True).pGet(1)
self.assertTrue(all(results))
results = p.doReturn(True).pGet(1)
self.assertTrue(all(results))
with self.assertRaises(Exception):
results = p.doRaise(Exception('reusableTest')).pGet(1)
def testContained(self):
devices = [ParallelizerTestObject(i) for i in xrange(0, 10)]
results = (ParallelizerTestObject.parallel(devices).helper
.doReturnStringThing().pGet(1))
self.assertTrue(isinstance(results, list))
self.assertEquals(10, len(results))
for i in xrange(0, 10):
self.assertEquals(str(i), results[i])
def testGetItem(self):
devices = [ParallelizerTestObject(range(i, i + 10)) for i in xrange(0, 10)]
results = ParallelizerTestObject.parallel(devices)[9].pGet(1)
self.assertEquals(range(9, 19), results)
if __name__ == '__main__':
unittest.main(verbosity=2)