| import faulthandler |
| import locale |
| import os |
| import platform |
| import random |
| import re |
| import sys |
| import sysconfig |
| import tempfile |
| import time |
| import unittest |
| from test.libregrtest.cmdline import _parse_args |
| from test.libregrtest.runtest import ( |
| findtests, split_test_packages, runtest, abs_module_name, |
| PROGRESS_MIN_TIME, State, MatchTestsDict, RunTests) |
| from test.libregrtest.setup import setup_tests |
| from test.libregrtest.pgo import setup_pgo_tests |
| from test.libregrtest.utils import (strip_py_suffix, count, format_duration, |
| printlist, get_build_info) |
| from test import support |
| from test.support import TestStats |
| from test.support import os_helper |
| from test.support import threading_helper |
| |
| |
| # bpo-38203: Maximum delay in seconds to exit Python (call Py_Finalize()). |
| # Used to protect against threading._shutdown() hang. |
| # Must be smaller than buildbot "1200 seconds without output" limit. |
| EXIT_TIMEOUT = 120.0 |
| |
| EXITCODE_BAD_TEST = 2 |
| EXITCODE_ENV_CHANGED = 3 |
| EXITCODE_NO_TESTS_RAN = 4 |
| EXITCODE_RERUN_FAIL = 5 |
| EXITCODE_INTERRUPTED = 130 |
| |
| |
| class Regrtest: |
| """Execute a test suite. |
| |
| This also parses command-line options and modifies its behavior |
| accordingly. |
| |
| tests -- a list of strings containing test names (optional) |
| testdir -- the directory in which to look for tests (optional) |
| |
| Users other than the Python test suite will certainly want to |
| specify testdir; if it's omitted, the directory containing the |
| Python test suite is searched for. |
| |
| If the tests argument is omitted, the tests listed on the |
| command-line will be used. If that's empty, too, then all *.py |
| files beginning with test_ will be used. |
| |
| The other default arguments (verbose, quiet, exclude, |
| single, randomize, use_resources, trace, coverdir, |
| print_slow, and random_seed) allow programmers calling main() |
| directly to set the values that would normally be set by flags |
| on the command line. |
| """ |
| def __init__(self): |
| # Namespace of command line options |
| self.ns = None |
| |
| # tests |
| self.tests = [] |
| self.selected = [] |
| self.all_runtests: list[RunTests] = [] |
| |
| # test results |
| self.good: list[str] = [] |
| self.bad: list[str] = [] |
| self.rerun_bad: list[str] = [] |
| self.skipped: list[str] = [] |
| self.resource_denied: list[str] = [] |
| self.environment_changed: list[str] = [] |
| self.run_no_tests: list[str] = [] |
| self.rerun: list[str] = [] |
| |
| self.need_rerun: list[TestResult] = [] |
| self.first_state: str | None = None |
| self.interrupted = False |
| self.total_stats = TestStats() |
| |
| # used by --slow |
| self.test_times = [] |
| |
| # used by --coverage, trace.Trace instance |
| self.tracer = None |
| |
| # used to display the progress bar "[ 3/100]" |
| self.start_time = time.perf_counter() |
| self.test_count_text = '' |
| self.test_count_width = 1 |
| |
| # used by --single |
| self.next_single_test = None |
| self.next_single_filename = None |
| |
| # used by --junit-xml |
| self.testsuite_xml = None |
| |
| # misc |
| self.win_load_tracker = None |
| self.tmp_dir = None |
| |
| def get_executed(self): |
| return (set(self.good) | set(self.bad) | set(self.skipped) |
| | set(self.resource_denied) | set(self.environment_changed) |
| | set(self.run_no_tests)) |
| |
| def accumulate_result(self, result, rerun=False): |
| fail_env_changed = self.ns.fail_env_changed |
| test_name = result.test_name |
| |
| match result.state: |
| case State.PASSED: |
| self.good.append(test_name) |
| case State.ENV_CHANGED: |
| self.environment_changed.append(test_name) |
| case State.SKIPPED: |
| self.skipped.append(test_name) |
| case State.RESOURCE_DENIED: |
| self.resource_denied.append(test_name) |
| case State.INTERRUPTED: |
| self.interrupted = True |
| case State.DID_NOT_RUN: |
| self.run_no_tests.append(test_name) |
| case _: |
| if result.is_failed(fail_env_changed): |
| self.bad.append(test_name) |
| self.need_rerun.append(result) |
| else: |
| raise ValueError(f"invalid test state: {result.state!r}") |
| |
| if result.has_meaningful_duration() and not rerun: |
| self.test_times.append((result.duration, test_name)) |
| if result.stats is not None: |
| self.total_stats.accumulate(result.stats) |
| if rerun: |
| self.rerun.append(test_name) |
| |
| xml_data = result.xml_data |
| if xml_data: |
| import xml.etree.ElementTree as ET |
| for e in xml_data: |
| try: |
| self.testsuite_xml.append(ET.fromstring(e)) |
| except ET.ParseError: |
| print(xml_data, file=sys.__stderr__) |
| raise |
| |
| def log(self, line=''): |
| empty = not line |
| |
| # add the system load prefix: "load avg: 1.80 " |
| load_avg = self.getloadavg() |
| if load_avg is not None: |
| line = f"load avg: {load_avg:.2f} {line}" |
| |
| # add the timestamp prefix: "0:01:05 " |
| test_time = time.perf_counter() - self.start_time |
| |
| mins, secs = divmod(int(test_time), 60) |
| hours, mins = divmod(mins, 60) |
| test_time = "%d:%02d:%02d" % (hours, mins, secs) |
| |
| line = f"{test_time} {line}" |
| if empty: |
| line = line[:-1] |
| |
| print(line, flush=True) |
| |
| def display_progress(self, test_index, text): |
| quiet = self.ns.quiet |
| pgo = self.ns.pgo |
| if quiet: |
| return |
| |
| # "[ 51/405/1] test_tcl passed" |
| line = f"{test_index:{self.test_count_width}}{self.test_count_text}" |
| fails = len(self.bad) + len(self.environment_changed) |
| if fails and not pgo: |
| line = f"{line}/{fails}" |
| self.log(f"[{line}] {text}") |
| |
| def parse_args(self, kwargs): |
| ns = _parse_args(sys.argv[1:], **kwargs) |
| |
| if ns.xmlpath: |
| support.junit_xml_list = self.testsuite_xml = [] |
| |
| strip_py_suffix(ns.args) |
| |
| if ns.huntrleaks: |
| warmup, repetitions, _ = ns.huntrleaks |
| if warmup < 1 or repetitions < 1: |
| msg = ("Invalid values for the --huntrleaks/-R parameters. The " |
| "number of warmups and repetitions must be at least 1 " |
| "each (1:1).") |
| print(msg, file=sys.stderr, flush=True) |
| sys.exit(2) |
| |
| if ns.tempdir: |
| ns.tempdir = os.path.expanduser(ns.tempdir) |
| |
| self.ns = ns |
| |
| def find_tests(self, tests): |
| ns = self.ns |
| single = ns.single |
| fromfile = ns.fromfile |
| pgo = ns.pgo |
| exclude = ns.exclude |
| test_dir = ns.testdir |
| starting_test = ns.start |
| randomize = ns.randomize |
| |
| self.tests = tests |
| |
| if single: |
| self.next_single_filename = os.path.join(self.tmp_dir, 'pynexttest') |
| try: |
| with open(self.next_single_filename, 'r') as fp: |
| next_test = fp.read().strip() |
| self.tests = [next_test] |
| except OSError: |
| pass |
| |
| if fromfile: |
| self.tests = [] |
| # regex to match 'test_builtin' in line: |
| # '0:00:00 [ 4/400] test_builtin -- test_dict took 1 sec' |
| regex = re.compile(r'\btest_[a-zA-Z0-9_]+\b') |
| with open(os.path.join(os_helper.SAVEDCWD, fromfile)) as fp: |
| for line in fp: |
| line = line.split('#', 1)[0] |
| line = line.strip() |
| match = regex.search(line) |
| if match is not None: |
| self.tests.append(match.group()) |
| |
| strip_py_suffix(self.tests) |
| |
| if pgo: |
| # add default PGO tests if no tests are specified |
| setup_pgo_tests(ns) |
| |
| exclude_tests = set() |
| if exclude: |
| for arg in ns.args: |
| exclude_tests.add(arg) |
| ns.args = [] |
| |
| alltests = findtests(testdir=test_dir, exclude=exclude_tests) |
| |
| if not fromfile: |
| self.selected = self.tests or ns.args |
| if self.selected: |
| self.selected = split_test_packages(self.selected) |
| else: |
| self.selected = alltests |
| else: |
| self.selected = self.tests |
| |
| if single: |
| self.selected = self.selected[:1] |
| try: |
| pos = alltests.index(self.selected[0]) |
| self.next_single_test = alltests[pos + 1] |
| except IndexError: |
| pass |
| |
| # Remove all the selected tests that precede start if it's set. |
| if starting_test: |
| try: |
| del self.selected[:self.selected.index(starting_test)] |
| except ValueError: |
| print(f"Cannot find starting test: {starting_test}") |
| sys.exit(1) |
| |
| if randomize: |
| if ns.random_seed is None: |
| ns.random_seed = random.randrange(10000000) |
| random.seed(ns.random_seed) |
| random.shuffle(self.selected) |
| |
| def list_tests(self): |
| for name in self.selected: |
| print(name) |
| |
| def _list_cases(self, suite): |
| for test in suite: |
| if isinstance(test, unittest.loader._FailedTest): |
| continue |
| if isinstance(test, unittest.TestSuite): |
| self._list_cases(test) |
| elif isinstance(test, unittest.TestCase): |
| if support.match_test(test): |
| print(test.id()) |
| |
| def list_cases(self): |
| ns = self.ns |
| test_dir = ns.testdir |
| support.verbose = False |
| support.set_match_tests(ns.match_tests, ns.ignore_tests) |
| |
| skipped = [] |
| for test_name in self.selected: |
| module_name = abs_module_name(test_name, test_dir) |
| try: |
| suite = unittest.defaultTestLoader.loadTestsFromName(module_name) |
| self._list_cases(suite) |
| except unittest.SkipTest: |
| skipped.append(test_name) |
| |
| if skipped: |
| sys.stdout.flush() |
| stderr = sys.stderr |
| print(file=stderr) |
| print(count(len(skipped), "test"), "skipped:", file=stderr) |
| printlist(skipped, file=stderr) |
| |
| def get_rerun_match(self, rerun_list) -> MatchTestsDict: |
| rerun_match_tests = {} |
| for result in rerun_list: |
| match_tests = result.get_rerun_match_tests() |
| # ignore empty match list |
| if match_tests: |
| rerun_match_tests[result.test_name] = match_tests |
| return rerun_match_tests |
| |
| def _rerun_failed_tests(self, need_rerun): |
| # Configure the runner to re-run tests |
| ns = self.ns |
| ns.verbose = True |
| ns.failfast = False |
| ns.verbose3 = False |
| ns.forever = False |
| if ns.use_mp is None: |
| ns.use_mp = 1 |
| |
| # Get tests to re-run |
| tests = [result.test_name for result in need_rerun] |
| match_tests = self.get_rerun_match(need_rerun) |
| self.set_tests(tests) |
| |
| # Clear previously failed tests |
| self.rerun_bad.extend(self.bad) |
| self.bad.clear() |
| self.need_rerun.clear() |
| |
| # Re-run failed tests |
| self.log(f"Re-running {len(tests)} failed tests in verbose mode in subprocesses") |
| runtests = RunTests(tests, match_tests=match_tests, rerun=True) |
| self.all_runtests.append(runtests) |
| self._run_tests_mp(runtests) |
| |
| def rerun_failed_tests(self, need_rerun): |
| if self.ns.python: |
| # Temp patch for https://github.com/python/cpython/issues/94052 |
| self.log( |
| "Re-running failed tests is not supported with --python " |
| "host runner option." |
| ) |
| return |
| |
| self.first_state = self.get_tests_state() |
| |
| print() |
| self._rerun_failed_tests(need_rerun) |
| |
| if self.bad: |
| print(count(len(self.bad), 'test'), "failed again:") |
| printlist(self.bad) |
| |
| self.display_result() |
| |
| def display_result(self): |
| pgo = self.ns.pgo |
| quiet = self.ns.quiet |
| print_slow = self.ns.print_slow |
| |
| # If running the test suite for PGO then no one cares about results. |
| if pgo: |
| return |
| |
| print() |
| print("== Tests result: %s ==" % self.get_tests_state()) |
| |
| if self.interrupted: |
| print("Test suite interrupted by signal SIGINT.") |
| |
| omitted = set(self.selected) - self.get_executed() |
| if omitted: |
| print() |
| print(count(len(omitted), "test"), "omitted:") |
| printlist(omitted) |
| |
| if self.good and not quiet: |
| print() |
| if (not self.bad |
| and not self.skipped |
| and not self.interrupted |
| and len(self.good) > 1): |
| print("All", end=' ') |
| print(count(len(self.good), "test"), "OK.") |
| |
| if print_slow: |
| self.test_times.sort(reverse=True) |
| print() |
| print("10 slowest tests:") |
| for test_time, test in self.test_times[:10]: |
| print("- %s: %s" % (test, format_duration(test_time))) |
| |
| if self.bad: |
| print() |
| print(count(len(self.bad), "test"), "failed:") |
| printlist(self.bad) |
| |
| if self.environment_changed: |
| print() |
| print("{} altered the execution environment:".format( |
| count(len(self.environment_changed), "test"))) |
| printlist(self.environment_changed) |
| |
| if self.skipped and not quiet: |
| print() |
| print(count(len(self.skipped), "test"), "skipped:") |
| printlist(self.skipped) |
| |
| if self.resource_denied and not quiet: |
| print() |
| print(count(len(self.resource_denied), "test"), "skipped (resource denied):") |
| printlist(self.resource_denied) |
| |
| if self.rerun: |
| print() |
| print("%s:" % count(len(self.rerun), "re-run test")) |
| printlist(self.rerun) |
| |
| if self.run_no_tests: |
| print() |
| print(count(len(self.run_no_tests), "test"), "run no tests:") |
| printlist(self.run_no_tests) |
| |
| def run_test(self, test_index, test_name, previous_test, save_modules): |
| text = test_name |
| if previous_test: |
| text = '%s -- %s' % (text, previous_test) |
| self.display_progress(test_index, text) |
| |
| if self.tracer: |
| # If we're tracing code coverage, then we don't exit with status |
| # if on a false return value from main. |
| cmd = ('result = runtest(self.ns, test_name); ' |
| 'self.accumulate_result(result)') |
| ns = dict(locals()) |
| self.tracer.runctx(cmd, globals=globals(), locals=ns) |
| result = ns['result'] |
| else: |
| result = runtest(self.ns, test_name) |
| self.accumulate_result(result) |
| |
| # Unload the newly imported modules (best effort finalization) |
| for module in sys.modules.keys(): |
| if module not in save_modules and module.startswith("test."): |
| support.unload(module) |
| |
| return result |
| |
| def run_tests_sequentially(self, runtests): |
| ns = self.ns |
| coverage = ns.trace |
| fail_fast = ns.failfast |
| fail_env_changed = ns.fail_env_changed |
| timeout = ns.timeout |
| |
| if coverage: |
| import trace |
| self.tracer = trace.Trace(trace=False, count=True) |
| |
| save_modules = sys.modules.keys() |
| |
| msg = "Run tests sequentially" |
| if timeout: |
| msg += " (timeout: %s)" % format_duration(timeout) |
| self.log(msg) |
| |
| previous_test = None |
| tests_iter = runtests.iter_tests() |
| for test_index, test_name in enumerate(tests_iter, 1): |
| start_time = time.perf_counter() |
| |
| result = self.run_test(test_index, test_name, |
| previous_test, save_modules) |
| |
| if result.must_stop(fail_fast, fail_env_changed): |
| break |
| |
| previous_test = str(result) |
| test_time = time.perf_counter() - start_time |
| if test_time >= PROGRESS_MIN_TIME: |
| previous_test = "%s in %s" % (previous_test, format_duration(test_time)) |
| elif result.state == State.PASSED: |
| # be quiet: say nothing if the test passed shortly |
| previous_test = None |
| |
| if previous_test: |
| print(previous_test) |
| |
| def display_header(self): |
| # Print basic platform information |
| print("==", platform.python_implementation(), *sys.version.split()) |
| print("==", platform.platform(aliased=True), |
| "%s-endian" % sys.byteorder) |
| print("== Python build:", ' '.join(get_build_info())) |
| print("== cwd:", os.getcwd()) |
| cpu_count = os.cpu_count() |
| if cpu_count: |
| print("== CPU count:", cpu_count) |
| print("== encodings: locale=%s, FS=%s" |
| % (locale.getencoding(), sys.getfilesystemencoding())) |
| self.display_sanitizers() |
| |
| def display_sanitizers(self): |
| # This makes it easier to remember what to set in your local |
| # environment when trying to reproduce a sanitizer failure. |
| asan = support.check_sanitizer(address=True) |
| msan = support.check_sanitizer(memory=True) |
| ubsan = support.check_sanitizer(ub=True) |
| sanitizers = [] |
| if asan: |
| sanitizers.append("address") |
| if msan: |
| sanitizers.append("memory") |
| if ubsan: |
| sanitizers.append("undefined behavior") |
| if not sanitizers: |
| return |
| |
| print(f"== sanitizers: {', '.join(sanitizers)}") |
| for sanitizer, env_var in ( |
| (asan, "ASAN_OPTIONS"), |
| (msan, "MSAN_OPTIONS"), |
| (ubsan, "UBSAN_OPTIONS"), |
| ): |
| options= os.environ.get(env_var) |
| if sanitizer and options is not None: |
| print(f"== {env_var}={options!r}") |
| |
| def no_tests_run(self): |
| return not any((self.good, self.bad, self.skipped, self.interrupted, |
| self.environment_changed)) |
| |
| def get_tests_state(self): |
| fail_env_changed = self.ns.fail_env_changed |
| |
| result = [] |
| if self.bad: |
| result.append("FAILURE") |
| elif fail_env_changed and self.environment_changed: |
| result.append("ENV CHANGED") |
| elif self.no_tests_run(): |
| result.append("NO TESTS RAN") |
| |
| if self.interrupted: |
| result.append("INTERRUPTED") |
| |
| if not result: |
| result.append("SUCCESS") |
| |
| result = ', '.join(result) |
| if self.first_state: |
| result = '%s then %s' % (self.first_state, result) |
| return result |
| |
| def _run_tests_mp(self, runtests: RunTests) -> None: |
| from test.libregrtest.runtest_mp import run_tests_multiprocess |
| # If we're on windows and this is the parent runner (not a worker), |
| # track the load average. |
| if sys.platform == 'win32': |
| from test.libregrtest.win_utils import WindowsLoadTracker |
| |
| try: |
| self.win_load_tracker = WindowsLoadTracker() |
| except PermissionError as error: |
| # Standard accounts may not have access to the performance |
| # counters. |
| print(f'Failed to create WindowsLoadTracker: {error}') |
| |
| try: |
| run_tests_multiprocess(self, runtests) |
| finally: |
| if self.win_load_tracker is not None: |
| self.win_load_tracker.close() |
| self.win_load_tracker = None |
| |
| def set_tests(self, tests): |
| self.tests = tests |
| if self.ns.forever: |
| self.test_count_text = '' |
| self.test_count_width = 3 |
| else: |
| self.test_count_text = '/{}'.format(len(self.tests)) |
| self.test_count_width = len(self.test_count_text) - 1 |
| |
| def run_tests(self): |
| # For a partial run, we do not need to clutter the output. |
| if (self.ns.header |
| or not(self.ns.pgo or self.ns.quiet or self.ns.single |
| or self.tests or self.ns.args)): |
| self.display_header() |
| |
| if self.ns.huntrleaks: |
| warmup, repetitions, _ = self.ns.huntrleaks |
| if warmup < 3: |
| msg = ("WARNING: Running tests with --huntrleaks/-R and less than " |
| "3 warmup repetitions can give false positives!") |
| print(msg, file=sys.stdout, flush=True) |
| |
| if self.ns.randomize: |
| print("Using random seed", self.ns.random_seed) |
| |
| tests = self.selected |
| self.set_tests(tests) |
| runtests = RunTests(tests, forever=self.ns.forever) |
| self.all_runtests.append(runtests) |
| if self.ns.use_mp: |
| self._run_tests_mp(runtests) |
| else: |
| self.run_tests_sequentially(runtests) |
| |
| def finalize(self): |
| if self.next_single_filename: |
| if self.next_single_test: |
| with open(self.next_single_filename, 'w') as fp: |
| fp.write(self.next_single_test + '\n') |
| else: |
| os.unlink(self.next_single_filename) |
| |
| if self.tracer: |
| r = self.tracer.results() |
| r.write_results(show_missing=True, summary=True, |
| coverdir=self.ns.coverdir) |
| |
| if self.ns.runleaks: |
| os.system("leaks %d" % os.getpid()) |
| |
| self.save_xml_result() |
| |
| def display_summary(self): |
| duration = time.perf_counter() - self.start_time |
| first_runtests = self.all_runtests[0] |
| # the second runtests (re-run failed tests) disables forever, |
| # use the first runtests |
| forever = first_runtests.forever |
| filtered = bool(self.ns.match_tests) or bool(self.ns.ignore_tests) |
| |
| # Total duration |
| print() |
| print("Total duration: %s" % format_duration(duration)) |
| |
| # Total tests |
| total = self.total_stats |
| text = f'run={total.tests_run:,}' |
| if filtered: |
| text = f"{text} (filtered)" |
| stats = [text] |
| if total.failures: |
| stats.append(f'failures={total.failures:,}') |
| if total.skipped: |
| stats.append(f'skipped={total.skipped:,}') |
| print(f"Total tests: {' '.join(stats)}") |
| |
| # Total test files |
| all_tests = [self.good, self.bad, self.rerun, |
| self.skipped, |
| self.environment_changed, self.run_no_tests] |
| run = sum(map(len, all_tests)) |
| text = f'run={run}' |
| if not forever: |
| ntest = len(first_runtests.tests) |
| text = f"{text}/{ntest}" |
| if filtered: |
| text = f"{text} (filtered)" |
| report = [text] |
| for name, tests in ( |
| ('failed', self.bad), |
| ('env_changed', self.environment_changed), |
| ('skipped', self.skipped), |
| ('resource_denied', self.resource_denied), |
| ('rerun', self.rerun), |
| ('run_no_tests', self.run_no_tests), |
| ): |
| if tests: |
| report.append(f'{name}={len(tests)}') |
| print(f"Total test files: {' '.join(report)}") |
| |
| # Result |
| result = self.get_tests_state() |
| print(f"Result: {result}") |
| |
| def save_xml_result(self): |
| if not self.ns.xmlpath and not self.testsuite_xml: |
| return |
| |
| import xml.etree.ElementTree as ET |
| root = ET.Element("testsuites") |
| |
| # Manually count the totals for the overall summary |
| totals = {'tests': 0, 'errors': 0, 'failures': 0} |
| for suite in self.testsuite_xml: |
| root.append(suite) |
| for k in totals: |
| try: |
| totals[k] += int(suite.get(k, 0)) |
| except ValueError: |
| pass |
| |
| for k, v in totals.items(): |
| root.set(k, str(v)) |
| |
| xmlpath = os.path.join(os_helper.SAVEDCWD, self.ns.xmlpath) |
| with open(xmlpath, 'wb') as f: |
| for s in ET.tostringlist(root): |
| f.write(s) |
| |
| def fix_umask(self): |
| if support.is_emscripten: |
| # Emscripten has default umask 0o777, which breaks some tests. |
| # see https://github.com/emscripten-core/emscripten/issues/17269 |
| old_mask = os.umask(0) |
| if old_mask == 0o777: |
| os.umask(0o027) |
| else: |
| os.umask(old_mask) |
| |
| def set_temp_dir(self): |
| if self.ns.tempdir: |
| self.tmp_dir = self.ns.tempdir |
| |
| if not self.tmp_dir: |
| # When tests are run from the Python build directory, it is best practice |
| # to keep the test files in a subfolder. This eases the cleanup of leftover |
| # files using the "make distclean" command. |
| if sysconfig.is_python_build(): |
| self.tmp_dir = sysconfig.get_config_var('abs_builddir') |
| if self.tmp_dir is None: |
| # bpo-30284: On Windows, only srcdir is available. Using |
| # abs_builddir mostly matters on UNIX when building Python |
| # out of the source tree, especially when the source tree |
| # is read only. |
| self.tmp_dir = sysconfig.get_config_var('srcdir') |
| self.tmp_dir = os.path.join(self.tmp_dir, 'build') |
| else: |
| self.tmp_dir = tempfile.gettempdir() |
| |
| self.tmp_dir = os.path.abspath(self.tmp_dir) |
| |
| def is_worker(self): |
| return (self.ns.worker_args is not None) |
| |
| def create_temp_dir(self): |
| os.makedirs(self.tmp_dir, exist_ok=True) |
| |
| # Define a writable temp dir that will be used as cwd while running |
| # the tests. The name of the dir includes the pid to allow parallel |
| # testing (see the -j option). |
| # Emscripten and WASI have stubbed getpid(), Emscripten has only |
| # milisecond clock resolution. Use randint() instead. |
| if sys.platform in {"emscripten", "wasi"}: |
| nounce = random.randint(0, 1_000_000) |
| else: |
| nounce = os.getpid() |
| |
| if self.is_worker(): |
| test_cwd = 'test_python_worker_{}'.format(nounce) |
| else: |
| test_cwd = 'test_python_{}'.format(nounce) |
| test_cwd += os_helper.FS_NONASCII |
| test_cwd = os.path.join(self.tmp_dir, test_cwd) |
| return test_cwd |
| |
| def cleanup(self): |
| import glob |
| |
| path = os.path.join(glob.escape(self.tmp_dir), 'test_python_*') |
| print("Cleanup %s directory" % self.tmp_dir) |
| for name in glob.glob(path): |
| if os.path.isdir(name): |
| print("Remove directory: %s" % name) |
| os_helper.rmtree(name) |
| else: |
| print("Remove file: %s" % name) |
| os_helper.unlink(name) |
| |
| def main(self, tests=None, **kwargs): |
| self.parse_args(kwargs) |
| |
| self.set_temp_dir() |
| |
| self.fix_umask() |
| |
| if self.ns.cleanup: |
| self.cleanup() |
| sys.exit(0) |
| |
| test_cwd = self.create_temp_dir() |
| |
| try: |
| # Run the tests in a context manager that temporarily changes the CWD |
| # to a temporary and writable directory. If it's not possible to |
| # create or change the CWD, the original CWD will be used. |
| # The original CWD is available from os_helper.SAVEDCWD. |
| with os_helper.temp_cwd(test_cwd, quiet=True): |
| # When using multiprocessing, worker processes will use test_cwd |
| # as their parent temporary directory. So when the main process |
| # exit, it removes also subdirectories of worker processes. |
| self.ns.tempdir = test_cwd |
| |
| self._main(tests, kwargs) |
| except SystemExit as exc: |
| # bpo-38203: Python can hang at exit in Py_Finalize(), especially |
| # on threading._shutdown() call: put a timeout |
| if threading_helper.can_start_thread: |
| faulthandler.dump_traceback_later(EXIT_TIMEOUT, exit=True) |
| |
| sys.exit(exc.code) |
| |
| def getloadavg(self): |
| if self.win_load_tracker is not None: |
| return self.win_load_tracker.getloadavg() |
| |
| if hasattr(os, 'getloadavg'): |
| return os.getloadavg()[0] |
| |
| return None |
| |
| def get_exitcode(self): |
| exitcode = 0 |
| if self.bad: |
| exitcode = EXITCODE_BAD_TEST |
| elif self.interrupted: |
| exitcode = EXITCODE_INTERRUPTED |
| elif self.ns.fail_env_changed and self.environment_changed: |
| exitcode = EXITCODE_ENV_CHANGED |
| elif self.no_tests_run(): |
| exitcode = EXITCODE_NO_TESTS_RAN |
| elif self.rerun and self.ns.fail_rerun: |
| exitcode = EXITCODE_RERUN_FAIL |
| return exitcode |
| |
| def action_run_tests(self): |
| self.run_tests() |
| self.display_result() |
| |
| need_rerun = self.need_rerun |
| if self.ns.rerun and need_rerun: |
| self.rerun_failed_tests(need_rerun) |
| |
| self.display_summary() |
| self.finalize() |
| |
| def _main(self, tests, kwargs): |
| if self.is_worker(): |
| from test.libregrtest.runtest_mp import run_tests_worker |
| run_tests_worker(self.ns.worker_args) |
| return |
| |
| if self.ns.wait: |
| input("Press any key to continue...") |
| |
| setup_tests(self.ns) |
| self.find_tests(tests) |
| |
| exitcode = 0 |
| if self.ns.list_tests: |
| self.list_tests() |
| elif self.ns.list_cases: |
| self.list_cases() |
| else: |
| self.action_run_tests() |
| exitcode = self.get_exitcode() |
| |
| sys.exit(exitcode) |
| |
| |
| def main(tests=None, **kwargs): |
| """Run the Python suite.""" |
| Regrtest().main(tests=tests, **kwargs) |