blob: 126d78233d924b68ad6a391ccba67032390e73c4 [file] [log] [blame]
import datetime
import gzip
import inspect
import io
import json
import os
import time
import uuid
import zipfile
from decimal import Decimal
from pathlib import Path
from typing import Any, Dict, List
from warnings import warn
import boto3 # type: ignore[import]
import requests
import rockset # type: ignore[import]
PYTORCH_REPO = "https://api.github.com/repos/pytorch/pytorch"
S3_RESOURCE = boto3.resource("s3")
# NB: In CI, a flaky test is usually retried 3 times, then the test file would be rerun
# 2 more times
MAX_RETRY_IN_NON_DISABLED_MODE = 3 * 3
def _get_request_headers() -> Dict[str, str]:
return {
"Accept": "application/vnd.github.v3+json",
"Authorization": "token " + os.environ["GITHUB_TOKEN"],
}
def _get_artifact_urls(prefix: str, workflow_run_id: int) -> Dict[Path, str]:
"""Get all workflow artifacts with 'test-report' in the name."""
response = requests.get(
f"{PYTORCH_REPO}/actions/runs/{workflow_run_id}/artifacts?per_page=100",
)
artifacts = response.json()["artifacts"]
while "next" in response.links.keys():
response = requests.get(
response.links["next"]["url"], headers=_get_request_headers()
)
artifacts.extend(response.json()["artifacts"])
artifact_urls = {}
for artifact in artifacts:
if artifact["name"].startswith(prefix):
artifact_urls[Path(artifact["name"])] = artifact["archive_download_url"]
return artifact_urls
def _download_artifact(
artifact_name: Path, artifact_url: str, workflow_run_attempt: int
) -> Path:
# [Artifact run attempt]
# All artifacts on a workflow share a single namespace. However, we can
# re-run a workflow and produce a new set of artifacts. To avoid name
# collisions, we add `-runattempt1<run #>-` somewhere in the artifact name.
#
# This code parses out the run attempt number from the artifact name. If it
# doesn't match the one specified on the command line, skip it.
atoms = str(artifact_name).split("-")
for atom in atoms:
if atom.startswith("runattempt"):
found_run_attempt = int(atom[len("runattempt") :])
if workflow_run_attempt != found_run_attempt:
print(
f"Skipping {artifact_name} as it is an invalid run attempt. "
f"Expected {workflow_run_attempt}, found {found_run_attempt}."
)
print(f"Downloading {artifact_name}")
response = requests.get(artifact_url, headers=_get_request_headers())
with open(artifact_name, "wb") as f:
f.write(response.content)
return artifact_name
def download_s3_artifacts(
prefix: str, workflow_run_id: int, workflow_run_attempt: int
) -> List[Path]:
bucket = S3_RESOURCE.Bucket("gha-artifacts")
objs = bucket.objects.filter(
Prefix=f"pytorch/pytorch/{workflow_run_id}/{workflow_run_attempt}/artifact/{prefix}"
)
found_one = False
paths = []
for obj in objs:
found_one = True
p = Path(Path(obj.key).name)
print(f"Downloading {p}")
with open(p, "wb") as f:
f.write(obj.get()["Body"].read())
paths.append(p)
if not found_one:
print(
"::warning title=s3 artifacts not found::"
"Didn't find any test reports in s3, there might be a bug!"
)
return paths
def download_gha_artifacts(
prefix: str, workflow_run_id: int, workflow_run_attempt: int
) -> List[Path]:
artifact_urls = _get_artifact_urls(prefix, workflow_run_id)
paths = []
for name, url in artifact_urls.items():
paths.append(_download_artifact(Path(name), url, workflow_run_attempt))
return paths
def upload_to_rockset(
collection: str, docs: List[Any], workspace: str = "commons"
) -> None:
print(f"Writing {len(docs)} documents to Rockset")
client = rockset.RocksetClient(
host="api.usw2a1.rockset.com", api_key=os.environ["ROCKSET_API_KEY"]
)
client.Documents.add_documents(
collection=collection,
data=docs,
workspace=workspace,
)
print("Done!")
def upload_to_s3(
bucket_name: str,
key: str,
docs: List[Dict[str, Any]],
) -> None:
print(f"Writing {len(docs)} documents to S3")
body = io.StringIO()
for doc in docs:
json.dump(doc, body)
body.write("\n")
S3_RESOURCE.Object(
f"{bucket_name}",
f"{key}",
).put(
Body=gzip.compress(body.getvalue().encode()),
ContentEncoding="gzip",
ContentType="application/json",
)
print("Done!")
def read_from_s3(
bucket_name: str,
key: str,
) -> List[Dict[str, Any]]:
print(f"Reading from s3://{bucket_name}/{key}")
body = (
S3_RESOURCE.Object(
f"{bucket_name}",
f"{key}",
)
.get()["Body"]
.read()
)
results = gzip.decompress(body).decode().split("\n")
return [json.loads(result) for result in results if result]
def upload_workflow_stats_to_s3(
workflow_run_id: int,
workflow_run_attempt: int,
collection: str,
docs: List[Dict[str, Any]],
) -> None:
bucket_name = "ossci-raw-job-status"
key = f"{collection}/{workflow_run_id}/{workflow_run_attempt}"
upload_to_s3(bucket_name, key, docs)
def upload_file_to_s3(
file_name: str,
bucket: str,
key: str,
) -> None:
"""
Upload a local file to S3
"""
print(f"Upload {file_name} to s3://{bucket}/{key}")
boto3.client("s3").upload_file(
file_name,
bucket,
key,
)
def unzip(p: Path) -> None:
"""Unzip the provided zipfile to a similarly-named directory.
Returns None if `p` is not a zipfile.
Looks like: /tmp/test-reports.zip -> /tmp/unzipped-test-reports/
"""
assert p.is_file()
unzipped_dir = p.with_name("unzipped-" + p.stem)
print(f"Extracting {p} to {unzipped_dir}")
with zipfile.ZipFile(p, "r") as zip:
zip.extractall(unzipped_dir)
def is_rerun_disabled_tests(tests: Dict[str, Dict[str, int]]) -> bool:
"""
Check if the test report is coming from rerun_disabled_tests workflow where
each test is run multiple times
"""
return all(
t.get("num_green", 0) + t.get("num_red", 0) > MAX_RETRY_IN_NON_DISABLED_MODE
for t in tests.values()
)
def _convert_float_values_to_decimals(data: Dict[str, Any]) -> Dict[str, Any]:
return {k: Decimal(str(v)) if isinstance(v, float) else v for k, v in data.items()}
class EnvVarMetric:
name: str
env_var: str
required: bool = True
# Used to cast the value of the env_var to the correct type (defaults to str)
type_conversion_fn: Any = None
def __init__(
self,
name: str,
env_var: str,
required: bool = True,
type_conversion_fn: Any = None,
) -> None:
self.name = name
self.env_var = env_var
self.required = required
self.type_conversion_fn = type_conversion_fn
def value(self) -> Any:
value = os.environ.get(self.env_var)
if value is None and self.required:
raise ValueError(
f"Missing {self.name}. Please set the {self.env_var}"
"environment variable to pass in this value."
)
if self.type_conversion_fn:
return self.type_conversion_fn(value)
return value
def emit_metric(
metric_name: str,
metrics: Dict[str, Any],
) -> None:
"""
Upload a metric to DynamoDB (and from there, Rockset).
Parameters:
metric_name:
Name of the metric. Every unique metric should have a different name
and be emitted just once per run attempt.
Metrics are namespaced by their module and the function that emitted them.
metrics: The actual data to record.
Some default values are populated from environment variables, which must be set
for metrics to be emitted. (If they're not set, this function becomes a noop):
"""
if metrics is None:
raise ValueError("You didn't ask to upload any metrics!")
# We use these env vars that to determine basic info about the workflow run.
# By using env vars, we don't have to pass this info around to every function.
# It also helps ensure that we only emit metrics during CI
env_var_metrics = [
EnvVarMetric("repo", "GITHUB_REPOSITORY"),
EnvVarMetric("workflow", "GITHUB_WORKFLOW"),
EnvVarMetric("build_environment", "BUILD_ENVIRONMENT"),
EnvVarMetric("job", "GITHUB_JOB"),
EnvVarMetric("test_config", "TEST_CONFIG", required=False),
EnvVarMetric("run_id", "GITHUB_RUN_ID", type_conversion_fn=int),
EnvVarMetric("run_number", "GITHUB_RUN_NUMBER", type_conversion_fn=int),
EnvVarMetric("run_attempt", "GITHUB_RUN_ATTEMPT", type_conversion_fn=int),
]
# Use info about the function that invoked this one as a namespace and a way to filter metrics.
calling_frame = inspect.currentframe().f_back # type: ignore[union-attr]
calling_frame_info = inspect.getframeinfo(calling_frame) # type: ignore[arg-type]
calling_file = os.path.basename(calling_frame_info.filename)
calling_module = inspect.getmodule(calling_frame).__name__ # type: ignore[union-attr]
calling_function = calling_frame_info.function
try:
reserved_metrics = {
"metric_name": metric_name,
"calling_file": calling_file,
"calling_module": calling_module,
"calling_function": calling_function,
"timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"),
**{m.name: m.value() for m in env_var_metrics},
}
except ValueError as e:
warn(f"Not emitting metrics. {e}")
return
# Prefix key with metric name and timestamp to derisk chance of a uuid1 name collision
reserved_metrics[
"dynamo_key"
] = f"{metric_name}_{int(time.time())}_{uuid.uuid1().hex}"
# Ensure the metrics dict doesn't contain any reserved keys
for key in reserved_metrics.keys():
used_reserved_keys = [k for k in metrics.keys() if k == key]
if used_reserved_keys:
raise ValueError(f"Metrics dict contains reserved keys: [{', '.join(key)}]")
# boto3 doesn't support uploading float values to DynamoDB, so convert them all to decimals.
metrics = _convert_float_values_to_decimals(metrics)
try:
session = boto3.Session(region_name="us-east-1")
session.resource("dynamodb").Table("torchci-metrics").put_item(
Item={
**reserved_metrics,
**metrics,
}
)
except Exception as e:
# We don't want to fail the job if we can't upload the metric.
# We still raise the ValueErrors outside this try block since those indicate improperly configured metrics
warn(f"Error uploading metric to DynamoDB: {e}")
return