|  | import gzip | 
|  | import io | 
|  | import json | 
|  | import os | 
|  | import zipfile | 
|  |  | 
|  | from pathlib import Path | 
|  | from typing import Any, Dict, List, Optional | 
|  |  | 
|  | import boto3  # type: ignore[import] | 
|  | import requests | 
|  | import rockset  # type: ignore[import] | 
|  |  | 
|  | PYTORCH_REPO = "https://api.github.com/repos/pytorch/pytorch" | 
|  | S3_RESOURCE = boto3.resource("s3") | 
|  |  | 
|  | # NB: In CI, a flaky test is usually retried 3 times, then the test file would be rerun | 
|  | # 2 more times | 
|  | MAX_RETRY_IN_NON_DISABLED_MODE = 3 * 3 | 
|  | # NB: Rockset has an upper limit of 5000 documents in one request | 
|  | BATCH_SIZE = 5000 | 
|  |  | 
|  |  | 
|  | def _get_request_headers() -> Dict[str, str]: | 
|  | return { | 
|  | "Accept": "application/vnd.github.v3+json", | 
|  | "Authorization": "token " + os.environ["GITHUB_TOKEN"], | 
|  | } | 
|  |  | 
|  |  | 
|  | def _get_artifact_urls(prefix: str, workflow_run_id: int) -> Dict[Path, str]: | 
|  | """Get all workflow artifacts with 'test-report' in the name.""" | 
|  | response = requests.get( | 
|  | f"{PYTORCH_REPO}/actions/runs/{workflow_run_id}/artifacts?per_page=100", | 
|  | headers=_get_request_headers(), | 
|  | ) | 
|  | artifacts = response.json()["artifacts"] | 
|  | while "next" in response.links.keys(): | 
|  | response = requests.get( | 
|  | response.links["next"]["url"], headers=_get_request_headers() | 
|  | ) | 
|  | artifacts.extend(response.json()["artifacts"]) | 
|  |  | 
|  | artifact_urls = {} | 
|  | for artifact in artifacts: | 
|  | if artifact["name"].startswith(prefix): | 
|  | artifact_urls[Path(artifact["name"])] = artifact["archive_download_url"] | 
|  | return artifact_urls | 
|  |  | 
|  |  | 
|  | def _download_artifact( | 
|  | artifact_name: Path, artifact_url: str, workflow_run_attempt: int | 
|  | ) -> Path: | 
|  | # [Artifact run attempt] | 
|  | # All artifacts on a workflow share a single namespace. However, we can | 
|  | # re-run a workflow and produce a new set of artifacts. To avoid name | 
|  | # collisions, we add `-runattempt1<run #>-` somewhere in the artifact name. | 
|  | # | 
|  | # This code parses out the run attempt number from the artifact name. If it | 
|  | # doesn't match the one specified on the command line, skip it. | 
|  | atoms = str(artifact_name).split("-") | 
|  | for atom in atoms: | 
|  | if atom.startswith("runattempt"): | 
|  | found_run_attempt = int(atom[len("runattempt") :]) | 
|  | if workflow_run_attempt != found_run_attempt: | 
|  | print( | 
|  | f"Skipping {artifact_name} as it is an invalid run attempt. " | 
|  | f"Expected {workflow_run_attempt}, found {found_run_attempt}." | 
|  | ) | 
|  |  | 
|  | print(f"Downloading {artifact_name}") | 
|  |  | 
|  | response = requests.get(artifact_url, headers=_get_request_headers()) | 
|  | with open(artifact_name, "wb") as f: | 
|  | f.write(response.content) | 
|  | return artifact_name | 
|  |  | 
|  |  | 
|  | def download_s3_artifacts( | 
|  | prefix: str, workflow_run_id: int, workflow_run_attempt: int | 
|  | ) -> List[Path]: | 
|  | bucket = S3_RESOURCE.Bucket("gha-artifacts") | 
|  | objs = bucket.objects.filter( | 
|  | Prefix=f"pytorch/pytorch/{workflow_run_id}/{workflow_run_attempt}/artifact/{prefix}" | 
|  | ) | 
|  |  | 
|  | found_one = False | 
|  | paths = [] | 
|  | for obj in objs: | 
|  | found_one = True | 
|  | p = Path(Path(obj.key).name) | 
|  | print(f"Downloading {p}") | 
|  | with open(p, "wb") as f: | 
|  | f.write(obj.get()["Body"].read()) | 
|  | paths.append(p) | 
|  |  | 
|  | if not found_one: | 
|  | print( | 
|  | "::warning title=s3 artifacts not found::" | 
|  | "Didn't find any test reports in s3, there might be a bug!" | 
|  | ) | 
|  | return paths | 
|  |  | 
|  |  | 
|  | def download_gha_artifacts( | 
|  | prefix: str, workflow_run_id: int, workflow_run_attempt: int | 
|  | ) -> List[Path]: | 
|  | artifact_urls = _get_artifact_urls(prefix, workflow_run_id) | 
|  | paths = [] | 
|  | for name, url in artifact_urls.items(): | 
|  | paths.append(_download_artifact(Path(name), url, workflow_run_attempt)) | 
|  | return paths | 
|  |  | 
|  |  | 
|  | def upload_to_rockset( | 
|  | collection: str, | 
|  | docs: List[Any], | 
|  | workspace: str = "commons", | 
|  | client: Any = None, | 
|  | ) -> None: | 
|  | if not client: | 
|  | client = rockset.RocksetClient( | 
|  | host="api.usw2a1.rockset.com", api_key=os.environ["ROCKSET_API_KEY"] | 
|  | ) | 
|  |  | 
|  | index = 0 | 
|  | while index < len(docs): | 
|  | from_index = index | 
|  | to_index = min(from_index + BATCH_SIZE, len(docs)) | 
|  | print(f"Writing {to_index - from_index} documents to Rockset") | 
|  |  | 
|  | client.Documents.add_documents( | 
|  | collection=collection, | 
|  | data=docs[from_index:to_index], | 
|  | workspace=workspace, | 
|  | ) | 
|  | index += BATCH_SIZE | 
|  |  | 
|  | print("Done!") | 
|  |  | 
|  |  | 
|  | def upload_to_s3( | 
|  | bucket_name: str, | 
|  | key: str, | 
|  | docs: List[Dict[str, Any]], | 
|  | ) -> None: | 
|  | print(f"Writing {len(docs)} documents to S3") | 
|  | body = io.StringIO() | 
|  | for doc in docs: | 
|  | json.dump(doc, body) | 
|  | body.write("\n") | 
|  |  | 
|  | S3_RESOURCE.Object( | 
|  | f"{bucket_name}", | 
|  | f"{key}", | 
|  | ).put( | 
|  | Body=gzip.compress(body.getvalue().encode()), | 
|  | ContentEncoding="gzip", | 
|  | ContentType="application/json", | 
|  | ) | 
|  | print("Done!") | 
|  |  | 
|  |  | 
|  | def read_from_s3( | 
|  | bucket_name: str, | 
|  | key: str, | 
|  | ) -> List[Dict[str, Any]]: | 
|  | print(f"Reading from s3://{bucket_name}/{key}") | 
|  | body = ( | 
|  | S3_RESOURCE.Object( | 
|  | f"{bucket_name}", | 
|  | f"{key}", | 
|  | ) | 
|  | .get()["Body"] | 
|  | .read() | 
|  | ) | 
|  | results = gzip.decompress(body).decode().split("\n") | 
|  | return [json.loads(result) for result in results if result] | 
|  |  | 
|  |  | 
|  | def upload_workflow_stats_to_s3( | 
|  | workflow_run_id: int, | 
|  | workflow_run_attempt: int, | 
|  | collection: str, | 
|  | docs: List[Dict[str, Any]], | 
|  | ) -> None: | 
|  | bucket_name = "ossci-raw-job-status" | 
|  | key = f"{collection}/{workflow_run_id}/{workflow_run_attempt}" | 
|  | upload_to_s3(bucket_name, key, docs) | 
|  |  | 
|  |  | 
|  | def upload_file_to_s3( | 
|  | file_name: str, | 
|  | bucket: str, | 
|  | key: str, | 
|  | ) -> None: | 
|  | """ | 
|  | Upload a local file to S3 | 
|  | """ | 
|  | print(f"Upload {file_name} to s3://{bucket}/{key}") | 
|  | boto3.client("s3").upload_file( | 
|  | file_name, | 
|  | bucket, | 
|  | key, | 
|  | ) | 
|  |  | 
|  |  | 
|  | def unzip(p: Path) -> None: | 
|  | """Unzip the provided zipfile to a similarly-named directory. | 
|  |  | 
|  | Returns None if `p` is not a zipfile. | 
|  |  | 
|  | Looks like: /tmp/test-reports.zip -> /tmp/unzipped-test-reports/ | 
|  | """ | 
|  | assert p.is_file() | 
|  | unzipped_dir = p.with_name("unzipped-" + p.stem) | 
|  | print(f"Extracting {p} to {unzipped_dir}") | 
|  |  | 
|  | with zipfile.ZipFile(p, "r") as zip: | 
|  | zip.extractall(unzipped_dir) | 
|  |  | 
|  |  | 
|  | def is_rerun_disabled_tests(tests: Dict[str, Dict[str, int]]) -> bool: | 
|  | """ | 
|  | Check if the test report is coming from rerun_disabled_tests workflow where | 
|  | each test is run multiple times | 
|  | """ | 
|  | return all( | 
|  | t.get("num_green", 0) + t.get("num_red", 0) > MAX_RETRY_IN_NON_DISABLED_MODE | 
|  | for t in tests.values() | 
|  | ) | 
|  |  | 
|  |  | 
|  | def get_job_id(report: Path) -> Optional[int]: | 
|  | # [Job id in artifacts] | 
|  | # Retrieve the job id from the report path. In our GHA workflows, we append | 
|  | # the job id to the end of the report name, so `report` looks like: | 
|  | #     unzipped-test-reports-foo_5596745227/test/test-reports/foo/TEST-foo.xml | 
|  | # and we want to get `5596745227` out of it. | 
|  | try: | 
|  | return int(report.parts[0].rpartition("_")[2]) | 
|  | except ValueError: | 
|  | return None |