| # -*- coding: utf-8 -*- |
| # Copyright 2013 Google Inc. All Rights Reserved. |
| # |
| # Permission is hereby granted, free of charge, to any person obtaining a |
| # copy of this software and associated documentation files (the |
| # "Software"), to deal in the Software without restriction, including |
| # without limitation the rights to use, copy, modify, merge, publish, dis- |
| # tribute, sublicense, and/or sell copies of the Software, and to permit |
| # persons to whom the Software is furnished to do so, subject to the fol- |
| # lowing conditions: |
| # |
| # The above copyright notice and this permission notice shall be included |
| # in all copies or substantial portions of the Software. |
| # |
| # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS |
| # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- |
| # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT |
| # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, |
| # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
| # IN THE SOFTWARE. |
| """Unit tests for gsutil parallelism framework.""" |
| |
| from __future__ import absolute_import |
| |
| import functools |
| import os |
| import signal |
| import threading |
| import time |
| |
| from boto.storage_uri import BucketStorageUri |
| from gslib import cs_api_map |
| from gslib.command import Command |
| from gslib.command import CreateGsutilLogger |
| from gslib.command import DummyArgChecker |
| import gslib.tests.testcase as testcase |
| from gslib.tests.testcase.base import RequiresIsolation |
| from gslib.tests.util import unittest |
| from gslib.util import CheckMultiprocessingAvailableAndInit |
| from gslib.util import IS_WINDOWS |
| |
| |
| # Amount of time for an individual test to run before timing out. We need a |
| # reasonably high value since if many tests are running in parallel, an |
| # individual test may take a while to complete. |
| _TEST_TIMEOUT_SECONDS = 120 |
| |
| |
| def Timeout(func): |
| """Decorator used to provide a timeout for functions.""" |
| @functools.wraps(func) |
| def Wrapper(*args, **kwargs): |
| if not IS_WINDOWS: |
| signal.signal(signal.SIGALRM, _HandleAlarm) |
| signal.alarm(_TEST_TIMEOUT_SECONDS) |
| try: |
| func(*args, **kwargs) |
| finally: |
| if not IS_WINDOWS: |
| signal.alarm(0) # Cancel the alarm. |
| return Wrapper |
| |
| |
| # pylint: disable=unused-argument |
| def _HandleAlarm(signal_num, cur_stack_frame): |
| raise Exception('Test timed out.') |
| |
| |
| class CustomException(Exception): |
| |
| def __init__(self, exception_str): |
| super(CustomException, self).__init__(exception_str) |
| |
| |
| def _ReturnOneValue(cls, args, thread_state=None): |
| return 1 |
| |
| |
| def _ReturnProcAndThreadId(cls, args, thread_state=None): |
| return (os.getpid(), threading.currentThread().ident) |
| |
| |
| def _SleepThenReturnProcAndThreadId(cls, args, thread_state=None): |
| # This can fail if the total time to spawn new processes and threads takes |
| # longer than 5 seconds, but if that occurs, then we have a performance |
| # problem that needs to be addressed. |
| time.sleep(5) |
| return _ReturnProcAndThreadId(cls, args, thread_state=thread_state) |
| |
| |
| def _FailureFunc(cls, args, thread_state=None): |
| raise CustomException('Failing on purpose.') |
| |
| |
| def _FailingExceptionHandler(cls, e): |
| cls.failure_count += 1 |
| raise CustomException('Exception handler failing on purpose.') |
| |
| |
| def _ExceptionHandler(cls, e): |
| cls.logger.exception(e) |
| cls.failure_count += 1 |
| |
| |
| def _IncrementByLength(cls, args, thread_state=None): |
| cls.arg_length_sum += len(args) |
| |
| |
| def _AdjustProcessCountIfWindows(process_count): |
| if IS_WINDOWS: |
| return 1 |
| else: |
| return process_count |
| |
| |
| def _ReApplyWithReplicatedArguments(cls, args, thread_state=None): |
| """Calls Apply with arguments repeated seven times. |
| |
| The first two elements of args should be the process and thread counts, |
| respectively, to be used for the recursive calls. |
| |
| Args: |
| cls: The Command class to call Apply on. |
| args: Arguments to pass to Apply. |
| thread_state: Unused, required by function signature. |
| |
| Returns: |
| Number of values returned by the two calls to Apply. |
| """ |
| new_args = [args] * 7 |
| process_count = _AdjustProcessCountIfWindows(args[0]) |
| thread_count = args[1] |
| return_values = cls.Apply(_PerformNRecursiveCalls, new_args, |
| _ExceptionHandler, arg_checker=DummyArgChecker, |
| process_count=process_count, |
| thread_count=thread_count, |
| should_return_results=True) |
| ret = sum(return_values) |
| |
| return_values = cls.Apply(_ReturnOneValue, new_args, |
| _ExceptionHandler, arg_checker=DummyArgChecker, |
| process_count=process_count, |
| thread_count=thread_count, |
| should_return_results=True) |
| |
| return len(return_values) + ret |
| |
| |
| def _PerformNRecursiveCalls(cls, args, thread_state=None): |
| """Calls Apply to perform N recursive calls. |
| |
| The first two elements of args should be the process and thread counts, |
| respectively, to be used for the recursive calls, while N is the third element |
| (the number of recursive calls to make). |
| |
| Args: |
| cls: The Command class to call Apply on. |
| args: Arguments to pass to Apply. |
| thread_state: Unused, required by function signature. |
| |
| Returns: |
| Number of values returned by the call to Apply. |
| """ |
| process_count = _AdjustProcessCountIfWindows(args[0]) |
| thread_count = args[1] |
| return_values = cls.Apply(_ReturnOneValue, [()] * args[2], _ExceptionHandler, |
| arg_checker=DummyArgChecker, |
| process_count=process_count, |
| thread_count=thread_count, |
| should_return_results=True) |
| return len(return_values) |
| |
| |
| def _SkipEvenNumbersArgChecker(cls, arg): |
| return arg % 2 != 0 |
| |
| |
| class FailingIterator(object): |
| |
| def __init__(self, size, failure_indices): |
| self.size = size |
| self.failure_indices = failure_indices |
| self.current_index = 0 |
| |
| def __iter__(self): |
| return self |
| |
| def next(self): |
| if self.current_index == self.size: |
| raise StopIteration('') |
| elif self.current_index in self.failure_indices: |
| self.current_index += 1 |
| raise CustomException( |
| 'Iterator failing on purpose at index %d.' % self.current_index) |
| else: |
| self.current_index += 1 |
| return self.current_index - 1 |
| |
| |
| class FakeCommand(Command): |
| """Fake command class for overriding command instance state.""" |
| command_spec = Command.CreateCommandSpec( |
| 'fake', |
| command_name_aliases=[], |
| ) |
| # Help specification. See help_provider.py for documentation. |
| help_spec = Command.HelpSpec( |
| help_name='fake', |
| help_name_aliases=[], |
| help_type='command_help', |
| help_one_line_summary='Something to take up space.', |
| help_text='Something else to take up space.', |
| subcommand_help_text={}, |
| ) |
| |
| def __init__(self, do_parallel): |
| self.bucket_storage_uri_class = BucketStorageUri |
| support_map = { |
| 'gs': ['JSON'], |
| 's3': ['XML'] |
| } |
| default_map = { |
| 'gs': 'JSON', |
| 's3': 'XML' |
| } |
| self.gsutil_api_map = cs_api_map.GsutilApiMapFactory.GetApiMap( |
| cs_api_map.GsutilApiClassMapFactory, support_map, default_map) |
| self.logger = CreateGsutilLogger('FakeCommand') |
| self.parallel_operations = do_parallel |
| self.failure_count = 0 |
| self.multiprocessing_is_available = ( |
| CheckMultiprocessingAvailableAndInit().is_available) |
| self.debug = 0 |
| |
| |
| class FakeCommandWithoutMultiprocessingModule(FakeCommand): |
| |
| def __init__(self, do_parallel): |
| super(FakeCommandWithoutMultiprocessingModule, self).__init__(do_parallel) |
| self.multiprocessing_is_available = False |
| |
| |
| # TODO: Figure out a good way to test that ctrl+C really stops execution, |
| # and also that ctrl+C works when there are still tasks enqueued. |
| class TestParallelismFramework(testcase.GsUtilUnitTestCase): |
| """gsutil parallelism framework test suite.""" |
| |
| command_class = FakeCommand |
| |
| def _RunApply(self, func, args_iterator, process_count, thread_count, |
| command_inst=None, shared_attrs=None, fail_on_error=False, |
| thr_exc_handler=None, arg_checker=DummyArgChecker): |
| command_inst = command_inst or self.command_class(True) |
| exception_handler = thr_exc_handler or _ExceptionHandler |
| |
| return command_inst.Apply(func, args_iterator, exception_handler, |
| thread_count=thread_count, |
| process_count=process_count, |
| arg_checker=arg_checker, |
| should_return_results=True, |
| shared_attrs=shared_attrs, |
| fail_on_error=fail_on_error) |
| |
| @RequiresIsolation |
| def testBasicApplySingleProcessSingleThread(self): |
| self._TestBasicApply(1, 1) |
| |
| @RequiresIsolation |
| def testBasicApplySingleProcessMultiThread(self): |
| self._TestBasicApply(1, 3) |
| |
| @RequiresIsolation |
| @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
| def testBasicApplyMultiProcessSingleThread(self): |
| self._TestBasicApply(3, 1) |
| |
| @RequiresIsolation |
| @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
| def testBasicApplyMultiProcessMultiThread(self): |
| self._TestBasicApply(3, 3) |
| |
| @Timeout |
| def _TestBasicApply(self, process_count, thread_count): |
| args = [()] * (17 * process_count * thread_count + 1) |
| |
| results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) |
| self.assertEqual(len(args), len(results)) |
| |
| @RequiresIsolation |
| def testNoTasksSingleProcessSingleThread(self): |
| self._TestApplyWithNoTasks(1, 1) |
| |
| @RequiresIsolation |
| def testNoTasksSingleProcessMultiThread(self): |
| self._TestApplyWithNoTasks(1, 3) |
| |
| @RequiresIsolation |
| @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
| def testNoTasksMultiProcessSingleThread(self): |
| self._TestApplyWithNoTasks(3, 1) |
| |
| @RequiresIsolation |
| @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
| def testNoTasksMultiProcessMultiThread(self): |
| self._TestApplyWithNoTasks(3, 3) |
| |
| @Timeout |
| def _TestApplyWithNoTasks(self, process_count, thread_count): |
| """Tests that calling Apply with no tasks releases locks/semaphores.""" |
| empty_args = [()] |
| |
| for _ in range(process_count * thread_count + 1): |
| self._RunApply(_ReturnOneValue, empty_args, process_count, thread_count) |
| |
| # Ensure that work can still be performed. |
| self._TestBasicApply(process_count, thread_count) |
| |
| @RequiresIsolation |
| @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
| def testApplySaturatesMultiProcessSingleThread(self): |
| self._TestApplySaturatesAvailableProcessesAndThreads(3, 1) |
| |
| @RequiresIsolation |
| def testApplySaturatesSingleProcessMultiThread(self): |
| self._TestApplySaturatesAvailableProcessesAndThreads(1, 3) |
| |
| @RequiresIsolation |
| @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
| def testApplySaturatesMultiProcessMultiThread(self): |
| self._TestApplySaturatesAvailableProcessesAndThreads(3, 3) |
| |
| @RequiresIsolation |
| def _TestApplySaturatesAvailableProcessesAndThreads(self, process_count, |
| thread_count): |
| """Tests that created processes and threads evenly share tasks.""" |
| calls_per_thread = 2 |
| args = [()] * (process_count * thread_count * calls_per_thread) |
| expected_calls_per_thread = calls_per_thread |
| |
| if not self.command_class(True).multiprocessing_is_available: |
| # When multiprocessing is unavailable, only a single process is used. |
| # Calls should be evenly distributed across threads. |
| expected_calls_per_thread = calls_per_thread * process_count |
| |
| results = self._RunApply(_SleepThenReturnProcAndThreadId, args, |
| process_count, thread_count) |
| usage_dict = {} # (process_id, thread_id): number of tasks performed |
| for (process_id, thread_id) in results: |
| usage_dict[(process_id, thread_id)] = ( |
| usage_dict.get((process_id, thread_id), 0) + 1) |
| |
| for (id_tuple, num_tasks_completed) in usage_dict.iteritems(): |
| self.assertEqual(num_tasks_completed, expected_calls_per_thread, |
| 'Process %s thread %s completed %s tasks. Expected: %s' % |
| (id_tuple[0], id_tuple[1], num_tasks_completed, |
| expected_calls_per_thread)) |
| |
| @RequiresIsolation |
| def testIteratorFailureSingleProcessSingleThread(self): |
| self._TestIteratorFailure(1, 1) |
| |
| @RequiresIsolation |
| def testIteratorFailureSingleProcessMultiThread(self): |
| self._TestIteratorFailure(1, 3) |
| |
| @RequiresIsolation |
| @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
| def testIteratorFailureMultiProcessSingleThread(self): |
| self._TestIteratorFailure(3, 1) |
| |
| @RequiresIsolation |
| @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
| def testIteratorFailureMultiProcessMultiThread(self): |
| self._TestIteratorFailure(3, 3) |
| |
| @Timeout |
| def _TestIteratorFailure(self, process_count, thread_count): |
| """Tests apply with a failing iterator.""" |
| # Tests for fail_on_error == False. |
| |
| args = FailingIterator(10, [0]) |
| results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) |
| self.assertEqual(9, len(results)) |
| |
| args = FailingIterator(10, [5]) |
| results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) |
| self.assertEqual(9, len(results)) |
| |
| args = FailingIterator(10, [9]) |
| results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) |
| self.assertEqual(9, len(results)) |
| |
| if process_count * thread_count > 1: |
| # In this case, we should ignore the fail_on_error flag. |
| args = FailingIterator(10, [9]) |
| results = self._RunApply(_ReturnOneValue, args, process_count, |
| thread_count, fail_on_error=True) |
| self.assertEqual(9, len(results)) |
| |
| args = FailingIterator(10, range(10)) |
| results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) |
| self.assertEqual(0, len(results)) |
| |
| args = FailingIterator(0, []) |
| results = self._RunApply(_ReturnOneValue, args, process_count, thread_count) |
| self.assertEqual(0, len(results)) |
| |
| @RequiresIsolation |
| def testTestSharedAttrsWorkSingleProcessSingleThread(self): |
| self._TestSharedAttrsWork(1, 1) |
| |
| @RequiresIsolation |
| def testTestSharedAttrsWorkSingleProcessMultiThread(self): |
| self._TestSharedAttrsWork(1, 3) |
| |
| @RequiresIsolation |
| @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
| def testTestSharedAttrsWorkMultiProcessSingleThread(self): |
| self._TestSharedAttrsWork(3, 1) |
| |
| @RequiresIsolation |
| @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
| def testTestSharedAttrsWorkMultiProcessMultiThread(self): |
| self._TestSharedAttrsWork(3, 3) |
| |
| @Timeout |
| def _TestSharedAttrsWork(self, process_count, thread_count): |
| """Tests that Apply successfully uses shared_attrs.""" |
| command_inst = self.command_class(True) |
| command_inst.arg_length_sum = 19 |
| args = ['foo', ['bar', 'baz'], [], ['x', 'y'], [], 'abcd'] |
| self._RunApply(_IncrementByLength, args, process_count, |
| thread_count, command_inst=command_inst, |
| shared_attrs=['arg_length_sum']) |
| expected_sum = 19 |
| for arg in args: |
| expected_sum += len(arg) |
| self.assertEqual(expected_sum, command_inst.arg_length_sum) |
| |
| # Test that shared variables work when the iterator fails at the beginning, |
| # middle, and end. |
| for (failing_iterator, expected_failure_count) in ( |
| (FailingIterator(5, [0]), 1), |
| (FailingIterator(10, [1, 3, 5]), 3), |
| (FailingIterator(5, [4]), 1)): |
| command_inst = self.command_class(True) |
| args = failing_iterator |
| self._RunApply(_ReturnOneValue, args, process_count, thread_count, |
| command_inst=command_inst, shared_attrs=['failure_count']) |
| self.assertEqual( |
| expected_failure_count, command_inst.failure_count, |
| msg='Failure count did not match. Expected: %s, actual: %s ' |
| 'for failing iterator of size %s, failing indices %s' % |
| (expected_failure_count, command_inst.failure_count, |
| failing_iterator.size, failing_iterator.failure_indices)) |
| |
| @RequiresIsolation |
| def testThreadsSurviveExceptionsInFuncSingleProcessSingleThread(self): |
| self._TestThreadsSurviveExceptionsInFunc(1, 1) |
| |
| @RequiresIsolation |
| def testThreadsSurviveExceptionsInFuncSingleProcessMultiThread(self): |
| self._TestThreadsSurviveExceptionsInFunc(1, 3) |
| |
| @RequiresIsolation |
| @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
| def testThreadsSurviveExceptionsInFuncMultiProcessSingleThread(self): |
| self._TestThreadsSurviveExceptionsInFunc(3, 1) |
| |
| @RequiresIsolation |
| @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
| def testThreadsSurviveExceptionsInFuncMultiProcessMultiThread(self): |
| self._TestThreadsSurviveExceptionsInFunc(3, 3) |
| |
| @Timeout |
| def _TestThreadsSurviveExceptionsInFunc(self, process_count, thread_count): |
| command_inst = self.command_class(True) |
| args = ([()] * 5) |
| self._RunApply(_FailureFunc, args, process_count, thread_count, |
| command_inst=command_inst, shared_attrs=['failure_count'], |
| thr_exc_handler=_FailingExceptionHandler) |
| self.assertEqual(len(args), command_inst.failure_count) |
| |
| @RequiresIsolation |
| def testThreadsSurviveExceptionsInHandlerSingleProcessSingleThread(self): |
| self._TestThreadsSurviveExceptionsInHandler(1, 1) |
| |
| @RequiresIsolation |
| def testThreadsSurviveExceptionsInHandlerSingleProcessMultiThread(self): |
| self._TestThreadsSurviveExceptionsInHandler(1, 3) |
| |
| @RequiresIsolation |
| @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
| def testThreadsSurviveExceptionsInHandlerMultiProcessSingleThread(self): |
| self._TestThreadsSurviveExceptionsInHandler(3, 1) |
| |
| @RequiresIsolation |
| @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
| def testThreadsSurviveExceptionsInHandlerMultiProcessMultiThread(self): |
| self._TestThreadsSurviveExceptionsInHandler(3, 3) |
| |
| @Timeout |
| def _TestThreadsSurviveExceptionsInHandler(self, process_count, thread_count): |
| command_inst = self.command_class(True) |
| args = ([()] * 5) |
| self._RunApply(_FailureFunc, args, process_count, thread_count, |
| command_inst=command_inst, shared_attrs=['failure_count'], |
| thr_exc_handler=_FailingExceptionHandler) |
| self.assertEqual(len(args), command_inst.failure_count) |
| |
| @RequiresIsolation |
| @Timeout |
| def testFailOnErrorFlag(self): |
| """Tests that fail_on_error produces the correct exception on failure.""" |
| def _ExpectCustomException(test_func): |
| try: |
| test_func() |
| self.fail( |
| 'Setting fail_on_error should raise any exception encountered.') |
| except CustomException, e: |
| pass |
| except Exception, e: |
| self.fail('Got unexpected error: ' + str(e)) |
| |
| def _RunFailureFunc(): |
| command_inst = self.command_class(True) |
| args = ([()] * 5) |
| self._RunApply(_FailureFunc, args, 1, 1, command_inst=command_inst, |
| shared_attrs=['failure_count'], fail_on_error=True) |
| _ExpectCustomException(_RunFailureFunc) |
| |
| def _RunFailingIteratorFirstPosition(): |
| args = FailingIterator(10, [0]) |
| results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True) |
| self.assertEqual(0, len(results)) |
| _ExpectCustomException(_RunFailingIteratorFirstPosition) |
| |
| def _RunFailingIteratorPositionMiddlePosition(): |
| args = FailingIterator(10, [5]) |
| results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True) |
| self.assertEqual(5, len(results)) |
| _ExpectCustomException(_RunFailingIteratorPositionMiddlePosition) |
| |
| def _RunFailingIteratorLastPosition(): |
| args = FailingIterator(10, [9]) |
| results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True) |
| self.assertEqual(9, len(results)) |
| _ExpectCustomException(_RunFailingIteratorLastPosition) |
| |
| def _RunFailingIteratorMultiplePositions(): |
| args = FailingIterator(10, [1, 3, 5]) |
| results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True) |
| self.assertEqual(1, len(results)) |
| _ExpectCustomException(_RunFailingIteratorMultiplePositions) |
| |
| @RequiresIsolation |
| def testRecursiveDepthThreeDifferentFunctionsSingleProcessSingleThread(self): |
| self._TestRecursiveDepthThreeDifferentFunctions(1, 1) |
| |
| @RequiresIsolation |
| def testRecursiveDepthThreeDifferentFunctionsSingleProcessMultiThread(self): |
| self._TestRecursiveDepthThreeDifferentFunctions(1, 3) |
| |
| @RequiresIsolation |
| @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
| def testRecursiveDepthThreeDifferentFunctionsMultiProcessSingleThread(self): |
| self._TestRecursiveDepthThreeDifferentFunctions(3, 1) |
| |
| @RequiresIsolation |
| @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
| def testRecursiveDepthThreeDifferentFunctionsMultiProcessMultiThread(self): |
| self._TestRecursiveDepthThreeDifferentFunctions(3, 3) |
| |
| @Timeout |
| def _TestRecursiveDepthThreeDifferentFunctions(self, process_count, |
| thread_count): |
| """Tests recursive application of Apply. |
| |
| Calls Apply(A), where A calls Apply(B) followed by Apply(C) and B calls |
| Apply(C). |
| |
| Args: |
| process_count: Number of processes to use. |
| thread_count: Number of threads to use. |
| """ |
| base_args = [3, 1, 4, 1, 5] |
| args = [[process_count, thread_count, count] for count in base_args] |
| |
| results = self._RunApply(_ReApplyWithReplicatedArguments, args, |
| process_count, thread_count) |
| self.assertEqual(7 * (sum(base_args) + len(base_args)), sum(results)) |
| |
| @RequiresIsolation |
| def testExceptionInProducerRaisesAndTerminatesSingleProcessSingleThread(self): |
| self._TestExceptionInProducerRaisesAndTerminates(1, 1) |
| |
| @RequiresIsolation |
| def testExceptionInProducerRaisesAndTerminatesSingleProcessMultiThread(self): |
| self._TestExceptionInProducerRaisesAndTerminates(1, 3) |
| |
| @RequiresIsolation |
| @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
| def testExceptionInProducerRaisesAndTerminatesMultiProcessSingleThread(self): |
| self._TestExceptionInProducerRaisesAndTerminates(3, 1) |
| |
| @RequiresIsolation |
| @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
| def testExceptionInProducerRaisesAndTerminatesMultiProcessMultiThread(self): |
| self._TestExceptionInProducerRaisesAndTerminates(3, 3) |
| |
| @Timeout |
| def _TestExceptionInProducerRaisesAndTerminates(self, process_count, |
| thread_count): |
| args = self # The ProducerThread will try and fail to iterate over this. |
| try: |
| self._RunApply(_ReturnOneValue, args, process_count, thread_count) |
| self.fail('Did not raise expected exception.') |
| except TypeError: |
| pass |
| |
| @RequiresIsolation |
| def testSkippedArgumentsSingleThreadSingleProcess(self): |
| self._TestSkippedArguments(1, 1) |
| |
| @RequiresIsolation |
| def testSkippedArgumentsMultiThreadSingleProcess(self): |
| self._TestSkippedArguments(1, 3) |
| |
| @RequiresIsolation |
| @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
| def testSkippedArgumentsSingleThreadMultiProcess(self): |
| self._TestSkippedArguments(3, 1) |
| |
| @RequiresIsolation |
| @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows') |
| def testSkippedArgumentsMultiThreadMultiProcess(self): |
| self._TestSkippedArguments(3, 3) |
| |
| @Timeout |
| def _TestSkippedArguments(self, process_count, thread_count): |
| |
| # Skip a proper subset of the arguments. |
| n = 2 * process_count * thread_count |
| args = range(1, n + 1) |
| results = self._RunApply(_ReturnOneValue, args, process_count, thread_count, |
| arg_checker=_SkipEvenNumbersArgChecker) |
| self.assertEqual(n / 2, len(results)) # We know n is even. |
| self.assertEqual(n / 2, sum(results)) |
| |
| # Skip all arguments. |
| args = [2 * x for x in args] |
| results = self._RunApply(_ReturnOneValue, args, process_count, thread_count, |
| arg_checker=_SkipEvenNumbersArgChecker) |
| self.assertEqual(0, len(results)) |
| |
| |
| class TestParallelismFrameworkWithoutMultiprocessing(TestParallelismFramework): |
| """Tests parallelism framework works with multiprocessing module unavailable. |
| |
| Notably, this test has no way to override previous calls |
| to gslib.util.CheckMultiprocessingAvailableAndInit to prevent the |
| initialization of all of the global variables in command.py, so this still |
| behaves slightly differently than the behavior one would see on a machine |
| where the multiprocessing functionality is actually not available (in |
| particular, it will not catch the case where a global variable that is not |
| available for the sequential path is referenced before initialization). |
| """ |
| command_class = FakeCommandWithoutMultiprocessingModule |