| from __future__ import annotations |
| |
| import argparse |
| import csv |
| import hashlib |
| import json |
| import os |
| import re |
| from pathlib import Path |
| from tempfile import TemporaryDirectory |
| from typing import Any, Dict |
| |
| from tools.stats.upload_stats_lib import ( |
| download_s3_artifacts, |
| unzip, |
| upload_to_dynamodb, |
| upload_to_rockset, |
| ) |
| |
| |
| ARTIFACTS = [ |
| "test-reports", |
| ] |
| ARTIFACT_REGEX = re.compile( |
| r"test-reports-test-(?P<name>[\w\-]+)-\d+-\d+-(?P<runner>[\w\.-]+)_(?P<job>\d+).zip" |
| ) |
| |
| |
| def upload_dynamo_perf_stats_to_rockset( |
| repo: str, |
| workflow_run_id: int, |
| workflow_run_attempt: int, |
| head_branch: str, |
| match_filename: str, |
| ) -> list[dict[str, Any]]: |
| match_filename_regex = re.compile(match_filename) |
| perf_stats = [] |
| with TemporaryDirectory() as temp_dir: |
| print("Using temporary directory:", temp_dir) |
| os.chdir(temp_dir) |
| |
| for artifact in ARTIFACTS: |
| artifact_paths = download_s3_artifacts( |
| artifact, workflow_run_id, workflow_run_attempt |
| ) |
| |
| # Unzip to get perf stats csv files |
| for path in artifact_paths: |
| m = ARTIFACT_REGEX.match(str(path)) |
| if not m: |
| print(f"Test report {path} has an invalid name. Skipping") |
| continue |
| |
| test_name = m.group("name") |
| runner = m.group("runner") |
| job_id = m.group("job") |
| |
| # Extract all files |
| unzip(path) |
| |
| for csv_file in Path(".").glob("**/*.csv"): |
| filename = os.path.splitext(os.path.basename(csv_file))[0] |
| if not re.match(match_filename_regex, filename): |
| continue |
| print(f"Processing {filename} from {path}") |
| |
| with open(csv_file) as csvfile: |
| reader = csv.DictReader(csvfile, delimiter=",") |
| |
| for row in reader: |
| row.update( |
| { |
| "workflow_id": workflow_run_id, # type: ignore[dict-item] |
| "run_attempt": workflow_run_attempt, # type: ignore[dict-item] |
| "test_name": test_name, |
| "runner": runner, |
| "job_id": job_id, |
| "filename": filename, |
| "head_branch": head_branch, |
| } |
| ) |
| perf_stats.append(row) |
| |
| # Done processing the file, removing it |
| os.remove(csv_file) |
| |
| return perf_stats |
| |
| |
| def generate_partition_key(repo: str, doc: Dict[str, Any]) -> str: |
| """ |
| Generate an unique partition key for the document on DynamoDB |
| """ |
| workflow_id = doc["workflow_id"] |
| job_id = doc["job_id"] |
| test_name = doc["test_name"] |
| filename = doc["filename"] |
| |
| hash_content = hashlib.md5(json.dumps(doc).encode("utf-8")).hexdigest() |
| return f"{repo}/{workflow_id}/{job_id}/{test_name}/{filename}/{hash_content}" |
| |
| |
| if __name__ == "__main__": |
| parser = argparse.ArgumentParser( |
| description="Upload dynamo perf stats from S3 to Rockset" |
| ) |
| parser.add_argument( |
| "--workflow-run-id", |
| type=int, |
| required=True, |
| help="id of the workflow to get perf stats from", |
| ) |
| parser.add_argument( |
| "--workflow-run-attempt", |
| type=int, |
| required=True, |
| help="which retry of the workflow this is", |
| ) |
| parser.add_argument( |
| "--repo", |
| type=str, |
| required=True, |
| help="which GitHub repo this workflow run belongs to", |
| ) |
| parser.add_argument( |
| "--head-branch", |
| type=str, |
| required=True, |
| help="head branch of the workflow", |
| ) |
| parser.add_argument( |
| "--rockset-collection", |
| type=str, |
| required=True, |
| help="the name of the Rockset collection to store the stats", |
| ) |
| parser.add_argument( |
| "--rockset-workspace", |
| type=str, |
| default="commons", |
| help="the name of the Rockset workspace to store the stats", |
| ) |
| parser.add_argument( |
| "--dynamodb-table", |
| type=str, |
| required=True, |
| help="the name of the DynamoDB table to store the stats", |
| ) |
| parser.add_argument( |
| "--match-filename", |
| type=str, |
| default="", |
| help="the regex to filter the list of CSV files containing the records to upload", |
| ) |
| args = parser.parse_args() |
| perf_stats = upload_dynamo_perf_stats_to_rockset( |
| args.repo, |
| args.workflow_run_id, |
| args.workflow_run_attempt, |
| args.head_branch, |
| args.match_filename, |
| ) |
| # TODO (huydhn): Write to both Rockset and DynamoDB, an one-off script to copy |
| # data from Rockset to DynamoDB is the next step before uploading to Rockset |
| # can be removed |
| upload_to_rockset( |
| collection=args.rockset_collection, |
| docs=perf_stats, |
| workspace=args.rockset_workspace, |
| ) |
| upload_to_dynamodb( |
| dynamodb_table=args.dynamodb_table, |
| repo=args.repo, |
| docs=perf_stats, |
| generate_partition_key=generate_partition_key, |
| ) |