| #!/usr/bin/env python3 |
| |
| import subprocess |
| import sys |
| import os |
| import argparse |
| import yaml |
| import asyncio |
| import shutil |
| import re |
| from typing import List, Dict, Any, Optional |
| |
| |
| REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| |
| class col: |
| HEADER = "\033[95m" |
| BLUE = "\033[94m" |
| GREEN = "\033[92m" |
| YELLOW = "\033[93m" |
| RED = "\033[91m" |
| RESET = "\033[0m" |
| BOLD = "\033[1m" |
| UNDERLINE = "\033[4m" |
| |
| |
| def color(the_color: str, text: str) -> str: |
| return col.BOLD + the_color + str(text) + col.RESET |
| |
| |
| def cprint(the_color: str, text: str) -> None: |
| if hasattr(sys.stdout, "isatty") and sys.stdout.isatty(): |
| print(color(the_color, text)) |
| else: |
| print(text) |
| |
| |
| def git(args: List[str]) -> List[str]: |
| p = subprocess.run( |
| ["git"] + args, |
| cwd=REPO_ROOT, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| check=True, |
| ) |
| lines = p.stdout.decode().strip().split("\n") |
| return [line.strip() for line in lines] |
| |
| |
| def find_changed_files() -> List[str]: |
| untracked = [] |
| |
| for line in git(["status", "--porcelain"]): |
| # Untracked files start with ??, so grab all of those |
| if line.startswith("?? "): |
| untracked.append(line.replace("?? ", "")) |
| |
| # Modified, unstaged |
| modified = git(["diff", "--name-only"]) |
| |
| # Modified, staged |
| cached = git(["diff", "--cached", "--name-only"]) |
| |
| # Committed |
| merge_base = git(["merge-base", "origin/master", "HEAD"])[0] |
| diff_with_origin = git(["diff", "--name-only", merge_base, "HEAD"]) |
| |
| # De-duplicate |
| all_files = set(untracked + cached + modified + diff_with_origin) |
| return [x.strip() for x in all_files if x.strip() != ""] |
| |
| |
| async def run_step(step: Dict[str, Any], job_name: str, files: Optional[List[str]], quiet: bool) -> bool: |
| env = os.environ.copy() |
| env["GITHUB_WORKSPACE"] = "/tmp" |
| if files is None: |
| env["LOCAL_FILES"] = "" |
| else: |
| env["LOCAL_FILES"] = " ".join(files) |
| script = step["run"] |
| |
| PASS = color(col.GREEN, '\N{check mark}') |
| FAIL = color(col.RED, 'x') |
| |
| if quiet: |
| # TODO: Either lint that GHA scripts only use 'set -eux' or make this more |
| # resilient |
| script = script.replace("set -eux", "set -eu") |
| script = re.sub(r"^time ", "", script, flags=re.MULTILINE) |
| name = f'{job_name}: {step["name"]}' |
| |
| def header(passed: bool) -> None: |
| icon = PASS if passed else FAIL |
| print(f"{icon} {color(col.BLUE, name)}") |
| |
| try: |
| proc = await asyncio.create_subprocess_shell( |
| script, |
| shell=True, |
| cwd=REPO_ROOT, |
| env=env, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| executable=shutil.which("bash"), |
| ) |
| |
| stdout_bytes, stderr_bytes = await proc.communicate() |
| |
| header(passed=proc.returncode == 0) |
| except Exception as e: |
| header(passed=False) |
| print(e) |
| return False |
| |
| stdout = stdout_bytes.decode().strip() |
| stderr = stderr_bytes.decode().strip() |
| |
| if stderr != "": |
| print(stderr) |
| if stdout != "": |
| print(stdout) |
| |
| return proc.returncode == 0 |
| |
| |
| async def run_steps(steps: List[Dict[str, Any]], job_name: str, files: Optional[List[str]], quiet: bool) -> None: |
| coros = [run_step(step, job_name, files, quiet) for step in steps] |
| await asyncio.gather(*coros) |
| |
| |
| def grab_specific_steps(steps_to_grab: List[str], job: Dict[str, Any]) -> List[Dict[str, Any]]: |
| relevant_steps = [] |
| for step in steps_to_grab: |
| for actual_step in job["steps"]: |
| if actual_step["name"].lower().strip() == step.lower().strip(): |
| relevant_steps.append(actual_step) |
| break |
| |
| if len(relevant_steps) != len(steps_to_grab): |
| raise RuntimeError("Missing steps") |
| |
| return relevant_steps |
| |
| |
| def grab_all_steps_after(last_step: str, job: Dict[str, Any]) -> List[Dict[str, Any]]: |
| relevant_steps = [] |
| |
| found = False |
| for step in job["steps"]: |
| if found: |
| relevant_steps.append(step) |
| if step["name"].lower().strip() == last_step.lower().strip(): |
| found = True |
| |
| return relevant_steps |
| |
| |
| def main() -> None: |
| parser = argparse.ArgumentParser( |
| description="Pull shell scripts out of GitHub actions and run them" |
| ) |
| parser.add_argument("--file", help="YAML file with actions", required=True) |
| parser.add_argument("--file-filter", help="only pass through files with this extension", default='') |
| parser.add_argument("--changed-only", help="only run on changed files", action='store_true', default=False) |
| parser.add_argument("--job", help="job name", required=True) |
| parser.add_argument("--no-quiet", help="output commands", action='store_true', default=False) |
| parser.add_argument("--step", action="append", help="steps to run (in order)") |
| parser.add_argument( |
| "--all-steps-after", help="include every step after this one (non inclusive)" |
| ) |
| args = parser.parse_args() |
| |
| relevant_files = None |
| quiet = not args.no_quiet |
| |
| if args.changed_only: |
| changed_files: Optional[List[str]] = None |
| try: |
| changed_files = find_changed_files() |
| except Exception: |
| # If the git commands failed for some reason, bail out and use the whole list |
| print( |
| "Could not query git for changed files, falling back to testing all files instead", |
| file=sys.stderr |
| ) |
| if changed_files is not None: |
| relevant_files = [] |
| for f in changed_files: |
| for file_filter in args.file_filter: |
| if f.endswith(file_filter): |
| relevant_files.append(f) |
| break |
| |
| if args.step is None and args.all_steps_after is None: |
| raise RuntimeError("1+ --steps or --all-steps-after must be provided") |
| |
| if args.step is not None and args.all_steps_after is not None: |
| raise RuntimeError("Only one of --step and --all-steps-after can be used") |
| |
| action = yaml.safe_load(open(args.file, "r")) |
| if "jobs" not in action: |
| raise RuntimeError(f"top level key 'jobs' not found in {args.file}") |
| jobs = action["jobs"] |
| |
| if args.job not in jobs: |
| raise RuntimeError(f"job '{args.job}' not found in {args.file}") |
| |
| job = jobs[args.job] |
| |
| if args.step is not None: |
| relevant_steps = grab_specific_steps(args.step, job) |
| else: |
| relevant_steps = grab_all_steps_after(args.all_steps_after, job) |
| |
| if sys.version_info > (3, 7): |
| loop = asyncio.get_event_loop() |
| loop.run_until_complete(run_steps(relevant_steps, args.job, relevant_files, quiet)) |
| else: |
| raise RuntimeError("Only Python >3.7 is supported") |
| |
| |
| if __name__ == "__main__": |
| try: |
| main() |
| except KeyboardInterrupt: |
| pass |