| import sys |
| import trace |
| |
| from .runtests import RunTests |
| from .result import State, TestResult, TestStats, Location |
| from .utils import ( |
| StrPath, TestName, TestTuple, TestList, FilterDict, |
| printlist, count, format_duration) |
| |
| |
| # Python uses exit code 1 when an exception is not caught |
| # argparse.ArgumentParser.error() uses exit code 2 |
| EXITCODE_BAD_TEST = 2 |
| EXITCODE_ENV_CHANGED = 3 |
| EXITCODE_NO_TESTS_RAN = 4 |
| EXITCODE_RERUN_FAIL = 5 |
| EXITCODE_INTERRUPTED = 130 # 128 + signal.SIGINT=2 |
| |
| |
| class TestResults: |
| def __init__(self): |
| self.bad: TestList = [] |
| self.good: TestList = [] |
| self.rerun_bad: TestList = [] |
| self.skipped: TestList = [] |
| self.resource_denied: TestList = [] |
| self.env_changed: TestList = [] |
| self.run_no_tests: TestList = [] |
| self.rerun: TestList = [] |
| self.rerun_results: list[TestResult] = [] |
| |
| self.interrupted: bool = False |
| self.worker_bug: bool = False |
| self.test_times: list[tuple[float, TestName]] = [] |
| self.stats = TestStats() |
| # used by --junit-xml |
| self.testsuite_xml: list = [] |
| # used by -T with -j |
| self.covered_lines: set[Location] = set() |
| |
| def is_all_good(self): |
| return (not self.bad |
| and not self.skipped |
| and not self.interrupted |
| and not self.worker_bug) |
| |
| def get_executed(self): |
| return (set(self.good) | set(self.bad) | set(self.skipped) |
| | set(self.resource_denied) | set(self.env_changed) |
| | set(self.run_no_tests)) |
| |
| def no_tests_run(self): |
| return not any((self.good, self.bad, self.skipped, self.interrupted, |
| self.env_changed)) |
| |
| def get_state(self, fail_env_changed): |
| state = [] |
| if self.bad: |
| state.append("FAILURE") |
| elif fail_env_changed and self.env_changed: |
| state.append("ENV CHANGED") |
| elif self.no_tests_run(): |
| state.append("NO TESTS RAN") |
| |
| if self.interrupted: |
| state.append("INTERRUPTED") |
| if self.worker_bug: |
| state.append("WORKER BUG") |
| if not state: |
| state.append("SUCCESS") |
| |
| return ', '.join(state) |
| |
| def get_exitcode(self, fail_env_changed, fail_rerun): |
| exitcode = 0 |
| if self.bad: |
| exitcode = EXITCODE_BAD_TEST |
| elif self.interrupted: |
| exitcode = EXITCODE_INTERRUPTED |
| elif fail_env_changed and self.env_changed: |
| exitcode = EXITCODE_ENV_CHANGED |
| elif self.no_tests_run(): |
| exitcode = EXITCODE_NO_TESTS_RAN |
| elif fail_rerun and self.rerun: |
| exitcode = EXITCODE_RERUN_FAIL |
| elif self.worker_bug: |
| exitcode = EXITCODE_BAD_TEST |
| return exitcode |
| |
| def accumulate_result(self, result: TestResult, runtests: RunTests): |
| test_name = result.test_name |
| rerun = runtests.rerun |
| fail_env_changed = runtests.fail_env_changed |
| |
| match result.state: |
| case State.PASSED: |
| self.good.append(test_name) |
| case State.ENV_CHANGED: |
| self.env_changed.append(test_name) |
| self.rerun_results.append(result) |
| case State.SKIPPED: |
| self.skipped.append(test_name) |
| case State.RESOURCE_DENIED: |
| self.resource_denied.append(test_name) |
| case State.INTERRUPTED: |
| self.interrupted = True |
| case State.DID_NOT_RUN: |
| self.run_no_tests.append(test_name) |
| case _: |
| if result.is_failed(fail_env_changed): |
| self.bad.append(test_name) |
| self.rerun_results.append(result) |
| else: |
| raise ValueError(f"invalid test state: {result.state!r}") |
| |
| if result.state == State.WORKER_BUG: |
| self.worker_bug = True |
| |
| if result.has_meaningful_duration() and not rerun: |
| if result.duration is None: |
| raise ValueError("result.duration is None") |
| self.test_times.append((result.duration, test_name)) |
| if result.stats is not None: |
| self.stats.accumulate(result.stats) |
| if rerun: |
| self.rerun.append(test_name) |
| if result.covered_lines: |
| # we don't care about trace counts so we don't have to sum them up |
| self.covered_lines.update(result.covered_lines) |
| xml_data = result.xml_data |
| if xml_data: |
| self.add_junit(xml_data) |
| |
| def get_coverage_results(self) -> trace.CoverageResults: |
| counts = {loc: 1 for loc in self.covered_lines} |
| return trace.CoverageResults(counts=counts) |
| |
| def need_rerun(self): |
| return bool(self.rerun_results) |
| |
| def prepare_rerun(self, *, clear: bool = True) -> tuple[TestTuple, FilterDict]: |
| tests: TestList = [] |
| match_tests_dict = {} |
| for result in self.rerun_results: |
| tests.append(result.test_name) |
| |
| match_tests = result.get_rerun_match_tests() |
| # ignore empty match list |
| if match_tests: |
| match_tests_dict[result.test_name] = match_tests |
| |
| if clear: |
| # Clear previously failed tests |
| self.rerun_bad.extend(self.bad) |
| self.bad.clear() |
| self.env_changed.clear() |
| self.rerun_results.clear() |
| |
| return (tuple(tests), match_tests_dict) |
| |
| def add_junit(self, xml_data: list[str]): |
| import xml.etree.ElementTree as ET |
| for e in xml_data: |
| try: |
| self.testsuite_xml.append(ET.fromstring(e)) |
| except ET.ParseError: |
| print(xml_data, file=sys.__stderr__) |
| raise |
| |
| def write_junit(self, filename: StrPath): |
| if not self.testsuite_xml: |
| # Don't create empty XML file |
| return |
| |
| import xml.etree.ElementTree as ET |
| root = ET.Element("testsuites") |
| |
| # Manually count the totals for the overall summary |
| totals = {'tests': 0, 'errors': 0, 'failures': 0} |
| for suite in self.testsuite_xml: |
| root.append(suite) |
| for k in totals: |
| try: |
| totals[k] += int(suite.get(k, 0)) |
| except ValueError: |
| pass |
| |
| for k, v in totals.items(): |
| root.set(k, str(v)) |
| |
| with open(filename, 'wb') as f: |
| for s in ET.tostringlist(root): |
| f.write(s) |
| |
| def display_result(self, tests: TestTuple, quiet: bool, print_slowest: bool): |
| if print_slowest: |
| self.test_times.sort(reverse=True) |
| print() |
| print("10 slowest tests:") |
| for test_time, test in self.test_times[:10]: |
| print("- %s: %s" % (test, format_duration(test_time))) |
| |
| all_tests = [] |
| omitted = set(tests) - self.get_executed() |
| |
| # less important |
| all_tests.append((omitted, "test", "{} omitted:")) |
| if not quiet: |
| all_tests.append((self.skipped, "test", "{} skipped:")) |
| all_tests.append((self.resource_denied, "test", "{} skipped (resource denied):")) |
| all_tests.append((self.run_no_tests, "test", "{} run no tests:")) |
| |
| # more important |
| all_tests.append((self.env_changed, "test", "{} altered the execution environment (env changed):")) |
| all_tests.append((self.rerun, "re-run test", "{}:")) |
| all_tests.append((self.bad, "test", "{} failed:")) |
| |
| for tests_list, count_text, title_format in all_tests: |
| if tests_list: |
| print() |
| count_text = count(len(tests_list), count_text) |
| print(title_format.format(count_text)) |
| printlist(tests_list) |
| |
| if self.good and not quiet: |
| print() |
| text = count(len(self.good), "test") |
| text = f"{text} OK." |
| if (self.is_all_good() and len(self.good) > 1): |
| text = f"All {text}" |
| print(text) |
| |
| if self.interrupted: |
| print() |
| print("Test suite interrupted by signal SIGINT.") |
| |
| def display_summary(self, first_runtests: RunTests, filtered: bool): |
| # Total tests |
| stats = self.stats |
| text = f'run={stats.tests_run:,}' |
| if filtered: |
| text = f"{text} (filtered)" |
| report = [text] |
| if stats.failures: |
| report.append(f'failures={stats.failures:,}') |
| if stats.skipped: |
| report.append(f'skipped={stats.skipped:,}') |
| print(f"Total tests: {' '.join(report)}") |
| |
| # Total test files |
| all_tests = [self.good, self.bad, self.rerun, |
| self.skipped, |
| self.env_changed, self.run_no_tests] |
| run = sum(map(len, all_tests)) |
| text = f'run={run}' |
| if not first_runtests.forever: |
| ntest = len(first_runtests.tests) |
| text = f"{text}/{ntest}" |
| if filtered: |
| text = f"{text} (filtered)" |
| report = [text] |
| for name, tests in ( |
| ('failed', self.bad), |
| ('env_changed', self.env_changed), |
| ('skipped', self.skipped), |
| ('resource_denied', self.resource_denied), |
| ('rerun', self.rerun), |
| ('run_no_tests', self.run_no_tests), |
| ): |
| if tests: |
| report.append(f'{name}={len(tests)}') |
| print(f"Total test files: {' '.join(report)}") |