| #!/usr/bin/python3 |
| # Copyright 2024 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import argparse |
| import datetime |
| import logging |
| import pathlib |
| import re |
| import shutil |
| import subprocess |
| import sys |
| |
| DESCRIPTION = ( |
| 'Helper script for importing a snapshot from upstream Wayland protocol ' |
| 'sources.') |
| |
| INTENDED_USAGE = (''' |
| Intended Usage: |
| # Update the freedesktop.org subdirectory to version 1.32 |
| # Check https://gitlab.freedesktop.org/wayland/wayland-protocols/-/tags |
| # for valid version tags. |
| ./import_snapshot.py freedesktop.org 1.32 |
| |
| # Update the chromium.org subdirectory to the latest |
| ./import_snapshot.py chromium.org main |
| ''') |
| |
| |
| class GitRepo: |
| """Issues git commands against a local checkout located at some path.""" |
| |
| def __init__(self, base: pathlib.PurePath): |
| logging.debug("GitRepo base %s", base) |
| self._base = base |
| |
| @property |
| def base(self) -> pathlib.PurePath: |
| """Gets the base path used the repo.""" |
| return self._base |
| |
| def _git(self, |
| cmd: list[str], |
| capture_output: bool = True, |
| check: bool = True) -> subprocess.CompletedProcess: |
| return subprocess.run(['git', '-C', self._base] + cmd, |
| capture_output=capture_output, |
| check=check, |
| text=True) |
| |
| def get_hash_for_version(self, version) -> str: |
| """Gets the hash associated with a |version| tag or branch.""" |
| logging.debug("GitRepo.get_hash_for_version version %s", version) |
| return self._git(['show-ref', '--hash', |
| version]).stdout.splitlines()[0].strip() |
| |
| def git_ref_name_for_version(self, version) -> str | None: |
| """Gets the named ref corresponding to |version|, if one exists.""" |
| logging.debug("GitRepo.get_ref_name_for_version version %s", version) |
| ref = self._git(['describe', '--all', '--exact-match', version], |
| check=False).stdout.splitlines()[0].strip() |
| if ref.startswith('tags/'): |
| return ref.removeprefix('tags/') |
| if ref.startswith('heads/'): |
| return ref.removeprefix('heads/') |
| return None |
| |
| def get_files(self, version: str, |
| paths: list[pathlib.PurePath]) -> list[pathlib.Path]: |
| """Gets the list of files under |paths| that are part of the Git tree at |version|.""" |
| logging.debug("GitRepo.get_files version %s paths %s", version, paths) |
| stdout = self._git( |
| ['ls-tree', '-r', '--name-only', f'{version}^{{tree}}'] + |
| paths).stdout |
| return list(pathlib.PurePath(path) for path in stdout.splitlines()) |
| |
| def assert_no_uncommitted_changes(self) -> None: |
| """Asserts that the repo has no uncommited changes.""" |
| r = self._git(['diff-files', '--quiet', '--ignore-submodules'], |
| check=False) |
| if r.returncode: |
| sys.exit('Error: Your tree is dirty') |
| |
| r = self._git([ |
| 'diff-index', '--quiet', '--ignore-submodules', '--cached', 'HEAD' |
| ], |
| check=False) |
| if r.returncode: |
| sys.exit('Error: You have staged changes') |
| |
| def sparse_depth1_clone(self, |
| url: str, |
| version: str | None, |
| paths: list[str], |
| force_clean: bool = True) -> None: |
| """Performs a sparse clone with depth=1 of a repo. |
| |
| A sparse clone limits the clone to a particular set of files, and not |
| all the files available in the repo. |
| |
| A depth=1 clone fetches only the most recent version of each file |
| cloned, and not the entire history. |
| |
| Together that makes the checkout be faster and take up less space on |
| disk, which is important for large repositories like the Chromium |
| source tree. |
| |
| |url| gives the url to the remote repository to clone. |
| |
| |version| gives the version to clone. If not specified, 'HEAD' is assumed. |
| |
| Paths in |paths| are included in the sparse checkout, which also means |
| all files in the parents directories leading up to those directories are |
| included. if |paths| is an empty list, all files at the root of the |
| repository will be included. |
| |
| |force_clean| ensures any existing checkout at |base| is removed. |
| Setting this to False speeds up testing changes to the script when |
| syncing a particular version, as it will only be cloned the first |
| time. |
| """ |
| logging.debug( |
| "GitRepo.sparse_depth1_clone url %s version %s paths %s force_clean %s", |
| url, version, paths, force_clean) |
| self._base.parent.mkdir(parents=True, exist_ok=True) |
| if force_clean and self._base.exists(): |
| shutil.rmtree(self._base) |
| |
| if not self._base.exists(): |
| cmd = ['git', 'clone', '--filter=blob:none', '--depth=1'] |
| if paths: |
| cmd.extend(['--sparse']) |
| if version is not None and version != 'HEAD': |
| cmd.extend(['-b', version]) |
| cmd.extend([url, self._base]) |
| |
| subprocess.run(cmd, capture_output=False, check=True, text=True) |
| |
| if paths: |
| self._git(['sparse-checkout', 'add'] + paths) |
| |
| def add(self, path: pathlib.Path) -> None: |
| """Stages a local file |path| in the index.""" |
| logging.debug("GitRepo.add path %s", path) |
| self._git(['add', path]) |
| |
| def commit(self, |
| message: str, |
| allow_empty: bool = False, |
| auto_add: bool = True) -> None: |
| """Commits stages changed using |message|. |
| |
| If |allow_empty| is true, an empty commit is allowed. |
| If |auto_add| is true, changed files are added automatically. |
| """ |
| logging.debug("GitRepo.commit message %s allow_empty %s auto_add %s", |
| message, allow_empty, auto_add) |
| cmd = ['commit', '-m', message] |
| if allow_empty: |
| cmd.extend(['--allow-empty']) |
| if auto_add: |
| cmd.extend(['-a']) |
| |
| self._git(cmd, capture_output=False) |
| |
| |
| class AndroidMetadata: |
| """Minimal set of functions for reading and updating METADATA files. |
| |
| Officially these files are meant to be read and written using code |
| generated from |
| //build/soong/compliance/project_metadata_proto/project_metadata.proto, |
| but using it would require adding a dependency on Python protocol buffer |
| libraries as well as the generated code for the .proto file. |
| |
| Instead we use the Python regex library module to parse and rewrite the |
| metadata, as we don't need to do anything really complicated. |
| """ |
| |
| def __init__(self, metadata_path: pathlib.Path): |
| assert metadata_path.exists() |
| self._metadata_path: pathlib.Path = metadata_path |
| self._content: str | None = None |
| self._url: str | None = None |
| self._paths: list[pathlib.PurePath] | None = None |
| |
| def _read_content(self) -> None: |
| if self._content is None: |
| with open(self._metadata_path, 'rt') as metadata_file: |
| self._content = metadata_file.read() |
| |
| def _write_content(self) -> None: |
| if self._content is not None: |
| with open(self._metadata_path, 'wt') as metadata_file: |
| metadata_file.write(self._content) |
| |
| def _read_raw_git_urls(self) -> None: |
| if self._url is None: |
| self._read_content() |
| |
| paths = [] |
| URL_PATTERN = r'url\s*{\s*type:\s*GIT\s*value:\s*"([^"]*)"\s*}' |
| for url in re.findall(URL_PATTERN, self._content): |
| base_url = url |
| path = None |
| |
| if '/-/tree/' in url: |
| base_url, path = url.split('/-/tree/') |
| _, path = path.split('/', 1) |
| elif '/+/' in url: |
| base_url, path = url.split('/+/') |
| _, path = path.split('/', 1) |
| |
| if self._url and self._url != base_url: |
| sys.exit( |
| f'Error: Inconsistent git URLs in {self._metadata_path} ({self._url} vs {base_url})' |
| ) |
| |
| self._url = base_url |
| if path: |
| paths.append(path) |
| |
| self._paths = tuple(paths) |
| |
| @property |
| def current_version(self) -> str: |
| """Obtains the current version according to the metadata.""" |
| self._read_content() |
| |
| match = re.search(r'version: "([^"]*)"', self._content) |
| if not match: |
| sys.exit( |
| f'Error: Unable to determine current version from {self._metadata_path}' |
| ) |
| return match.group(1) |
| |
| @property |
| def git_url(self) -> str: |
| """Obtains the git URL to use from the metadata.""" |
| self._read_raw_git_urls() |
| return self._url |
| |
| @property |
| def git_paths(self) -> list[pathlib.PurePath]: |
| """Obtains the child paths to sync from the metadata. |
| |
| This can be an empty list if the entire repo should be synced. |
| """ |
| self._read_raw_git_urls() |
| return list(self._paths) |
| |
| def update_version_and_import_date(self, version: str) -> None: |
| """Updates the version and import date in the metadata. |
| |
| |version| gives the version string to write. |
| The import date is set to the current date. |
| """ |
| self._read_content() |
| |
| now = datetime.datetime.now() |
| self._content = re.sub(r'version: "[^"]*"', f'version: "{version}"', |
| self._content) |
| self._content = re.sub( |
| r'last_upgrade_date {[^}]*}', |
| (f'last_upgrade_date {{ year: {now.year} month: {now.month} ' |
| f'day: {now.day} }}'), self._content) |
| |
| self._write_content() |
| |
| |
| def must_ignore(path: pathlib.PurePath) -> bool: |
| """Checks if |path| should be ignored and not imported, as doing so might conflict with Android metadata..""" |
| IGNORE_PATTERNS: tuple[str] = ( |
| 'METADATA', |
| 'MODULE_LICENSE_*', |
| '**/OWNERS', |
| '**/Android.bp', |
| ) |
| ignore = any(path.match(pattern) for pattern in IGNORE_PATTERNS) |
| if ignore: |
| print('Ignoring source {path}') |
| return ignore |
| |
| |
| def main(): |
| parser = argparse.ArgumentParser( |
| description=DESCRIPTION, |
| epilog=INTENDED_USAGE, |
| formatter_class=argparse.RawDescriptionHelpFormatter) |
| |
| parser.add_argument('group', |
| default=None, |
| help='The subdirectory (group) to update') |
| |
| parser.add_argument( |
| 'version', |
| nargs='?', |
| default='HEAD', |
| help='The official version to import. Uses HEAD by default.') |
| |
| parser.add_argument('--loglevel', |
| default='INFO', |
| choices=('DEBUG', 'INFO', 'WARNING', 'ERROR', |
| 'CRITICAL'), |
| help='Logging level.') |
| |
| parser.add_argument('--no-force-clean', |
| dest='force_clean', |
| default=True, |
| action='store_false', |
| help='Disables clean fetches of upstream code') |
| |
| parser.add_argument( |
| '--no-remove-old-files', |
| dest='remove_old_files', |
| default=True, |
| action='store_false', |
| help= |
| 'Disables syncing the previous version to determine what files to remove' |
| ) |
| |
| args: argparse.ArgumentParser = parser.parse_args() |
| |
| logging.basicConfig(level=getattr(logging, args.loglevel)) |
| |
| base = pathlib.Path(sys.argv[0]).parent.resolve().absolute() |
| assert base.exists() |
| |
| print( |
| f'Importing {args.group} Wayland protocols at {args.version} to {args.group}' |
| ) |
| |
| target_git = GitRepo(base) |
| target_git.assert_no_uncommitted_changes() |
| target_group_path = base / args.group |
| |
| meta = AndroidMetadata(target_group_path / 'METADATA') |
| |
| print(f'Cloning {meta.git_url} [sparse/limited] at {args.version}') |
| import_new_git = GitRepo(base / '.import' / args.group / (args.version)) |
| import_new_git.sparse_depth1_clone(meta.git_url, |
| args.version, |
| meta.git_paths, |
| force_clean=args.force_clean) |
| import_new_hash = import_new_git.get_hash_for_version(args.version) |
| import_new_ref_name = import_new_git.git_ref_name_for_version(args.version) |
| print(f'Synced "{import_new_hash} ({import_new_ref_name})"') |
| import_new_files = import_new_git.get_files(import_new_hash, |
| meta.git_paths) |
| if args.remove_old_files: |
| print( |
| f'Cloning {meta.git_url} [sparse/limited] at prior {meta.current_version}' |
| ) |
| import_old_git = GitRepo(base / '.import' / args.group / |
| meta.current_version) |
| import_old_git.sparse_depth1_clone(meta.git_url, |
| meta.current_version, |
| meta.git_paths, |
| force_clean=args.force_clean) |
| import_old_hash = import_old_git.get_hash_for_version( |
| meta.current_version) |
| print(f'Synced "{import_old_hash}"') |
| import_old_files = import_old_git.get_files(import_old_hash, |
| meta.git_paths) |
| |
| files_to_remove = set(import_old_files).difference(import_new_files) |
| for path in files_to_remove: |
| if must_ignore(path): |
| continue |
| old: pathlib.Path = target_group_path / path |
| logging.debug("removing old path %s", old) |
| old.unlink(missing_ok=True) |
| |
| for path in import_new_files: |
| if must_ignore(path): |
| continue |
| src: pathlib.Path = import_new_git.base / path |
| dst: pathlib.Path = target_group_path / path |
| logging.debug("copying %s to %s", src, dst) |
| dst.parent.mkdir(parents=True, exist_ok=True) |
| shutil.copy(src, dst) |
| target_git.add(target_group_path / path) |
| |
| meta.update_version_and_import_date(import_new_ref_name or import_new_hash) |
| target_git.add(target_group_path / 'METADATA') |
| |
| message = f''' |
| Update to {args.group} protocols {import_new_ref_name or import_new_hash} |
| |
| This imports {import_new_hash} from the upstream repository. |
| |
| Test: Builds |
| '''.lstrip() |
| target_git.commit(message, allow_empty=True) |
| |
| |
| if __name__ == '__main__': |
| main() |