| #!/usr/bin/env python3 |
| # Copyright 2022 The ChromiumOS Authors |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """ |
| Provides helpers for writing shell-like scripts in Python. |
| |
| It provides tools to execute commands with similar flexibility to shell scripts and simplifies |
| command line arguments using `argh` and provides common flags (e.g. -v and -vv) for all of |
| our command line tools. |
| |
| Refer to the scripts in ./tools for example usage. |
| """ |
| |
| # Import preamble before anything else |
| from . import preamble # type: ignore |
| |
| import argparse |
| import contextlib |
| import datetime |
| import functools |
| import getpass |
| import json |
| import os |
| import re |
| import shlex |
| import shutil |
| import subprocess |
| import sys |
| import traceback |
| import urllib |
| import urllib.request |
| import urllib.error |
| from copy import deepcopy |
| from math import ceil |
| from multiprocessing.pool import ThreadPool |
| from pathlib import Path |
| from subprocess import DEVNULL, PIPE, STDOUT # type: ignore |
| from tempfile import gettempdir |
| from typing import ( |
| Any, |
| Callable, |
| Dict, |
| Iterable, |
| List, |
| NamedTuple, |
| Optional, |
| Tuple, |
| TypeVar, |
| Union, |
| cast, |
| ) |
| |
| import argh # type: ignore |
| import rich |
| import rich.console |
| import rich.live |
| import rich.spinner |
| import rich.text |
| |
| # Hack: argh does not support type annotations. This prevents type errors. |
| argh: Any # type: ignore |
| |
| # File where to store http headers for gcloud authentication |
| AUTH_HEADERS_FILE = Path(gettempdir()) / f"crosvm_gcloud_auth_headers_{getpass.getuser()}" |
| |
| PathLike = Union[Path, str] |
| |
| |
| def find_crosvm_root(): |
| "Walk up from CWD until we find the crosvm root dir." |
| path = Path("").resolve() |
| while True: |
| if (path / "tools/impl/common.py").is_file(): |
| return path |
| if path.parent: |
| path = path.parent |
| else: |
| raise Exception("Cannot find crosvm root dir.") |
| |
| |
| "Root directory of crosvm derived from CWD." |
| CROSVM_ROOT = find_crosvm_root() |
| |
| "Cargo.toml file of crosvm" |
| CROSVM_TOML = CROSVM_ROOT / "Cargo.toml" |
| |
| """ |
| Root directory of crosvm devtools. |
| |
| May be different from `CROSVM_ROOT/tools`, which is allows you to run the crosvm dev |
| tools from this directory on another crosvm repo. |
| |
| Use this if you want to call crosvm dev tools, which will use the scripts relative |
| to this file. |
| """ |
| TOOLS_ROOT = Path(__file__).parent.parent.resolve() |
| |
| "Cache directory that is preserved between builds in CI." |
| CACHE_DIR = Path(os.environ.get("CROSVM_CACHE_DIR", os.environ.get("TMPDIR", "/tmp"))) |
| |
| "Url of crosvm's gerrit review host" |
| GERRIT_URL = "https://chromium-review.googlesource.com" |
| |
| # Ensure that we really found the crosvm root directory |
| assert 'name = "crosvm"' in CROSVM_TOML.read_text() |
| |
| # List of times recorded by `record_time` which will be printed if --timing-info is provided. |
| global_time_records: List[Tuple[str, datetime.timedelta]] = [] |
| |
| # Regex that matches ANSI escape sequences |
| ANSI_ESCAPE = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") |
| |
| |
| def crosvm_target_dir(): |
| crosvm_target = os.environ.get("CROSVM_TARGET_DIR") |
| cargo_target = os.environ.get("CARGO_TARGET_DIR") |
| if crosvm_target: |
| return Path(crosvm_target) |
| elif cargo_target: |
| return Path(cargo_target) / "crosvm" |
| else: |
| return CROSVM_ROOT / "target/crosvm" |
| |
| |
| class CommandResult(NamedTuple): |
| """Results of a command execution as returned by Command.run()""" |
| |
| stdout: str |
| stderr: str |
| returncode: int |
| |
| |
| class Command(object): |
| """ |
| Simplified subprocess handling for shell-like scripts. |
| |
| ## Example Usage |
| |
| To run a program on behalf of the user: |
| |
| >> cmd("cargo build").fg() |
| |
| This will run the program with stdio passed to the user. Developer tools usually run a set of |
| actions on behalf of the user. These should be executed with fg(). |
| |
| To make calls in the background to gather information use success/stdout/lines: |
| |
| >> cmd("git branch").lines() |
| >> cmd("git rev-parse foo").success() |
| |
| These will capture all program output. Try to avoid using these to run mutating commands, |
| as they will remain hidden to the user even when using --verbose. |
| |
| ## Arguments |
| |
| Arguments are provided as a list similar to subprocess.run(): |
| |
| >>> Command('cargo', 'build', '--workspace') |
| Command('cargo', 'build', '--workspace') |
| |
| In contrast to subprocess.run, all strings are split by whitespaces similar to bash: |
| |
| >>> Command('cargo build --workspace', '--features foo') |
| Command('cargo', 'build', '--workspace', '--features', 'foo') |
| |
| In contrast to bash, globs are *not* evaluated, but can easily be provided using Path: |
| |
| >>> Command('ls -l', *Path(CROSVM_ROOT).glob('*.toml')) |
| Command('ls', '-l', ...) |
| |
| None or False are ignored to make it easy to include conditional arguments: |
| |
| >>> all = False |
| >>> Command('cargo build', '--workspace' if all else None) |
| Command('cargo', 'build') |
| |
| ## Nesting |
| |
| Commands can be nested, similar to $() subshells in bash. The sub-commands will be executed |
| right away and their output will undergo the usual splitting: |
| |
| >>> Command('printf "(%s)"', Command('echo foo bar')).stdout() |
| '(foo)(bar)' |
| |
| Arguments can be explicitly quoted to prevent splitting, it applies to both sub-commands |
| as well as strings: |
| |
| >>> Command('printf "(%s)"', quoted(Command('echo foo bar'))).stdout() |
| '(foo bar)' |
| |
| Commands can also be piped into one another: |
| |
| >>> wc = Command('wc') |
| >>> Command('echo "abcd"').pipe(wc('-c')).stdout() |
| '5' |
| |
| ## Verbosity |
| |
| The --verbose flag is intended for users and will show all command lines executed in the |
| foreground with fg(), it'll also include output of programs run with fg(quiet=True). Commands |
| executed in the background are not shown. |
| |
| For script developers, the --very-verbose flag will print full details and output of all |
| executed command lines, including those run hidden from the user. |
| """ |
| |
| def __init__( |
| self, |
| *args: Any, |
| stdin_cmd: Optional["Command"] = None, |
| env_vars: Dict[str, str] = {}, |
| cwd: Optional[Path] = None, |
| ): |
| self.args = Command.__parse_cmd(args) |
| self.stdin_cmd = stdin_cmd |
| self.env_vars = env_vars |
| self.cwd = cwd |
| |
| ### Builder API to construct commands |
| |
| def with_args(self, *args: Any): |
| """Returns a new Command with added arguments. |
| |
| >>> cargo = Command('cargo') |
| >>> cargo.with_args('clippy') |
| Command('cargo', 'clippy') |
| """ |
| cmd = deepcopy(self) |
| cmd.args = [*self.args, *Command.__parse_cmd(args)] |
| return cmd |
| |
| def with_cwd(self, cwd: Optional[Path]): |
| """Changes the working directory the command is executed in. |
| |
| >>> cargo = Command('pwd') |
| >>> cargo.with_cwd('/tmp').stdout() |
| '/tmp' |
| """ |
| cmd = deepcopy(self) |
| cmd.cwd = cwd |
| return cmd |
| |
| def __call__(self, *args: Any): |
| """Shorthand for Command.with_args""" |
| return self.with_args(*args) |
| |
| def with_env(self, key: str, value: Optional[str]): |
| """ |
| Returns a command with an added env variable. |
| |
| The variable is removed if value is None. |
| """ |
| return self.with_envs({key: value}) |
| |
| def with_envs(self, envs: Union[Dict[str, Optional[str]], Dict[str, str]]): |
| """ |
| Returns a command with an added env variable. |
| |
| The variable is removed if value is None. |
| """ |
| cmd = deepcopy(self) |
| for key, value in envs.items(): |
| if value is not None: |
| cmd.env_vars[key] = value |
| else: |
| if key in cmd.env_vars: |
| del cmd.env_vars[key] |
| return cmd |
| |
| def with_path_env(self, new_path: str): |
| """Returns a command with a path added to the PATH variable.""" |
| path_var = self.env_vars.get("PATH", os.environ.get("PATH", "")) |
| return self.with_env("PATH", f"{path_var}:{new_path}") |
| |
| def with_color_arg( |
| self, |
| always: Optional[str] = None, |
| never: Optional[str] = None, |
| ): |
| """Returns a command with an argument added to pass through enabled/disabled colors.""" |
| new_cmd = self |
| if color_enabled(): |
| if always: |
| new_cmd = new_cmd(always) |
| else: |
| if never: |
| new_cmd = new_cmd(never) |
| return new_cmd |
| |
| def with_color_env(self, var_name: str): |
| """Returns a command with an env var added to pass through enabled/disabled colors.""" |
| return self.with_env(var_name, "1" if color_enabled() else "0") |
| |
| def with_color_flag(self, flag: str = "--color"): |
| """Returns a command with an added --color=always/never/auto flag.""" |
| return self.with_color_arg(always=f"{flag}=always", never=f"{flag}=never") |
| |
| def foreach(self, arguments: Iterable[Any], batch_size: int = 1): |
| """ |
| Yields a new command for each entry in `arguments`. |
| |
| The argument is appended to each command and is intended to be used in |
| conjunction with `parallel()` to execute a command on a list of arguments in |
| parallel. |
| |
| >>> parallel(*cmd('echo').foreach((1, 2, 3))).stdout() |
| ['1', '2', '3'] |
| |
| Arguments can also be batched by setting batch_size > 1, which will append multiple |
| arguments to each command. |
| |
| >>> parallel(*cmd('echo').foreach((1, 2, 3), batch_size=2)).stdout() |
| ['1 2', '3'] |
| |
| """ |
| for batch in batched(arguments, batch_size): |
| yield self(*batch) |
| |
| def pipe(self, *args: Any): |
| """ |
| Pipes the output of this command into another process. |
| |
| The target can either be another Command or the argument list to build a new command. |
| """ |
| if len(args) == 1 and isinstance(args[0], Command): |
| cmd = Command(stdin_cmd=self) |
| cmd.args = args[0].args |
| cmd.env_vars = self.env_vars.copy() |
| return cmd |
| else: |
| return Command(*args, stdin_cmd=self, env_vars=self.env_vars) |
| |
| ### Executing programs in the foreground |
| |
| def run_foreground( |
| self, |
| quiet: bool = False, |
| check: bool = True, |
| dry_run: bool = False, |
| style: Optional[Callable[["subprocess.Popen[str]"], None]] = None, |
| ): |
| """ |
| Runs a program in the foreground with output streamed to the user. |
| |
| >>> Command('true').fg() |
| 0 |
| |
| Non-zero exit codes will trigger an Exception |
| |
| >>> Command('false').fg() |
| Traceback (most recent call last): |
| ... |
| subprocess.CalledProcessError... |
| |
| But can be disabled: |
| |
| >>> Command('false').fg(check=False) |
| 1 |
| |
| Output can be hidden by setting quiet=True: |
| |
| >>> Command("echo foo").fg(quiet=True) |
| 0 |
| |
| This will hide the programs stdout and stderr unless the program fails. |
| |
| More sophisticated means of outputting stdout/err are available via `Styles`: |
| |
| >>> Command("echo foo").fg(style=Styles.live_truncated()) |
| … |
| foo |
| 0 |
| |
| Will output the results of the command but truncate output after a few lines. See `Styles` |
| for more options. |
| |
| Arguments: |
| quiet: Do not show stdout/stderr unless the program failed. |
| check: Raise an exception if the program returned an error code. |
| style: A function to present the output of the program. See `Styles` |
| |
| Returns: The return code of the program. |
| """ |
| if dry_run: |
| print(f"Not running: {self}") |
| return 0 |
| |
| if quiet: |
| style = Styles.quiet |
| |
| if verbose(): |
| print(f"$ {self}") |
| |
| if style is None or verbose(): |
| return self.__run(stdout=None, stderr=None, check=check).returncode |
| else: |
| process = self.popen(stdout=PIPE, stderr=STDOUT) |
| style(process) |
| returncode = process.wait() |
| if returncode != 0 and check: |
| assert process.stdout |
| raise subprocess.CalledProcessError(returncode, process.args) |
| return returncode |
| |
| def fg( |
| self, |
| quiet: bool = False, |
| check: bool = True, |
| dry_run: bool = False, |
| style: Optional[Callable[["subprocess.Popen[str]"], None]] = None, |
| ): |
| """ |
| Shorthand for Command.run_foreground() |
| """ |
| return self.run_foreground(quiet, check, dry_run, style) |
| |
| def write_to(self, filename: Path): |
| """ |
| Writes stdout to the provided file. |
| """ |
| if verbose(): |
| print(f"$ {self} > {filename}") |
| with open(filename, "w") as file: |
| file.write(self.__run(stdout=PIPE, stderr=PIPE).stdout) |
| |
| def append_to(self, filename: Path): |
| """ |
| Appends stdout to the provided file. |
| """ |
| if verbose(): |
| print(f"$ {self} >> {filename}") |
| with open(filename, "a") as file: |
| file.write(self.__run(stdout=PIPE, stderr=PIPE).stdout) |
| |
| ### API for executing commands hidden from the user |
| |
| def success(self): |
| """ |
| Returns True if the program succeeded (i.e. returned 0). |
| |
| The program will not be visible to the user unless --very-verbose is specified. |
| """ |
| if very_verbose(): |
| print(f"$ {self}") |
| return self.__run(stdout=PIPE, stderr=PIPE, check=False).returncode == 0 |
| |
| def stdout(self, check: bool = True, stderr: int = PIPE): |
| """ |
| Runs a program and returns stdout. |
| |
| The program will not be visible to the user unless --very-verbose is specified. |
| """ |
| if very_verbose(): |
| print(f"$ {self}") |
| return self.__run(stdout=PIPE, stderr=stderr, check=check).stdout.strip() |
| |
| def json(self, check: bool = True) -> Any: |
| """ |
| Runs a program and returns stdout parsed as json. |
| |
| The program will not be visible to the user unless --very-verbose is specified. |
| """ |
| stdout = self.stdout(check=check) |
| if stdout: |
| return json.loads(stdout) |
| else: |
| return None |
| |
| def lines(self, check: bool = True, stderr: int = PIPE): |
| """ |
| Runs a program and returns stdout line by line. |
| |
| The program will not be visible to the user unless --very-verbose is specified. |
| """ |
| return self.stdout(check=check, stderr=stderr).splitlines() |
| |
| ### Utilities |
| |
| def __str__(self): |
| stdin = "" |
| if self.stdin_cmd: |
| stdin = str(self.stdin_cmd) + " | " |
| return stdin + shlex.join(self.args) |
| |
| def __repr__(self): |
| stdin = "" |
| if self.stdin_cmd: |
| stdin = ", stdin_cmd=" + repr(self.stdin_cmd) |
| return f"Command({', '.join(repr(a) for a in self.args)}{stdin})" |
| |
| ### Private implementation details |
| |
| def __run( |
| self, |
| stdout: Optional[int], |
| stderr: Optional[int], |
| check: bool = True, |
| ) -> CommandResult: |
| "Run this command in subprocess.run()" |
| if very_verbose(): |
| print(f"cwd: {Path().resolve()}") |
| for k, v in self.env_vars.items(): |
| print(f"env: {k}={v}") |
| result = subprocess.run( |
| self.args, |
| cwd=self.cwd, |
| stdout=stdout, |
| stderr=stderr, |
| stdin=self.__stdin_stream(), |
| env={**os.environ, **self.env_vars}, |
| check=check, |
| text=True, |
| ) |
| if very_verbose(): |
| if result.stdout: |
| for line in result.stdout.splitlines(): |
| print("stdout:", line) |
| if result.stderr: |
| for line in result.stderr.splitlines(): |
| print("stderr:", line) |
| print("returncode:", result.returncode) |
| if check and result.returncode != 0: |
| raise subprocess.CalledProcessError(result.returncode, str(self), result.stdout) |
| return CommandResult(result.stdout, result.stderr, result.returncode) |
| |
| def __stdin_stream(self): |
| if self.stdin_cmd: |
| return self.stdin_cmd.popen(stdout=PIPE, stderr=PIPE).stdout |
| return None |
| |
| def popen(self, **kwargs: Any) -> "subprocess.Popen[str]": |
| """ |
| Runs a program and returns the Popen object of the running process. |
| """ |
| return subprocess.Popen( |
| self.args, |
| cwd=self.cwd, |
| stdin=self.__stdin_stream(), |
| env={**os.environ, **self.env_vars}, |
| text=True, |
| **kwargs, |
| ) |
| |
| @staticmethod |
| def __parse_cmd(args: Iterable[Any]) -> List[str]: |
| """Parses command line arguments for Command.""" |
| res = [parsed for arg in args for parsed in Command.__parse_cmd_args(arg)] |
| return res |
| |
| @staticmethod |
| def __parse_cmd_args(arg: Any) -> List[str]: |
| """Parses a mixed type command line argument into a list of strings.""" |
| if isinstance(arg, Path): |
| return [str(arg)] |
| elif isinstance(arg, QuotedString): |
| return [arg.value] |
| elif isinstance(arg, Command): |
| return [*shlex.split(arg.stdout())] |
| elif arg is None or arg is False: |
| return [] |
| else: |
| return [*shlex.split(str(arg))] |
| |
| |
| class Styles(object): |
| "A collection of methods that can be passed to `Command.fg(style=)`" |
| |
| @staticmethod |
| def quiet(process: "subprocess.Popen[str]"): |
| "Won't print anything unless the command failed." |
| assert process.stdout |
| stdout = process.stdout.read() |
| if process.wait() != 0: |
| print(stdout, end="") |
| |
| @staticmethod |
| def live_truncated(num_lines: int = 8): |
| "Prints only the last `num_lines` of output while the program is running and successful." |
| |
| def output(process: "subprocess.Popen[str]"): |
| assert process.stdout |
| spinner = rich.spinner.Spinner("dots") |
| lines: List[rich.text.Text] = [] |
| stdout: List[str] = [] |
| with rich.live.Live(refresh_per_second=30, transient=True) as live: |
| for line in iter(process.stdout.readline, ""): |
| stdout.append(line.strip()) |
| lines.append(rich.text.Text.from_ansi(line.strip(), no_wrap=True)) |
| while len(lines) > num_lines: |
| lines.pop(0) |
| live.update(rich.console.Group(rich.text.Text("…"), *lines, spinner)) |
| if process.wait() == 0: |
| console.print(rich.console.Group(rich.text.Text("…"), *lines)) |
| else: |
| for line in stdout: |
| print(line) |
| |
| return output |
| |
| @staticmethod |
| def quiet_with_progress(title: str): |
| "Prints only the last `num_lines` of output while the program is running and successful." |
| |
| def output(process: "subprocess.Popen[str]"): |
| assert process.stdout |
| with rich.live.Live( |
| rich.spinner.Spinner("dots", title), refresh_per_second=30, transient=True |
| ): |
| stdout = process.stdout.read() |
| |
| if process.wait() == 0: |
| console.print(f"[green]OK[/green] {title}") |
| else: |
| print(stdout) |
| console.print(f"[red]ERR[/red] {title}") |
| |
| return output |
| |
| |
| class ParallelCommands(object): |
| """ |
| Allows commands to be run in parallel. |
| |
| >>> parallel(cmd('true'), cmd('false')).fg(check=False) |
| [0, 1] |
| |
| >>> parallel(cmd('echo a'), cmd('echo b')).stdout() |
| ['a', 'b'] |
| """ |
| |
| def __init__(self, *commands: Command): |
| self.commands = commands |
| |
| def fg(self, quiet: bool = False, check: bool = True): |
| with ThreadPool(1 if very_verbose() else os.cpu_count()) as pool: |
| return pool.map(lambda command: command.fg(quiet=quiet, check=check), self.commands) |
| |
| def stdout(self): |
| with ThreadPool(1 if very_verbose() else os.cpu_count()) as pool: |
| return pool.map(lambda command: command.stdout(), self.commands) |
| |
| def success(self): |
| results = self.fg(check=False, quiet=True) |
| return all(result == 0 for result in results) |
| |
| |
| class Remote(object): |
| """ |
| Wrapper around the cmd() API and allow execution of commands via SSH." |
| """ |
| |
| def __init__(self, host: str, opts: Dict[str, str]): |
| self.host = host |
| ssh_opts = [f"-o{k}={v}" for k, v in opts.items()] |
| self.ssh_cmd = cmd("ssh", host, "-T", *ssh_opts) |
| self.scp_cmd = cmd("scp", *ssh_opts) |
| |
| def ssh(self, cmd: Command, remote_cwd: Optional[Path] = None): |
| # Use huponexit to ensure the process is killed if the connection is lost. |
| # Use shlex to properly quote the command. |
| wrapped_cmd = f"bash -O huponexit -c {shlex.quote(str(cmd))}" |
| if remote_cwd is not None: |
| wrapped_cmd = f"cd {remote_cwd} && {wrapped_cmd}" |
| # The whole command to pass it to SSH for remote execution. |
| return self.ssh_cmd.with_args(quoted(wrapped_cmd)) |
| |
| def scp(self, sources: List[Path], target: str, quiet: bool = False): |
| return self.scp_cmd.with_args(*sources, f"{self.host}:{target}").fg(quiet=quiet) |
| |
| |
| @contextlib.contextmanager |
| def record_time(title: str): |
| """ |
| Records wall-time of how long this context lasts. |
| |
| The results will be printed at the end of script executation if --timing-info is specified. |
| """ |
| start_time = datetime.datetime.now() |
| try: |
| yield |
| finally: |
| global_time_records.append((title, datetime.datetime.now() - start_time)) |
| |
| |
| @contextlib.contextmanager |
| def cwd_context(path: PathLike): |
| """Context for temporarily changing the cwd. |
| |
| >>> with cwd('/tmp'): |
| ... os.getcwd() |
| '/tmp' |
| |
| """ |
| cwd = os.getcwd() |
| try: |
| chdir(path) |
| yield |
| finally: |
| chdir(cwd) |
| |
| |
| def chdir(path: PathLike): |
| if very_verbose(): |
| print("cd", path) |
| os.chdir(path) |
| |
| |
| class QuotedString(object): |
| """ |
| Prevents the provided string from being split. |
| |
| Commands will be executed and their stdout is quoted. |
| """ |
| |
| def __init__(self, value: Any): |
| if isinstance(value, Command): |
| self.value = value.stdout() |
| else: |
| self.value = str(value) |
| |
| def __str__(self): |
| return f'"{self.value}"' |
| |
| |
| T = TypeVar("T") |
| |
| |
| def batched(source: Iterable[T], max_batch_size: int) -> Iterable[List[T]]: |
| """ |
| Returns an iterator over batches of elements from source_list. |
| |
| >>> list(batched([1, 2, 3, 4, 5], 2)) |
| [[1, 2], [3, 4], [5]] |
| """ |
| source_list = list(source) |
| # Calculate batch size that spreads elements evenly across all batches |
| batch_count = ceil(len(source_list) / max_batch_size) |
| batch_size = ceil(len(source_list) / batch_count) |
| for index in range(0, len(source_list), batch_size): |
| yield source_list[index : min(index + batch_size, len(source_list))] |
| |
| |
| # Shorthands |
| quoted = QuotedString |
| cmd = Command |
| cwd = cwd_context |
| parallel = ParallelCommands |
| |
| |
| def run_main(main_fn: Callable[..., Any], usage: Optional[str] = None): |
| run_commands(default_fn=main_fn, usage=usage) |
| |
| |
| def run_commands( |
| *functions: Callable[..., Any], |
| default_fn: Optional[Callable[..., Any]] = None, |
| usage: Optional[str] = None, |
| ): |
| """ |
| Allow the user to call the provided functions with command line arguments translated to |
| function arguments via argh: https://pythonhosted.org/argh |
| """ |
| exit_code = 0 |
| try: |
| parser = argparse.ArgumentParser( |
| description=usage, |
| # Docstrings are used as the description in argparse, preserve their formatting. |
| formatter_class=argparse.RawDescriptionHelpFormatter, |
| # Do not allow implied abbreviations. Abbreviations should be manually specified. |
| allow_abbrev=False, |
| ) |
| add_common_args(parser) |
| |
| # Add provided commands to parser. Do not use sub-commands if we just got one function. |
| if functions: |
| argh.add_commands(parser, functions) # type: ignore |
| if default_fn: |
| argh.set_default_command(parser, default_fn) # type: ignore |
| |
| with record_time("Total Time"): |
| # Call main method |
| argh.dispatch(parser) # type: ignore |
| |
| except Exception as e: |
| if verbose(): |
| traceback.print_exc() |
| else: |
| print(e) |
| exit_code = 1 |
| |
| if parse_common_args().timing_info: |
| print_timing_info() |
| |
| sys.exit(exit_code) |
| |
| |
| def print_timing_info(): |
| console.print() |
| console.print("Timing info:") |
| console.print() |
| for title, delta in global_time_records: |
| console.print(f" {title:20} {delta.total_seconds():.2f}s") |
| |
| |
| @functools.lru_cache(None) |
| def parse_common_args(): |
| """ |
| Parse args common to all scripts |
| |
| These args are parsed separately of the run_main/run_commands method so we can access |
| verbose/etc before the commands arguments are parsed. |
| """ |
| parser = argparse.ArgumentParser(add_help=False) |
| add_common_args(parser) |
| return parser.parse_known_args()[0] |
| |
| |
| def add_common_args(parser: argparse.ArgumentParser): |
| "These args are added to all commands." |
| parser.add_argument( |
| "--color", |
| default="auto", |
| choices=("always", "never", "auto"), |
| help="Force enable or disable colors. Defaults to automatic detection.", |
| ) |
| parser.add_argument( |
| "--verbose", |
| "-v", |
| action="store_true", |
| default=False, |
| help="Print more details about the commands this script is running.", |
| ) |
| parser.add_argument( |
| "--very-verbose", |
| "-vv", |
| action="store_true", |
| default=False, |
| help="Print more debug output", |
| ) |
| parser.add_argument( |
| "--timing-info", |
| action="store_true", |
| default=False, |
| help="Print info on how long which parts of the command take", |
| ) |
| |
| |
| def verbose(): |
| return very_verbose() or parse_common_args().verbose |
| |
| |
| def very_verbose(): |
| return parse_common_args().very_verbose |
| |
| |
| def color_enabled(): |
| color_arg = parse_common_args().color |
| if color_arg == "never": |
| return False |
| if color_arg == "always": |
| return True |
| return sys.stdout.isatty() |
| |
| |
| def all_tracked_files(): |
| for line in cmd("git ls-files").lines(): |
| file = Path(line) |
| if file.is_file(): |
| yield file |
| |
| |
| def find_source_files(extension: str, ignore: List[str] = []): |
| for file in all_tracked_files(): |
| if file.suffix != f".{extension}": |
| continue |
| if file.is_relative_to("third_party"): |
| continue |
| if str(file) in ignore: |
| continue |
| yield file |
| |
| |
| def find_scripts(path: Path, shebang: str): |
| for file in path.glob("*"): |
| if file.is_file() and file.open(errors="ignore").read(512).startswith(f"#!{shebang}"): |
| yield file |
| |
| |
| def confirm(message: str, default: bool = False): |
| print(message, "[y/N]" if default == False else "[Y/n]", end=" ", flush=True) |
| response = sys.stdin.readline().strip() |
| if response in ("y", "Y"): |
| return True |
| if response in ("n", "N"): |
| return False |
| return default |
| |
| |
| def get_cookie_file(): |
| path = cmd("git config http.cookiefile").stdout(check=False) |
| return Path(path) if path else None |
| |
| |
| def get_gcloud_access_token(): |
| if not shutil.which("gcloud"): |
| return None |
| return cmd("gcloud auth print-access-token").stdout(check=False) |
| |
| |
| @functools.lru_cache(maxsize=None) |
| def curl_with_git_auth(): |
| """ |
| Returns a curl `Command` instance set up to use the same HTTP credentials as git. |
| |
| This currently supports two methods: |
| - git cookies (the default) |
| - gcloud |
| |
| Most developers will use git cookies, which are passed to curl. |
| |
| glloud for authorization can be enabled in git via `git config credential.helper gcloud.sh`. |
| If enabled in git, this command will also return a curl command using a gloud access token. |
| """ |
| helper = cmd("git config credential.helper").stdout(check=False) |
| |
| if not helper: |
| cookie_file = get_cookie_file() |
| if not cookie_file or not cookie_file.is_file(): |
| raise Exception("git http cookiefile is not available.") |
| return cmd("curl --cookie", cookie_file) |
| |
| if helper.endswith("gcloud.sh"): |
| token = get_gcloud_access_token() |
| if not token: |
| raise Exception("Cannot get gcloud access token.") |
| # File where to store http headers for gcloud authentication |
| AUTH_HEADERS_FILE = Path(gettempdir()) / f"crosvm_gcloud_auth_headers_{getpass.getuser()}" |
| |
| # Write token to a header file so it will not appear in logs or error messages. |
| AUTH_HEADERS_FILE.write_text(f"Authorization: Bearer {token}") |
| return cmd(f"curl -H @{AUTH_HEADERS_FILE}") |
| |
| raise Exception(f"Unsupported git credentials.helper: {helper}") |
| |
| |
| def strip_xssi(response: str): |
| # See https://gerrit-review.googlesource.com/Documentation/rest-api.html#output |
| assert response.startswith(")]}'\n") |
| return response[5:] |
| |
| |
| def gerrit_api_get(path: str): |
| response = cmd(f"curl --silent --fail {GERRIT_URL}/{path}").stdout() |
| return json.loads(strip_xssi(response)) |
| |
| |
| def gerrit_api_post(path: str, body: Any): |
| response = curl_with_git_auth()( |
| "--silent --fail", |
| "-X POST", |
| "-H", |
| quoted("Content-Type: application/json"), |
| "-d", |
| quoted(json.dumps(body)), |
| f"{GERRIT_URL}/a/{path}", |
| ).stdout() |
| if very_verbose(): |
| print("Response:", response) |
| return json.loads(strip_xssi(response)) |
| |
| |
| class GerritChange(object): |
| """ |
| Class to interact with the gerrit /changes/ API. |
| |
| For information on the data format returned by the API, see: |
| https://gerrit-review.googlesource.com/Documentation/rest-api-changes.html#change-info |
| """ |
| |
| id: str |
| _data: Any |
| |
| def __init__(self, data: Any): |
| self._data = data |
| self.id = data["id"] |
| |
| @functools.cached_property |
| def _details(self) -> Any: |
| return gerrit_api_get(f"changes/{self.id}/detail") |
| |
| @functools.cached_property |
| def _messages(self) -> List[Any]: |
| return gerrit_api_get(f"changes/{self.id}/messages") |
| |
| @property |
| def status(self): |
| return cast(str, self._data["status"]) |
| |
| def get_votes(self, label_name: str) -> List[int]: |
| "Returns the list of votes on `label_name`" |
| label_info = self._details.get("labels", {}).get(label_name) |
| votes = label_info.get("all", []) |
| return [cast(int, v.get("value")) for v in votes] |
| |
| def get_messages_by(self, email: str) -> List[str]: |
| "Returns all messages posted by the user with the specified `email`." |
| return [m["message"] for m in self._messages if m["author"].get("email") == email] |
| |
| def review(self, message: str, labels: Dict[str, int]): |
| "Post review `message` and set the specified review `labels`" |
| print("Posting on", self, ":", message, labels) |
| gerrit_api_post( |
| f"changes/{self.id}/revisions/current/review", |
| {"message": message, "labels": labels}, |
| ) |
| |
| def abandon(self, message: str): |
| print("Abandoning", self, ":", message) |
| gerrit_api_post(f"changes/{self.id}/abandon", {"message": message}) |
| |
| @classmethod |
| def query(cls, *queries: str): |
| "Returns a list of gerrit changes matching the provided list of queries." |
| return [cls(c) for c in gerrit_api_get(f"changes/?q={'+'.join(queries)}")] |
| |
| def short_url(self): |
| return f"http://crrev.com/c/{self._data['_number']}" |
| |
| def __str__(self): |
| return self.short_url() |
| |
| def pretty_info(self): |
| return f"{self} - {self._data['subject']}" |
| |
| |
| def is_cros_repo(): |
| "Returns true if the crosvm repo is a symlink or worktree to a CrOS repo checkout." |
| dot_git = CROSVM_ROOT / ".git" |
| if not dot_git.is_symlink() and dot_git.is_dir(): |
| return False |
| return (cros_repo_root() / ".repo").exists() |
| |
| |
| def cros_repo_root(): |
| "Root directory of the CrOS repo checkout." |
| return (CROSVM_ROOT / "../../..").resolve() |
| |
| |
| def is_kiwi_repo(): |
| "Returns true if the crosvm repo contains .kiwi_repo file." |
| dot_kiwi_repo = CROSVM_ROOT / ".kiwi_repo" |
| return dot_kiwi_repo.exists() |
| |
| |
| def kiwi_repo_root(): |
| "Root directory of the kiwi repo checkout." |
| return (CROSVM_ROOT / "../..").resolve() |
| |
| |
| def sudo_is_passwordless(): |
| # Run with --askpass but no askpass set, succeeds only if passwordless sudo |
| # is available. |
| (ret, _) = subprocess.getstatusoutput("SUDO_ASKPASS=false sudo --askpass true") |
| return ret == 0 |
| |
| |
| SHORTHANDS = { |
| "mingw64": "x86_64-pc-windows-gnu", |
| "msvc64": "x86_64-pc-windows-msvc", |
| "armhf": "armv7-unknown-linux-gnueabihf", |
| "aarch64": "aarch64-unknown-linux-gnu", |
| "x86_64": "x86_64-unknown-linux-gnu", |
| } |
| |
| |
| class Triple(NamedTuple): |
| """ |
| Build triple in cargo format. |
| |
| The format is: <arch><sub>-<vendor>-<sys>-<abi>, However, we will treat <arch><sub> as a single |
| arch to simplify things. |
| """ |
| |
| arch: str |
| vendor: str |
| sys: Optional[str] |
| abi: Optional[str] |
| |
| @classmethod |
| def from_shorthand(cls, shorthand: str): |
| "These shorthands make it easier to specify triples on the command line." |
| if "-" in shorthand: |
| triple = shorthand |
| elif shorthand in SHORTHANDS: |
| triple = SHORTHANDS[shorthand] |
| else: |
| raise Exception(f"Not a valid build triple shorthand: {shorthand}") |
| return cls.from_str(triple) |
| |
| @classmethod |
| def from_str(cls, triple: str): |
| parts = triple.split("-") |
| if len(parts) < 2: |
| raise Exception(f"Unsupported triple {triple}") |
| return cls( |
| parts[0], |
| parts[1], |
| parts[2] if len(parts) > 2 else None, |
| parts[3] if len(parts) > 3 else None, |
| ) |
| |
| @classmethod |
| def from_linux_arch(cls, arch: str): |
| "Rough logic to convert the output of `arch` into a corresponding linux build triple." |
| if arch == "armhf": |
| return cls.from_str("armv7-unknown-linux-gnueabihf") |
| else: |
| return cls.from_str(f"{arch}-unknown-linux-gnu") |
| |
| @classmethod |
| def host_default(cls): |
| "Returns the default build triple of the host." |
| rustc_info = subprocess.check_output(["rustc", "-vV"], text=True) |
| match = re.search(r"host: (\S+)", rustc_info) |
| if not match: |
| raise Exception(f"Cannot parse rustc info: {rustc_info}") |
| return cls.from_str(match.group(1)) |
| |
| @property |
| def feature_flag(self): |
| triple_to_shorthand = {v: k for k, v in SHORTHANDS.items()} |
| shorthand = triple_to_shorthand.get(str(self)) |
| if not shorthand: |
| raise Exception(f"No feature set for triple {self}") |
| return f"all-{shorthand}" |
| |
| @property |
| def target_dir(self): |
| return crosvm_target_dir() / str(self) |
| |
| def get_cargo_env(self): |
| """Environment variables to make cargo use the test target.""" |
| env: Dict[str, str] = {} |
| cargo_target = str(self) |
| env["CARGO_BUILD_TARGET"] = cargo_target |
| env["CARGO_TARGET_DIR"] = str(self.target_dir) |
| env["CROSVM_TARGET_DIR"] = str(crosvm_target_dir()) |
| return env |
| |
| def __str__(self): |
| return f"{self.arch}-{self.vendor}-{self.sys}-{self.abi}" |
| |
| |
| def download_file(url: str, filename: Path, attempts: int = 3): |
| assert attempts > 0 |
| while True: |
| attempts -= 1 |
| try: |
| urllib.request.urlretrieve(url, filename) |
| return |
| except Exception as e: |
| if attempts == 0: |
| raise e |
| else: |
| console.print("Download failed:", e) |
| |
| |
| def strip_ansi_escape_sequences(line: str) -> str: |
| return ANSI_ESCAPE.sub("", line) |
| |
| |
| console = rich.console.Console() |
| |
| if __name__ == "__main__": |
| import doctest |
| |
| (failures, num_tests) = doctest.testmod(optionflags=doctest.ELLIPSIS) |
| sys.exit(1 if failures > 0 else 0) |