blob: 45ea6ba39fd5b9b87ae82c12ea13dc4247b0ea49 [file] [log] [blame]
import gzip
import io
import json
import os
import zipfile
from pathlib import Path
from typing import Any, Dict, List
import boto3 # type: ignore[import]
import requests
import rockset # type: ignore[import]
PYTORCH_REPO = "https://api.github.com/repos/pytorch/pytorch"
S3_RESOURCE = boto3.resource("s3")
# NB: In CI, a flaky test is usually retried 3 times, then the test file would be rerun
# 2 more times
MAX_RETRY_IN_NON_DISABLED_MODE = 3 * 3
# NB: Rockset has an upper limit of 5000 documents in one request
BATCH_SIZE = 5000
def _get_request_headers() -> Dict[str, str]:
return {
"Accept": "application/vnd.github.v3+json",
"Authorization": "token " + os.environ["GITHUB_TOKEN"],
}
def _get_artifact_urls(prefix: str, workflow_run_id: int) -> Dict[Path, str]:
"""Get all workflow artifacts with 'test-report' in the name."""
response = requests.get(
f"{PYTORCH_REPO}/actions/runs/{workflow_run_id}/artifacts?per_page=100",
)
artifacts = response.json()["artifacts"]
while "next" in response.links.keys():
response = requests.get(
response.links["next"]["url"], headers=_get_request_headers()
)
artifacts.extend(response.json()["artifacts"])
artifact_urls = {}
for artifact in artifacts:
if artifact["name"].startswith(prefix):
artifact_urls[Path(artifact["name"])] = artifact["archive_download_url"]
return artifact_urls
def _download_artifact(
artifact_name: Path, artifact_url: str, workflow_run_attempt: int
) -> Path:
# [Artifact run attempt]
# All artifacts on a workflow share a single namespace. However, we can
# re-run a workflow and produce a new set of artifacts. To avoid name
# collisions, we add `-runattempt1<run #>-` somewhere in the artifact name.
#
# This code parses out the run attempt number from the artifact name. If it
# doesn't match the one specified on the command line, skip it.
atoms = str(artifact_name).split("-")
for atom in atoms:
if atom.startswith("runattempt"):
found_run_attempt = int(atom[len("runattempt") :])
if workflow_run_attempt != found_run_attempt:
print(
f"Skipping {artifact_name} as it is an invalid run attempt. "
f"Expected {workflow_run_attempt}, found {found_run_attempt}."
)
print(f"Downloading {artifact_name}")
response = requests.get(artifact_url, headers=_get_request_headers())
with open(artifact_name, "wb") as f:
f.write(response.content)
return artifact_name
def download_s3_artifacts(
prefix: str, workflow_run_id: int, workflow_run_attempt: int
) -> List[Path]:
bucket = S3_RESOURCE.Bucket("gha-artifacts")
objs = bucket.objects.filter(
Prefix=f"pytorch/pytorch/{workflow_run_id}/{workflow_run_attempt}/artifact/{prefix}"
)
found_one = False
paths = []
for obj in objs:
found_one = True
p = Path(Path(obj.key).name)
print(f"Downloading {p}")
with open(p, "wb") as f:
f.write(obj.get()["Body"].read())
paths.append(p)
if not found_one:
print(
"::warning title=s3 artifacts not found::"
"Didn't find any test reports in s3, there might be a bug!"
)
return paths
def download_gha_artifacts(
prefix: str, workflow_run_id: int, workflow_run_attempt: int
) -> List[Path]:
artifact_urls = _get_artifact_urls(prefix, workflow_run_id)
paths = []
for name, url in artifact_urls.items():
paths.append(_download_artifact(Path(name), url, workflow_run_attempt))
return paths
def upload_to_rockset(
collection: str,
docs: List[Any],
workspace: str = "commons",
client: Any = None,
) -> None:
if not client:
client = rockset.RocksetClient(
host="api.usw2a1.rockset.com", api_key=os.environ["ROCKSET_API_KEY"]
)
index = 0
while index < len(docs):
from_index = index
to_index = min(from_index + BATCH_SIZE, len(docs))
print(f"Writing {to_index - from_index} documents to Rockset")
client.Documents.add_documents(
collection=collection,
data=docs[from_index:to_index],
workspace=workspace,
)
index += BATCH_SIZE
print("Done!")
def upload_to_s3(
bucket_name: str,
key: str,
docs: List[Dict[str, Any]],
) -> None:
print(f"Writing {len(docs)} documents to S3")
body = io.StringIO()
for doc in docs:
json.dump(doc, body)
body.write("\n")
S3_RESOURCE.Object(
f"{bucket_name}",
f"{key}",
).put(
Body=gzip.compress(body.getvalue().encode()),
ContentEncoding="gzip",
ContentType="application/json",
)
print("Done!")
def read_from_s3(
bucket_name: str,
key: str,
) -> List[Dict[str, Any]]:
print(f"Reading from s3://{bucket_name}/{key}")
body = (
S3_RESOURCE.Object(
f"{bucket_name}",
f"{key}",
)
.get()["Body"]
.read()
)
results = gzip.decompress(body).decode().split("\n")
return [json.loads(result) for result in results if result]
def upload_workflow_stats_to_s3(
workflow_run_id: int,
workflow_run_attempt: int,
collection: str,
docs: List[Dict[str, Any]],
) -> None:
bucket_name = "ossci-raw-job-status"
key = f"{collection}/{workflow_run_id}/{workflow_run_attempt}"
upload_to_s3(bucket_name, key, docs)
def upload_file_to_s3(
file_name: str,
bucket: str,
key: str,
) -> None:
"""
Upload a local file to S3
"""
print(f"Upload {file_name} to s3://{bucket}/{key}")
boto3.client("s3").upload_file(
file_name,
bucket,
key,
)
def unzip(p: Path) -> None:
"""Unzip the provided zipfile to a similarly-named directory.
Returns None if `p` is not a zipfile.
Looks like: /tmp/test-reports.zip -> /tmp/unzipped-test-reports/
"""
assert p.is_file()
unzipped_dir = p.with_name("unzipped-" + p.stem)
print(f"Extracting {p} to {unzipped_dir}")
with zipfile.ZipFile(p, "r") as zip:
zip.extractall(unzipped_dir)
def is_rerun_disabled_tests(tests: Dict[str, Dict[str, int]]) -> bool:
"""
Check if the test report is coming from rerun_disabled_tests workflow where
each test is run multiple times
"""
return all(
t.get("num_green", 0) + t.get("num_red", 0) > MAX_RETRY_IN_NON_DISABLED_MODE
for t in tests.values()
)