blob: c4c9ee182d82ec164559e7d2ba07471e32c01336 [file] [log] [blame]
# Copyright 2020 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
"""Tools for running presubmit checks in a Git repository.
Presubmit checks are defined as a function or other callable. The function may
take either no arguments or a list of the paths on which to run. Presubmit
checks communicate failure by raising any exception.
For example, either of these functions may be used as presubmit checks:
@pw_presubmit.filter_paths(endswith='.py')
def file_contains_ni(ctx: PresubmitContext):
for path in ctx.paths:
with open(path) as file:
contents = file.read()
if 'ni' not in contents and 'nee' not in contents:
raise PresumitFailure('Files must say "ni"!', path=path)
def run_the_build():
subprocess.run(['make', 'release'], check=True)
Presubmit checks that accept a list of paths may use the filter_paths decorator
to automatically filter the paths list for file types they care about. See the
pragma_once function for an example.
See pigweed_presbumit.py for an example of how to define presubmit checks.
"""
import collections
from collections import Counter, defaultdict
import contextlib
import dataclasses
import enum
import itertools
import logging
import re
import os
from pathlib import Path
import shlex
import subprocess
import time
from typing import Any, Callable, Collection, Dict, Iterable, Iterator, List
from typing import NamedTuple, Optional, Pattern, Sequence, Tuple, Union
from inspect import signature
_LOG: logging.Logger = logging.getLogger(__name__)
PathOrStr = Union[Path, str]
def plural(items_or_count,
singular: str,
count_format='',
these: bool = False,
number: bool = True,
are: bool = False) -> str:
"""Returns the singular or plural form of a word based on a count."""
try:
count = len(items_or_count)
except TypeError:
count = items_or_count
prefix = ('this ' if count == 1 else 'these ') if these else ''
num = f'{count:{count_format}} ' if number else ''
suffix = (' is' if count == 1 else ' are') if are else ''
if singular.endswith('y'):
result = f'{singular[:-1]}{"y" if count == 1 else "ies"}'
elif singular.endswith('s'):
result = f'{singular}{"" if count == 1 else "es"}'
else:
result = f'{singular}{"" if count == 1 else "s"}'
return f'{prefix}{num}{result}{suffix}'
def git_stdout(*args: PathOrStr, repo: PathOrStr = '.') -> str:
return subprocess.run(('git', '-C', repo, *args),
stdout=subprocess.PIPE,
check=True).stdout.decode().strip()
def _git_ls_files(args: Collection[PathOrStr], repo: Path) -> List[Path]:
return [
repo.joinpath(path).resolve() for path in git_stdout(
'ls-files', '--', *args, repo=repo).splitlines()
]
def _git_diff_names(commit: str, paths: Collection[PathOrStr],
repo: Path) -> List[Path]:
"""Returns absolute paths of files changed since the specified commit."""
root = git_repo_path(repo=repo)
return [
root.joinpath(path).resolve()
for path in git_stdout('diff',
'--name-only',
'--diff-filter=d',
commit,
'--',
*paths,
repo=repo).splitlines()
]
def list_git_files(commit: Optional[str] = None,
paths: Collection[PathOrStr] = (),
exclude: Collection[Pattern[str]] = (),
repo: Optional[Path] = None) -> List[Path]:
"""Lists files with git ls-files or git diff --name-only.
This function may only be called if repo points to a Git repository.
"""
if repo is None:
repo = Path.cwd()
if commit:
files = _git_diff_names(commit, paths, repo)
else:
files = _git_ls_files(paths, repo)
root = git_repo_path(repo=repo).resolve()
return sorted(
path for path in files
if not any(exp.search(str(path.relative_to(root))) for exp in exclude))
def git_has_uncommitted_changes(repo: Optional[Path] = None) -> bool:
"""Returns True if the Git repo has uncommitted changes in it.
This does not check for untracked files.
"""
if repo is None:
repo = Path.cwd()
# Refresh the Git index so that the diff-index command will be accurate.
subprocess.run(['git', '-C', repo, 'update-index', '-q', '--refresh'],
check=True)
# diff-index exits with 1 if there are uncommitted changes.
return subprocess.run(
['git', '-C', repo, 'diff-index', '--quiet', 'HEAD',
'--']).returncode == 1
def _describe_constraints(root: Path, repo_path: Path, commit: Optional[str],
pathspecs: Collection[PathOrStr],
exclude: Collection[Pattern]) -> Iterator[str]:
if not root.samefile(repo_path):
yield (f'under the {repo_path.resolve().relative_to(root.resolve())} '
'subdirectory')
if commit:
yield f'that have changed since {commit}'
if pathspecs:
paths_str = ', '.join(str(p) for p in pathspecs)
yield f'that match {plural(pathspecs, "pathspec")} ({paths_str})'
if exclude:
yield (f'that do not match {plural(exclude, "pattern")} (' +
', '.join(p.pattern for p in exclude) + ')')
def describe_files_in_repo(root: Path, repo_path: Path, commit: Optional[str],
pathspecs: Collection[PathOrStr],
exclude: Collection[Pattern]) -> str:
"""Completes 'Doing something to ...' for a set of files in a Git repo."""
constraints = list(
_describe_constraints(root, repo_path, commit, pathspecs, exclude))
if not constraints:
return f'all files in the {root.name} repo'
msg = f'files in the {root.name} repo'
if len(constraints) == 1:
return f'{msg} {constraints[0]}'
return msg + ''.join(f'\n - {line}' for line in constraints)
def is_git_repo(path='.') -> bool:
return not subprocess.run(['git', '-C', path, 'rev-parse'],
stderr=subprocess.DEVNULL).returncode
def git_repo_path(*paths, repo: PathOrStr = '.') -> Path:
"""Returns a path relative to a Git repository's root."""
return Path(git_stdout('rev-parse', '--show-toplevel',
repo=repo)).joinpath(*paths)
def _make_color(*codes: int):
start = ''.join(f'\033[{code}m' for code in codes)
return f'{start}{{}}\033[0m'.format if os.name == 'posix' else str
color_red = _make_color(31)
color_bold_red = _make_color(31, 1)
color_black_on_red = _make_color(30, 41)
color_yellow = _make_color(33, 1)
color_green = _make_color(32)
color_black_on_green = _make_color(30, 42)
color_aqua = _make_color(36)
color_bold_white = _make_color(37, 1)
def _make_box(section_alignments: Sequence[str]) -> str:
indices = [i + 1 for i in range(len(section_alignments))]
top_sections = '{2}'.join('{1:{1}^{width%d}}' % i for i in indices)
mid_sections = '{5}'.join('{section%d:%s{width%d}}' %
(i, section_alignments[i - 1], i)
for i in indices)
bot_sections = '{9}'.join('{8:{8}^{width%d}}' % i for i in indices)
return ''.join(['{0}', *top_sections, '{3}\n',
'{4}', *mid_sections, '{6}\n',
'{7}', *bot_sections, '{10}']) # yapf: disable
_SUMMARY_BOX = '══╦╗ ║║══╩╝'
_CHECK_UPPER = '━━━┓ '
_CHECK_LOWER = ' ━━━┛'
WIDTH = 80
_LEFT = 7
_RIGHT = 11
def _title(msg, style=_SUMMARY_BOX) -> str:
msg = f' {msg} '.center(WIDTH - 2)
return _make_box('^').format(*style, section1=msg, width1=len(msg))
def _format_time(time_s: float) -> str:
minutes, seconds = divmod(time_s, 60)
return f' {int(minutes)}:{seconds:04.1f}'
def _box(style, left, middle, right, box=_make_box('><>')) -> str:
return box.format(*style,
section1=left + ('' if left.endswith(' ') else ' '),
width1=_LEFT,
section2=' ' + middle,
width2=WIDTH - _LEFT - _RIGHT - 4,
section3=right + ' ',
width3=_RIGHT)
class PresubmitFailure(Exception):
"""Optional exception to use for presubmit failures."""
def __init__(self, description: str = '', path=None):
super().__init__(f'{path}: {description}' if path else description)
class _Result(enum.Enum):
PASS = 'PASSED' # Check completed successfully.
FAIL = 'FAILED' # Check failed.
CANCEL = 'CANCEL' # Check didn't complete.
def colorized(self, width: int, invert: bool = False) -> str:
if self is _Result.PASS:
color = color_black_on_green if invert else color_green
elif self is _Result.FAIL:
color = color_black_on_red if invert else color_red
elif self is _Result.CANCEL:
color = color_yellow
else:
color = lambda value: value
padding = (width - len(self.value)) // 2 * ' '
return padding + color(self.value) + padding
class Program(collections.abc.Sequence):
"""A sequence of presubmit checks; basically a tuple with a name."""
def __init__(self, name: str, steps: Iterable[Callable]):
self.name = name
self._steps = tuple(flatten(steps))
def __getitem__(self, i):
return self._steps[i]
def __len__(self):
return len(self._steps)
def __str__(self):
return self.name
class Programs(collections.abc.Mapping):
"""A mapping of presubmit check programs.
Use is optional. Helpful when managing multiple presubmit check programs.
"""
def __init__(self, **programs: Sequence):
"""Initializes a name: program mapping from the provided keyword args.
A program is a sequence of presubmit check functions. The sequence may
contain nested sequences, which are flattened.
"""
self._programs: Dict[str, Program] = {
name: Program(name, checks)
for name, checks in programs.items()
}
def all_steps(self) -> Dict[str, Callable]:
return {c.__name__: c for c in itertools.chain(*self.values())}
def __getitem__(self, item: str) -> Program:
return self._programs[item]
def __iter__(self) -> Iterator[str]:
return iter(self._programs)
def __len__(self) -> int:
return len(self._programs)
@dataclasses.dataclass(frozen=True)
class PresubmitContext:
"""Context passed into presubmit checks."""
repo_root: Path
output_dir: Path
paths: Sequence[Path]
def file_summary(paths: Iterable[Path],
levels: int = 2,
max_lines: int = 12,
max_types: int = 3,
pad: str = ' ',
pad_start: str = ' ',
pad_end: str = ' ') -> List[str]:
"""Summarizes a list of files by the file types in each directory."""
# Count the file types in each directory.
all_counts: Dict[Any, Counter] = defaultdict(Counter)
for path in paths:
parent = path.parents[max(len(path.parents) - levels, 0)]
all_counts[parent][path.suffix] += 1
# If there are too many lines, condense directories with the fewest files.
if len(all_counts) > max_lines:
counts = sorted(all_counts.items(),
key=lambda item: -sum(item[1].values()))
counts, others = sorted(counts[:max_lines - 1]), counts[max_lines - 1:]
counts.append((f'({plural(others, "other")})',
sum((c for _, c in others), Counter())))
else:
counts = sorted(all_counts.items())
width = max(len(str(d)) + len(os.sep) for d, _ in counts) if counts else 0
width += len(pad_start)
# Prepare the output.
output = []
for path, files in counts:
total = sum(files.values())
del files[''] # Never display no-extension files individually.
if files:
extensions = files.most_common(max_types)
other_extensions = total - sum(count for _, count in extensions)
if other_extensions:
extensions.append(('other', other_extensions))
types = ' (' + ', '.join(f'{c} {e}' for e, c in extensions) + ')'
else:
types = ''
root = f'{path}{os.sep}{pad_start}'.ljust(width, pad)
output.append(f'{root}{pad_end}{plural(total, "file")}{types}')
return output
class Presubmit:
"""Runs a series of presubmit checks on a list of files."""
def __init__(self, repository_root: Path, output_directory: Path,
paths: Sequence[Path]):
self._repository_root = repository_root
self._output_directory = output_directory
self._paths = paths
def run(self, program: Program, keep_going: bool = False) -> bool:
"""Executes a series of presubmit checks on the paths."""
title = f'{program.name if program.name else ""} presubmit checks'
checks = _apply_filters(program, self._paths)
_LOG.debug('Running %s for %s', title, self._repository_root.name)
print(_title(f'{self._repository_root.name}: {title}'))
_LOG.info('%d of %d checks apply to %s in %s', len(checks),
len(program), plural(self._paths,
'file'), self._repository_root)
print()
for line in file_summary(self._paths):
print(line)
print()
_LOG.debug('Paths:\n%s', '\n'.join(str(path) for path in self._paths))
if not self._paths:
print(color_yellow('No files are being checked!'))
_LOG.debug('Checks:\n%s', '\n'.join(c.name for c, _ in checks))
start_time: float = time.time()
passed, failed, skipped = self._execute_checks(checks, keep_going)
self._log_summary(time.time() - start_time, passed, failed, skipped)
return not failed and not skipped
def _log_summary(self, time_s: float, passed: int, failed: int,
skipped: int) -> None:
summary_items = []
if passed:
summary_items.append(f'{passed} passed')
if failed:
summary_items.append(f'{failed} failed')
if skipped:
summary_items.append(f'{skipped} not run')
summary = ', '.join(summary_items) or 'nothing was done'
result = _Result.FAIL if failed or skipped else _Result.PASS
total = passed + failed + skipped
_LOG.debug('Finished running %d checks on %s in %.1f s', total,
plural(self._paths, 'file'), time_s)
_LOG.debug('Presubmit checks %s: %s', result.value, summary)
print(
_box(
_SUMMARY_BOX, result.colorized(_LEFT, invert=True),
f'{total} checks on {plural(self._paths, "file")}: {summary}',
_format_time(time_s)))
@contextlib.contextmanager
def _context(self, name: str, paths: Sequence[Path]):
# There are many characters banned from filenames on Windows. To
# simplify things, just strip everything that's not a letter, digit,
# or underscore.
sanitized_name = re.sub(r'[\W_]+', '_', name).lower()
output_directory = self._output_directory.joinpath(sanitized_name)
os.makedirs(output_directory, exist_ok=True)
handler = logging.FileHandler(output_directory.joinpath('step.log'),
mode='w')
handler.setLevel(logging.DEBUG)
try:
_LOG.addHandler(handler)
yield PresubmitContext(
repo_root=self._repository_root.absolute(),
output_dir=output_directory.absolute(),
paths=paths,
)
finally:
_LOG.removeHandler(handler)
def _execute_checks(self, program,
keep_going: bool) -> Tuple[int, int, int]:
"""Runs presubmit checks; returns (passed, failed, skipped) lists."""
passed = failed = 0
for i, (check, paths) in enumerate(program, 1):
paths = [self._repository_root.joinpath(p) for p in paths]
with self._context(check.name, paths) as ctx:
result = check.run(ctx, i, len(program))
if result is _Result.PASS:
passed += 1
elif result is _Result.CANCEL:
break
else:
failed += 1
if not keep_going:
break
return passed, failed, len(program) - passed - failed
def _apply_filters(
program: Sequence[Callable],
paths: Sequence[Path]) -> List[Tuple['_Check', Sequence[Path]]]:
"""Returns a list of (check, paths_to_check) for checks that should run."""
checks = [c if isinstance(c, _Check) else _Check(c) for c in program]
filter_to_checks: Dict[_PathFilter, List[_Check]] = defaultdict(list)
for check in checks:
filter_to_checks[check.filter].append(check)
check_to_paths = _map_checks_to_paths(filter_to_checks, paths)
return [(c, check_to_paths[c]) for c in checks if c in check_to_paths]
def _map_checks_to_paths(
filter_to_checks: Dict['_PathFilter', List['_Check']],
paths: Sequence[Path]) -> Dict['_Check', Sequence[Path]]:
checks_to_paths: Dict[_Check, Sequence[Path]] = {}
str_paths = [path.as_posix() for path in paths]
for filt, checks in filter_to_checks.items():
exclude = [re.compile(exp) for exp in filt.exclude]
filtered_paths = tuple(
Path(path) for path in str_paths
if any(path.endswith(end) for end in filt.endswith) and not any(
exp.search(path) for exp in exclude))
for check in checks:
if filtered_paths or check.always_run:
checks_to_paths[check] = filtered_paths
else:
_LOG.debug('Skipping "%s": no relevant files', check.name)
return checks_to_paths
def run_presubmit(program: Sequence[Callable],
base: Optional[str] = None,
paths: Sequence[Path] = (),
exclude: Sequence[Pattern] = (),
repo_path: Path = Optional[None],
output_directory: Optional[Path] = None,
keep_going: bool = False) -> bool:
"""Lists files in the current Git repo and runs a Presubmit with them.
This changes the directory to the root of the Git repository after listing
paths, so all presubmit checks can assume they run from there.
Args:
program: list of presubmit check functions to run
name: name to use to refer to this presubmit check run
base: optional base Git commit to list files against
paths: optional list of paths to run the presubmit checks against
exclude: regular expressions of paths to exclude from checks
repo_path: path in the git repository to check
output_directory: where to place output files
keep_going: whether to continue running checks if an error occurs
Returns:
True if all presubmit checks succeeded
"""
if repo_path is None:
repo_path = Path.cwd()
if not is_git_repo(repo_path):
_LOG.critical('Presubmit checks must be run from a Git repo')
return False
files = list_git_files(base, paths, exclude, repo_path)
root = git_repo_path(repo=repo_path)
_LOG.info('Checking %s',
describe_files_in_repo(root, repo_path, base, paths, exclude))
files = [path.relative_to(root) for path in files]
if output_directory is None:
output_directory = root.joinpath('.presubmit')
presubmit = Presubmit(
repository_root=root,
output_directory=Path(output_directory),
paths=files,
)
if not isinstance(program, Program):
program = Program('', program)
return presubmit.run(program, keep_going)
def find_python_packages(python_paths, repo='.') -> Dict[str, List[str]]:
"""Returns Python package directories for the files in python_paths."""
setup_pys = [
os.path.dirname(file)
for file in _git_ls_files(['setup.py', '*/setup.py'], repo)
]
package_dirs: Dict[str, List[str]] = defaultdict(list)
for path in (os.path.abspath(p) for p in python_paths):
try:
setup_dir = max(setup for setup in setup_pys
if path.startswith(setup))
package_dirs[os.path.abspath(setup_dir)].append(path)
except ValueError:
continue
return package_dirs
class _PathFilter(NamedTuple):
endswith: Tuple[str, ...] = ('', )
exclude: Tuple[str, ...] = ()
class _Check:
"""Wraps a presubmit check function.
This class consolidates the logic for running and logging a presubmit check.
It also supports filtering the paths passed to the presubmit check.
"""
def __init__(self,
check_function: Callable,
path_filter: _PathFilter = _PathFilter(),
always_run: bool = True):
_ensure_is_valid_presubmit_check_function(check_function)
self._check: Callable = check_function
self.filter: _PathFilter = path_filter
self.always_run: bool = always_run
# Since _Check wraps a presubmit function, adopt that function's name.
self.__name__ = self._check.__name__
@property
def name(self):
return self.__name__
def run(self, ctx: PresubmitContext, count: int, total: int) -> _Result:
"""Runs the presubmit check on the provided paths."""
print(
_box(_CHECK_UPPER, f'{count}/{total}', self.name,
plural(ctx.paths, "file")))
_LOG.debug('[%d/%d] Running %s on %s', count, total, self.name,
plural(ctx.paths, "file"))
start_time_s = time.time()
result = self._call_function(ctx)
time_str = _format_time(time.time() - start_time_s)
_LOG.debug('%s %s', self.name, result.value)
print(_box(_CHECK_LOWER, result.colorized(_LEFT), self.name, time_str))
_LOG.debug('%s duration:%s', self.name, time_str)
return result
def _call_function(self, ctx: PresubmitContext) -> _Result:
try:
self._check(ctx)
except PresubmitFailure as failure:
if str(failure):
_LOG.warning('%s', failure)
return _Result.FAIL
except Exception as failure: # pylint: disable=broad-except
_LOG.exception('Presubmit check %s failed!', self.name)
return _Result.FAIL
except KeyboardInterrupt:
print()
return _Result.CANCEL
return _Result.PASS
def __call__(self, ctx: PresubmitContext, *args, **kwargs):
"""Calling a _Check calls its underlying function directly.
This makes it possible to call functions wrapped by @filter_paths. The
prior filters are ignored, so new filters may be applied.
"""
return self._check(ctx, *args, **kwargs)
def _ensure_is_valid_presubmit_check_function(check: Callable) -> None:
"""Checks if a Callable can be used as a presubmit check."""
try:
params = signature(check).parameters
except (TypeError, ValueError):
raise TypeError('Presubmit checks must be callable, but '
f'{check!r} is a {type(check).__name__}')
required_args = [p for p in params.values() if p.default == p.empty]
if len(required_args) != 1:
raise TypeError(
f'Presubmit check functions must have exactly one required '
f'positional argument (the PresubmitContext), but '
f'{check.__name__} has {len(required_args)} required arguments' +
(f' ({", ".join(a.name for a in required_args)})'
if required_args else ''))
def _make_tuple(value: Iterable[str]) -> Tuple[str, ...]:
return tuple([value] if isinstance(value, str) else value)
def filter_paths(endswith: Iterable[str] = (''),
exclude: Iterable[str] = (),
always_run: bool = False):
"""Decorator for filtering the paths list for a presubmit check function.
Path filters only apply when the function is used as a presubmit check.
Filters are ignored when the functions are called directly. This makes it
possible to reuse functions wrapped in @filter_paths in other presubmit
checks, potentially with different path filtering rules.
Args:
endswith: str or iterable of path endings to include
exclude: regular expressions of paths to exclude
Returns:
a wrapped version of the presubmit function
"""
def filter_paths_for_function(function: Callable):
return _Check(function,
_PathFilter(_make_tuple(endswith), _make_tuple(exclude)),
always_run=always_run)
return filter_paths_for_function
def log_run(*args, **kwargs) -> subprocess.CompletedProcess:
"""Logs a command then runs it with subprocess.run."""
_LOG.debug('[COMMAND] %s\n%s',
', '.join(f'{k}={v}' for k, v in sorted(kwargs.items())),
' '.join(shlex.quote(str(arg)) for arg in args))
return subprocess.run(args, **kwargs)
def call(*args, **kwargs) -> None:
"""Optional subprocess wrapper that causes a PresubmitFailure on errors."""
attributes = ', '.join(f'{k}={v}' for k, v in sorted(kwargs.items()))
command = ' '.join(shlex.quote(str(arg)) for arg in args)
_LOG.debug('[RUN] %s\n%s', attributes, command)
process = subprocess.run(args,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
**kwargs)
logfunc = _LOG.warning if process.returncode else _LOG.debug
logfunc('[FINISHED]\n%s', command)
logfunc('[RESULT] %s with return code %d',
'Failed' if process.returncode else 'Passed', process.returncode)
output = process.stdout.decode(errors='backslashreplace')
if output:
logfunc('[OUTPUT]\n%s', output)
if process.returncode:
raise PresubmitFailure
@filter_paths(endswith='.h')
def pragma_once(ctx: PresubmitContext) -> None:
"""Presubmit check that ensures all header files contain '#pragma once'."""
for path in ctx.paths:
_LOG.debug('Checking %s', path)
with open(path) as file:
for line in file:
if line.startswith('#pragma once'):
break
else:
raise PresubmitFailure('#pragma once is missing!', path=path)
def flatten(*items) -> Iterator:
"""Yields items from a series of items and nested iterables.
This function is used to flatten arbitrarily nested lists. str and bytes
are kept intact.
"""
for item in items:
if isinstance(item, collections.abc.Iterable) and not isinstance(
item, (str, bytes)):
yield from flatten(*item)
else:
yield item