| """Exceptions used throughout package. |
| |
| This module MUST NOT try to import from anything within `pip._internal` to |
| operate. This is expected to be importable from any/all files within the |
| subpackage and, thus, should not depend on them. |
| """ |
| |
| import configparser |
| import re |
| from itertools import chain, groupby, repeat |
| from typing import TYPE_CHECKING, Dict, List, Optional, Union |
| |
| from pip._vendor.requests.models import Request, Response |
| from pip._vendor.rich.console import Console, ConsoleOptions, RenderResult |
| from pip._vendor.rich.markup import escape |
| from pip._vendor.rich.text import Text |
| |
| if TYPE_CHECKING: |
| from hashlib import _Hash |
| from typing import Literal |
| |
| from pip._internal.metadata import BaseDistribution |
| from pip._internal.req.req_install import InstallRequirement |
| |
| |
| # |
| # Scaffolding |
| # |
| def _is_kebab_case(s: str) -> bool: |
| return re.match(r"^[a-z]+(-[a-z]+)*$", s) is not None |
| |
| |
| def _prefix_with_indent( |
| s: Union[Text, str], |
| console: Console, |
| *, |
| prefix: str, |
| indent: str, |
| ) -> Text: |
| if isinstance(s, Text): |
| text = s |
| else: |
| text = console.render_str(s) |
| |
| return console.render_str(prefix, overflow="ignore") + console.render_str( |
| f"\n{indent}", overflow="ignore" |
| ).join(text.split(allow_blank=True)) |
| |
| |
| class PipError(Exception): |
| """The base pip error.""" |
| |
| |
| class DiagnosticPipError(PipError): |
| """An error, that presents diagnostic information to the user. |
| |
| This contains a bunch of logic, to enable pretty presentation of our error |
| messages. Each error gets a unique reference. Each error can also include |
| additional context, a hint and/or a note -- which are presented with the |
| main error message in a consistent style. |
| |
| This is adapted from the error output styling in `sphinx-theme-builder`. |
| """ |
| |
| reference: str |
| |
| def __init__( |
| self, |
| *, |
| kind: 'Literal["error", "warning"]' = "error", |
| reference: Optional[str] = None, |
| message: Union[str, Text], |
| context: Optional[Union[str, Text]], |
| hint_stmt: Optional[Union[str, Text]], |
| note_stmt: Optional[Union[str, Text]] = None, |
| link: Optional[str] = None, |
| ) -> None: |
| # Ensure a proper reference is provided. |
| if reference is None: |
| assert hasattr(self, "reference"), "error reference not provided!" |
| reference = self.reference |
| assert _is_kebab_case(reference), "error reference must be kebab-case!" |
| |
| self.kind = kind |
| self.reference = reference |
| |
| self.message = message |
| self.context = context |
| |
| self.note_stmt = note_stmt |
| self.hint_stmt = hint_stmt |
| |
| self.link = link |
| |
| super().__init__(f"<{self.__class__.__name__}: {self.reference}>") |
| |
| def __repr__(self) -> str: |
| return ( |
| f"<{self.__class__.__name__}(" |
| f"reference={self.reference!r}, " |
| f"message={self.message!r}, " |
| f"context={self.context!r}, " |
| f"note_stmt={self.note_stmt!r}, " |
| f"hint_stmt={self.hint_stmt!r}" |
| ")>" |
| ) |
| |
| def __rich_console__( |
| self, |
| console: Console, |
| options: ConsoleOptions, |
| ) -> RenderResult: |
| colour = "red" if self.kind == "error" else "yellow" |
| |
| yield f"[{colour} bold]{self.kind}[/]: [bold]{self.reference}[/]" |
| yield "" |
| |
| if not options.ascii_only: |
| # Present the main message, with relevant context indented. |
| if self.context is not None: |
| yield _prefix_with_indent( |
| self.message, |
| console, |
| prefix=f"[{colour}]×[/] ", |
| indent=f"[{colour}]│[/] ", |
| ) |
| yield _prefix_with_indent( |
| self.context, |
| console, |
| prefix=f"[{colour}]╰─>[/] ", |
| indent=f"[{colour}] [/] ", |
| ) |
| else: |
| yield _prefix_with_indent( |
| self.message, |
| console, |
| prefix="[red]×[/] ", |
| indent=" ", |
| ) |
| else: |
| yield self.message |
| if self.context is not None: |
| yield "" |
| yield self.context |
| |
| if self.note_stmt is not None or self.hint_stmt is not None: |
| yield "" |
| |
| if self.note_stmt is not None: |
| yield _prefix_with_indent( |
| self.note_stmt, |
| console, |
| prefix="[magenta bold]note[/]: ", |
| indent=" ", |
| ) |
| if self.hint_stmt is not None: |
| yield _prefix_with_indent( |
| self.hint_stmt, |
| console, |
| prefix="[cyan bold]hint[/]: ", |
| indent=" ", |
| ) |
| |
| if self.link is not None: |
| yield "" |
| yield f"Link: {self.link}" |
| |
| |
| # |
| # Actual Errors |
| # |
| class ConfigurationError(PipError): |
| """General exception in configuration""" |
| |
| |
| class InstallationError(PipError): |
| """General exception during installation""" |
| |
| |
| class UninstallationError(PipError): |
| """General exception during uninstallation""" |
| |
| |
| class MissingPyProjectBuildRequires(DiagnosticPipError): |
| """Raised when pyproject.toml has `build-system`, but no `build-system.requires`.""" |
| |
| reference = "missing-pyproject-build-system-requires" |
| |
| def __init__(self, *, package: str) -> None: |
| super().__init__( |
| message=f"Can not process {escape(package)}", |
| context=Text( |
| "This package has an invalid pyproject.toml file.\n" |
| "The [build-system] table is missing the mandatory `requires` key." |
| ), |
| note_stmt="This is an issue with the package mentioned above, not pip.", |
| hint_stmt=Text("See PEP 518 for the detailed specification."), |
| ) |
| |
| |
| class InvalidPyProjectBuildRequires(DiagnosticPipError): |
| """Raised when pyproject.toml an invalid `build-system.requires`.""" |
| |
| reference = "invalid-pyproject-build-system-requires" |
| |
| def __init__(self, *, package: str, reason: str) -> None: |
| super().__init__( |
| message=f"Can not process {escape(package)}", |
| context=Text( |
| "This package has an invalid `build-system.requires` key in " |
| f"pyproject.toml.\n{reason}" |
| ), |
| note_stmt="This is an issue with the package mentioned above, not pip.", |
| hint_stmt=Text("See PEP 518 for the detailed specification."), |
| ) |
| |
| |
| class NoneMetadataError(PipError): |
| """Raised when accessing a Distribution's "METADATA" or "PKG-INFO". |
| |
| This signifies an inconsistency, when the Distribution claims to have |
| the metadata file (if not, raise ``FileNotFoundError`` instead), but is |
| not actually able to produce its content. This may be due to permission |
| errors. |
| """ |
| |
| def __init__( |
| self, |
| dist: "BaseDistribution", |
| metadata_name: str, |
| ) -> None: |
| """ |
| :param dist: A Distribution object. |
| :param metadata_name: The name of the metadata being accessed |
| (can be "METADATA" or "PKG-INFO"). |
| """ |
| self.dist = dist |
| self.metadata_name = metadata_name |
| |
| def __str__(self) -> str: |
| # Use `dist` in the error message because its stringification |
| # includes more information, like the version and location. |
| return "None {} metadata found for distribution: {}".format( |
| self.metadata_name, |
| self.dist, |
| ) |
| |
| |
| class UserInstallationInvalid(InstallationError): |
| """A --user install is requested on an environment without user site.""" |
| |
| def __str__(self) -> str: |
| return "User base directory is not specified" |
| |
| |
| class InvalidSchemeCombination(InstallationError): |
| def __str__(self) -> str: |
| before = ", ".join(str(a) for a in self.args[:-1]) |
| return f"Cannot set {before} and {self.args[-1]} together" |
| |
| |
| class DistributionNotFound(InstallationError): |
| """Raised when a distribution cannot be found to satisfy a requirement""" |
| |
| |
| class RequirementsFileParseError(InstallationError): |
| """Raised when a general error occurs parsing a requirements file line.""" |
| |
| |
| class BestVersionAlreadyInstalled(PipError): |
| """Raised when the most up-to-date version of a package is already |
| installed.""" |
| |
| |
| class BadCommand(PipError): |
| """Raised when virtualenv or a command is not found""" |
| |
| |
| class CommandError(PipError): |
| """Raised when there is an error in command-line arguments""" |
| |
| |
| class PreviousBuildDirError(PipError): |
| """Raised when there's a previous conflicting build directory""" |
| |
| |
| class NetworkConnectionError(PipError): |
| """HTTP connection error""" |
| |
| def __init__( |
| self, error_msg: str, response: Response = None, request: Request = None |
| ) -> None: |
| """ |
| Initialize NetworkConnectionError with `request` and `response` |
| objects. |
| """ |
| self.response = response |
| self.request = request |
| self.error_msg = error_msg |
| if ( |
| self.response is not None |
| and not self.request |
| and hasattr(response, "request") |
| ): |
| self.request = self.response.request |
| super().__init__(error_msg, response, request) |
| |
| def __str__(self) -> str: |
| return str(self.error_msg) |
| |
| |
| class InvalidWheelFilename(InstallationError): |
| """Invalid wheel filename.""" |
| |
| |
| class UnsupportedWheel(InstallationError): |
| """Unsupported wheel.""" |
| |
| |
| class InvalidWheel(InstallationError): |
| """Invalid (e.g. corrupt) wheel.""" |
| |
| def __init__(self, location: str, name: str): |
| self.location = location |
| self.name = name |
| |
| def __str__(self) -> str: |
| return f"Wheel '{self.name}' located at {self.location} is invalid." |
| |
| |
| class MetadataInconsistent(InstallationError): |
| """Built metadata contains inconsistent information. |
| |
| This is raised when the metadata contains values (e.g. name and version) |
| that do not match the information previously obtained from sdist filename |
| or user-supplied ``#egg=`` value. |
| """ |
| |
| def __init__( |
| self, ireq: "InstallRequirement", field: str, f_val: str, m_val: str |
| ) -> None: |
| self.ireq = ireq |
| self.field = field |
| self.f_val = f_val |
| self.m_val = m_val |
| |
| def __str__(self) -> str: |
| template = ( |
| "Requested {} has inconsistent {}: " |
| "filename has {!r}, but metadata has {!r}" |
| ) |
| return template.format(self.ireq, self.field, self.f_val, self.m_val) |
| |
| |
| class LegacyInstallFailure(DiagnosticPipError): |
| """Error occurred while executing `setup.py install`""" |
| |
| reference = "legacy-install-failure" |
| |
| def __init__(self, package_details: str) -> None: |
| super().__init__( |
| message="Encountered error while trying to install package.", |
| context=package_details, |
| hint_stmt="See above for output from the failure.", |
| note_stmt="This is an issue with the package mentioned above, not pip.", |
| ) |
| |
| |
| class InstallationSubprocessError(DiagnosticPipError, InstallationError): |
| """A subprocess call failed.""" |
| |
| reference = "subprocess-exited-with-error" |
| |
| def __init__( |
| self, |
| *, |
| command_description: str, |
| exit_code: int, |
| output_lines: Optional[List[str]], |
| ) -> None: |
| if output_lines is None: |
| output_prompt = Text("See above for output.") |
| else: |
| output_prompt = ( |
| Text.from_markup(f"[red][{len(output_lines)} lines of output][/]\n") |
| + Text("".join(output_lines)) |
| + Text.from_markup(R"[red]\[end of output][/]") |
| ) |
| |
| super().__init__( |
| message=( |
| f"[green]{escape(command_description)}[/] did not run successfully.\n" |
| f"exit code: {exit_code}" |
| ), |
| context=output_prompt, |
| hint_stmt=None, |
| note_stmt=( |
| "This error originates from a subprocess, and is likely not a " |
| "problem with pip." |
| ), |
| ) |
| |
| self.command_description = command_description |
| self.exit_code = exit_code |
| |
| def __str__(self) -> str: |
| return f"{self.command_description} exited with {self.exit_code}" |
| |
| |
| class MetadataGenerationFailed(InstallationSubprocessError, InstallationError): |
| reference = "metadata-generation-failed" |
| |
| def __init__( |
| self, |
| *, |
| package_details: str, |
| ) -> None: |
| super(InstallationSubprocessError, self).__init__( |
| message="Encountered error while generating package metadata.", |
| context=escape(package_details), |
| hint_stmt="See above for details.", |
| note_stmt="This is an issue with the package mentioned above, not pip.", |
| ) |
| |
| def __str__(self) -> str: |
| return "metadata generation failed" |
| |
| |
| class HashErrors(InstallationError): |
| """Multiple HashError instances rolled into one for reporting""" |
| |
| def __init__(self) -> None: |
| self.errors: List["HashError"] = [] |
| |
| def append(self, error: "HashError") -> None: |
| self.errors.append(error) |
| |
| def __str__(self) -> str: |
| lines = [] |
| self.errors.sort(key=lambda e: e.order) |
| for cls, errors_of_cls in groupby(self.errors, lambda e: e.__class__): |
| lines.append(cls.head) |
| lines.extend(e.body() for e in errors_of_cls) |
| if lines: |
| return "\n".join(lines) |
| return "" |
| |
| def __bool__(self) -> bool: |
| return bool(self.errors) |
| |
| |
| class HashError(InstallationError): |
| """ |
| A failure to verify a package against known-good hashes |
| |
| :cvar order: An int sorting hash exception classes by difficulty of |
| recovery (lower being harder), so the user doesn't bother fretting |
| about unpinned packages when he has deeper issues, like VCS |
| dependencies, to deal with. Also keeps error reports in a |
| deterministic order. |
| :cvar head: A section heading for display above potentially many |
| exceptions of this kind |
| :ivar req: The InstallRequirement that triggered this error. This is |
| pasted on after the exception is instantiated, because it's not |
| typically available earlier. |
| |
| """ |
| |
| req: Optional["InstallRequirement"] = None |
| head = "" |
| order: int = -1 |
| |
| def body(self) -> str: |
| """Return a summary of me for display under the heading. |
| |
| This default implementation simply prints a description of the |
| triggering requirement. |
| |
| :param req: The InstallRequirement that provoked this error, with |
| its link already populated by the resolver's _populate_link(). |
| |
| """ |
| return f" {self._requirement_name()}" |
| |
| def __str__(self) -> str: |
| return f"{self.head}\n{self.body()}" |
| |
| def _requirement_name(self) -> str: |
| """Return a description of the requirement that triggered me. |
| |
| This default implementation returns long description of the req, with |
| line numbers |
| |
| """ |
| return str(self.req) if self.req else "unknown package" |
| |
| |
| class VcsHashUnsupported(HashError): |
| """A hash was provided for a version-control-system-based requirement, but |
| we don't have a method for hashing those.""" |
| |
| order = 0 |
| head = ( |
| "Can't verify hashes for these requirements because we don't " |
| "have a way to hash version control repositories:" |
| ) |
| |
| |
| class DirectoryUrlHashUnsupported(HashError): |
| """A hash was provided for a version-control-system-based requirement, but |
| we don't have a method for hashing those.""" |
| |
| order = 1 |
| head = ( |
| "Can't verify hashes for these file:// requirements because they " |
| "point to directories:" |
| ) |
| |
| |
| class HashMissing(HashError): |
| """A hash was needed for a requirement but is absent.""" |
| |
| order = 2 |
| head = ( |
| "Hashes are required in --require-hashes mode, but they are " |
| "missing from some requirements. Here is a list of those " |
| "requirements along with the hashes their downloaded archives " |
| "actually had. Add lines like these to your requirements files to " |
| "prevent tampering. (If you did not enable --require-hashes " |
| "manually, note that it turns on automatically when any package " |
| "has a hash.)" |
| ) |
| |
| def __init__(self, gotten_hash: str) -> None: |
| """ |
| :param gotten_hash: The hash of the (possibly malicious) archive we |
| just downloaded |
| """ |
| self.gotten_hash = gotten_hash |
| |
| def body(self) -> str: |
| # Dodge circular import. |
| from pip._internal.utils.hashes import FAVORITE_HASH |
| |
| package = None |
| if self.req: |
| # In the case of URL-based requirements, display the original URL |
| # seen in the requirements file rather than the package name, |
| # so the output can be directly copied into the requirements file. |
| package = ( |
| self.req.original_link |
| if self.req.original_link |
| # In case someone feeds something downright stupid |
| # to InstallRequirement's constructor. |
| else getattr(self.req, "req", None) |
| ) |
| return " {} --hash={}:{}".format( |
| package or "unknown package", FAVORITE_HASH, self.gotten_hash |
| ) |
| |
| |
| class HashUnpinned(HashError): |
| """A requirement had a hash specified but was not pinned to a specific |
| version.""" |
| |
| order = 3 |
| head = ( |
| "In --require-hashes mode, all requirements must have their " |
| "versions pinned with ==. These do not:" |
| ) |
| |
| |
| class HashMismatch(HashError): |
| """ |
| Distribution file hash values don't match. |
| |
| :ivar package_name: The name of the package that triggered the hash |
| mismatch. Feel free to write to this after the exception is raise to |
| improve its error message. |
| |
| """ |
| |
| order = 4 |
| head = ( |
| "THESE PACKAGES DO NOT MATCH THE HASHES FROM THE REQUIREMENTS " |
| "FILE. If you have updated the package versions, please update " |
| "the hashes. Otherwise, examine the package contents carefully; " |
| "someone may have tampered with them." |
| ) |
| |
| def __init__(self, allowed: Dict[str, List[str]], gots: Dict[str, "_Hash"]) -> None: |
| """ |
| :param allowed: A dict of algorithm names pointing to lists of allowed |
| hex digests |
| :param gots: A dict of algorithm names pointing to hashes we |
| actually got from the files under suspicion |
| """ |
| self.allowed = allowed |
| self.gots = gots |
| |
| def body(self) -> str: |
| return " {}:\n{}".format(self._requirement_name(), self._hash_comparison()) |
| |
| def _hash_comparison(self) -> str: |
| """ |
| Return a comparison of actual and expected hash values. |
| |
| Example:: |
| |
| Expected sha256 abcdeabcdeabcdeabcdeabcdeabcdeabcdeabcdeabcde |
| or 123451234512345123451234512345123451234512345 |
| Got bcdefbcdefbcdefbcdefbcdefbcdefbcdefbcdefbcdef |
| |
| """ |
| |
| def hash_then_or(hash_name: str) -> "chain[str]": |
| # For now, all the decent hashes have 6-char names, so we can get |
| # away with hard-coding space literals. |
| return chain([hash_name], repeat(" or")) |
| |
| lines: List[str] = [] |
| for hash_name, expecteds in self.allowed.items(): |
| prefix = hash_then_or(hash_name) |
| lines.extend( |
| (" Expected {} {}".format(next(prefix), e)) for e in expecteds |
| ) |
| lines.append( |
| " Got {}\n".format(self.gots[hash_name].hexdigest()) |
| ) |
| return "\n".join(lines) |
| |
| |
| class UnsupportedPythonVersion(InstallationError): |
| """Unsupported python version according to Requires-Python package |
| metadata.""" |
| |
| |
| class ConfigurationFileCouldNotBeLoaded(ConfigurationError): |
| """When there are errors while loading a configuration file""" |
| |
| def __init__( |
| self, |
| reason: str = "could not be loaded", |
| fname: Optional[str] = None, |
| error: Optional[configparser.Error] = None, |
| ) -> None: |
| super().__init__(error) |
| self.reason = reason |
| self.fname = fname |
| self.error = error |
| |
| def __str__(self) -> str: |
| if self.fname is not None: |
| message_part = f" in {self.fname}." |
| else: |
| assert self.error is not None |
| message_part = f".\n{self.error}\n" |
| return f"Configuration file {self.reason}{message_part}" |