blob: 7eb898617061b89a662473cb6ce54c4b45e0a182 [file] [log] [blame]
# Copyright (C) 2020 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module to check updates from crates.io."""
import json
import os
# pylint: disable=g-importing-member
from pathlib import Path
import re
import shutil
import tempfile
import urllib.request
from typing import IO
import archive_utils
from base_updater import Updater
import git_utils
# pylint: disable=import-error
import metadata_pb2 # type: ignore
import updater_utils
LIBRARY_NAME_PATTERN: str = r"([-\w]+)"
ALPHA_BETA_PATTERN: str = r"^.*[0-9]+\.[0-9]+\.[0-9]+-(alpha|beta).*"
ALPHA_BETA_RE: re.Pattern = re.compile(ALPHA_BETA_PATTERN)
"""Match both x.y.z and x.y.z+a.b.c which is used by some Vulkan binding libraries"""
VERSION_PATTERN: str = r"([0-9]+)\.([0-9]+)\.([0-9]+)(\+([0-9]+)\.([0-9]+)\.([0-9]+))?"
VERSION_RE: re.Pattern = re.compile(VERSION_PATTERN)
CRATES_IO_ARCHIVE_URL_PATTERN: str = (r"^https:\/\/static.crates.io\/crates\/" +
LIBRARY_NAME_PATTERN + "/" +
LIBRARY_NAME_PATTERN + "-" +
"(.*?)" + ".crate")
CRATES_IO_ARCHIVE_URL_RE: re.Pattern = re.compile(CRATES_IO_ARCHIVE_URL_PATTERN)
DESCRIPTION_PATTERN: str = r"^description *= *(\".+\")"
DESCRIPTION_RE: re.Pattern = re.compile(DESCRIPTION_PATTERN)
class CratesUpdater(Updater):
"""Updater for crates.io packages."""
UPSTREAM_REMOTE_NAME: str = "update_origin"
download_url: str
package: str
package_dir: str
temp_file: IO
def is_supported_url(self) -> bool:
match = CRATES_IO_ARCHIVE_URL_RE.match(self._old_identifier.value)
if match is None:
return False
self.package = match.group(1)
return True
def setup_remote(self) -> None:
url = "https://crates.io/api/v1/crates/" + self.package
with urllib.request.urlopen(url) as request:
data = json.loads(request.read().decode())
homepage = data["crate"]["repository"]
remotes = git_utils.list_remotes(self._proj_path)
current_remote_url = None
for name, url in remotes.items():
if name == self.UPSTREAM_REMOTE_NAME:
current_remote_url = url
if current_remote_url is not None and current_remote_url != homepage:
git_utils.remove_remote(self._proj_path, self.UPSTREAM_REMOTE_NAME)
current_remote_url = None
if current_remote_url is None:
git_utils.add_remote(self._proj_path, self.UPSTREAM_REMOTE_NAME, homepage)
branch = git_utils.detect_default_branch(self._proj_path,
self.UPSTREAM_REMOTE_NAME)
git_utils.fetch(self._proj_path, self.UPSTREAM_REMOTE_NAME, branch)
def _get_version_numbers(self, version: str) -> tuple[int, int, int]:
match = VERSION_RE.match(version)
if match is not None:
return (
int(match.group(1)),
int(match.group(2)),
int(match.group(3)),
)
return (0, 0, 0)
def _is_newer_version(self, prev_version: str, prev_id: int,
check_version: str, check_id: int):
"""Return true if check_version+id is newer than prev_version+id."""
return ((self._get_version_numbers(check_version), check_id) >
(self._get_version_numbers(prev_version), prev_id))
def _find_latest_non_test_version(self) -> None:
url = f"https://crates.io/api/v1/crates/{self.package}/versions"
with urllib.request.urlopen(url) as request:
data = json.loads(request.read().decode())
last_id = 0
self._new_identifier.version = ""
for v in data["versions"]:
version = v["num"]
if (not v["yanked"] and not ALPHA_BETA_RE.match(version) and
self._is_newer_version(
self._new_identifier.version, last_id, version, int(v["id"]))):
last_id = int(v["id"])
self._new_identifier.version = version
self.download_url = "https://crates.io" + v["dl_path"]
def check(self) -> None:
"""Checks crates.io and returns whether a new version is available."""
url = "https://crates.io/api/v1/crates/" + self.package
with urllib.request.urlopen(url) as request:
data = json.loads(request.read().decode())
self._new_identifier.version = data["crate"]["max_version"]
# Skip d.d.d-{alpha,beta}* versions
if ALPHA_BETA_RE.match(self._new_identifier.version):
print(f"Ignore alpha or beta release:{self.package}-{self._new_identifier.version}.")
self._find_latest_non_test_version()
else:
url = url + "/" + self._new_identifier.version
with urllib.request.urlopen(url) as request:
data = json.loads(request.read().decode())
self.download_url = "https://crates.io" + data["version"]["dl_path"]
def use_current_as_latest(self):
Updater.use_current_as_latest(self)
# A shortcut to use the static download path.
self.download_url = f"https://static.crates.io/crates/{self.package}/" \
f"{self.package}-{self._new_identifier.version}.crate"
def update(self) -> None:
"""Updates the package.
Has to call check() before this function.
"""
try:
temporary_dir = archive_utils.download_and_extract(self.download_url)
self.package_dir = archive_utils.find_archive_root(temporary_dir)
self.temp_file = tempfile.NamedTemporaryFile()
updater_utils.replace_package(self.package_dir, self._proj_path,
self.temp_file.name)
self.check_for_errors()
finally:
urllib.request.urlcleanup()
def rollback(self) -> bool:
# Only rollback if we have already swapped,
# which we denote by writing to this file.
if os.fstat(self.temp_file.fileno()).st_size > 0:
tmp_dir = tempfile.TemporaryDirectory()
shutil.move(self._proj_path, tmp_dir.name)
shutil.move(self.package_dir, self._proj_path)
shutil.move(Path(tmp_dir.name) / self.package, self.package_dir)
return True
return False
# pylint: disable=no-self-use
def update_metadata(self, metadata: metadata_pb2.MetaData) -> metadata_pb2:
"""Updates METADATA content."""
# copy only HOMEPAGE url, and then add new ARCHIVE url.
updated_metadata = super().update_metadata(metadata)
for identifier in updated_metadata.third_party.identifier:
if identifier.version:
identifier.value = f"https://static.crates.io/crates/{updated_metadata.name}/" \
f"{updated_metadata.name}-{self.latest_identifier.version}.crate"
break
# copy description from Cargo.toml to METADATA
cargo_toml = os.path.join(self.project_path, "Cargo.toml")
description = self._get_cargo_description(cargo_toml)
if description and description != updated_metadata.description:
print("New METADATA description:", description)
updated_metadata.description = description
return updated_metadata
def check_for_errors(self) -> None:
# Check for .rej patches from failing to apply patches.
# If this has too many false positives, we could either
# check if the files are modified by patches or somehow
# track which files existed before the patching.
rejects = list(self._proj_path.glob('**/*.rej'))
if len(rejects) > 0:
print(f"Error: Found patch reject files: {str(rejects)}")
self._has_errors = True
# Check for Cargo errors embedded in Android.bp.
# Note that this should stay in sync with cargo2android.py.
with open(f'{self._proj_path}/Android.bp', 'r') as bp_file:
for line in bp_file:
if line.strip() == "Errors in cargo.out:":
print("Error: Found Cargo errors in Android.bp")
self._has_errors = True
return
def _toml2str(self, line: str) -> str:
"""Convert a quoted toml string to a Python str without quotes."""
if line.startswith("\"\"\""):
return "" # cannot handle broken multi-line description
# TOML string escapes: \b \t \n \f \r \" \\ (no unicode escape)
line = line[1:-1].replace("\\\\", "\n").replace("\\b", "")
line = line.replace("\\t", " ").replace("\\n", " ").replace("\\f", " ")
line = line.replace("\\r", "").replace("\\\"", "\"").replace("\n", "\\")
# replace a unicode quotation mark, used in the libloading crate
return line.replace("’", "'").strip()
def _get_cargo_description(self, cargo_toml: str) -> str:
"""Return the description in Cargo.toml or empty string."""
if os.path.isfile(cargo_toml) and os.access(cargo_toml, os.R_OK):
with open(cargo_toml, "r") as toml_file:
for line in toml_file:
match = DESCRIPTION_RE.match(line)
if match:
return self._toml2str(match.group(1))
return ""