blob: 777f1e6b5637a9d88f3eab19e9f089584a8cb8e4 [file] [log] [blame]
import json
import os
import re
import time
from collections import defaultdict
from functools import lru_cache
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any, cast, Dict, List
import requests
from tools.stats.upload_stats_lib import (
_get_request_headers,
download_s3_artifacts,
get_job_id,
unzip,
upload_workflow_stats_to_s3,
)
REGEX_JOB_INFO = r"(.*) \/ .*test \(([^,]*), .*\)"
@lru_cache(maxsize=1000)
def get_job_name(job_id: int) -> str:
try:
return cast(
str,
requests.get(
f"https://api.github.com/repos/pytorch/pytorch/actions/jobs/{job_id}",
headers=_get_request_headers(),
).json()["name"],
)
except Exception as e:
print(f"Failed to get job name for job id {job_id}: {e}")
return "NoJobName"
@lru_cache(maxsize=1000)
def get_build_name(job_name: str) -> str:
try:
return re.match(REGEX_JOB_INFO, job_name).group(1) # type: ignore[union-attr]
except AttributeError:
print(f"Failed to match job name: {job_name}")
return "NoBuildEnv"
@lru_cache(maxsize=1000)
def get_test_config(job_name: str) -> str:
try:
return re.match(REGEX_JOB_INFO, job_name).group(2) # type: ignore[union-attr]
except AttributeError:
print(f"Failed to match job name: {job_name}")
return "NoTestConfig"
def get_td_exclusions(
workflow_run_id: int, workflow_run_attempt: int
) -> Dict[str, Any]:
with TemporaryDirectory() as temp_dir:
print("Using temporary directory:", temp_dir)
os.chdir(temp_dir)
# Download and extract all the reports (both GHA and S3)
s3_paths = download_s3_artifacts(
"test-jsons", workflow_run_id, workflow_run_attempt
)
for path in s3_paths:
unzip(path)
grouped_tests: Dict[str, Any] = defaultdict(lambda: defaultdict(set))
for td_exclusions in Path(".").glob("**/td_exclusions*.json"):
with open(td_exclusions) as f:
exclusions = json.load(f)
for exclusion in exclusions["excluded"]:
job_id = get_job_id(td_exclusions)
job_name = get_job_name(job_id)
build_name = get_build_name(job_name)
test_config = get_test_config(job_name)
grouped_tests[build_name][test_config].add(exclusion["test_file"])
for build_name, build in grouped_tests.items():
for test_config, test_files in build.items():
grouped_tests[build_name][test_config] = sorted(test_files)
return grouped_tests
def group_test_cases(test_cases: List[Dict[str, Any]]) -> Dict[str, Any]:
start = time.time()
grouped_tests: Dict[str, Any] = defaultdict(
lambda: defaultdict(
lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(list)))
)
)
for test_case in test_cases:
job_name = get_job_name(test_case["job_id"])
build_name = get_build_name(job_name)
if "bazel" in build_name:
continue
test_config = get_test_config(job_name)
class_name = test_case.pop("classname", "NoClass")
name = test_case.pop("name", "NoName")
invoking_file = test_case.pop("invoking_file", "NoFile")
invoking_file = invoking_file.replace(".", "/")
test_case.pop("workflow_id")
test_case.pop("workflow_run_attempt")
grouped_tests[build_name][test_config][invoking_file][class_name][name].append(
test_case
)
print(f"Time taken to group tests: {time.time() - start}")
return grouped_tests
def get_reruns(grouped_tests: Dict[str, Any]) -> Dict[str, Any]:
reruns: Dict[str, Any] = defaultdict(
lambda: defaultdict(
lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(list)))
)
)
for build_name, build in grouped_tests.items():
for test_config, test_config_data in build.items():
for invoking_file, invoking_file_data in test_config_data.items():
for class_name, class_data in invoking_file_data.items():
for test_name, test_data in class_data.items():
if len(test_data) > 1:
if invoking_file in (
"distributed/test_distributed_spawn",
"onnx/test_fx_to_onnx_with_onnxruntime",
"distributed/algorithms/quantization/test_quantization",
):
continue
reruns[build_name][test_config][invoking_file][class_name][
test_name
] = test_data
return reruns
def get_invoking_file_summary(grouped_tests: Dict[str, Any]) -> Dict[str, Any]:
invoking_file_summary: Dict[str, Any] = defaultdict(
lambda: defaultdict(lambda: defaultdict(lambda: {"count": 0, "time": 0.0}))
)
for build_name, build in grouped_tests.items():
for test_config, test_config_data in build.items():
for invoking_file, invoking_file_data in test_config_data.items():
for class_data in invoking_file_data.values():
for test_data in class_data.values():
invoking_file_summary[build_name][test_config][invoking_file][
"count"
] += 1
for i in test_data:
invoking_file_summary[build_name][test_config][
invoking_file
]["time"] += i["time"]
return invoking_file_summary
def upload_additional_info(
workflow_run_id: int, workflow_run_attempt: int, test_cases: List[Dict[str, Any]]
) -> None:
grouped_tests = group_test_cases(test_cases)
reruns = get_reruns(grouped_tests)
exclusions = get_td_exclusions(workflow_run_id, workflow_run_attempt)
invoking_file_summary = get_invoking_file_summary(grouped_tests)
upload_workflow_stats_to_s3(
workflow_run_id,
workflow_run_attempt,
"additional_info/reruns",
[reruns],
)
upload_workflow_stats_to_s3(
workflow_run_id,
workflow_run_attempt,
"additional_info/td_exclusions",
[exclusions],
)
upload_workflow_stats_to_s3(
workflow_run_id,
workflow_run_attempt,
"additional_info/invoking_file_summary",
[invoking_file_summary],
)