|  | import argparse | 
|  | import concurrent.futures | 
|  | import json | 
|  | import logging | 
|  | import os | 
|  | import re | 
|  | import subprocess | 
|  | import sys | 
|  | import time | 
|  | from enum import Enum | 
|  | from typing import List, NamedTuple, Optional, Pattern | 
|  |  | 
|  |  | 
|  | LINTER_CODE = "ACTIONLINT" | 
|  |  | 
|  |  | 
|  | class LintSeverity(str, Enum): | 
|  | ERROR = "error" | 
|  | WARNING = "warning" | 
|  | ADVICE = "advice" | 
|  | DISABLED = "disabled" | 
|  |  | 
|  |  | 
|  | class LintMessage(NamedTuple): | 
|  | path: Optional[str] | 
|  | line: Optional[int] | 
|  | char: Optional[int] | 
|  | code: str | 
|  | severity: LintSeverity | 
|  | name: str | 
|  | original: Optional[str] | 
|  | replacement: Optional[str] | 
|  | description: Optional[str] | 
|  |  | 
|  |  | 
|  | RESULTS_RE: Pattern[str] = re.compile( | 
|  | r"""(?mx) | 
|  | ^ | 
|  | (?P<file>.*?): | 
|  | (?P<line>\d+): | 
|  | (?P<char>\d+): | 
|  | \s(?P<message>.*) | 
|  | \s(?P<code>\[.*\]) | 
|  | $ | 
|  | """ | 
|  | ) | 
|  |  | 
|  |  | 
|  | def run_command( | 
|  | args: List[str], | 
|  | ) -> "subprocess.CompletedProcess[bytes]": | 
|  | logging.debug("$ %s", " ".join(args)) | 
|  | start_time = time.monotonic() | 
|  | try: | 
|  | return subprocess.run( | 
|  | args, | 
|  | capture_output=True, | 
|  | ) | 
|  | finally: | 
|  | end_time = time.monotonic() | 
|  | logging.debug("took %dms", (end_time - start_time) * 1000) | 
|  |  | 
|  |  | 
|  | def check_file( | 
|  | binary: str, | 
|  | file: str, | 
|  | ) -> List[LintMessage]: | 
|  | try: | 
|  | proc = run_command( | 
|  | [ | 
|  | binary, | 
|  | "-ignore", | 
|  | '"runs-on" section must be sequence node but got mapping node with "!!map" tag', | 
|  | file, | 
|  | ] | 
|  | ) | 
|  | except OSError as err: | 
|  | return [ | 
|  | LintMessage( | 
|  | path=None, | 
|  | line=None, | 
|  | char=None, | 
|  | code=LINTER_CODE, | 
|  | severity=LintSeverity.ERROR, | 
|  | name="command-failed", | 
|  | original=None, | 
|  | replacement=None, | 
|  | description=(f"Failed due to {err.__class__.__name__}:\n{err}"), | 
|  | ) | 
|  | ] | 
|  | stdout = str(proc.stdout, "utf-8").strip() | 
|  | return [ | 
|  | LintMessage( | 
|  | path=match["file"], | 
|  | name=match["code"], | 
|  | description=match["message"], | 
|  | line=int(match["line"]), | 
|  | char=int(match["char"]), | 
|  | code=LINTER_CODE, | 
|  | severity=LintSeverity.ERROR, | 
|  | original=None, | 
|  | replacement=None, | 
|  | ) | 
|  | for match in RESULTS_RE.finditer(stdout) | 
|  | ] | 
|  |  | 
|  |  | 
|  | if __name__ == "__main__": | 
|  | parser = argparse.ArgumentParser( | 
|  | description="actionlint runner", | 
|  | fromfile_prefix_chars="@", | 
|  | ) | 
|  | parser.add_argument( | 
|  | "--binary", | 
|  | required=True, | 
|  | help="actionlint binary path", | 
|  | ) | 
|  | parser.add_argument( | 
|  | "filenames", | 
|  | nargs="+", | 
|  | help="paths to lint", | 
|  | ) | 
|  |  | 
|  | args = parser.parse_args() | 
|  |  | 
|  | if not os.path.exists(args.binary): | 
|  | err_msg = LintMessage( | 
|  | path="<none>", | 
|  | line=None, | 
|  | char=None, | 
|  | code=LINTER_CODE, | 
|  | severity=LintSeverity.ERROR, | 
|  | name="command-failed", | 
|  | original=None, | 
|  | replacement=None, | 
|  | description=( | 
|  | f"Could not find actionlint binary at {args.binary}," | 
|  | " you may need to run `lintrunner init`." | 
|  | ), | 
|  | ) | 
|  | print(json.dumps(err_msg._asdict()), flush=True) | 
|  | sys.exit(0) | 
|  |  | 
|  | with concurrent.futures.ThreadPoolExecutor( | 
|  | max_workers=os.cpu_count(), | 
|  | thread_name_prefix="Thread", | 
|  | ) as executor: | 
|  | futures = { | 
|  | executor.submit( | 
|  | check_file, | 
|  | args.binary, | 
|  | filename, | 
|  | ): filename | 
|  | for filename in args.filenames | 
|  | } | 
|  | for future in concurrent.futures.as_completed(futures): | 
|  | try: | 
|  | for lint_message in future.result(): | 
|  | print(json.dumps(lint_message._asdict()), flush=True) | 
|  | except Exception: | 
|  | logging.critical('Failed at "%s".', futures[future]) | 
|  | raise |