| import datetime |
| import inspect |
| import os |
| import time |
| import uuid |
| |
| from decimal import Decimal |
| from typing import Any, Dict |
| from warnings import warn |
| |
| # boto3 is an optional dependency. If it's not installed, |
| # we'll just not emit the metrics. |
| # Keeping this logic here so that callers don't have to |
| # worry about it. |
| EMIT_METRICS = False |
| try: |
| import boto3 # type: ignore[import] |
| |
| EMIT_METRICS = True |
| except ImportError as e: |
| print(f"Unable to import boto3. Will not be emitting metrics.... Reason: {e}") |
| |
| |
| class EnvVarMetric: |
| name: str |
| env_var: str |
| required: bool = True |
| # Used to cast the value of the env_var to the correct type (defaults to str) |
| type_conversion_fn: Any = None |
| |
| def __init__( |
| self, |
| name: str, |
| env_var: str, |
| required: bool = True, |
| type_conversion_fn: Any = None, |
| ) -> None: |
| self.name = name |
| self.env_var = env_var |
| self.required = required |
| self.type_conversion_fn = type_conversion_fn |
| |
| def value(self) -> Any: |
| value = os.environ.get(self.env_var) |
| |
| # Github CI will set some env vars to an empty string |
| DEFAULT_ENVVAR_VALUES = [None, ""] |
| if value in DEFAULT_ENVVAR_VALUES: |
| if not self.required: |
| return None |
| |
| raise ValueError( |
| f"Missing {self.name}. Please set the {self.env_var} " |
| "environment variable to pass in this value." |
| ) |
| |
| if self.type_conversion_fn: |
| return self.type_conversion_fn(value) |
| return value |
| |
| |
| global_metrics: Dict[str, Any] = {} |
| |
| |
| def add_global_metric(metric_name: str, metric_value: Any) -> None: |
| """ |
| Adds stats that should be emitted with every metric by the current process. |
| If the emit_metrics method specifies a metric with the same name, it will |
| overwrite this value. |
| """ |
| global_metrics[metric_name] = metric_value |
| |
| |
| def emit_metric( |
| metric_name: str, |
| metrics: Dict[str, Any], |
| ) -> None: |
| """ |
| Upload a metric to DynamoDB (and from there, Rockset). |
| |
| Even if EMIT_METRICS is set to False, this function will still run the code to |
| validate and shape the metrics, skipping just the upload. |
| |
| Parameters: |
| metric_name: |
| Name of the metric. Every unique metric should have a different name |
| and be emitted just once per run attempt. |
| Metrics are namespaced by their module and the function that emitted them. |
| metrics: The actual data to record. |
| |
| Some default values are populated from environment variables, which must be set |
| for metrics to be emitted. (If they're not set, this function becomes a noop): |
| """ |
| |
| if metrics is None: |
| raise ValueError("You didn't ask to upload any metrics!") |
| |
| # Merge the given metrics with the global metrics, overwriting any duplicates |
| # with the given metrics. |
| metrics = {**global_metrics, **metrics} |
| |
| # We use these env vars that to determine basic info about the workflow run. |
| # By using env vars, we don't have to pass this info around to every function. |
| # It also helps ensure that we only emit metrics during CI |
| env_var_metrics = [ |
| EnvVarMetric("repo", "GITHUB_REPOSITORY"), |
| EnvVarMetric("workflow", "GITHUB_WORKFLOW"), |
| EnvVarMetric("build_environment", "BUILD_ENVIRONMENT", required=False), |
| EnvVarMetric("job", "GITHUB_JOB"), |
| EnvVarMetric("test_config", "TEST_CONFIG", required=False), |
| EnvVarMetric("pr_number", "PR_NUMBER", required=False, type_conversion_fn=int), |
| EnvVarMetric("run_id", "GITHUB_RUN_ID", type_conversion_fn=int), |
| EnvVarMetric("run_number", "GITHUB_RUN_NUMBER", type_conversion_fn=int), |
| EnvVarMetric("run_attempt", "GITHUB_RUN_ATTEMPT", type_conversion_fn=int), |
| EnvVarMetric("job_id", "JOB_ID", type_conversion_fn=int), |
| EnvVarMetric("job_name", "JOB_NAME"), |
| ] |
| |
| # Use info about the function that invoked this one as a namespace and a way to filter metrics. |
| calling_frame = inspect.currentframe().f_back # type: ignore[union-attr] |
| calling_frame_info = inspect.getframeinfo(calling_frame) # type: ignore[arg-type] |
| calling_file = os.path.basename(calling_frame_info.filename) |
| calling_module = inspect.getmodule(calling_frame).__name__ # type: ignore[union-attr] |
| calling_function = calling_frame_info.function |
| |
| try: |
| reserved_metrics = { |
| "metric_name": metric_name, |
| "calling_file": calling_file, |
| "calling_module": calling_module, |
| "calling_function": calling_function, |
| "timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"), |
| **{m.name: m.value() for m in env_var_metrics if m.value()}, |
| } |
| except ValueError as e: |
| warn(f"Not emitting metrics for {metric_name}. {e}") |
| return |
| |
| # Prefix key with metric name and timestamp to derisk chance of a uuid1 name collision |
| reserved_metrics[ |
| "dynamo_key" |
| ] = f"{metric_name}_{int(time.time())}_{uuid.uuid1().hex}" |
| |
| # Ensure the metrics dict doesn't contain any reserved keys |
| for key in reserved_metrics.keys(): |
| used_reserved_keys = [k for k in metrics.keys() if k == key] |
| if used_reserved_keys: |
| raise ValueError(f"Metrics dict contains reserved keys: [{', '.join(key)}]") |
| |
| # boto3 doesn't support uploading float values to DynamoDB, so convert them all to decimals. |
| metrics = _convert_float_values_to_decimals(metrics) |
| |
| if EMIT_METRICS: |
| try: |
| session = boto3.Session(region_name="us-east-1") |
| session.resource("dynamodb").Table("torchci-metrics").put_item( |
| Item={ |
| **reserved_metrics, |
| **metrics, |
| } |
| ) |
| except Exception as e: |
| # We don't want to fail the job if we can't upload the metric. |
| # We still raise the ValueErrors outside this try block since those indicate improperly configured metrics |
| warn(f"Error uploading metric {metric_name} to DynamoDB: {e}") |
| return |
| else: |
| print(f"Not emitting metrics for {metric_name}. Boto wasn't imported.") |
| |
| |
| def _convert_float_values_to_decimals(data: Dict[str, Any]) -> Dict[str, Any]: |
| # Attempt to recurse |
| def _helper(o: Any) -> Any: |
| if isinstance(o, float): |
| return Decimal(str(o)) |
| if isinstance(o, list): |
| return [_helper(v) for v in o] |
| if isinstance(o, dict): |
| return {_helper(k): _helper(v) for k, v in o.items()} |
| if isinstance(o, tuple): |
| return tuple(_helper(v) for v in o) |
| return o |
| |
| return {k: _helper(v) for k, v in data.items()} |