| import functools |
| import logging |
| |
| from pip._vendor import six |
| from pip._vendor.packaging.utils import canonicalize_name |
| from pip._vendor.resolvelib import BaseReporter, ResolutionImpossible |
| from pip._vendor.resolvelib import Resolver as RLResolver |
| |
| from pip._internal.exceptions import InstallationError |
| from pip._internal.req.req_set import RequirementSet |
| from pip._internal.resolution.base import BaseResolver |
| from pip._internal.resolution.resolvelib.provider import PipProvider |
| from pip._internal.utils.typing import MYPY_CHECK_RUNNING |
| |
| from .factory import Factory |
| |
| if MYPY_CHECK_RUNNING: |
| from typing import Dict, List, Optional, Tuple |
| |
| from pip._vendor.resolvelib.resolvers import Result |
| |
| from pip._internal.cache import WheelCache |
| from pip._internal.index.package_finder import PackageFinder |
| from pip._internal.operations.prepare import RequirementPreparer |
| from pip._internal.req.req_install import InstallRequirement |
| from pip._internal.resolution.base import InstallRequirementProvider |
| |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| class Resolver(BaseResolver): |
| def __init__( |
| self, |
| preparer, # type: RequirementPreparer |
| finder, # type: PackageFinder |
| wheel_cache, # type: Optional[WheelCache] |
| make_install_req, # type: InstallRequirementProvider |
| use_user_site, # type: bool |
| ignore_dependencies, # type: bool |
| ignore_installed, # type: bool |
| ignore_requires_python, # type: bool |
| force_reinstall, # type: bool |
| upgrade_strategy, # type: str |
| py_version_info=None, # type: Optional[Tuple[int, ...]] |
| ): |
| super(Resolver, self).__init__() |
| self.factory = Factory( |
| finder=finder, |
| preparer=preparer, |
| make_install_req=make_install_req, |
| force_reinstall=force_reinstall, |
| ignore_installed=ignore_installed, |
| ignore_requires_python=ignore_requires_python, |
| py_version_info=py_version_info, |
| ) |
| self.ignore_dependencies = ignore_dependencies |
| self._result = None # type: Optional[Result] |
| |
| def resolve(self, root_reqs, check_supported_wheels): |
| # type: (List[InstallRequirement], bool) -> RequirementSet |
| |
| # FIXME: Implement constraints. |
| if any(r.constraint for r in root_reqs): |
| raise InstallationError("Constraints are not yet supported.") |
| |
| provider = PipProvider( |
| factory=self.factory, |
| ignore_dependencies=self.ignore_dependencies, |
| ) |
| reporter = BaseReporter() |
| resolver = RLResolver(provider, reporter) |
| |
| requirements = [ |
| self.factory.make_requirement_from_install_req(r) |
| for r in root_reqs |
| ] |
| |
| try: |
| self._result = resolver.resolve(requirements) |
| |
| except ResolutionImpossible as e: |
| error = self.factory.get_installation_error(e) |
| if not error: |
| # TODO: This needs fixing, we need to look at the |
| # factory.get_installation_error infrastructure, as that |
| # doesn't really allow for the logger.critical calls I'm |
| # using here. |
| for req, parent in e.causes: |
| logger.critical( |
| "Could not find a version that satisfies " + |
| "the requirement " + |
| str(req) + |
| ("" if parent is None else " (from {})".format( |
| parent.name |
| )) |
| ) |
| raise InstallationError( |
| "No matching distribution found for " + |
| ", ".join([r.name for r, _ in e.causes]) |
| ) |
| raise |
| six.raise_from(error, e) |
| |
| req_set = RequirementSet(check_supported_wheels=check_supported_wheels) |
| for candidate in self._result.mapping.values(): |
| ireq = provider.get_install_requirement(candidate) |
| if ireq is None: |
| continue |
| ireq.should_reinstall = self.factory.should_reinstall(candidate) |
| req_set.add_named_requirement(ireq) |
| |
| return req_set |
| |
| def get_installation_order(self, req_set): |
| # type: (RequirementSet) -> List[InstallRequirement] |
| """Create a list that orders given requirements for installation. |
| |
| The returned list should contain all requirements in ``req_set``, |
| so the caller can loop through it and have a requirement installed |
| before the requiring thing. |
| |
| The current implementation walks the resolved dependency graph, and |
| make sure every node has a greater "weight" than all its parents. |
| """ |
| assert self._result is not None, "must call resolve() first" |
| weights = {} # type: Dict[Optional[str], int] |
| |
| graph = self._result.graph |
| key_count = len(self._result.mapping) + 1 # Packages plus sentinal. |
| while len(weights) < key_count: |
| progressed = False |
| for key in graph: |
| if key in weights: |
| continue |
| parents = list(graph.iter_parents(key)) |
| if not all(p in weights for p in parents): |
| continue |
| if parents: |
| weight = max(weights[p] for p in parents) + 1 |
| else: |
| weight = 0 |
| weights[key] = weight |
| progressed = True |
| |
| # FIXME: This check will fail if there are unbreakable cycles. |
| # Implement something to forcifully break them up to continue. |
| if not progressed: |
| raise InstallationError( |
| "Could not determine installation order due to cicular " |
| "dependency." |
| ) |
| |
| sorted_items = sorted( |
| req_set.requirements.items(), |
| key=functools.partial(_req_set_item_sorter, weights=weights), |
| reverse=True, |
| ) |
| return [ireq for _, ireq in sorted_items] |
| |
| |
| def _req_set_item_sorter( |
| item, # type: Tuple[str, InstallRequirement] |
| weights, # type: Dict[Optional[str], int] |
| ): |
| # type: (...) -> Tuple[int, str] |
| """Key function used to sort install requirements for installation. |
| |
| Based on the "weight" mapping calculated in ``get_installation_order()``. |
| The canonical package name is returned as the second member as a tie- |
| breaker to ensure the result is predictable, which is useful in tests. |
| """ |
| name = canonicalize_name(item[0]) |
| return weights[name], name |