| #!/usr/bin/env python3 |
| # -*- coding: utf-8 -*- |
| |
| import bz2 |
| import datetime |
| import json |
| import math |
| import os |
| import re |
| import statistics |
| import subprocess |
| import time |
| from collections import defaultdict |
| from pathlib import Path |
| from typing import ( |
| Any, |
| cast, |
| DefaultDict, |
| Dict, |
| Iterable, |
| Iterator, |
| List, |
| Optional, |
| Set, |
| Tuple, |
| ) |
| from xml.dom import minidom |
| |
| from typing_extensions import TypedDict |
| |
| from tools.stats.s3_stat_parser import ( |
| Commit, |
| get_S3_object_from_bucket, |
| get_test_stats_summaries_for_job, |
| HAVE_BOTO3, |
| newify_case, |
| Report, |
| ReportMetaMeta, |
| Status, |
| Version1Report, |
| Version2Case, |
| Version2Report, |
| VersionedReport, |
| ) |
| from tools.stats.scribe import send_to_scribe |
| |
| |
| SimplerSuite = Dict[str, Version2Case] |
| SimplerFile = Dict[str, SimplerSuite] |
| SimplerReport = Dict[str, SimplerFile] |
| |
| |
| class Stat(TypedDict): |
| center: float |
| spread: Optional[float] |
| |
| |
| class CaseDiff(TypedDict): |
| margin: str |
| name: str |
| was: Optional[Tuple[Stat, Status]] |
| now: Optional[Version2Case] |
| |
| |
| class SuiteDiff(TypedDict): |
| margin: str |
| name: str |
| was: Optional[Stat] |
| now: Optional[float] |
| cases: List[CaseDiff] |
| |
| |
| # TODO: consolidate this with the get_cases function from |
| # tools/stats/test_history.py |
| |
| # Here we translate to a three-layer format (file -> suite -> case) |
| # rather than a two-layer format (suite -> case) because as mentioned in |
| # a comment in the body of this function, if we consolidate suites that |
| # share a name, there will be test case name collisions, and once we |
| # have those, there's no clean way to deal with it in the diffing logic. |
| # It's not great to have to add a dummy empty string for the filename |
| # for version 1 reports, but it's better than either losing cases that |
| # share a name (for version 2 reports) or using a list of cases rather |
| # than a dict. |
| def simplify(report: Report) -> SimplerReport: |
| if "format_version" not in report: # version 1 implicitly |
| v1report = cast(Version1Report, report) |
| return { |
| # we just don't have test filename information sadly, so we |
| # just make one fake filename that is the empty string |
| "": { |
| suite_name: { |
| # This clobbers some cases that have duplicate names |
| # because in version 1, we would merge together all |
| # the suites with a given name (even if they came |
| # from different files), so there were actually |
| # situations in which two cases in the same suite |
| # shared a name (because they actually originally |
| # came from two suites that were then merged). It |
| # would probably be better to warn about the cases |
| # that we're silently discarding here, but since |
| # we're only uploading in the new format (where |
| # everything is also keyed by filename) going |
| # forward, it shouldn't matter too much. |
| case["name"]: newify_case(case) |
| for case in suite["cases"] |
| } |
| for suite_name, suite in v1report["suites"].items() |
| } |
| } |
| else: |
| v_report = cast(VersionedReport, report) |
| version = v_report["format_version"] |
| if version == 2: |
| v2report = cast(Version2Report, v_report) |
| return { |
| filename: { |
| suite_name: suite["cases"] |
| for suite_name, suite in file_data["suites"].items() |
| } |
| for filename, file_data in v2report["files"].items() |
| } |
| else: |
| raise RuntimeError(f"Unknown format version: {version}") |
| |
| |
| def plural(n: int) -> str: |
| return "" if n == 1 else "s" |
| |
| |
| def get_base_commit(sha1: str) -> str: |
| default_branch = os.environ.get("GIT_DEFAULT_BRANCH") |
| # capture None and "" cases |
| if not default_branch: |
| default_branch = "master" |
| |
| default_remote = f"origin/{default_branch}" |
| return subprocess.check_output( |
| ["git", "merge-base", sha1, default_remote], |
| encoding="ascii", |
| ).strip() |
| |
| |
| def display_stat( |
| x: Stat, |
| format: Tuple[Tuple[int, int], Tuple[int, int]], |
| ) -> str: |
| spread_len = format[1][0] + 1 + format[1][1] |
| spread = x["spread"] |
| if spread is not None: |
| spread_str = f" ± {spread:{spread_len}.{format[1][1]}f}s" |
| else: |
| spread_str = " " * (3 + spread_len + 1) |
| mean_len = format[0][0] + 1 + format[0][1] |
| return f'{x["center"]:{mean_len}.{format[0][1]}f}s{spread_str}' |
| |
| |
| def list_stat(l: List[float]) -> Stat: |
| return { |
| "center": statistics.mean(l), |
| "spread": statistics.stdev(l) if len(l) > 1 else None, |
| } |
| |
| |
| def zero_stat() -> Stat: |
| return {"center": 0, "spread": None} |
| |
| |
| def recenter(was: Stat, now: float) -> Stat: |
| return {"center": now - was["center"], "spread": was["spread"]} |
| |
| |
| def sum_normals(stats: Iterable[Stat]) -> Stat: |
| """ |
| Returns a stat corresponding to the sum of the given stats. |
| |
| Assumes that the center and spread for each of the given stats are |
| mean and stdev, respectively. |
| """ |
| l = list(stats) |
| spread: Optional[float] |
| if any(stat["spread"] is not None for stat in l): |
| spread = math.sqrt(sum((stat["spread"] or 0) ** 2 for stat in l)) |
| else: |
| spread = None |
| return { |
| "center": sum(stat["center"] for stat in l), |
| "spread": spread, |
| } |
| |
| |
| def format_seconds(seconds: List[float]) -> str: |
| if len(seconds) > 0: |
| x = list_stat(seconds) |
| return f"total time {display_stat(x, ((5, 2), (4, 2)))}".strip() |
| return "" |
| |
| |
| def show_ancestors(num_commits: int) -> str: |
| return f" | : ({num_commits} commit{plural(num_commits)})" |
| |
| |
| def unlines(lines: List[str]) -> str: |
| return "".join(f"{line}\n" for line in lines) |
| |
| |
| def matching_test_times( |
| *, |
| base_reports: Dict[Commit, List[SimplerReport]], |
| filename: str, |
| suite_name: str, |
| case_name: str, |
| status: Status, |
| ) -> List[float]: |
| times: List[float] = [] |
| for reports in base_reports.values(): |
| for report in reports: |
| file_data = report.get(filename) |
| if file_data: |
| suite = file_data.get(suite_name) |
| if suite: |
| case = suite.get(case_name) |
| if case: |
| t = case["seconds"] |
| s = case["status"] |
| if s == status: |
| times.append(t) |
| return times |
| |
| |
| def analyze( |
| *, |
| head_report: SimplerReport, |
| base_reports: Dict[Commit, List[SimplerReport]], |
| ) -> List[SuiteDiff]: |
| nonempty_shas = [sha for sha, reports in base_reports.items() if reports] |
| # most recent main ancestor with at least one S3 report, |
| # or empty list if there are none (will show all tests as added) |
| base_report = base_reports[nonempty_shas[0]] if nonempty_shas else [] |
| |
| # find all relevant suites (those in either base or head or both) |
| all_reports = [head_report] + base_report |
| all_suites: Set[Tuple[str, str]] = { |
| (filename, suite_name) |
| for r in all_reports |
| for filename, file_data in r.items() |
| for suite_name in file_data.keys() |
| } |
| |
| removed_suites: List[SuiteDiff] = [] |
| modified_suites: List[SuiteDiff] = [] |
| added_suites: List[SuiteDiff] = [] |
| |
| for filename, suite_name in sorted(all_suites): |
| case_diffs: List[CaseDiff] = [] |
| head_suite = head_report.get(filename, {}).get(suite_name) |
| base_cases: Dict[str, Status] = dict( |
| sorted( |
| set.intersection( |
| *[ |
| { |
| (n, case["status"]) |
| for n, case in report.get(filename, {}) |
| .get(suite_name, {}) |
| .items() |
| } |
| for report in base_report |
| ] |
| or [set()] |
| ) |
| ) |
| ) |
| case_stats: Dict[str, Stat] = {} |
| if head_suite: |
| now = sum(case["seconds"] for case in head_suite.values()) |
| if any( |
| filename in report and suite_name in report[filename] |
| for report in base_report |
| ): |
| removed_cases: List[CaseDiff] = [] |
| for case_name, case_status in base_cases.items(): |
| case_stats[case_name] = list_stat( |
| matching_test_times( |
| base_reports=base_reports, |
| filename=filename, |
| suite_name=suite_name, |
| case_name=case_name, |
| status=case_status, |
| ) |
| ) |
| if case_name not in head_suite: |
| removed_cases.append( |
| { |
| "margin": "-", |
| "name": case_name, |
| "was": (case_stats[case_name], case_status), |
| "now": None, |
| } |
| ) |
| modified_cases: List[CaseDiff] = [] |
| added_cases: List[CaseDiff] = [] |
| for head_case_name in sorted(head_suite): |
| head_case = head_suite[head_case_name] |
| if head_case_name in base_cases: |
| stat = case_stats[head_case_name] |
| base_status = base_cases[head_case_name] |
| if head_case["status"] != base_status: |
| modified_cases.append( |
| { |
| "margin": "!", |
| "name": head_case_name, |
| "was": (stat, base_status), |
| "now": head_case, |
| } |
| ) |
| else: |
| added_cases.append( |
| { |
| "margin": "+", |
| "name": head_case_name, |
| "was": None, |
| "now": head_case, |
| } |
| ) |
| # there might be a bug calculating this stdev, not sure |
| was = sum_normals(case_stats.values()) |
| case_diffs = removed_cases + modified_cases + added_cases |
| if case_diffs: |
| modified_suites.append( |
| { |
| "margin": " ", |
| "name": suite_name, |
| "was": was, |
| "now": now, |
| "cases": case_diffs, |
| } |
| ) |
| else: |
| for head_case_name in sorted(head_suite): |
| head_case = head_suite[head_case_name] |
| case_diffs.append( |
| { |
| "margin": " ", |
| "name": head_case_name, |
| "was": None, |
| "now": head_case, |
| } |
| ) |
| added_suites.append( |
| { |
| "margin": "+", |
| "name": suite_name, |
| "was": None, |
| "now": now, |
| "cases": case_diffs, |
| } |
| ) |
| else: |
| for case_name, case_status in base_cases.items(): |
| case_stats[case_name] = list_stat( |
| matching_test_times( |
| base_reports=base_reports, |
| filename=filename, |
| suite_name=suite_name, |
| case_name=case_name, |
| status=case_status, |
| ) |
| ) |
| case_diffs.append( |
| { |
| "margin": " ", |
| "name": case_name, |
| "was": (case_stats[case_name], case_status), |
| "now": None, |
| } |
| ) |
| removed_suites.append( |
| { |
| "margin": "-", |
| "name": suite_name, |
| # there might be a bug calculating this stdev, not sure |
| "was": sum_normals(case_stats.values()), |
| "now": None, |
| "cases": case_diffs, |
| } |
| ) |
| |
| return removed_suites + modified_suites + added_suites |
| |
| |
| def case_diff_lines(diff: CaseDiff) -> List[str]: |
| lines = [f'def {diff["name"]}: ...'] |
| |
| case_fmt = ((3, 3), (2, 3)) |
| |
| was = diff["was"] |
| if was: |
| was_line = f" # was {display_stat(was[0], case_fmt)}" |
| was_status = was[1] |
| if was_status: |
| was_line += f" ({was_status})" |
| lines.append(was_line) |
| |
| now = diff["now"] |
| if now: |
| now_stat: Stat = {"center": now["seconds"], "spread": None} |
| now_line = f" # now {display_stat(now_stat, case_fmt)}" |
| now_status = now["status"] |
| if now_status: |
| now_line += f" ({now_status})" |
| lines.append(now_line) |
| |
| return [""] + [f'{diff["margin"]} {l}' for l in lines] |
| |
| |
| def display_suite_diff(diff: SuiteDiff) -> str: |
| lines = [f'class {diff["name"]}:'] |
| |
| suite_fmt = ((4, 2), (3, 2)) |
| |
| was = diff["was"] |
| if was: |
| lines.append(f" # was {display_stat(was, suite_fmt)}") |
| |
| now = diff["now"] |
| if now is not None: |
| now_stat: Stat = {"center": now, "spread": None} |
| lines.append(f" # now {display_stat(now_stat, suite_fmt)}") |
| |
| for case_diff in diff["cases"]: |
| lines.extend([f" {l}" for l in case_diff_lines(case_diff)]) |
| |
| return unlines([""] + [f'{diff["margin"]} {l}'.rstrip() for l in lines] + [""]) |
| |
| |
| def anomalies(diffs: List[SuiteDiff]) -> str: |
| return "".join(map(display_suite_diff, diffs)) |
| |
| |
| def graph( |
| *, |
| head_sha: Commit, |
| head_seconds: float, |
| base_seconds: Dict[Commit, List[float]], |
| on_master: bool, |
| ancestry_path: int = 0, |
| other_ancestors: int = 0, |
| ) -> str: |
| lines = [ |
| "Commit graph (base is most recent master ancestor with at least one S3 report):", |
| "", |
| " : (master)", |
| " |", |
| ] |
| |
| head_time_str = f" {format_seconds([head_seconds])}" |
| if on_master: |
| lines.append(f" * {head_sha[:10]} (HEAD) {head_time_str}") |
| else: |
| lines.append(f" | * {head_sha[:10]} (HEAD) {head_time_str}") |
| |
| if ancestry_path > 0: |
| lines += [ |
| " | |", |
| show_ancestors(ancestry_path), |
| ] |
| |
| if other_ancestors > 0: |
| lines += [ |
| " |/|", |
| show_ancestors(other_ancestors), |
| " |", |
| ] |
| else: |
| lines.append(" |/") |
| |
| is_first = True |
| for sha, seconds in base_seconds.items(): |
| num_runs = len(seconds) |
| prefix = str(num_runs).rjust(3) |
| base = "(base)" if is_first and num_runs > 0 else " " |
| if num_runs > 0: |
| is_first = False |
| t = format_seconds(seconds) |
| p = plural(num_runs) |
| if t: |
| p = f"{p}, ".ljust(3) |
| lines.append(f" * {sha[:10]} {base} {prefix} report{p}{t}") |
| |
| lines.extend([" |", " :"]) |
| |
| return unlines(lines) |
| |
| |
| def case_delta(case: CaseDiff) -> Stat: |
| was = case["was"] |
| now = case["now"] |
| return recenter( |
| was[0] if was else zero_stat(), |
| now["seconds"] if now else 0, |
| ) |
| |
| |
| def display_final_stat(stat: Stat) -> str: |
| center = stat["center"] |
| spread = stat["spread"] |
| displayed = display_stat( |
| {"center": abs(center), "spread": spread}, |
| ((4, 2), (3, 2)), |
| ) |
| if center < 0: |
| sign = "-" |
| elif center > 0: |
| sign = "+" |
| else: |
| sign = " " |
| return f"{sign}{displayed}".rstrip() |
| |
| |
| def summary_line(message: str, d: DefaultDict[str, List[CaseDiff]]) -> str: |
| all_cases = [c for cs in d.values() for c in cs] |
| tests = len(all_cases) |
| suites = len(d) |
| sp = f"{plural(suites)})".ljust(2) |
| tp = f"{plural(tests)},".ljust(2) |
| # there might be a bug calculating this stdev, not sure |
| stat = sum_normals(case_delta(c) for c in all_cases) |
| return "".join( |
| [ |
| f"{message} (across {suites:>4} suite{sp}", |
| f"{tests:>6} test{tp}", |
| f" totaling {display_final_stat(stat)}", |
| ] |
| ) |
| |
| |
| def summary(analysis: List[SuiteDiff]) -> str: |
| removed_tests: DefaultDict[str, List[CaseDiff]] = defaultdict(list) |
| modified_tests: DefaultDict[str, List[CaseDiff]] = defaultdict(list) |
| added_tests: DefaultDict[str, List[CaseDiff]] = defaultdict(list) |
| |
| for diff in analysis: |
| # the use of 'margin' here is not the most elegant |
| name = diff["name"] |
| margin = diff["margin"] |
| cases = diff["cases"] |
| if margin == "-": |
| removed_tests[name] += cases |
| elif margin == "+": |
| added_tests[name] += cases |
| else: |
| removed = list(filter(lambda c: c["margin"] == "-", cases)) |
| added = list(filter(lambda c: c["margin"] == "+", cases)) |
| modified = list(filter(lambda c: c["margin"] == "!", cases)) |
| if removed: |
| removed_tests[name] += removed |
| if added: |
| added_tests[name] += added |
| if modified: |
| modified_tests[name] += modified |
| |
| return unlines( |
| [ |
| summary_line("Removed ", removed_tests), |
| summary_line("Modified", modified_tests), |
| summary_line("Added ", added_tests), |
| ] |
| ) |
| |
| |
| def regression_info( |
| *, |
| head_sha: Commit, |
| head_report: Report, |
| base_reports: Dict[Commit, List[Report]], |
| job_name: str, |
| on_master: bool, |
| ancestry_path: int, |
| other_ancestors: int, |
| ) -> str: |
| """ |
| Return a human-readable report describing any test time regressions. |
| |
| The head_sha and head_report args give info about the current commit |
| and its test times. Since Python dicts maintain insertion order |
| (guaranteed as part of the language spec since 3.7), the |
| base_reports argument must list the head's several most recent |
| main commits, from newest to oldest (so the merge-base is |
| list(base_reports)[0]). |
| """ |
| simpler_head = simplify(head_report) |
| simpler_base: Dict[Commit, List[SimplerReport]] = {} |
| for commit, reports in base_reports.items(): |
| simpler_base[commit] = [simplify(r) for r in reports] |
| analysis = analyze( |
| head_report=simpler_head, |
| base_reports=simpler_base, |
| ) |
| |
| return "\n".join( |
| [ |
| unlines( |
| [ |
| "----- Historic stats comparison result ------", |
| "", |
| f" job: {job_name}", |
| f" commit: {head_sha}", |
| ] |
| ), |
| # don't print anomalies, because sometimes due to sharding, the |
| # output from this would be very long and obscure better signal |
| # anomalies(analysis), |
| graph( |
| head_sha=head_sha, |
| head_seconds=head_report["total_seconds"], |
| base_seconds={ |
| c: [r["total_seconds"] for r in rs] |
| for c, rs in base_reports.items() |
| }, |
| on_master=on_master, |
| ancestry_path=ancestry_path, |
| other_ancestors=other_ancestors, |
| ), |
| summary(analysis), |
| ] |
| ) |
| |
| |
| class TestCase: |
| def __init__(self, dom: Any) -> None: |
| self.class_name = str(dom.attributes["classname"].value) |
| self.name = str(dom.attributes["name"].value) |
| self.time = float(dom.attributes["time"].value) |
| error_elements = dom.getElementsByTagName("error") |
| # DISCLAIMER: unexpected successes and expected failures are currently not reported in assemble_s3_object |
| self.expected_failure = False |
| self.skipped = False |
| self.errored = False |
| self.unexpected_success = False |
| if len(error_elements) > 0: |
| # We are only expecting 1 element here |
| error_element = error_elements[0] |
| self.unexpected_success = ( |
| error_element.hasAttribute("type") |
| and error_element.attributes["type"].value == "UnexpectedSuccess" |
| ) |
| self.errored = not self.unexpected_success |
| skipped_elements = dom.getElementsByTagName("skipped") |
| if len(skipped_elements) > 0: |
| # We are only expecting 1 element here |
| skipped_element = skipped_elements[0] |
| self.expected_failure = ( |
| skipped_element.hasAttribute("type") |
| and skipped_element.attributes["type"].value == "XFAIL" |
| ) |
| self.skipped = not self.expected_failure |
| self.failed = len(dom.getElementsByTagName("failure")) > 0 |
| |
| def __repr__(self) -> str: |
| return self.__str__() |
| |
| def __str__(self) -> str: |
| return ( |
| f"[TestCase name: {self.name} | class_name: {self.class_name} | time: {self.time} | " |
| f"expected_failure: {self.expected_failure} | skipped: {self.skipped} | errored: {self.errored} | " |
| f"unexpected_success: {self.unexpected_success} | failed: {self.failed}]\n" |
| ) |
| |
| |
| class TestSuite: |
| def __init__(self, name: str) -> None: |
| self.name = name |
| self.test_cases: Dict[str, TestCase] = {} |
| self.failed_count = 0 |
| self.skipped_count = 0 |
| self.errored_count = 0 |
| self.total_time = 0.0 |
| # The below are currently not included in test reports |
| self.unexpected_success_count = 0 |
| self.expected_failure_count = 0 |
| |
| def __repr__(self) -> str: |
| rc = ( |
| f"{self.name} run_time: {self.total_time:.2f} tests: {len(self.test_cases)}" |
| ) |
| if self.skipped_count > 0: |
| rc += f" skipped: {self.skipped_count}" |
| return f"TestSuite({rc})" |
| |
| def append(self, test_case: TestCase) -> None: |
| self.test_cases[test_case.name] = test_case |
| self.total_time += test_case.time |
| self.failed_count += 1 if test_case.failed else 0 |
| self.skipped_count += 1 if test_case.skipped else 0 |
| self.errored_count += 1 if test_case.errored else 0 |
| self.unexpected_success_count += 1 if test_case.unexpected_success else 0 |
| self.expected_failure_count += 1 if test_case.expected_failure else 0 |
| |
| def update(self, test_case: TestCase) -> None: |
| name = test_case.name |
| assert ( |
| name in self.test_cases |
| ), f"Error: attempting to replace nonexistent test case {name}" |
| # Note that time for unexpected successes and expected failures are reported as 0s |
| self.test_cases[name].time += test_case.time |
| self.test_cases[name].failed |= test_case.failed |
| self.test_cases[name].errored |= test_case.errored |
| self.test_cases[name].skipped |= test_case.skipped |
| self.test_cases[name].unexpected_success |= test_case.unexpected_success |
| self.test_cases[name].expected_failure |= test_case.expected_failure |
| |
| |
| # Tests that spawn duplicates (usually only twice) intentionally |
| MULTITESTS = [ |
| "test_cpp_extensions_aot", |
| "distributed/test_distributed_spawn", |
| "distributed\\test_distributed_spawn", # for windows |
| "distributed/test_c10d_gloo", |
| "distributed\\test_c10d_gloo", # for windows |
| "cpp", # The caffe2 cpp tests spawn duplicate test cases as well. |
| ] |
| |
| |
| class TestFile: |
| def __init__(self, name: str) -> None: |
| self.name = name |
| self.total_time = 0.0 |
| self.test_suites: Dict[str, TestSuite] = {} |
| |
| def append(self, test_case: TestCase) -> None: |
| suite_name = test_case.class_name |
| if suite_name not in self.test_suites: |
| self.test_suites[suite_name] = TestSuite(suite_name) |
| if test_case.name in self.test_suites[suite_name].test_cases: |
| if self.name in MULTITESTS: |
| self.test_suites[suite_name].update(test_case) |
| self.total_time += test_case.time |
| else: |
| self.test_suites[suite_name].append(test_case) |
| self.total_time += test_case.time |
| |
| |
| def parse_report(path: str) -> Iterator[TestCase]: |
| try: |
| dom = minidom.parse(path) |
| except Exception as e: |
| print(f"Error occurred when parsing {path}: {e}") |
| return |
| for test_case in dom.getElementsByTagName("testcase"): |
| yield TestCase(test_case) |
| |
| |
| def get_recursive_files(folder: str, extension: str) -> Iterable[str]: |
| """ |
| Get recursive list of files with given extension even. |
| |
| Use it instead of glob(os.path.join(folder, '**', f'*{extension}')) |
| if folder/file names can start with `.`, which makes it hidden on Unix platforms |
| """ |
| assert extension.startswith(".") |
| for root, _, files in os.walk(folder): |
| for fname in files: |
| if os.path.splitext(fname)[1] == extension: |
| yield os.path.join(root, fname) |
| |
| |
| def parse_reports(folder: str) -> Dict[str, TestFile]: |
| tests_by_file = {} |
| for report in get_recursive_files(folder, ".xml"): |
| report_path = Path(report) |
| # basename of the directory of test-report is the test filename |
| test_filename = re.sub(r"\.", "/", report_path.parent.name) |
| if test_filename not in tests_by_file: |
| tests_by_file[test_filename] = TestFile(test_filename) |
| for test_case in parse_report(report): |
| tests_by_file[test_filename].append(test_case) |
| return tests_by_file |
| |
| |
| def build_info() -> ReportMetaMeta: |
| return { |
| "build_pr": os.environ.get("PR_NUMBER", os.environ.get("CIRCLE_PR_NUMBER", "")), |
| "build_tag": os.environ.get("TAG", os.environ.get("CIRCLE_TAG", "")), |
| "build_sha1": os.environ.get("SHA1", os.environ.get("CIRCLE_SHA1", "")), |
| "build_base_commit": get_base_commit( |
| os.environ.get("SHA1", os.environ.get("CIRCLE_SHA1", "HEAD")) |
| ), |
| "build_branch": os.environ.get("BRANCH", os.environ.get("CIRCLE_BRANCH", "")), |
| "build_job": os.environ.get( |
| "BUILD_ENVIRONMENT", os.environ.get("CIRCLE_JOB", "") |
| ), |
| "build_workflow_id": os.environ.get( |
| "WORKFLOW_ID", os.environ.get("CIRCLE_WORKFLOW_ID", "") |
| ), |
| "build_start_time_epoch": str( |
| int(os.path.getmtime(os.path.realpath(__file__))) |
| ), |
| } |
| |
| |
| def build_message( |
| test_file: TestFile, |
| test_suite: TestSuite, |
| test_case: TestCase, |
| meta_info: ReportMetaMeta, |
| ) -> Dict[str, Dict[str, Any]]: |
| return { |
| "normal": { |
| **meta_info, |
| "test_filename": test_file.name, |
| "test_suite_name": test_suite.name, |
| "test_case_name": test_case.name, |
| }, |
| "int": { |
| "time": int(time.time()), |
| "test_total_count": 1, |
| "test_total_time": int(test_case.time * 1000), |
| "test_failed_count": 1 if test_case.failed > 0 else 0, |
| "test_skipped_count": 1 if test_case.skipped > 0 else 0, |
| "test_errored_count": 1 if test_case.errored > 0 else 0, |
| }, |
| } |
| |
| |
| def send_report_to_scribe(reports: Dict[str, TestFile]) -> None: |
| meta_info = build_info() |
| logs = json.dumps( |
| [ |
| { |
| "category": "perfpipe_pytorch_test_times", |
| "message": json.dumps( |
| build_message(test_file, test_suite, test_case, meta_info) |
| ), |
| "line_escape": False, |
| } |
| for test_file in reports.values() |
| for test_suite in test_file.test_suites.values() |
| for test_case in test_suite.test_cases.values() |
| ] |
| ) |
| # no need to print send result as exceptions will be captured and print later. |
| send_to_scribe(logs) |
| |
| |
| def assemble_s3_object( |
| reports: Dict[str, TestFile], |
| *, |
| total_seconds: float, |
| ) -> Version2Report: |
| return { |
| **build_info(), # type: ignore[misc] |
| "total_seconds": total_seconds, |
| "format_version": 2, |
| "files": { |
| name: { |
| "total_seconds": test_file.total_time, |
| "suites": { |
| name: { |
| "total_seconds": suite.total_time, |
| "cases": { |
| name: { |
| "seconds": case.time, |
| "status": "errored" |
| if case.errored |
| else "failed" |
| if case.failed |
| else "skipped" |
| if case.skipped |
| else None, |
| } |
| for name, case in suite.test_cases.items() |
| }, |
| } |
| for name, suite in test_file.test_suites.items() |
| }, |
| } |
| for name, test_file in reports.items() |
| }, |
| } |
| |
| |
| def send_report_to_s3(head_report: Version2Report) -> None: |
| job = os.getenv("BUILD_ENVIRONMENT", os.environ.get("CIRCLE_JOB")) |
| sha1 = os.environ.get("SHA1", os.environ.get("CIRCLE_SHA1", "")) |
| now = datetime.datetime.utcnow().isoformat() |
| |
| # SHARD_NUMBER and TEST_CONFIG are specific to GHA, as these details would be included in CIRCLE_JOB already |
| shard = os.environ.get("SHARD_NUMBER", "") |
| test_config = os.environ.get("TEST_CONFIG") |
| |
| job_report_dirname = ( |
| f'{job}{f"-{test_config}" if test_config is not None else ""}{shard}' |
| ) |
| key = f"test_time/{sha1}/{job_report_dirname}/{now}Z.json.bz2" # Z meaning UTC |
| obj = get_S3_object_from_bucket("ossci-metrics", key) |
| # use bz2 because the results are smaller than gzip, and the |
| # compression time penalty we pay is only about half a second for |
| # input files of a few megabytes in size like these JSON files, and |
| # because for some reason zlib doesn't seem to play nice with the |
| # gunzip command whereas Python's bz2 does work with bzip2 |
| obj.put(Body=bz2.compress(json.dumps(head_report).encode())) |
| |
| |
| def print_regressions(head_report: Report, *, num_prev_commits: int) -> None: |
| sha1 = os.environ.get("SHA1", os.environ.get("CIRCLE_SHA1", "HEAD")) |
| |
| base = get_base_commit(sha1) |
| |
| count_spec = f"{base}..{sha1}" |
| intermediate_commits = int( |
| subprocess.check_output( |
| ["git", "rev-list", "--count", count_spec], encoding="ascii" |
| ) |
| ) |
| ancestry_path = int( |
| subprocess.check_output( |
| ["git", "rev-list", "--ancestry-path", "--count", count_spec], |
| encoding="ascii", |
| ) |
| ) |
| |
| # if current commit is already on main, we need to exclude it from |
| # this history; otherwise we include the merge-base |
| commits = subprocess.check_output( |
| ["git", "rev-list", f"--max-count={num_prev_commits+1}", base], |
| encoding="ascii", |
| ).splitlines() |
| on_master = False |
| if base == sha1: |
| on_master = True |
| commits = commits[1:] |
| else: |
| commits = commits[:-1] |
| |
| job = os.environ.get("BUILD_ENVIRONMENT", "") |
| objects: Dict[Commit, List[Report]] = defaultdict(list) |
| |
| for commit in commits: |
| objects[commit] |
| summaries = get_test_stats_summaries_for_job(sha=commit, job_prefix=job) |
| for _, summary in summaries.items(): |
| objects[commit].extend(summary) |
| |
| print() |
| print( |
| regression_info( |
| head_sha=sha1, |
| head_report=head_report, |
| base_reports=objects, |
| job_name=job, |
| on_master=on_master, |
| ancestry_path=ancestry_path - 1, |
| other_ancestors=intermediate_commits - ancestry_path, |
| ), |
| end="", |
| ) |
| |
| |
| def positive_integer(value: str) -> float: |
| parsed = int(value) |
| if parsed < 1: |
| raise argparse.ArgumentTypeError(f"{value} is not a natural number") |
| return parsed |
| |
| |
| def positive_float(value: str) -> float: |
| parsed = float(value) |
| if parsed <= 0.0: |
| raise argparse.ArgumentTypeError(f"{value} is not a positive rational number") |
| return parsed |
| |
| |
| def reports_has_no_tests(reports: Dict[str, TestFile]) -> bool: |
| for test_file in reports.values(): |
| for test_suite in test_file.test_suites.values(): |
| if len(test_suite.test_cases) > 0: |
| return False |
| return True |
| |
| |
| if __name__ == "__main__": |
| import argparse |
| import sys |
| |
| parser = argparse.ArgumentParser( |
| "Print statistics from test XML output.", |
| formatter_class=argparse.ArgumentDefaultsHelpFormatter, |
| ) |
| parser.add_argument( |
| "--longest-of-class", |
| type=positive_integer, |
| default=3, |
| metavar="N", |
| help="how many longest tests to show for each class", |
| ) |
| parser.add_argument( |
| "--class-print-threshold", |
| type=positive_float, |
| default=1.0, |
| metavar="N", |
| help="Minimal total time to warrant class report", |
| ) |
| parser.add_argument( |
| "--longest-of-run", |
| type=positive_integer, |
| default=10, |
| metavar="N", |
| help="how many longest tests to show from the entire run", |
| ) |
| if HAVE_BOTO3: |
| parser.add_argument( |
| "--upload-to-s3", |
| action="store_true", |
| help="upload test time to S3 bucket", |
| ) |
| parser.add_argument( |
| "--compare-with-s3", |
| action="store_true", |
| help="download test times for base commits and compare", |
| ) |
| parser.add_argument( |
| "--num-prev-commits", |
| type=positive_integer, |
| default=10, |
| metavar="N", |
| help="how many previous commits to compare test times with", |
| ) |
| parser.add_argument( |
| "--use-json", |
| metavar="FILE.json", |
| help="compare S3 with JSON file, instead of the test report folder", |
| ) |
| parser.add_argument( |
| "folder", |
| help="test report folder", |
| ) |
| args = parser.parse_args() |
| |
| reports_by_file = parse_reports(args.folder) |
| |
| if reports_has_no_tests(reports_by_file): |
| print(f"No tests in reports found in {args.folder}") |
| sys.exit(0) |
| |
| try: |
| send_report_to_scribe(reports_by_file) |
| except Exception as e: |
| print(f"ERROR ENCOUNTERED WHEN UPLOADING TO SCRIBE: {e}") |
| |
| total_time = 0.0 |
| for filename, test_filename in reports_by_file.items(): |
| for suite_name, test_suite in test_filename.test_suites.items(): |
| total_time += test_suite.total_time |
| |
| obj = assemble_s3_object(reports_by_file, total_seconds=total_time) |
| |
| if args.upload_to_s3: |
| try: |
| send_report_to_s3(obj) |
| except Exception as e: |
| print(f"ERROR ENCOUNTERED WHEN UPLOADING TO S3: {e}") |
| |
| if args.compare_with_s3: |
| head_json = obj |
| if args.use_json: |
| head_json = json.loads(Path(args.use_json).read_text()) |
| try: |
| print_regressions(head_json, num_prev_commits=args.num_prev_commits) |
| except Exception as e: |
| print(f"ERROR ENCOUNTERED WHEN COMPARING AGAINST S3: {e}") |