| # -*- coding: utf-8 -*- |
| # Copyright 2011 Google Inc. All Rights Reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """Implementation of gsutil test command.""" |
| |
| from __future__ import absolute_import |
| |
| from collections import namedtuple |
| import logging |
| import os |
| import subprocess |
| import sys |
| import tempfile |
| import textwrap |
| import time |
| |
| import gslib |
| from gslib.command import Command |
| from gslib.command import ResetFailureCount |
| from gslib.exception import CommandException |
| from gslib.project_id import PopulateProjectId |
| import gslib.tests as tests |
| from gslib.util import IS_WINDOWS |
| from gslib.util import NO_MAX |
| |
| |
| # For Python 2.6, unittest2 is required to run the tests. If it's not available, |
| # display an error if the test command is run instead of breaking the whole |
| # program. |
| # pylint: disable=g-import-not-at-top |
| try: |
| from gslib.tests.util import GetTestNames |
| from gslib.tests.util import unittest |
| except ImportError as e: |
| if 'unittest2' in str(e): |
| unittest = None |
| GetTestNames = None # pylint: disable=invalid-name |
| else: |
| raise |
| |
| |
| try: |
| import coverage |
| except ImportError: |
| coverage = None |
| |
| |
| _DEFAULT_TEST_PARALLEL_PROCESSES = 5 |
| _DEFAULT_S3_TEST_PARALLEL_PROCESSES = 50 |
| _SEQUENTIAL_ISOLATION_FLAG = 'sequential_only' |
| |
| |
| _SYNOPSIS = """ |
| gsutil test [-l] [-u] [-f] [command command...] |
| """ |
| |
| _DETAILED_HELP_TEXT = (""" |
| <B>SYNOPSIS</B> |
| """ + _SYNOPSIS + """ |
| |
| |
| <B>DESCRIPTION</B> |
| The gsutil test command runs the gsutil unit tests and integration tests. |
| The unit tests use an in-memory mock storage service implementation, while |
| the integration tests send requests to the production service using the |
| preferred API set in the boto configuration file (see "gsutil help apis" for |
| details). |
| |
| To run both the unit tests and integration tests, run the command with no |
| arguments: |
| |
| gsutil test |
| |
| To run the unit tests only (which run quickly): |
| |
| gsutil test -u |
| |
| Tests run in parallel regardless of whether the top-level -m flag is |
| present. To limit the number of tests run in parallel to 10 at a time: |
| |
| gsutil test -p 10 |
| |
| To force tests to run sequentially: |
| |
| gsutil test -p 1 |
| |
| To have sequentially-run tests stop running immediately when an error occurs: |
| |
| gsutil test -f |
| |
| To run tests for one or more individual commands add those commands as |
| arguments. For example, the following command will run the cp and mv command |
| tests: |
| |
| gsutil test cp mv |
| |
| To list available tests, run the test command with the -l argument: |
| |
| gsutil test -l |
| |
| The tests are defined in the code under the gslib/tests module. Each test |
| file is of the format test_[name].py where [name] is the test name you can |
| pass to this command. For example, running "gsutil test ls" would run the |
| tests in "gslib/tests/test_ls.py". |
| |
| You can also run an individual test class or function name by passing the |
| test module followed by the class name and optionally a test name. For |
| example, to run the an entire test class by name: |
| |
| gsutil test naming.GsutilNamingTests |
| |
| or an individual test function: |
| |
| gsutil test cp.TestCp.test_streaming |
| |
| You can list the available tests under a module or class by passing arguments |
| with the -l option. For example, to list all available test functions in the |
| cp module: |
| |
| gsutil test -l cp |
| |
| To output test coverage: |
| |
| gsutil test -c -p 500 |
| coverage html |
| |
| This will output an HTML report to a directory named 'htmlcov'. |
| |
| |
| <B>OPTIONS</B> |
| -c Output coverage information. |
| |
| -f Exit on first sequential test failure. |
| |
| -l List available tests. |
| |
| -p N Run at most N tests in parallel. The default value is %d. |
| |
| -s Run tests against S3 instead of GS. |
| |
| -u Only run unit tests. |
| """ % _DEFAULT_TEST_PARALLEL_PROCESSES) |
| |
| |
| TestProcessData = namedtuple('TestProcessData', |
| 'name return_code stdout stderr') |
| |
| |
| def MakeCustomTestResultClass(total_tests): |
| """Creates a closure of CustomTestResult. |
| |
| Args: |
| total_tests: The total number of tests being run. |
| |
| Returns: |
| An instance of CustomTestResult. |
| """ |
| |
| class CustomTestResult(unittest.TextTestResult): |
| """A subclass of unittest.TextTestResult that prints a progress report.""" |
| |
| def startTest(self, test): |
| super(CustomTestResult, self).startTest(test) |
| if self.dots: |
| test_id = '.'.join(test.id().split('.')[-2:]) |
| message = ('\r%d/%d finished - E[%d] F[%d] s[%d] - %s' % ( |
| self.testsRun, total_tests, len(self.errors), |
| len(self.failures), len(self.skipped), test_id)) |
| message = message[:73] |
| message = message.ljust(73) |
| self.stream.write('%s - ' % message) |
| |
| return CustomTestResult |
| |
| |
| def GetTestNamesFromSuites(test_suite): |
| """Takes a list of test suites and returns a list of contained test names.""" |
| suites = [test_suite] |
| test_names = [] |
| while suites: |
| suite = suites.pop() |
| for test in suite: |
| if isinstance(test, unittest.TestSuite): |
| suites.append(test) |
| else: |
| test_names.append(test.id()[len('gslib.tests.test_'):]) |
| return test_names |
| |
| |
| # pylint: disable=protected-access |
| # Need to get into the guts of unittest to evaluate test cases for parallelism. |
| def TestCaseToName(test_case): |
| """Converts a python.unittest to its gsutil test-callable name.""" |
| return (str(test_case.__class__).split('\'')[1] + '.' + |
| test_case._testMethodName) |
| |
| |
| # pylint: disable=protected-access |
| # Need to get into the guts of unittest to evaluate test cases for parallelism. |
| def SplitParallelizableTestSuite(test_suite): |
| """Splits a test suite into groups with different running properties. |
| |
| Args: |
| test_suite: A python unittest test suite. |
| |
| Returns: |
| 4-part tuple of lists of test names: |
| (tests that must be run sequentially, |
| tests that must be isolated in a separate process but can be run either |
| sequentially or in parallel, |
| unit tests that can be run in parallel, |
| integration tests that can run in parallel) |
| """ |
| # pylint: disable=import-not-at-top |
| # Need to import this after test globals are set so that skip functions work. |
| from gslib.tests.testcase.unit_testcase import GsUtilUnitTestCase |
| isolated_tests = [] |
| sequential_tests = [] |
| parallelizable_integration_tests = [] |
| parallelizable_unit_tests = [] |
| |
| items_to_evaluate = [test_suite] |
| cases_to_evaluate = [] |
| # Expand the test suites into individual test cases: |
| while items_to_evaluate: |
| suite_or_case = items_to_evaluate.pop() |
| if isinstance(suite_or_case, unittest.suite.TestSuite): |
| for item in suite_or_case._tests: |
| items_to_evaluate.append(item) |
| elif isinstance(suite_or_case, unittest.TestCase): |
| cases_to_evaluate.append(suite_or_case) |
| |
| for test_case in cases_to_evaluate: |
| test_method = getattr(test_case, test_case._testMethodName, None) |
| if getattr(test_method, 'requires_isolation', False): |
| # Test must be isolated to a separate process, even it if is being |
| # run sequentially. |
| isolated_tests.append(TestCaseToName(test_case)) |
| elif not getattr(test_method, 'is_parallelizable', True): |
| sequential_tests.append(TestCaseToName(test_case)) |
| elif isinstance(test_case, GsUtilUnitTestCase): |
| parallelizable_unit_tests.append(TestCaseToName(test_case)) |
| else: |
| parallelizable_integration_tests.append(TestCaseToName(test_case)) |
| |
| return (sorted(sequential_tests), |
| sorted(isolated_tests), |
| sorted(parallelizable_unit_tests), |
| sorted(parallelizable_integration_tests)) |
| |
| |
| def CountFalseInList(input_list): |
| """Counts number of falses in the input list.""" |
| num_false = 0 |
| for item in input_list: |
| if not item: |
| num_false += 1 |
| return num_false |
| |
| |
| def CreateTestProcesses(parallel_tests, test_index, process_list, process_done, |
| max_parallel_tests, root_coverage_file=None): |
| """Creates test processes to run tests in parallel. |
| |
| Args: |
| parallel_tests: List of all parallel tests. |
| test_index: List index of last created test before this function call. |
| process_list: List of running subprocesses. Created processes are appended |
| to this list. |
| process_done: List of booleans indicating process completion. One 'False' |
| will be added per process created. |
| max_parallel_tests: Maximum number of tests to run in parallel. |
| root_coverage_file: The root .coverage filename if coverage is requested. |
| |
| Returns: |
| Index of last created test. |
| """ |
| orig_test_index = test_index |
| executable_prefix = [sys.executable] if sys.executable and IS_WINDOWS else [] |
| s3_argument = ['-s'] if tests.util.RUN_S3_TESTS else [] |
| |
| process_create_start_time = time.time() |
| last_log_time = process_create_start_time |
| while (CountFalseInList(process_done) < max_parallel_tests and |
| test_index < len(parallel_tests)): |
| env = os.environ.copy() |
| if root_coverage_file: |
| env['GSUTIL_COVERAGE_OUTPUT_FILE'] = root_coverage_file |
| process_list.append(subprocess.Popen( |
| executable_prefix + [gslib.GSUTIL_PATH] + |
| ['-o', 'GSUtil:default_project_id=' + PopulateProjectId()] + |
| ['test'] + s3_argument + |
| ['--' + _SEQUENTIAL_ISOLATION_FLAG] + |
| [parallel_tests[test_index][len('gslib.tests.test_'):]], |
| stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)) |
| test_index += 1 |
| process_done.append(False) |
| if time.time() - last_log_time > 5: |
| print ('Created %d new processes (total %d/%d created)' % |
| (test_index - orig_test_index, len(process_list), |
| len(parallel_tests))) |
| last_log_time = time.time() |
| if test_index == len(parallel_tests): |
| print ('Test process creation finished (%d/%d created)' % |
| (len(process_list), len(parallel_tests))) |
| return test_index |
| |
| |
| class TestCommand(Command): |
| """Implementation of gsutil test command.""" |
| |
| # Command specification. See base class for documentation. |
| command_spec = Command.CreateCommandSpec( |
| 'test', |
| command_name_aliases=[], |
| usage_synopsis=_SYNOPSIS, |
| min_args=0, |
| max_args=NO_MAX, |
| supported_sub_args='uflp:sc', |
| file_url_ok=True, |
| provider_url_ok=False, |
| urls_start_arg=0, |
| supported_private_args=[_SEQUENTIAL_ISOLATION_FLAG] |
| ) |
| # Help specification. See help_provider.py for documentation. |
| help_spec = Command.HelpSpec( |
| help_name='test', |
| help_name_aliases=[], |
| help_type='command_help', |
| help_one_line_summary='Run gsutil tests', |
| help_text=_DETAILED_HELP_TEXT, |
| subcommand_help_text={}, |
| ) |
| |
| def RunParallelTests(self, parallel_integration_tests, |
| max_parallel_tests, coverage_filename): |
| """Executes the parallel/isolated portion of the test suite. |
| |
| Args: |
| parallel_integration_tests: List of tests to execute. |
| max_parallel_tests: Maximum number of parallel tests to run at once. |
| coverage_filename: If not None, filename for coverage output. |
| |
| Returns: |
| (int number of test failures, float elapsed time) |
| """ |
| process_list = [] |
| process_done = [] |
| process_results = [] # Tuples of (name, return code, stdout, stderr) |
| num_parallel_failures = 0 |
| # Number of logging cycles we ran with no progress. |
| progress_less_logging_cycles = 0 |
| completed_as_of_last_log = 0 |
| num_parallel_tests = len(parallel_integration_tests) |
| parallel_start_time = last_log_time = time.time() |
| test_index = CreateTestProcesses( |
| parallel_integration_tests, 0, process_list, process_done, |
| max_parallel_tests, root_coverage_file=coverage_filename) |
| while len(process_results) < num_parallel_tests: |
| for proc_num in xrange(len(process_list)): |
| if process_done[proc_num] or process_list[proc_num].poll() is None: |
| continue |
| process_done[proc_num] = True |
| stdout, stderr = process_list[proc_num].communicate() |
| process_list[proc_num].stdout.close() |
| process_list[proc_num].stderr.close() |
| # TODO: Differentiate test failures from errors. |
| if process_list[proc_num].returncode != 0: |
| num_parallel_failures += 1 |
| process_results.append(TestProcessData( |
| name=parallel_integration_tests[proc_num], |
| return_code=process_list[proc_num].returncode, |
| stdout=stdout, stderr=stderr)) |
| if len(process_list) < num_parallel_tests: |
| test_index = CreateTestProcesses( |
| parallel_integration_tests, test_index, process_list, |
| process_done, max_parallel_tests, |
| root_coverage_file=coverage_filename) |
| if len(process_results) < num_parallel_tests: |
| if time.time() - last_log_time > 5: |
| print '%d/%d finished - %d failures' % ( |
| len(process_results), num_parallel_tests, num_parallel_failures) |
| if len(process_results) == completed_as_of_last_log: |
| progress_less_logging_cycles += 1 |
| else: |
| completed_as_of_last_log = len(process_results) |
| # A process completed, so we made progress. |
| progress_less_logging_cycles = 0 |
| if progress_less_logging_cycles > 4: |
| # Ran 5 or more logging cycles with no progress, let the user |
| # know which tests are running slowly or hanging. |
| still_running = [] |
| for proc_num in xrange(len(process_list)): |
| if not process_done[proc_num]: |
| still_running.append(parallel_integration_tests[proc_num]) |
| print 'Still running: %s' % still_running |
| # TODO: Terminate still-running processes if they |
| # hang for a long time. |
| last_log_time = time.time() |
| time.sleep(1) |
| process_run_finish_time = time.time() |
| if num_parallel_failures: |
| for result in process_results: |
| if result.return_code != 0: |
| new_stderr = result.stderr.split('\n') |
| print 'Results for failed test %s:' % result.name |
| for line in new_stderr: |
| print line |
| |
| return (num_parallel_failures, |
| (process_run_finish_time - parallel_start_time)) |
| |
| def PrintTestResults(self, num_sequential_tests, sequential_success, |
| sequential_time_elapsed, |
| num_parallel_tests, num_parallel_failures, |
| parallel_time_elapsed): |
| """Prints test results for parallel and sequential tests.""" |
| # TODO: Properly track test skips. |
| print 'Parallel tests complete. Success: %s Fail: %s' % ( |
| num_parallel_tests - num_parallel_failures, num_parallel_failures) |
| print ( |
| 'Ran %d tests in %.3fs (%d sequential in %.3fs, %d parallel in %.3fs)' |
| % (num_parallel_tests + num_sequential_tests, |
| float(sequential_time_elapsed + parallel_time_elapsed), |
| num_sequential_tests, |
| float(sequential_time_elapsed), |
| num_parallel_tests, |
| float(parallel_time_elapsed))) |
| print |
| |
| if not num_parallel_failures and sequential_success: |
| print 'OK' |
| else: |
| if num_parallel_failures: |
| print 'FAILED (parallel tests)' |
| if not sequential_success: |
| print 'FAILED (sequential tests)' |
| |
| def RunCommand(self): |
| """Command entry point for the test command.""" |
| if not unittest: |
| raise CommandException('On Python 2.6, the unittest2 module is required ' |
| 'to run the gsutil tests.') |
| |
| failfast = False |
| list_tests = False |
| max_parallel_tests = _DEFAULT_TEST_PARALLEL_PROCESSES |
| perform_coverage = False |
| sequential_only = False |
| if self.sub_opts: |
| for o, a in self.sub_opts: |
| if o == '-c': |
| perform_coverage = True |
| elif o == '-f': |
| failfast = True |
| elif o == '-l': |
| list_tests = True |
| elif o == ('--' + _SEQUENTIAL_ISOLATION_FLAG): |
| # Called to isolate a single test in a separate process. |
| # Don't try to isolate it again (would lead to an infinite loop). |
| sequential_only = True |
| elif o == '-p': |
| max_parallel_tests = long(a) |
| elif o == '-s': |
| if not tests.util.HAS_S3_CREDS: |
| raise CommandException('S3 tests require S3 credentials. Please ' |
| 'add appropriate credentials to your .boto ' |
| 'file and re-run.') |
| tests.util.RUN_S3_TESTS = True |
| elif o == '-u': |
| tests.util.RUN_INTEGRATION_TESTS = False |
| |
| if perform_coverage and not coverage: |
| raise CommandException( |
| 'Coverage has been requested but the coverage module was not found. ' |
| 'You can install it with "pip install coverage".') |
| |
| if (tests.util.RUN_S3_TESTS and |
| max_parallel_tests > _DEFAULT_S3_TEST_PARALLEL_PROCESSES): |
| self.logger.warn( |
| 'Reducing parallel tests to %d due to S3 maximum bucket ' |
| 'limitations.', _DEFAULT_S3_TEST_PARALLEL_PROCESSES) |
| max_parallel_tests = _DEFAULT_S3_TEST_PARALLEL_PROCESSES |
| |
| test_names = sorted(GetTestNames()) |
| if list_tests and not self.args: |
| print 'Found %d test names:' % len(test_names) |
| print ' ', '\n '.join(sorted(test_names)) |
| return 0 |
| |
| # Set list of commands to test if supplied. |
| if self.args: |
| commands_to_test = [] |
| for name in self.args: |
| if name in test_names or name.split('.')[0] in test_names: |
| commands_to_test.append('gslib.tests.test_%s' % name) |
| else: |
| commands_to_test.append(name) |
| else: |
| commands_to_test = ['gslib.tests.test_%s' % name for name in test_names] |
| |
| # Installs a ctrl-c handler that tries to cleanly tear down tests. |
| unittest.installHandler() |
| |
| loader = unittest.TestLoader() |
| |
| if commands_to_test: |
| try: |
| suite = loader.loadTestsFromNames(commands_to_test) |
| except (ImportError, AttributeError) as e: |
| raise CommandException('Invalid test argument name: %s' % e) |
| |
| if list_tests: |
| test_names = GetTestNamesFromSuites(suite) |
| print 'Found %d test names:' % len(test_names) |
| print ' ', '\n '.join(sorted(test_names)) |
| return 0 |
| |
| if logging.getLogger().getEffectiveLevel() <= logging.INFO: |
| verbosity = 1 |
| else: |
| verbosity = 2 |
| logging.disable(logging.ERROR) |
| |
| if perform_coverage: |
| # We want to run coverage over the gslib module, but filter out the test |
| # modules and any third-party code. We also filter out anything under the |
| # temporary directory. Otherwise, the gsutil update test (which copies |
| # code to the temporary directory) gets included in the output. |
| coverage_controller = coverage.coverage( |
| source=['gslib'], omit=['gslib/third_party/*', 'gslib/tests/*', |
| tempfile.gettempdir() + '*']) |
| coverage_controller.erase() |
| coverage_controller.start() |
| |
| num_parallel_failures = 0 |
| sequential_success = False |
| |
| (sequential_tests, isolated_tests, |
| parallel_unit_tests, parallel_integration_tests) = ( |
| SplitParallelizableTestSuite(suite)) |
| |
| # Since parallel integration tests are run in a separate process, they |
| # won't get the override to tests.util, so skip them here. |
| if not tests.util.RUN_INTEGRATION_TESTS: |
| parallel_integration_tests = [] |
| |
| logging.debug('Sequential tests to run: %s', sequential_tests) |
| logging.debug('Isolated tests to run: %s', isolated_tests) |
| logging.debug('Parallel unit tests to run: %s', parallel_unit_tests) |
| logging.debug('Parallel integration tests to run: %s', |
| parallel_integration_tests) |
| |
| # If we're running an already-isolated test (spawned in isolation by a |
| # previous test process), or we have no parallel tests to run, |
| # just run sequentially. For now, unit tests are always run sequentially. |
| run_tests_sequentially = (sequential_only or |
| (len(parallel_integration_tests) <= 1 |
| and not isolated_tests)) |
| |
| if run_tests_sequentially: |
| total_tests = suite.countTestCases() |
| resultclass = MakeCustomTestResultClass(total_tests) |
| |
| runner = unittest.TextTestRunner(verbosity=verbosity, |
| resultclass=resultclass, |
| failfast=failfast) |
| ret = runner.run(suite) |
| sequential_success = ret.wasSuccessful() |
| else: |
| if max_parallel_tests == 1: |
| # We can't take advantage of parallelism, though we may have tests that |
| # need isolation. |
| sequential_tests += parallel_integration_tests |
| parallel_integration_tests = [] |
| |
| sequential_start_time = time.time() |
| # TODO: For now, run unit tests sequentially because they are fast. |
| # We could potentially shave off several seconds of execution time |
| # by executing them in parallel with the integration tests. |
| if len(sequential_tests) + len(parallel_unit_tests): |
| print 'Running %d tests sequentially.' % (len(sequential_tests) + |
| len(parallel_unit_tests)) |
| sequential_tests_to_run = sequential_tests + parallel_unit_tests |
| suite = loader.loadTestsFromNames( |
| sorted([test_name for test_name in sequential_tests_to_run])) |
| num_sequential_tests = suite.countTestCases() |
| resultclass = MakeCustomTestResultClass(num_sequential_tests) |
| runner = unittest.TextTestRunner(verbosity=verbosity, |
| resultclass=resultclass, |
| failfast=failfast) |
| |
| ret = runner.run(suite) |
| sequential_success = ret.wasSuccessful() |
| else: |
| num_sequential_tests = 0 |
| sequential_success = True |
| sequential_time_elapsed = time.time() - sequential_start_time |
| |
| # At this point, all tests get their own process so just treat the |
| # isolated tests as parallel tests. |
| parallel_integration_tests += isolated_tests |
| num_parallel_tests = len(parallel_integration_tests) |
| |
| if not num_parallel_tests: |
| pass |
| else: |
| num_processes = min(max_parallel_tests, num_parallel_tests) |
| if num_parallel_tests > 1 and max_parallel_tests > 1: |
| message = 'Running %d tests in parallel mode (%d processes).' |
| if num_processes > _DEFAULT_TEST_PARALLEL_PROCESSES: |
| message += ( |
| ' Please be patient while your CPU is incinerated. ' |
| 'If your machine becomes unresponsive, consider reducing ' |
| 'the amount of parallel test processes by running ' |
| '\'gsutil test -p <num_processes>\'.') |
| print ('\n'.join(textwrap.wrap( |
| message % (num_parallel_tests, num_processes)))) |
| else: |
| print ('Running %d tests sequentially in isolated processes.' % |
| num_parallel_tests) |
| (num_parallel_failures, parallel_time_elapsed) = self.RunParallelTests( |
| parallel_integration_tests, max_parallel_tests, |
| coverage_controller.data.filename if perform_coverage else None) |
| self.PrintTestResults( |
| num_sequential_tests, sequential_success, |
| sequential_time_elapsed, |
| num_parallel_tests, num_parallel_failures, |
| parallel_time_elapsed) |
| |
| if perform_coverage: |
| coverage_controller.stop() |
| coverage_controller.combine() |
| coverage_controller.save() |
| print ('Coverage information was saved to: %s' % |
| coverage_controller.data.filename) |
| |
| if sequential_success and not num_parallel_failures: |
| ResetFailureCount() |
| return 0 |
| return 1 |