| import gzip |
| import io |
| import json |
| import os |
| import zipfile |
| from pathlib import Path |
| from typing import Any, Dict, List |
| |
| import boto3 # type: ignore[import] |
| import requests |
| import rockset # type: ignore[import] |
| |
| PYTORCH_REPO = "https://api.github.com/repos/pytorch/pytorch" |
| S3_RESOURCE = boto3.resource("s3") |
| |
| |
| def _get_request_headers() -> Dict[str, str]: |
| return { |
| "Accept": "application/vnd.github.v3+json", |
| "Authorization": "token " + os.environ["GITHUB_TOKEN"], |
| } |
| |
| |
| def _get_artifact_urls(prefix: str, workflow_run_id: int) -> Dict[Path, str]: |
| """Get all workflow artifacts with 'test-report' in the name.""" |
| response = requests.get( |
| f"{PYTORCH_REPO}/actions/runs/{workflow_run_id}/artifacts?per_page=100", |
| ) |
| artifacts = response.json()["artifacts"] |
| while "next" in response.links.keys(): |
| response = requests.get( |
| response.links["next"]["url"], headers=_get_request_headers() |
| ) |
| artifacts.extend(response.json()["artifacts"]) |
| |
| artifact_urls = {} |
| for artifact in artifacts: |
| if artifact["name"].startswith(prefix): |
| artifact_urls[Path(artifact["name"])] = artifact["archive_download_url"] |
| return artifact_urls |
| |
| |
| def _download_artifact( |
| artifact_name: Path, artifact_url: str, workflow_run_attempt: int |
| ) -> Path: |
| # [Artifact run attempt] |
| # All artifacts on a workflow share a single namespace. However, we can |
| # re-run a workflow and produce a new set of artifacts. To avoid name |
| # collisions, we add `-runattempt1<run #>-` somewhere in the artifact name. |
| # |
| # This code parses out the run attempt number from the artifact name. If it |
| # doesn't match the one specified on the command line, skip it. |
| atoms = str(artifact_name).split("-") |
| for atom in atoms: |
| if atom.startswith("runattempt"): |
| found_run_attempt = int(atom[len("runattempt") :]) |
| if workflow_run_attempt != found_run_attempt: |
| print( |
| f"Skipping {artifact_name} as it is an invalid run attempt. " |
| f"Expected {workflow_run_attempt}, found {found_run_attempt}." |
| ) |
| |
| print(f"Downloading {artifact_name}") |
| |
| response = requests.get(artifact_url, headers=_get_request_headers()) |
| with open(artifact_name, "wb") as f: |
| f.write(response.content) |
| return artifact_name |
| |
| |
| def download_s3_artifacts( |
| prefix: str, workflow_run_id: int, workflow_run_attempt: int |
| ) -> List[Path]: |
| bucket = S3_RESOURCE.Bucket("gha-artifacts") |
| objs = bucket.objects.filter( |
| Prefix=f"pytorch/pytorch/{workflow_run_id}/{workflow_run_attempt}/artifact/{prefix}" |
| ) |
| |
| found_one = False |
| paths = [] |
| for obj in objs: |
| found_one = True |
| p = Path(Path(obj.key).name) |
| print(f"Downloading {p}") |
| with open(p, "wb") as f: |
| f.write(obj.get()["Body"].read()) |
| paths.append(p) |
| |
| if not found_one: |
| print( |
| "::warning title=s3 artifacts not found::" |
| "Didn't find any test reports in s3, there might be a bug!" |
| ) |
| return paths |
| |
| |
| def download_gha_artifacts( |
| prefix: str, workflow_run_id: int, workflow_run_attempt: int |
| ) -> List[Path]: |
| artifact_urls = _get_artifact_urls(prefix, workflow_run_id) |
| paths = [] |
| for name, url in artifact_urls.items(): |
| paths.append(_download_artifact(Path(name), url, workflow_run_attempt)) |
| return paths |
| |
| |
| def upload_to_rockset(collection: str, docs: List[Any]) -> None: |
| print(f"Writing {len(docs)} documents to Rockset") |
| client = rockset.Client( |
| api_server="api.rs2.usw2.rockset.com", api_key=os.environ["ROCKSET_API_KEY"] |
| ) |
| client.Collection.retrieve(collection).add_docs(docs) |
| print("Done!") |
| |
| |
| def upload_to_s3( |
| workflow_run_id: int, |
| workflow_run_attempt: int, |
| collection: str, |
| docs: List[Dict[str, Any]], |
| ) -> None: |
| print(f"Writing {len(docs)} documents to S3") |
| body = io.StringIO() |
| for doc in docs: |
| json.dump(doc, body) |
| body.write("\n") |
| |
| S3_RESOURCE.Object( |
| "ossci-raw-job-status", |
| f"{collection}/{workflow_run_id}/{workflow_run_attempt}", |
| ).put( |
| Body=gzip.compress(body.getvalue().encode()), |
| ContentEncoding="gzip", |
| ContentType="application/json", |
| ) |
| print("Done!") |
| |
| |
| def unzip(p: Path) -> None: |
| """Unzip the provided zipfile to a similarly-named directory. |
| |
| Returns None if `p` is not a zipfile. |
| |
| Looks like: /tmp/test-reports.zip -> /tmp/unzipped-test-reports/ |
| """ |
| assert p.is_file() |
| unzipped_dir = p.with_name("unzipped-" + p.stem) |
| print(f"Extracting {p} to {unzipped_dir}") |
| |
| with zipfile.ZipFile(p, "r") as zip: |
| zip.extractall(unzipped_dir) |