| # Copyright 2023 The Pigweed Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """pw_presubmit ContextVar.""" |
| |
| from contextvars import ContextVar |
| import dataclasses |
| import enum |
| import inspect |
| import logging |
| import json |
| import os |
| from pathlib import Path |
| import re |
| import shlex |
| import shutil |
| import subprocess |
| import tempfile |
| from typing import ( |
| Any, |
| Dict, |
| List, |
| Iterable, |
| NamedTuple, |
| Optional, |
| Sequence, |
| Tuple, |
| TYPE_CHECKING, |
| ) |
| import urllib |
| |
| import pw_cli.color |
| import pw_cli.env |
| import pw_env_setup.config_file |
| |
| if TYPE_CHECKING: |
| from pw_presubmit.presubmit import Check |
| |
| _COLOR = pw_cli.color.colors() |
| _LOG: logging.Logger = logging.getLogger(__name__) |
| |
| PRESUBMIT_CHECK_TRACE: ContextVar[ |
| Dict[str, List['PresubmitCheckTrace']] |
| ] = ContextVar('pw_presubmit_check_trace', default={}) |
| |
| |
| @dataclasses.dataclass(frozen=True) |
| class FormatOptions: |
| python_formatter: Optional[str] = 'yapf' |
| black_path: Optional[str] = 'black' |
| exclude: Sequence[re.Pattern] = dataclasses.field(default_factory=list) |
| |
| @staticmethod |
| def load(env: Optional[Dict[str, str]] = None) -> 'FormatOptions': |
| config = pw_env_setup.config_file.load(env=env) |
| fmt = config.get('pw', {}).get('pw_presubmit', {}).get('format', {}) |
| return FormatOptions( |
| python_formatter=fmt.get('python_formatter', 'yapf'), |
| black_path=fmt.get('black_path', 'black'), |
| exclude=tuple(re.compile(x) for x in fmt.get('exclude', ())), |
| ) |
| |
| def filter_paths(self, paths: Iterable[Path]) -> Tuple[Path, ...]: |
| root = Path(pw_cli.env.pigweed_environment().PW_PROJECT_ROOT) |
| relpaths = [x.relative_to(root) for x in paths] |
| |
| for filt in self.exclude: |
| relpaths = [x for x in relpaths if not filt.search(str(x))] |
| return tuple(root / x for x in relpaths) |
| |
| |
| def get_buildbucket_info(bbid) -> Dict[str, Any]: |
| if not bbid or not shutil.which('bb'): |
| return {} |
| |
| output = subprocess.check_output( |
| ['bb', 'get', '-json', '-p', f'{bbid}'], text=True |
| ) |
| return json.loads(output) |
| |
| |
| @dataclasses.dataclass |
| class LuciPipeline: |
| round: int |
| builds_from_previous_iteration: Sequence[int] |
| |
| @staticmethod |
| def create( |
| bbid: int, |
| fake_pipeline_props: Optional[Dict[str, Any]] = None, |
| ) -> Optional['LuciPipeline']: |
| pipeline_props: Dict[str, Any] |
| if fake_pipeline_props is not None: |
| pipeline_props = fake_pipeline_props |
| else: |
| pipeline_props = ( |
| get_buildbucket_info(bbid) |
| .get('input', {}) |
| .get('properties', {}) |
| .get('$pigweed/pipeline', {}) |
| ) |
| if not pipeline_props.get('inside_a_pipeline', False): |
| return None |
| |
| return LuciPipeline( |
| round=int(pipeline_props['round']), |
| builds_from_previous_iteration=list( |
| int(x) for x in pipeline_props['builds_from_previous_iteration'] |
| ), |
| ) |
| |
| |
| @dataclasses.dataclass |
| class LuciTrigger: |
| """Details the pending change or submitted commit triggering the build.""" |
| |
| number: int |
| patchset: int |
| remote: str |
| project: str |
| branch: str |
| ref: str |
| gerrit_name: str |
| submitted: bool |
| |
| @property |
| def gerrit_host(self): |
| return f'https://{self.gerrit_name}-review.googlesource.com' |
| |
| @property |
| def gerrit_url(self): |
| if not self.number: |
| return self.gitiles_url |
| return f'{self.gerrit_host}/c/{self.number}' |
| |
| @property |
| def gitiles_url(self): |
| return f'{self.remote}/+/{self.ref}' |
| |
| @staticmethod |
| def create_from_environment( |
| env: Optional[Dict[str, str]] = None, |
| ) -> Sequence['LuciTrigger']: |
| if not env: |
| env = os.environ.copy() |
| raw_path = env.get('TRIGGERING_CHANGES_JSON') |
| if not raw_path: |
| return () |
| path = Path(raw_path) |
| if not path.is_file(): |
| return () |
| |
| result = [] |
| with open(path, 'r') as ins: |
| for trigger in json.load(ins): |
| keys = { |
| 'number', |
| 'patchset', |
| 'remote', |
| 'project', |
| 'branch', |
| 'ref', |
| 'gerrit_name', |
| 'submitted', |
| } |
| if keys <= trigger.keys(): |
| result.append(LuciTrigger(**{x: trigger[x] for x in keys})) |
| |
| return tuple(result) |
| |
| @staticmethod |
| def create_for_testing(**kwargs): |
| change = { |
| 'number': 123456, |
| 'patchset': 1, |
| 'remote': 'https://pigweed.googlesource.com/pigweed/pigweed', |
| 'project': 'pigweed/pigweed', |
| 'branch': 'main', |
| 'ref': 'refs/changes/56/123456/1', |
| 'gerrit_name': 'pigweed', |
| 'submitted': True, |
| } |
| change.update(kwargs) |
| |
| with tempfile.TemporaryDirectory() as tempdir: |
| changes_json = Path(tempdir) / 'changes.json' |
| with changes_json.open('w') as outs: |
| json.dump([change], outs) |
| env = {'TRIGGERING_CHANGES_JSON': changes_json} |
| return LuciTrigger.create_from_environment(env) |
| |
| |
| @dataclasses.dataclass |
| class LuciContext: |
| """LUCI-specific information about the environment.""" |
| |
| buildbucket_id: int |
| build_number: int |
| project: str |
| bucket: str |
| builder: str |
| swarming_server: str |
| swarming_task_id: str |
| cas_instance: str |
| pipeline: Optional[LuciPipeline] |
| triggers: Sequence[LuciTrigger] = dataclasses.field(default_factory=tuple) |
| |
| @property |
| def is_try(self): |
| return re.search(r'\btry$', self.bucket) |
| |
| @property |
| def is_ci(self): |
| return re.search(r'\bci$', self.bucket) |
| |
| @property |
| def is_dev(self): |
| return re.search(r'\bdev\b', self.bucket) |
| |
| @staticmethod |
| def create_from_environment( |
| env: Optional[Dict[str, str]] = None, |
| fake_pipeline_props: Optional[Dict[str, Any]] = None, |
| ) -> Optional['LuciContext']: |
| """Create a LuciContext from the environment.""" |
| |
| if not env: |
| env = os.environ.copy() |
| |
| luci_vars = [ |
| 'BUILDBUCKET_ID', |
| 'BUILDBUCKET_NAME', |
| 'BUILD_NUMBER', |
| 'SWARMING_TASK_ID', |
| 'SWARMING_SERVER', |
| ] |
| if any(x for x in luci_vars if x not in env): |
| return None |
| |
| project, bucket, builder = env['BUILDBUCKET_NAME'].split(':') |
| |
| bbid: int = 0 |
| pipeline: Optional[LuciPipeline] = None |
| try: |
| bbid = int(env['BUILDBUCKET_ID']) |
| pipeline = LuciPipeline.create(bbid, fake_pipeline_props) |
| |
| except ValueError: |
| pass |
| |
| # Logic to identify cas instance from swarming server is derived from |
| # https://chromium.googlesource.com/infra/luci/recipes-py/+/main/recipe_modules/cas/api.py |
| swarm_server = env['SWARMING_SERVER'] |
| cas_project = urllib.parse.urlparse(swarm_server).netloc.split('.')[0] |
| cas_instance = f'projects/{cas_project}/instances/default_instance' |
| |
| result = LuciContext( |
| buildbucket_id=bbid, |
| build_number=int(env['BUILD_NUMBER']), |
| project=project, |
| bucket=bucket, |
| builder=builder, |
| swarming_server=env['SWARMING_SERVER'], |
| swarming_task_id=env['SWARMING_TASK_ID'], |
| cas_instance=cas_instance, |
| pipeline=pipeline, |
| triggers=LuciTrigger.create_from_environment(env), |
| ) |
| _LOG.debug('%r', result) |
| return result |
| |
| @staticmethod |
| def create_for_testing(**kwargs): |
| env = { |
| 'BUILDBUCKET_ID': '881234567890', |
| 'BUILDBUCKET_NAME': 'pigweed:bucket.try:builder-name', |
| 'BUILD_NUMBER': '123', |
| 'SWARMING_SERVER': 'https://chromium-swarm.appspot.com', |
| 'SWARMING_TASK_ID': 'cd2dac62d2', |
| } |
| env.update(kwargs) |
| |
| return LuciContext.create_from_environment(env, {}) |
| |
| |
| @dataclasses.dataclass |
| class FormatContext: |
| """Context passed into formatting helpers. |
| |
| This class is a subset of PresubmitContext containing only what's needed by |
| formatters. |
| |
| For full documentation on the members see the PresubmitContext section of |
| pw_presubmit/docs.rst. |
| |
| Args: |
| root: Source checkout root directory |
| output_dir: Output directory for this specific language |
| paths: Modified files for the presubmit step to check (often used in |
| formatting steps but ignored in compile steps) |
| package_root: Root directory for pw package installations |
| format_options: Formatting options, derived from pigweed.json |
| """ |
| |
| root: Optional[Path] |
| output_dir: Path |
| paths: Tuple[Path, ...] |
| package_root: Path |
| format_options: FormatOptions |
| dry_run: bool = False |
| |
| def append_check_command(self, *command_args, **command_kwargs) -> None: |
| """Empty append_check_command.""" |
| |
| |
| class PresubmitFailure(Exception): |
| """Optional exception to use for presubmit failures.""" |
| |
| def __init__( |
| self, |
| description: str = '', |
| path: Optional[Path] = None, |
| line: Optional[int] = None, |
| ): |
| line_part: str = '' |
| if line is not None: |
| line_part = f'{line}:' |
| super().__init__( |
| f'{path}:{line_part} {description}' if path else description |
| ) |
| |
| |
| @dataclasses.dataclass |
| class PresubmitContext: # pylint: disable=too-many-instance-attributes |
| """Context passed into presubmit checks. |
| |
| For full documentation on the members see pw_presubmit/docs.rst. |
| |
| Args: |
| root: Source checkout root directory |
| repos: Repositories (top-level and submodules) processed by |
| pw presubmit |
| output_dir: Output directory for this specific presubmit step |
| failure_summary_log: Path where steps should write a brief summary of |
| any failures encountered for use by other tooling. |
| paths: Modified files for the presubmit step to check (often used in |
| formatting steps but ignored in compile steps) |
| all_paths: All files in the tree. |
| package_root: Root directory for pw package installations |
| override_gn_args: Additional GN args processed by build.gn_gen() |
| luci: Information about the LUCI build or None if not running in LUCI |
| format_options: Formatting options, derived from pigweed.json |
| num_jobs: Number of jobs to run in parallel |
| continue_after_build_error: For steps that compile, don't exit on the |
| first compilation error |
| rng_seed: Seed for a random number generator, for the few steps that |
| need one |
| full: Whether this is a full or incremental presubmit run |
| """ |
| |
| root: Path |
| repos: Tuple[Path, ...] |
| output_dir: Path |
| failure_summary_log: Path |
| paths: Tuple[Path, ...] |
| all_paths: Tuple[Path, ...] |
| package_root: Path |
| luci: Optional[LuciContext] |
| override_gn_args: Dict[str, str] |
| format_options: FormatOptions |
| num_jobs: Optional[int] = None |
| continue_after_build_error: bool = False |
| rng_seed: int = 1 |
| full: bool = False |
| _failed: bool = False |
| dry_run: bool = False |
| |
| @property |
| def failed(self) -> bool: |
| return self._failed |
| |
| @property |
| def incremental(self) -> bool: |
| return not self.full |
| |
| def fail( |
| self, |
| description: str, |
| path: Optional[Path] = None, |
| line: Optional[int] = None, |
| ): |
| """Add a failure to this presubmit step. |
| |
| If this is called at least once the step fails, but not immediately—the |
| check is free to continue and possibly call this method again. |
| """ |
| _LOG.warning('%s', PresubmitFailure(description, path, line)) |
| self._failed = True |
| |
| @staticmethod |
| def create_for_testing(**kwargs): |
| parsed_env = pw_cli.env.pigweed_environment() |
| root = parsed_env.PW_PROJECT_ROOT |
| presubmit_root = root / 'out' / 'presubmit' |
| presubmit_kwargs = { |
| 'root': root, |
| 'repos': (root,), |
| 'output_dir': presubmit_root / 'test', |
| 'failure_summary_log': presubmit_root / 'failure-summary.log', |
| 'paths': (root / 'foo.cc', root / 'foo.py'), |
| 'all_paths': (root / 'BUILD.gn', root / 'foo.cc', root / 'foo.py'), |
| 'package_root': root / 'environment' / 'packages', |
| 'luci': None, |
| 'override_gn_args': {}, |
| 'format_options': FormatOptions(), |
| } |
| presubmit_kwargs.update(kwargs) |
| return PresubmitContext(**presubmit_kwargs) |
| |
| def append_check_command( |
| self, |
| *command_args, |
| call_annotation: Optional[Dict[Any, Any]] = None, |
| **command_kwargs, |
| ) -> None: |
| """Save a subprocess command annotation to this presubmit context. |
| |
| This is used to capture commands that will be run for display in ``pw |
| presubmit --dry-run.`` |
| |
| Args: |
| |
| command_args: All args that would normally be passed to |
| subprocess.run |
| |
| call_annotation: Optional key value pairs of data to save for this |
| command. Examples: |
| |
| :: |
| |
| call_annotation={'pw_package_install': 'teensy'} |
| call_annotation={'build_system': 'bazel'} |
| call_annotation={'build_system': 'ninja'} |
| |
| command_kwargs: keyword args that would normally be passed to |
| subprocess.run. |
| """ |
| call_annotation = call_annotation if call_annotation else {} |
| calling_func: Optional[str] = None |
| calling_check = None |
| |
| # Loop through the current call stack looking for `self`, and stopping |
| # when self is a Check() instance and if the __call__ or _try_call |
| # functions are in the stack. |
| |
| # This used to be an isinstance(obj, Check) call, but it was changed to |
| # this so Check wouldn't need to be imported here. Doing so would create |
| # a dependency loop. |
| def is_check_object(obj): |
| return getattr(obj, '_is_presubmit_check_object', False) |
| |
| for frame_info in inspect.getouterframes(inspect.currentframe()): |
| self_obj = frame_info.frame.f_locals.get('self', None) |
| if ( |
| self_obj |
| and is_check_object(self_obj) |
| and frame_info.function in ['_try_call', '__call__'] |
| ): |
| calling_func = frame_info.function |
| calling_check = self_obj |
| |
| save_check_trace( |
| self.output_dir, |
| PresubmitCheckTrace( |
| self, |
| calling_check, |
| calling_func, |
| command_args, |
| command_kwargs, |
| call_annotation, |
| ), |
| ) |
| |
| def __post_init__(self) -> None: |
| PRESUBMIT_CONTEXT.set(self) |
| |
| def __hash__(self): |
| return hash( |
| tuple( |
| tuple(attribute.items()) |
| if isinstance(attribute, dict) |
| else attribute |
| for attribute in dataclasses.astuple(self) |
| ) |
| ) |
| |
| |
| PRESUBMIT_CONTEXT: ContextVar[Optional[PresubmitContext]] = ContextVar( |
| 'pw_presubmit_context', default=None |
| ) |
| |
| |
| def get_presubmit_context(): |
| return PRESUBMIT_CONTEXT.get() |
| |
| |
| class PresubmitCheckTraceType(enum.Enum): |
| BAZEL = 'BAZEL' |
| CMAKE = 'CMAKE' |
| GN_NINJA = 'GN_NINJA' |
| PW_PACKAGE = 'PW_PACKAGE' |
| |
| |
| class PresubmitCheckTrace(NamedTuple): |
| ctx: 'PresubmitContext' |
| check: Optional['Check'] |
| func: Optional[str] |
| args: Iterable[Any] |
| kwargs: Dict[Any, Any] |
| call_annotation: Dict[Any, Any] |
| |
| def __repr__(self) -> str: |
| return f'''CheckTrace( |
| ctx={self.ctx.output_dir} |
| id(ctx)={id(self.ctx)} |
| check={self.check} |
| args={self.args} |
| kwargs={self.kwargs.keys()} |
| call_annotation={self.call_annotation} |
| )''' |
| |
| |
| def save_check_trace(output_dir: Path, trace: PresubmitCheckTrace) -> None: |
| trace_key = str(output_dir.resolve()) |
| trace_list = PRESUBMIT_CHECK_TRACE.get().get(trace_key, []) |
| trace_list.append(trace) |
| PRESUBMIT_CHECK_TRACE.get()[trace_key] = trace_list |
| |
| |
| def get_check_traces(ctx: 'PresubmitContext') -> List[PresubmitCheckTrace]: |
| trace_key = str(ctx.output_dir.resolve()) |
| return PRESUBMIT_CHECK_TRACE.get().get(trace_key, []) |
| |
| |
| def log_check_traces(ctx: 'PresubmitContext') -> None: |
| traces = PRESUBMIT_CHECK_TRACE.get() |
| |
| for _output_dir, check_traces in traces.items(): |
| for check_trace in check_traces: |
| if check_trace.ctx != ctx: |
| continue |
| |
| quoted_command_args = ' '.join( |
| shlex.quote(str(arg)) for arg in check_trace.args |
| ) |
| _LOG.info( |
| '%s %s', |
| _COLOR.blue('Run ==>'), |
| quoted_command_args, |
| ) |
| |
| |
| def apply_exclusions( |
| ctx: PresubmitContext, |
| paths: Optional[Sequence[Path]] = None, |
| ) -> Tuple[Path, ...]: |
| return ctx.format_options.filter_paths(paths or ctx.paths) |