blob: 066e6545d67cc0c2db2ad87f21eacf98a90134ff [file] [log] [blame]
# Copyright 2025 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Wraps a command to run it at most once per week successfully.
This script is intended to be run daily (e.g., via a cron job).
It checks a state file to determine if the wrapped command has already
succeeded within the current week. If so, it exits successfully. Otherwise, it
runs the command.
If the command fails more times than the `--max-flakes` flag specifies, it
exits unsuccessfully. Otherwise, it assumes the failure is a flake, logs a
warning, and exits successfully. As mentioned above, subsequent invocations will
re-execute the command.
For the purposes of this script, week boundaries are defined by the ISO
calendar, and 'now' is determined by the current UNIX timestamp. Moreover, weeks
begin every Monday at 00:00UTC.
"""
import argparse
import dataclasses
import datetime
import json
import logging
from pathlib import Path
import shlex
import subprocess
import time
from typing import Any, Dict, List, Optional, Tuple
@dataclasses.dataclass(frozen=True)
class WeeklyJobState:
"""State for the weekly job runner."""
# Timestamp per `time.time()`
last_success_timestamp: Optional[float] = None
consecutive_failures: int = 0
@classmethod
def from_json(cls, data: Dict[str, Any]) -> "WeeklyJobState":
return cls(
last_success_timestamp=data.get("last_success_timestamp"),
consecutive_failures=data.get("consecutive_failures", 0),
)
def to_json(self) -> Dict[str, Any]:
return dataclasses.asdict(self)
def load_state(state_file_path: Path) -> WeeklyJobState:
"""Loads the state from the JSON file."""
try:
with open(state_file_path, "r", encoding="utf-8") as f:
return WeeklyJobState.from_json(json.load(f))
except FileNotFoundError:
logging.info(
"State file %s not found, using default state.", state_file_path
)
return WeeklyJobState()
def save_state(state_file_path: Path, state: WeeklyJobState) -> None:
"""Saves the state to the JSON file."""
with state_file_path.open("w", encoding="utf-8") as f:
json.dump(state.to_json(), f)
def get_iso_week_info(timestamp: float) -> Tuple[int, int]:
"""Converts a Unix timestamp to (ISO year, ISO week number)."""
# `isocalendar` note: weeks are uniquely identified by (iso_year, iso_week),
# and all weeks are 7 days. The year that an ISO week belongs to is the
# Gregorian year of the Thursday of that week.
#
# Hence, there's no need to be concerned about cases like "1-Jan falls on
# Wednesday, so we see 31-Dec-24 as (iso_year=2024, iso_week=52) and
# 1-Jan-25 as (iso_year=2025, iso_week=1)." 30-Dec-25 through 5-Jan-25 is
# (iso_year=2025, iso_week=1).
date_obj = datetime.date.fromtimestamp(timestamp)
iso_year, iso_week, _ = date_obj.isocalendar()
return iso_year, iso_week
def run_wrapped_command(command_to_run: List[str]) -> int:
"""Runs the wrapped command and returns its exit code."""
logging.info("Running command: %s", shlex.join(command_to_run))
# Do not capture stdout/stderr unless necessary for debugging,
# let the wrapped command print directly.
process = subprocess.run(
command_to_run,
check=False,
stdin=subprocess.DEVNULL,
)
return process.returncode
def parse_args(argv: List[str]) -> argparse.Namespace:
"""Parses command line arguments."""
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"--state-file",
required=True,
type=Path,
help="Path to the JSON file for storing run state.",
)
parser.add_argument(
"--max-flakes",
required=True,
type=int,
help="Max number of consecutive failures before reporting an error.",
)
parser.add_argument(
"--debug", action="store_true", help="Enable debug logging."
)
parser.add_argument(
"command_to_run",
nargs=argparse.REMAINDER,
help="The command to run, optionally preceded by '--'.",
)
opts = parser.parse_args(argv)
# Some commandline utils use `--` to separate remainder args from flags.
# e.g., `./run_weekly_job.py "${flags[@]}" -- ./foo`
# In this case, Python will have `command_to_run == ["--", "./foo"]`;
# massage to `["./foo"]`
if opts.command_to_run and opts.command_to_run[0] == "--":
opts.command_to_run = opts.command_to_run[1:]
if not opts.command_to_run:
parser.error("No wrapped command provided.")
return opts
def main(argv: List[str]) -> int:
"""Main entry point for the script."""
opts = parse_args(argv)
logging.basicConfig(
format=">> %(asctime)s: %(levelname)s: %(filename)s:%(lineno)d: "
"%(message)s",
level=logging.DEBUG if opts.debug else logging.INFO,
)
command_to_run: List[str] = opts.command_to_run
state_file: Path = opts.state_file
state = load_state(state_file)
now = time.time()
if state.last_success_timestamp:
current_week = get_iso_week_info(now)
last_success_week = get_iso_week_info(state.last_success_timestamp)
if current_week == last_success_week:
logging.info("Had success earlier this week; not running command")
return 0
if state.consecutive_failures:
logging.info(
"Had %d consecutive failures before; trying again",
state.consecutive_failures,
)
else:
logging.info("Last success was one or more weeks ago; trying again")
command_success = run_wrapped_command(command_to_run) == 0
if command_success:
logging.info("Command succeeded.")
new_state = WeeklyJobState(
last_success_timestamp=now,
consecutive_failures=0,
)
exit_code = 0
else:
new_state = WeeklyJobState(
last_success_timestamp=state.last_success_timestamp,
consecutive_failures=state.consecutive_failures + 1,
)
if new_state.consecutive_failures >= opts.max_flakes:
logging.warning(
"Command failed, total failures = %d. Bubbling error up.",
new_state.consecutive_failures,
)
exit_code = 1
else:
logging.warning(
"Command failed, total failures = %d, which is below the "
"failure threshold. Exiting cleanly.",
new_state.consecutive_failures,
)
exit_code = 0
save_state(state_file, new_state)
return exit_code