|  | import argparse | 
|  | import os | 
|  | import sys | 
|  | import xml.etree.ElementTree as ET | 
|  | from multiprocessing import cpu_count, Pool | 
|  | from pathlib import Path | 
|  | from tempfile import TemporaryDirectory | 
|  | from typing import Any, Dict, List | 
|  |  | 
|  | from tools.stats.test_dashboard import upload_additional_info | 
|  | from tools.stats.upload_stats_lib import ( | 
|  | download_gha_artifacts, | 
|  | download_s3_artifacts, | 
|  | get_job_id, | 
|  | unzip, | 
|  | upload_workflow_stats_to_s3, | 
|  | ) | 
|  |  | 
|  |  | 
|  | def parse_xml_report( | 
|  | tag: str, | 
|  | report: Path, | 
|  | workflow_id: int, | 
|  | workflow_run_attempt: int, | 
|  | ) -> List[Dict[str, Any]]: | 
|  | """Convert a test report xml file into a JSON-serializable list of test cases.""" | 
|  | print(f"Parsing {tag}s for test report: {report}") | 
|  |  | 
|  | job_id = get_job_id(report) | 
|  | print(f"Found job id: {job_id}") | 
|  |  | 
|  | test_cases: List[Dict[str, Any]] = [] | 
|  |  | 
|  | root = ET.parse(report) | 
|  | for test_case in root.iter(tag): | 
|  | case = process_xml_element(test_case) | 
|  | case["workflow_id"] = workflow_id | 
|  | case["workflow_run_attempt"] = workflow_run_attempt | 
|  | case["job_id"] = job_id | 
|  |  | 
|  | # [invoking file] | 
|  | # The name of the file that the test is located in is not necessarily | 
|  | # the same as the name of the file that invoked the test. | 
|  | # For example, `test_jit.py` calls into multiple other test files (e.g. | 
|  | # jit/test_dce.py). For sharding/test selection purposes, we want to | 
|  | # record the file that invoked the test. | 
|  | # | 
|  | # To do this, we leverage an implementation detail of how we write out | 
|  | # tests (https://bit.ly/3ajEV1M), which is that reports are created | 
|  | # under a folder with the same name as the invoking file. | 
|  | case["invoking_file"] = report.parent.name | 
|  | test_cases.append(case) | 
|  |  | 
|  | return test_cases | 
|  |  | 
|  |  | 
|  | def process_xml_element(element: ET.Element) -> Dict[str, Any]: | 
|  | """Convert a test suite element into a JSON-serializable dict.""" | 
|  | ret: Dict[str, Any] = {} | 
|  |  | 
|  | # Convert attributes directly into dict elements. | 
|  | # e.g. | 
|  | #     <testcase name="test_foo" classname="test_bar"></testcase> | 
|  | # becomes: | 
|  | #     {"name": "test_foo", "classname": "test_bar"} | 
|  | ret.update(element.attrib) | 
|  |  | 
|  | # The XML format encodes all values as strings. Convert to ints/floats if | 
|  | # possible to make aggregation possible in Rockset. | 
|  | for k, v in ret.items(): | 
|  | try: | 
|  | ret[k] = int(v) | 
|  | except ValueError: | 
|  | pass | 
|  | try: | 
|  | ret[k] = float(v) | 
|  | except ValueError: | 
|  | pass | 
|  |  | 
|  | # Convert inner and outer text into special dict elements. | 
|  | # e.g. | 
|  | #     <testcase>my_inner_text</testcase> my_tail | 
|  | # becomes: | 
|  | #     {"text": "my_inner_text", "tail": " my_tail"} | 
|  | if element.text and element.text.strip(): | 
|  | ret["text"] = element.text | 
|  | if element.tail and element.tail.strip(): | 
|  | ret["tail"] = element.tail | 
|  |  | 
|  | # Convert child elements recursively, placing them at a key: | 
|  | # e.g. | 
|  | #     <testcase> | 
|  | #       <foo>hello</foo> | 
|  | #       <foo>world</foo> | 
|  | #       <bar>another</bar> | 
|  | #     </testcase> | 
|  | # becomes | 
|  | #    { | 
|  | #       "foo": [{"text": "hello"}, {"text": "world"}], | 
|  | #       "bar": {"text": "another"} | 
|  | #    } | 
|  | for child in element: | 
|  | if child.tag not in ret: | 
|  | ret[child.tag] = process_xml_element(child) | 
|  | else: | 
|  | # If there are multiple tags with the same name, they should be | 
|  | # coalesced into a list. | 
|  | if not isinstance(ret[child.tag], list): | 
|  | ret[child.tag] = [ret[child.tag]] | 
|  | ret[child.tag].append(process_xml_element(child)) | 
|  | return ret | 
|  |  | 
|  |  | 
|  | def get_tests(workflow_run_id: int, workflow_run_attempt: int) -> List[Dict[str, Any]]: | 
|  | with TemporaryDirectory() as temp_dir: | 
|  | print("Using temporary directory:", temp_dir) | 
|  | os.chdir(temp_dir) | 
|  |  | 
|  | # Download and extract all the reports (both GHA and S3) | 
|  | s3_paths = download_s3_artifacts( | 
|  | "test-report", workflow_run_id, workflow_run_attempt | 
|  | ) | 
|  | for path in s3_paths: | 
|  | unzip(path) | 
|  |  | 
|  | artifact_paths = download_gha_artifacts( | 
|  | "test-report", workflow_run_id, workflow_run_attempt | 
|  | ) | 
|  | for path in artifact_paths: | 
|  | unzip(path) | 
|  |  | 
|  | # Parse the reports and transform them to JSON | 
|  | test_cases = [] | 
|  | mp = Pool(cpu_count()) | 
|  | for xml_report in Path(".").glob("**/*.xml"): | 
|  | test_cases.append( | 
|  | mp.apply_async( | 
|  | parse_xml_report, | 
|  | args=( | 
|  | "testcase", | 
|  | xml_report, | 
|  | workflow_run_id, | 
|  | workflow_run_attempt, | 
|  | ), | 
|  | ) | 
|  | ) | 
|  | mp.close() | 
|  | mp.join() | 
|  | test_cases = [tc.get() for tc in test_cases] | 
|  | flattened = [item for sublist in test_cases for item in sublist] | 
|  | return flattened | 
|  |  | 
|  |  | 
|  | def get_tests_for_circleci( | 
|  | workflow_run_id: int, workflow_run_attempt: int | 
|  | ) -> List[Dict[str, Any]]: | 
|  | # Parse the reports and transform them to JSON | 
|  | test_cases = [] | 
|  | for xml_report in Path(".").glob("**/test/test-reports/**/*.xml"): | 
|  | test_cases.extend( | 
|  | parse_xml_report( | 
|  | "testcase", xml_report, workflow_run_id, workflow_run_attempt | 
|  | ) | 
|  | ) | 
|  |  | 
|  | return test_cases | 
|  |  | 
|  |  | 
|  | def summarize_test_cases(test_cases: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | 
|  | """Group test cases by classname, file, and job_id. We perform the aggregation | 
|  | manually instead of using the `test-suite` XML tag because xmlrunner does | 
|  | not produce reliable output for it. | 
|  | """ | 
|  |  | 
|  | def get_key(test_case: Dict[str, Any]) -> Any: | 
|  | return ( | 
|  | test_case.get("file"), | 
|  | test_case.get("classname"), | 
|  | test_case["job_id"], | 
|  | test_case["workflow_id"], | 
|  | test_case["workflow_run_attempt"], | 
|  | # [see: invoking file] | 
|  | test_case["invoking_file"], | 
|  | ) | 
|  |  | 
|  | def init_value(test_case: Dict[str, Any]) -> Dict[str, Any]: | 
|  | return { | 
|  | "file": test_case.get("file"), | 
|  | "classname": test_case.get("classname"), | 
|  | "job_id": test_case["job_id"], | 
|  | "workflow_id": test_case["workflow_id"], | 
|  | "workflow_run_attempt": test_case["workflow_run_attempt"], | 
|  | # [see: invoking file] | 
|  | "invoking_file": test_case["invoking_file"], | 
|  | "tests": 0, | 
|  | "failures": 0, | 
|  | "errors": 0, | 
|  | "skipped": 0, | 
|  | "successes": 0, | 
|  | "time": 0.0, | 
|  | } | 
|  |  | 
|  | ret = {} | 
|  | for test_case in test_cases: | 
|  | key = get_key(test_case) | 
|  | if key not in ret: | 
|  | ret[key] = init_value(test_case) | 
|  |  | 
|  | ret[key]["tests"] += 1 | 
|  |  | 
|  | if "failure" in test_case: | 
|  | ret[key]["failures"] += 1 | 
|  | elif "error" in test_case: | 
|  | ret[key]["errors"] += 1 | 
|  | elif "skipped" in test_case: | 
|  | ret[key]["skipped"] += 1 | 
|  | else: | 
|  | ret[key]["successes"] += 1 | 
|  |  | 
|  | ret[key]["time"] += test_case["time"] | 
|  | return list(ret.values()) | 
|  |  | 
|  |  | 
|  | if __name__ == "__main__": | 
|  | parser = argparse.ArgumentParser(description="Upload test stats to Rockset") | 
|  | parser.add_argument( | 
|  | "--workflow-run-id", | 
|  | required=True, | 
|  | help="id of the workflow to get artifacts from", | 
|  | ) | 
|  | parser.add_argument( | 
|  | "--workflow-run-attempt", | 
|  | type=int, | 
|  | required=True, | 
|  | help="which retry of the workflow this is", | 
|  | ) | 
|  | parser.add_argument( | 
|  | "--head-branch", | 
|  | required=True, | 
|  | help="Head branch of the workflow", | 
|  | ) | 
|  | parser.add_argument( | 
|  | "--head-repository", | 
|  | required=True, | 
|  | help="Head repository of the workflow", | 
|  | ) | 
|  | parser.add_argument( | 
|  | "--circleci", | 
|  | action="store_true", | 
|  | help="If this is being run through circleci", | 
|  | ) | 
|  | args = parser.parse_args() | 
|  |  | 
|  | print(f"Workflow id is: {args.workflow_run_id}") | 
|  |  | 
|  | if args.circleci: | 
|  | test_cases = get_tests_for_circleci( | 
|  | args.workflow_run_id, args.workflow_run_attempt | 
|  | ) | 
|  | else: | 
|  | test_cases = get_tests(args.workflow_run_id, args.workflow_run_attempt) | 
|  |  | 
|  | # Flush stdout so that any errors in Rockset upload show up last in the logs. | 
|  | sys.stdout.flush() | 
|  |  | 
|  | # For PRs, only upload a summary of test_runs. This helps lower the | 
|  | # volume of writes we do to Rockset. | 
|  | test_case_summary = summarize_test_cases(test_cases) | 
|  |  | 
|  | upload_workflow_stats_to_s3( | 
|  | args.workflow_run_id, | 
|  | args.workflow_run_attempt, | 
|  | "test_run_summary", | 
|  | test_case_summary, | 
|  | ) | 
|  |  | 
|  | # Separate out the failed test cases. | 
|  | # Uploading everything is too data intensive most of the time, | 
|  | # but these will be just a tiny fraction. | 
|  | failed_tests_cases = [] | 
|  | for test_case in test_cases: | 
|  | if "rerun" in test_case or "failure" in test_case or "error" in test_case: | 
|  | failed_tests_cases.append(test_case) | 
|  |  | 
|  | upload_workflow_stats_to_s3( | 
|  | args.workflow_run_id, | 
|  | args.workflow_run_attempt, | 
|  | "failed_test_runs", | 
|  | failed_tests_cases, | 
|  | ) | 
|  |  | 
|  | if args.head_branch == "main" and args.head_repository == "pytorch/pytorch": | 
|  | # For jobs on main branch, upload everything. | 
|  | upload_workflow_stats_to_s3( | 
|  | args.workflow_run_id, args.workflow_run_attempt, "test_run", test_cases | 
|  | ) | 
|  |  | 
|  | upload_additional_info(args.workflow_run_id, args.workflow_run_attempt, test_cases) |