blob: 19e74d6c08dc792d5b49ee51f5d3d21d69801fd9 [file] [log] [blame]
import argparse
import os
import xml.etree.ElementTree as ET
from pathlib import Path
from typing import Dict, List, Any, Tuple, Optional
from tempfile import TemporaryDirectory
from tools.stats.upload_stats_lib import (
download_gha_artifacts,
download_s3_artifacts,
upload_to_rockset,
unzip,
)
def parse_xml_report(
tag: str,
report: Path,
workflow_id: int,
workflow_run_attempt: int,
skip_tag: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""Convert a test report xml file into a JSON-serializable list of test cases."""
print(f"Parsing {tag}s for test report: {report}")
# [Job id in artifacts]
# Retrieve the job id from the report path. In our GHA workflows, we append
# the job id to the end of the report name, so `report` looks like:
# unzipped-test-reports-foo_5596745227/test/test-reports/foo/TEST-foo.xml
# and we want to get `5596745227` out of it.
job_id = int(report.parts[0].rpartition("_")[2])
print(f"Found job id: {job_id}")
root = ET.parse(report)
test_cases = []
for test_case in root.iter(tag):
case = process_xml_element(test_case, skip_tag)
case["workflow_id"] = workflow_id
case["workflow_run_attempt"] = workflow_run_attempt
case["job_id"] = job_id
test_cases.append(case)
return test_cases
def process_xml_element(element: ET.Element, skip_tag: Optional[str]) -> Dict[str, Any]:
"""Convert a test suite element into a JSON-serializable dict."""
ret: Dict[str, Any] = {}
# Convert attributes directly into dict elements.
# e.g.
# <testcase name="test_foo" classname="test_bar"></testcase>
# becomes:
# {"name": "test_foo", "classname": "test_bar"}
ret.update(element.attrib)
# The XML format encodes all values as strings. Convert to ints/floats if
# possible to make aggregation possible in Rockset.
for k, v in ret.items():
try:
ret[k] = int(v)
except ValueError:
pass
try:
ret[k] = float(v)
except ValueError:
pass
# Convert inner and outer text into special dict elements.
# e.g.
# <testcase>my_inner_text</testcase> my_tail
# becomes:
# {"text": "my_inner_text", "tail": " my_tail"}
if element.text and element.text.strip():
ret["text"] = element.text
if element.tail and element.tail.strip():
ret["tail"] = element.tail
# Convert child elements recursively, placing them at a key:
# e.g.
# <testcase>
# <foo>hello</foo>
# <foo>world</foo>
# <bar>another</bar>
# </testcase>
# becomes
# {
# "foo": [{"text": "hello"}, {"text": "world"}],
# "bar": {"text": "another"}
# }
for child in element:
if child.tag == skip_tag:
continue
if child.tag not in ret:
ret[child.tag] = process_xml_element(child, skip_tag)
else:
# If there are multiple tags with the same name, they should be
# coalesced into a list.
if not isinstance(ret[child.tag], list):
ret[child.tag] = [ret[child.tag]]
ret[child.tag].append(process_xml_element(child, skip_tag))
return ret
def get_tests(
workflow_run_id: int, workflow_run_attempt: int
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
with TemporaryDirectory() as temp_dir:
print("Using temporary directory:", temp_dir)
os.chdir(temp_dir)
# Download and extract all the reports (both GHA and S3)
s3_paths = download_s3_artifacts(
"test-report", workflow_run_id, workflow_run_attempt
)
for path in s3_paths:
unzip(path)
artifact_paths = download_gha_artifacts(
"test-report", workflow_run_id, workflow_run_attempt
)
for path in artifact_paths:
unzip(path)
# Parse the reports and transform them to JSON
test_cases = []
test_suites = []
for xml_report in Path(".").glob("**/*.xml"):
test_cases.extend(
parse_xml_report(
"testcase",
xml_report,
workflow_run_id,
workflow_run_attempt,
)
)
test_suites.extend(
parse_xml_report(
"testsuite",
xml_report,
workflow_run_id,
workflow_run_attempt,
skip_tag="testcase",
)
)
return test_cases, test_suites
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Upload test stats to Rockset")
parser.add_argument(
"--workflow-run-id",
type=int,
required=True,
help="id of the workflow to get artifacts from",
)
parser.add_argument(
"--workflow-run-attempt",
type=int,
required=True,
help="which retry of the workflow this is",
)
args = parser.parse_args()
test_cases, test_suites = get_tests(args.workflow_run_id, args.workflow_run_attempt)
upload_to_rockset("test_run", test_cases)
upload_to_rockset("test_suite", test_suites)