blob: b3b9f46712567d9e3626a36c249014542f4da147 [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright 2013 Google Inc. All Rights Reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish, dis-
# tribute, sublicense, and/or sell copies of the Software, and to permit
# persons to whom the Software is furnished to do so, subject to the fol-
# lowing conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
"""Unit tests for gsutil parallelism framework."""
from __future__ import absolute_import
import functools
import os
import signal
import threading
import time
from boto.storage_uri import BucketStorageUri
from gslib import cs_api_map
from gslib.command import Command
from gslib.command import CreateGsutilLogger
from gslib.command import DummyArgChecker
import gslib.tests.testcase as testcase
from gslib.tests.testcase.base import RequiresIsolation
from gslib.tests.util import unittest
from gslib.util import CheckMultiprocessingAvailableAndInit
from gslib.util import IS_WINDOWS
# Amount of time for an individual test to run before timing out. We need a
# reasonably high value since if many tests are running in parallel, an
# individual test may take a while to complete.
_TEST_TIMEOUT_SECONDS = 120
def Timeout(func):
"""Decorator used to provide a timeout for functions."""
@functools.wraps(func)
def Wrapper(*args, **kwargs):
if not IS_WINDOWS:
signal.signal(signal.SIGALRM, _HandleAlarm)
signal.alarm(_TEST_TIMEOUT_SECONDS)
try:
func(*args, **kwargs)
finally:
if not IS_WINDOWS:
signal.alarm(0) # Cancel the alarm.
return Wrapper
# pylint: disable=unused-argument
def _HandleAlarm(signal_num, cur_stack_frame):
raise Exception('Test timed out.')
class CustomException(Exception):
def __init__(self, exception_str):
super(CustomException, self).__init__(exception_str)
def _ReturnOneValue(cls, args, thread_state=None):
return 1
def _ReturnProcAndThreadId(cls, args, thread_state=None):
return (os.getpid(), threading.currentThread().ident)
def _SleepThenReturnProcAndThreadId(cls, args, thread_state=None):
# This can fail if the total time to spawn new processes and threads takes
# longer than 5 seconds, but if that occurs, then we have a performance
# problem that needs to be addressed.
time.sleep(5)
return _ReturnProcAndThreadId(cls, args, thread_state=thread_state)
def _FailureFunc(cls, args, thread_state=None):
raise CustomException('Failing on purpose.')
def _FailingExceptionHandler(cls, e):
cls.failure_count += 1
raise CustomException('Exception handler failing on purpose.')
def _ExceptionHandler(cls, e):
cls.logger.exception(e)
cls.failure_count += 1
def _IncrementByLength(cls, args, thread_state=None):
cls.arg_length_sum += len(args)
def _AdjustProcessCountIfWindows(process_count):
if IS_WINDOWS:
return 1
else:
return process_count
def _ReApplyWithReplicatedArguments(cls, args, thread_state=None):
"""Calls Apply with arguments repeated seven times.
The first two elements of args should be the process and thread counts,
respectively, to be used for the recursive calls.
Args:
cls: The Command class to call Apply on.
args: Arguments to pass to Apply.
thread_state: Unused, required by function signature.
Returns:
Number of values returned by the two calls to Apply.
"""
new_args = [args] * 7
process_count = _AdjustProcessCountIfWindows(args[0])
thread_count = args[1]
return_values = cls.Apply(_PerformNRecursiveCalls, new_args,
_ExceptionHandler, arg_checker=DummyArgChecker,
process_count=process_count,
thread_count=thread_count,
should_return_results=True)
ret = sum(return_values)
return_values = cls.Apply(_ReturnOneValue, new_args,
_ExceptionHandler, arg_checker=DummyArgChecker,
process_count=process_count,
thread_count=thread_count,
should_return_results=True)
return len(return_values) + ret
def _PerformNRecursiveCalls(cls, args, thread_state=None):
"""Calls Apply to perform N recursive calls.
The first two elements of args should be the process and thread counts,
respectively, to be used for the recursive calls, while N is the third element
(the number of recursive calls to make).
Args:
cls: The Command class to call Apply on.
args: Arguments to pass to Apply.
thread_state: Unused, required by function signature.
Returns:
Number of values returned by the call to Apply.
"""
process_count = _AdjustProcessCountIfWindows(args[0])
thread_count = args[1]
return_values = cls.Apply(_ReturnOneValue, [()] * args[2], _ExceptionHandler,
arg_checker=DummyArgChecker,
process_count=process_count,
thread_count=thread_count,
should_return_results=True)
return len(return_values)
def _SkipEvenNumbersArgChecker(cls, arg):
return arg % 2 != 0
class FailingIterator(object):
def __init__(self, size, failure_indices):
self.size = size
self.failure_indices = failure_indices
self.current_index = 0
def __iter__(self):
return self
def next(self):
if self.current_index == self.size:
raise StopIteration('')
elif self.current_index in self.failure_indices:
self.current_index += 1
raise CustomException(
'Iterator failing on purpose at index %d.' % self.current_index)
else:
self.current_index += 1
return self.current_index - 1
class FakeCommand(Command):
"""Fake command class for overriding command instance state."""
command_spec = Command.CreateCommandSpec(
'fake',
command_name_aliases=[],
)
# Help specification. See help_provider.py for documentation.
help_spec = Command.HelpSpec(
help_name='fake',
help_name_aliases=[],
help_type='command_help',
help_one_line_summary='Something to take up space.',
help_text='Something else to take up space.',
subcommand_help_text={},
)
def __init__(self, do_parallel):
self.bucket_storage_uri_class = BucketStorageUri
support_map = {
'gs': ['JSON'],
's3': ['XML']
}
default_map = {
'gs': 'JSON',
's3': 'XML'
}
self.gsutil_api_map = cs_api_map.GsutilApiMapFactory.GetApiMap(
cs_api_map.GsutilApiClassMapFactory, support_map, default_map)
self.logger = CreateGsutilLogger('FakeCommand')
self.parallel_operations = do_parallel
self.failure_count = 0
self.multiprocessing_is_available = (
CheckMultiprocessingAvailableAndInit().is_available)
self.debug = 0
class FakeCommandWithoutMultiprocessingModule(FakeCommand):
def __init__(self, do_parallel):
super(FakeCommandWithoutMultiprocessingModule, self).__init__(do_parallel)
self.multiprocessing_is_available = False
# TODO: Figure out a good way to test that ctrl+C really stops execution,
# and also that ctrl+C works when there are still tasks enqueued.
class TestParallelismFramework(testcase.GsUtilUnitTestCase):
"""gsutil parallelism framework test suite."""
command_class = FakeCommand
def _RunApply(self, func, args_iterator, process_count, thread_count,
command_inst=None, shared_attrs=None, fail_on_error=False,
thr_exc_handler=None, arg_checker=DummyArgChecker):
command_inst = command_inst or self.command_class(True)
exception_handler = thr_exc_handler or _ExceptionHandler
return command_inst.Apply(func, args_iterator, exception_handler,
thread_count=thread_count,
process_count=process_count,
arg_checker=arg_checker,
should_return_results=True,
shared_attrs=shared_attrs,
fail_on_error=fail_on_error)
@RequiresIsolation
def testBasicApplySingleProcessSingleThread(self):
self._TestBasicApply(1, 1)
@RequiresIsolation
def testBasicApplySingleProcessMultiThread(self):
self._TestBasicApply(1, 3)
@RequiresIsolation
@unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
def testBasicApplyMultiProcessSingleThread(self):
self._TestBasicApply(3, 1)
@RequiresIsolation
@unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
def testBasicApplyMultiProcessMultiThread(self):
self._TestBasicApply(3, 3)
@Timeout
def _TestBasicApply(self, process_count, thread_count):
args = [()] * (17 * process_count * thread_count + 1)
results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
self.assertEqual(len(args), len(results))
@RequiresIsolation
def testNoTasksSingleProcessSingleThread(self):
self._TestApplyWithNoTasks(1, 1)
@RequiresIsolation
def testNoTasksSingleProcessMultiThread(self):
self._TestApplyWithNoTasks(1, 3)
@RequiresIsolation
@unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
def testNoTasksMultiProcessSingleThread(self):
self._TestApplyWithNoTasks(3, 1)
@RequiresIsolation
@unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
def testNoTasksMultiProcessMultiThread(self):
self._TestApplyWithNoTasks(3, 3)
@Timeout
def _TestApplyWithNoTasks(self, process_count, thread_count):
"""Tests that calling Apply with no tasks releases locks/semaphores."""
empty_args = [()]
for _ in range(process_count * thread_count + 1):
self._RunApply(_ReturnOneValue, empty_args, process_count, thread_count)
# Ensure that work can still be performed.
self._TestBasicApply(process_count, thread_count)
@RequiresIsolation
@unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
def testApplySaturatesMultiProcessSingleThread(self):
self._TestApplySaturatesAvailableProcessesAndThreads(3, 1)
@RequiresIsolation
def testApplySaturatesSingleProcessMultiThread(self):
self._TestApplySaturatesAvailableProcessesAndThreads(1, 3)
@RequiresIsolation
@unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
def testApplySaturatesMultiProcessMultiThread(self):
self._TestApplySaturatesAvailableProcessesAndThreads(3, 3)
@RequiresIsolation
def _TestApplySaturatesAvailableProcessesAndThreads(self, process_count,
thread_count):
"""Tests that created processes and threads evenly share tasks."""
calls_per_thread = 2
args = [()] * (process_count * thread_count * calls_per_thread)
expected_calls_per_thread = calls_per_thread
if not self.command_class(True).multiprocessing_is_available:
# When multiprocessing is unavailable, only a single process is used.
# Calls should be evenly distributed across threads.
expected_calls_per_thread = calls_per_thread * process_count
results = self._RunApply(_SleepThenReturnProcAndThreadId, args,
process_count, thread_count)
usage_dict = {} # (process_id, thread_id): number of tasks performed
for (process_id, thread_id) in results:
usage_dict[(process_id, thread_id)] = (
usage_dict.get((process_id, thread_id), 0) + 1)
for (id_tuple, num_tasks_completed) in usage_dict.iteritems():
self.assertEqual(num_tasks_completed, expected_calls_per_thread,
'Process %s thread %s completed %s tasks. Expected: %s' %
(id_tuple[0], id_tuple[1], num_tasks_completed,
expected_calls_per_thread))
@RequiresIsolation
def testIteratorFailureSingleProcessSingleThread(self):
self._TestIteratorFailure(1, 1)
@RequiresIsolation
def testIteratorFailureSingleProcessMultiThread(self):
self._TestIteratorFailure(1, 3)
@RequiresIsolation
@unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
def testIteratorFailureMultiProcessSingleThread(self):
self._TestIteratorFailure(3, 1)
@RequiresIsolation
@unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
def testIteratorFailureMultiProcessMultiThread(self):
self._TestIteratorFailure(3, 3)
@Timeout
def _TestIteratorFailure(self, process_count, thread_count):
"""Tests apply with a failing iterator."""
# Tests for fail_on_error == False.
args = FailingIterator(10, [0])
results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
self.assertEqual(9, len(results))
args = FailingIterator(10, [5])
results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
self.assertEqual(9, len(results))
args = FailingIterator(10, [9])
results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
self.assertEqual(9, len(results))
if process_count * thread_count > 1:
# In this case, we should ignore the fail_on_error flag.
args = FailingIterator(10, [9])
results = self._RunApply(_ReturnOneValue, args, process_count,
thread_count, fail_on_error=True)
self.assertEqual(9, len(results))
args = FailingIterator(10, range(10))
results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
self.assertEqual(0, len(results))
args = FailingIterator(0, [])
results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
self.assertEqual(0, len(results))
@RequiresIsolation
def testTestSharedAttrsWorkSingleProcessSingleThread(self):
self._TestSharedAttrsWork(1, 1)
@RequiresIsolation
def testTestSharedAttrsWorkSingleProcessMultiThread(self):
self._TestSharedAttrsWork(1, 3)
@RequiresIsolation
@unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
def testTestSharedAttrsWorkMultiProcessSingleThread(self):
self._TestSharedAttrsWork(3, 1)
@RequiresIsolation
@unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
def testTestSharedAttrsWorkMultiProcessMultiThread(self):
self._TestSharedAttrsWork(3, 3)
@Timeout
def _TestSharedAttrsWork(self, process_count, thread_count):
"""Tests that Apply successfully uses shared_attrs."""
command_inst = self.command_class(True)
command_inst.arg_length_sum = 19
args = ['foo', ['bar', 'baz'], [], ['x', 'y'], [], 'abcd']
self._RunApply(_IncrementByLength, args, process_count,
thread_count, command_inst=command_inst,
shared_attrs=['arg_length_sum'])
expected_sum = 19
for arg in args:
expected_sum += len(arg)
self.assertEqual(expected_sum, command_inst.arg_length_sum)
# Test that shared variables work when the iterator fails at the beginning,
# middle, and end.
for (failing_iterator, expected_failure_count) in (
(FailingIterator(5, [0]), 1),
(FailingIterator(10, [1, 3, 5]), 3),
(FailingIterator(5, [4]), 1)):
command_inst = self.command_class(True)
args = failing_iterator
self._RunApply(_ReturnOneValue, args, process_count, thread_count,
command_inst=command_inst, shared_attrs=['failure_count'])
self.assertEqual(
expected_failure_count, command_inst.failure_count,
msg='Failure count did not match. Expected: %s, actual: %s '
'for failing iterator of size %s, failing indices %s' %
(expected_failure_count, command_inst.failure_count,
failing_iterator.size, failing_iterator.failure_indices))
@RequiresIsolation
def testThreadsSurviveExceptionsInFuncSingleProcessSingleThread(self):
self._TestThreadsSurviveExceptionsInFunc(1, 1)
@RequiresIsolation
def testThreadsSurviveExceptionsInFuncSingleProcessMultiThread(self):
self._TestThreadsSurviveExceptionsInFunc(1, 3)
@RequiresIsolation
@unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
def testThreadsSurviveExceptionsInFuncMultiProcessSingleThread(self):
self._TestThreadsSurviveExceptionsInFunc(3, 1)
@RequiresIsolation
@unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
def testThreadsSurviveExceptionsInFuncMultiProcessMultiThread(self):
self._TestThreadsSurviveExceptionsInFunc(3, 3)
@Timeout
def _TestThreadsSurviveExceptionsInFunc(self, process_count, thread_count):
command_inst = self.command_class(True)
args = ([()] * 5)
self._RunApply(_FailureFunc, args, process_count, thread_count,
command_inst=command_inst, shared_attrs=['failure_count'],
thr_exc_handler=_FailingExceptionHandler)
self.assertEqual(len(args), command_inst.failure_count)
@RequiresIsolation
def testThreadsSurviveExceptionsInHandlerSingleProcessSingleThread(self):
self._TestThreadsSurviveExceptionsInHandler(1, 1)
@RequiresIsolation
def testThreadsSurviveExceptionsInHandlerSingleProcessMultiThread(self):
self._TestThreadsSurviveExceptionsInHandler(1, 3)
@RequiresIsolation
@unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
def testThreadsSurviveExceptionsInHandlerMultiProcessSingleThread(self):
self._TestThreadsSurviveExceptionsInHandler(3, 1)
@RequiresIsolation
@unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
def testThreadsSurviveExceptionsInHandlerMultiProcessMultiThread(self):
self._TestThreadsSurviveExceptionsInHandler(3, 3)
@Timeout
def _TestThreadsSurviveExceptionsInHandler(self, process_count, thread_count):
command_inst = self.command_class(True)
args = ([()] * 5)
self._RunApply(_FailureFunc, args, process_count, thread_count,
command_inst=command_inst, shared_attrs=['failure_count'],
thr_exc_handler=_FailingExceptionHandler)
self.assertEqual(len(args), command_inst.failure_count)
@RequiresIsolation
@Timeout
def testFailOnErrorFlag(self):
"""Tests that fail_on_error produces the correct exception on failure."""
def _ExpectCustomException(test_func):
try:
test_func()
self.fail(
'Setting fail_on_error should raise any exception encountered.')
except CustomException, e:
pass
except Exception, e:
self.fail('Got unexpected error: ' + str(e))
def _RunFailureFunc():
command_inst = self.command_class(True)
args = ([()] * 5)
self._RunApply(_FailureFunc, args, 1, 1, command_inst=command_inst,
shared_attrs=['failure_count'], fail_on_error=True)
_ExpectCustomException(_RunFailureFunc)
def _RunFailingIteratorFirstPosition():
args = FailingIterator(10, [0])
results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True)
self.assertEqual(0, len(results))
_ExpectCustomException(_RunFailingIteratorFirstPosition)
def _RunFailingIteratorPositionMiddlePosition():
args = FailingIterator(10, [5])
results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True)
self.assertEqual(5, len(results))
_ExpectCustomException(_RunFailingIteratorPositionMiddlePosition)
def _RunFailingIteratorLastPosition():
args = FailingIterator(10, [9])
results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True)
self.assertEqual(9, len(results))
_ExpectCustomException(_RunFailingIteratorLastPosition)
def _RunFailingIteratorMultiplePositions():
args = FailingIterator(10, [1, 3, 5])
results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True)
self.assertEqual(1, len(results))
_ExpectCustomException(_RunFailingIteratorMultiplePositions)
@RequiresIsolation
def testRecursiveDepthThreeDifferentFunctionsSingleProcessSingleThread(self):
self._TestRecursiveDepthThreeDifferentFunctions(1, 1)
@RequiresIsolation
def testRecursiveDepthThreeDifferentFunctionsSingleProcessMultiThread(self):
self._TestRecursiveDepthThreeDifferentFunctions(1, 3)
@RequiresIsolation
@unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
def testRecursiveDepthThreeDifferentFunctionsMultiProcessSingleThread(self):
self._TestRecursiveDepthThreeDifferentFunctions(3, 1)
@RequiresIsolation
@unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
def testRecursiveDepthThreeDifferentFunctionsMultiProcessMultiThread(self):
self._TestRecursiveDepthThreeDifferentFunctions(3, 3)
@Timeout
def _TestRecursiveDepthThreeDifferentFunctions(self, process_count,
thread_count):
"""Tests recursive application of Apply.
Calls Apply(A), where A calls Apply(B) followed by Apply(C) and B calls
Apply(C).
Args:
process_count: Number of processes to use.
thread_count: Number of threads to use.
"""
base_args = [3, 1, 4, 1, 5]
args = [[process_count, thread_count, count] for count in base_args]
results = self._RunApply(_ReApplyWithReplicatedArguments, args,
process_count, thread_count)
self.assertEqual(7 * (sum(base_args) + len(base_args)), sum(results))
@RequiresIsolation
def testExceptionInProducerRaisesAndTerminatesSingleProcessSingleThread(self):
self._TestExceptionInProducerRaisesAndTerminates(1, 1)
@RequiresIsolation
def testExceptionInProducerRaisesAndTerminatesSingleProcessMultiThread(self):
self._TestExceptionInProducerRaisesAndTerminates(1, 3)
@RequiresIsolation
@unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
def testExceptionInProducerRaisesAndTerminatesMultiProcessSingleThread(self):
self._TestExceptionInProducerRaisesAndTerminates(3, 1)
@RequiresIsolation
@unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
def testExceptionInProducerRaisesAndTerminatesMultiProcessMultiThread(self):
self._TestExceptionInProducerRaisesAndTerminates(3, 3)
@Timeout
def _TestExceptionInProducerRaisesAndTerminates(self, process_count,
thread_count):
args = self # The ProducerThread will try and fail to iterate over this.
try:
self._RunApply(_ReturnOneValue, args, process_count, thread_count)
self.fail('Did not raise expected exception.')
except TypeError:
pass
@RequiresIsolation
def testSkippedArgumentsSingleThreadSingleProcess(self):
self._TestSkippedArguments(1, 1)
@RequiresIsolation
def testSkippedArgumentsMultiThreadSingleProcess(self):
self._TestSkippedArguments(1, 3)
@RequiresIsolation
@unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
def testSkippedArgumentsSingleThreadMultiProcess(self):
self._TestSkippedArguments(3, 1)
@RequiresIsolation
@unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
def testSkippedArgumentsMultiThreadMultiProcess(self):
self._TestSkippedArguments(3, 3)
@Timeout
def _TestSkippedArguments(self, process_count, thread_count):
# Skip a proper subset of the arguments.
n = 2 * process_count * thread_count
args = range(1, n + 1)
results = self._RunApply(_ReturnOneValue, args, process_count, thread_count,
arg_checker=_SkipEvenNumbersArgChecker)
self.assertEqual(n / 2, len(results)) # We know n is even.
self.assertEqual(n / 2, sum(results))
# Skip all arguments.
args = [2 * x for x in args]
results = self._RunApply(_ReturnOneValue, args, process_count, thread_count,
arg_checker=_SkipEvenNumbersArgChecker)
self.assertEqual(0, len(results))
class TestParallelismFrameworkWithoutMultiprocessing(TestParallelismFramework):
"""Tests parallelism framework works with multiprocessing module unavailable.
Notably, this test has no way to override previous calls
to gslib.util.CheckMultiprocessingAvailableAndInit to prevent the
initialization of all of the global variables in command.py, so this still
behaves slightly differently than the behavior one would see on a machine
where the multiprocessing functionality is actually not available (in
particular, it will not catch the case where a global variable that is not
available for the sequential path is referenced before initialization).
"""
command_class = FakeCommandWithoutMultiprocessingModule