blob: 9e71333475fc3c47be01624ae864d549bb71e07c [file] [log] [blame]
#!/usr/bin/env python3
"""
A driver script to run clang-tidy on changes detected via git.
By default, clang-tidy runs on all files you point it at. This means that even
if you changed only parts of that file, you will get warnings for the whole
file. This script has the ability to ask git for the exact lines that have
changed since a particular git revision, and makes clang-tidy only lint those.
This makes it much less overhead to integrate in CI and much more relevant to
developers. This git-enabled mode is optional, and full scans of a directory
tree are also possible. In both cases, the script allows filtering files via
glob or regular expressions.
"""
import collections
import fnmatch
import json
import os
import os.path
import re
import shutil
import sys
import asyncio
import shlex
import multiprocessing
from typing import Any, Dict, Iterable, List, Set, Tuple
Patterns = collections.namedtuple("Patterns", "positive, negative")
# NOTE: Clang-tidy cannot lint headers directly, because headers are not
# compiled -- translation units are, of which there is one per implementation
# (c/cc/cpp) file.
DEFAULT_FILE_PATTERN = re.compile(r"^.*\.c(c|pp)?$")
CLANG_WARNING_PATTERN = re.compile(
r"([^:]+):(\d+):\d+:\s+(warning|error):.*\[([^\]]+)\]"
)
# Set from command line arguments in main().
VERBOSE = False
QUIET = False
def log(*args: Any, **kwargs: Any) -> None:
if not QUIET:
print(*args, **kwargs)
class CommandResult:
def __init__(self, returncode: int, stdout: str, stderr: str):
self.returncode = returncode
self.stdout = stdout.strip()
self.stderr = stderr.strip()
def failed(self) -> bool:
return self.returncode != 0
def __add__(self, other: "CommandResult") -> "CommandResult":
return CommandResult(
self.returncode + other.returncode,
f"{self.stdout}\n{other.stdout}",
f"{self.stderr}\n{other.stderr}",
)
def __str__(self) -> str:
return f"{self.stdout}"
def __repr__(self) -> str:
return (
f"returncode: {self.returncode}\n"
+ f"stdout: {self.stdout}\n"
+ f"stderr: {self.stderr}"
)
class ProgressMeter:
def __init__(
self, num_items: int, start_msg: str = "", disable_progress_bar: bool = False
) -> None:
self.num_items = num_items
self.num_processed = 0
self.width = 80
self.disable_progress_bar = disable_progress_bar
# helper escape sequences
self._clear_to_end = "\x1b[2K"
self._move_to_previous_line = "\x1b[F"
self._move_to_start_of_line = "\r"
self._move_to_next_line = "\n"
if self.disable_progress_bar:
log(start_msg)
else:
self._write(
start_msg
+ self._move_to_next_line
+ "[>"
+ (self.width * " ")
+ "]"
+ self._move_to_start_of_line
)
self._flush()
def _write(self, s: str) -> None:
sys.stderr.write(s)
def _flush(self) -> None:
sys.stderr.flush()
def update(self, msg: str) -> None:
if self.disable_progress_bar:
return
# Once we've processed all items, clear the progress bar
if self.num_processed == self.num_items - 1:
self._write(self._clear_to_end)
return
# NOP if we've already processed all items
if self.num_processed > self.num_items:
return
self.num_processed += 1
self._write(
self._move_to_previous_line
+ self._clear_to_end
+ msg
+ self._move_to_next_line
)
progress = int((self.num_processed / self.num_items) * self.width)
padding = self.width - progress
self._write(
self._move_to_start_of_line
+ self._clear_to_end
+ f"({self.num_processed} of {self.num_items}) "
+ f"[{progress*'='}>{padding*' '}]"
+ self._move_to_start_of_line
)
self._flush()
def print(self, msg: str) -> None:
if QUIET:
return
elif self.disable_progress_bar:
print(msg)
else:
self._write(
self._clear_to_end
+ self._move_to_previous_line
+ self._clear_to_end
+ msg
+ self._move_to_next_line
+ self._move_to_next_line
)
self._flush()
class ClangTidyWarning:
def __init__(self, name: str, occurrences: List[Tuple[str, int]]):
self.name = name
self.occurrences = occurrences
def __str__(self) -> str:
base = f"[{self.name}] occurred {len(self.occurrences)} times\n"
for occ in self.occurrences:
base += f" {occ[0]}:{occ[1]}\n"
return base
async def run_shell_command(
cmd: List[str], on_completed: Any = None, *args: Any
) -> CommandResult:
"""Executes a shell command and runs an optional callback when complete"""
if VERBOSE:
log("Running: ", " ".join(cmd))
proc = await asyncio.create_subprocess_shell(
" ".join(shlex.quote(x) for x in cmd), # type: ignore[attr-defined]
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
output = await proc.communicate()
result = CommandResult(
returncode=proc.returncode if proc.returncode is not None else -1,
stdout=output[0].decode("utf-8").strip(),
stderr=output[1].decode("utf-8").strip(),
)
if on_completed:
on_completed(result, *args)
return result
async def _run_clang_tidy_in_parallel(
commands: List[Tuple[List[str], str]], disable_progress_bar: bool
) -> CommandResult:
progress_meter = ProgressMeter(
len(commands),
f"Processing {len(commands)} clang-tidy jobs",
disable_progress_bar=disable_progress_bar,
)
async def gather_with_concurrency(n: int, tasks: List[Any]) -> Any:
semaphore = asyncio.Semaphore(n)
async def sem_task(task: Any) -> Any:
async with semaphore:
return await task
return await asyncio.gather(
*(sem_task(task) for task in tasks), return_exceptions=True
)
async def helper() -> Any:
def on_completed(result: CommandResult, filename: str) -> None:
if result.failed():
msg = str(result) if not VERBOSE else repr(result)
progress_meter.print(msg)
progress_meter.update(f"Processed {filename}")
coros = [
run_shell_command(cmd, on_completed, filename)
for (cmd, filename) in commands
]
return await gather_with_concurrency(multiprocessing.cpu_count(), coros)
results = await helper()
return sum(results, CommandResult(0, "", ""))
async def _run_clang_tidy(
options: Any, line_filters: List[Dict[str, Any]], files: Iterable[str]
) -> CommandResult:
"""Executes the actual clang-tidy command in the shell."""
base = [options.clang_tidy_exe]
# Apply common options
base += ["-p", options.compile_commands_dir]
if not options.config_file and os.path.exists(".clang-tidy"):
options.config_file = ".clang-tidy"
if options.config_file:
import yaml
with open(options.config_file) as config:
# Here we convert the YAML config file to a JSON blob.
base += [
"-config",
json.dumps(yaml.load(config, Loader=yaml.SafeLoader)),
]
if options.print_include_paths:
base += ["--extra-arg", "-v"]
if options.include_dir:
for dir in options.include_dir:
base += ["--extra-arg", f"-I{dir}"]
base += options.extra_args
if line_filters:
base += ["-line-filter", json.dumps(line_filters)]
# Apply per-file options
commands = []
for f in files:
command = list(base) + [map_filename(options.compile_commands_dir, f)]
commands.append((command, f))
if options.dry_run:
return CommandResult(0, str([c for c, _ in commands]), "")
return await _run_clang_tidy_in_parallel(commands, options.disable_progress_bar)
def extract_warnings(
output: str, base_dir: str = "."
) -> Tuple[Dict[str, Dict[int, Set[str]]], List[ClangTidyWarning]]:
warn2occ: Dict[str, List[Tuple[str, int]]] = {}
fixes: Dict[str, Dict[int, Set[str]]] = {}
for line in output.splitlines():
p = CLANG_WARNING_PATTERN.match(line)
if p is None:
continue
if os.path.isabs(p.group(1)):
path = os.path.abspath(p.group(1))
else:
path = os.path.abspath(os.path.join(base_dir, p.group(1)))
line_no = int(p.group(2))
# Filter out any options (which start with '-')
warning_names = set([w for w in p.group(4).split(",") if not w.startswith("-")])
for name in warning_names:
if name not in warn2occ:
warn2occ[name] = []
warn2occ[name].append((path, line_no))
if path not in fixes:
fixes[path] = {}
if line_no not in fixes[path]:
fixes[path][line_no] = set()
fixes[path][line_no].update(warning_names)
warnings = [ClangTidyWarning(name, sorted(occ)) for name, occ in warn2occ.items()]
return fixes, warnings
def apply_nolint(fname: str, warnings: Dict[int, Set[str]]) -> None:
with open(fname, encoding="utf-8") as f:
lines = f.readlines()
line_offset = -1 # As in .cpp files lines are numbered starting from 1
for line_no in sorted(warnings.keys()):
nolint_diagnostics = ",".join(warnings[line_no])
line_no += line_offset
indent = " " * (len(lines[line_no]) - len(lines[line_no].lstrip(" ")))
lines.insert(line_no, f"{indent}// NOLINTNEXTLINE({nolint_diagnostics})\n")
line_offset += 1
with open(fname, mode="w") as f:
f.write("".join(lines))
# Functions for correct handling of "ATen/native/cpu" mapping
# Sources in that folder are not built in place but first copied into build folder with `.[CPUARCH].cpp` suffixes
def map_filename(build_folder: str, fname: str) -> str:
fname = os.path.relpath(fname)
native_cpu_prefix = "aten/src/ATen/native/cpu/"
build_cpu_prefix = os.path.join(build_folder, native_cpu_prefix, "")
default_arch_suffix = ".DEFAULT.cpp"
if fname.startswith(native_cpu_prefix) and fname.endswith(".cpp"):
return (
f"{build_cpu_prefix}{fname[len(native_cpu_prefix):]}{default_arch_suffix}"
)
if fname.startswith(build_cpu_prefix) and fname.endswith(default_arch_suffix):
return f"{native_cpu_prefix}{fname[len(build_cpu_prefix):-len(default_arch_suffix)]}"
return fname
def map_filenames(build_folder: str, fnames: Iterable[str]) -> List[str]:
return [map_filename(build_folder, fname) for fname in fnames]
def split_negative_from_positive_patterns(patterns: Iterable[str]) -> Patterns:
"""Separates negative patterns (that start with a dash) from positive patterns"""
positive, negative = [], []
for pattern in patterns:
if pattern.startswith("-"):
negative.append(pattern[1:])
else:
positive.append(pattern)
return Patterns(positive, negative)
def get_file_patterns(globs: Iterable[str], regexes: Iterable[str]) -> Patterns:
"""Returns a list of compiled regex objects from globs and regex pattern strings."""
# fnmatch.translate converts a glob into a regular expression.
# https://docs.python.org/2/library/fnmatch.html#fnmatch.translate
glob = split_negative_from_positive_patterns(globs)
regexes_ = split_negative_from_positive_patterns(regexes)
positive_regexes = regexes_.positive + [fnmatch.translate(g) for g in glob.positive]
negative_regexes = regexes_.negative + [fnmatch.translate(g) for g in glob.negative]
positive_patterns = [re.compile(regex) for regex in positive_regexes] or [
DEFAULT_FILE_PATTERN
]
negative_patterns = [re.compile(regex) for regex in negative_regexes]
return Patterns(positive_patterns, negative_patterns)
def filter_files(files: Iterable[str], file_patterns: Patterns) -> Iterable[str]:
"""Returns all files that match any of the patterns."""
if VERBOSE:
log("Filtering with these file patterns: {}".format(file_patterns))
for file in files:
if not any(n.match(file) for n in file_patterns.negative):
if any(p.match(file) for p in file_patterns.positive):
yield file
continue
if VERBOSE:
log(f"{file} omitted due to file filters")
async def get_all_files(paths: List[str]) -> List[str]:
"""Returns all files that are tracked by git in the given paths."""
output = await run_shell_command(["git", "ls-files"] + paths)
return str(output).strip().splitlines()
def find_changed_lines(diff: str) -> Dict[str, List[Tuple[int, int]]]:
# Delay import since this isn't required unless using the --diff-file
# argument, which for local runs people don't care about
try:
import unidiff # type: ignore[import]
except ImportError as e:
e.msg += ", run 'pip install unidiff'" # type: ignore[attr-defined]
raise e
files: Any = collections.defaultdict(list)
for file in unidiff.PatchSet(diff):
for hunk in file:
added_line_nos = [line.target_line_no for line in hunk if line.is_added]
if len(added_line_nos) == 0:
continue
# Convert list of line numbers to ranges
# Eg: [1, 2, 3, 12, 13, 14, 15] becomes [[1,3], [12, 15]]
i = 1
ranges = [[added_line_nos[0], added_line_nos[0]]]
while i < len(added_line_nos):
if added_line_nos[i] != added_line_nos[i - 1] + 1:
ranges[-1][1] = added_line_nos[i - 1]
ranges.append([added_line_nos[i], added_line_nos[i]])
i += 1
ranges[-1][1] = added_line_nos[-1]
files[file.path] += ranges
return dict(files)
def filter_from_diff(
paths: List[str], diffs: List[str]
) -> Tuple[List[str], List[Dict[Any, Any]]]:
files = []
line_filters = []
for diff in diffs:
changed_files = find_changed_lines(diff)
changed_files = {
filename: v
for filename, v in changed_files.items()
if any(filename.startswith(path) for path in paths)
}
line_filters += [
{"name": name, "lines": lines} for name, lines, in changed_files.items()
]
files += list(changed_files.keys())
return files, line_filters
def filter_from_diff_file(
paths: List[str], filename: str
) -> Tuple[List[str], List[Dict[Any, Any]]]:
with open(filename, "r") as f:
diff = f.read()
return filter_from_diff(paths, [diff])
async def filter_default(paths: List[str]) -> Tuple[List[str], List[Dict[Any, Any]]]:
return await get_all_files(paths), []
async def _run(options: Any) -> Tuple[CommandResult, List[ClangTidyWarning]]:
# These flags are pervasive enough to set it globally. It makes the code
# cleaner compared to threading it through every single function.
global VERBOSE
global QUIET
VERBOSE = options.verbose
QUIET = options.quiet
# Normalize the paths first
paths = [path.rstrip("/") for path in options.paths]
# Filter files
if options.diff_file:
files, line_filters = filter_from_diff_file(options.paths, options.diff_file)
else:
files, line_filters = await filter_default(options.paths)
file_patterns = get_file_patterns(options.glob, options.regex)
files = list(filter_files(files, file_patterns))
# clang-tidy errors when it does not get input files.
if not files:
log("No files detected")
return CommandResult(0, "", ""), []
result = await _run_clang_tidy(options, line_filters, files)
fixes, warnings = extract_warnings(
result.stdout, base_dir=options.compile_commands_dir
)
if options.suppress_diagnostics:
for fname in fixes.keys():
mapped_fname = map_filename(options.compile_commands_dir, fname)
log(f"Applying fixes to {mapped_fname}")
apply_nolint(fname, fixes[fname])
if os.path.relpath(fname) != mapped_fname:
shutil.copyfile(fname, mapped_fname)
if options.dry_run:
log(result)
elif result.failed():
# If you change this message, update the error checking logic in
# .github/workflows/lint.yml
msg = "Warnings detected!"
log(msg)
log("Summary:")
for w in warnings:
log(str(w))
return result, warnings
def run(options: Any) -> Tuple[CommandResult, List[ClangTidyWarning]]:
loop = asyncio.get_event_loop()
return loop.run_until_complete(_run(options))