| """Routines related to PyPI, indexes""" |
| from __future__ import absolute_import |
| |
| import cgi |
| import itertools |
| import logging |
| import mimetypes |
| import os |
| import re |
| |
| from pip._vendor import html5lib, requests, six |
| from pip._vendor.distlib.compat import unescape |
| from pip._vendor.packaging import specifiers |
| from pip._vendor.packaging.utils import canonicalize_name |
| from pip._vendor.packaging.version import parse as parse_version |
| from pip._vendor.requests.exceptions import HTTPError, RetryError, SSLError |
| from pip._vendor.six.moves.urllib import parse as urllib_parse |
| from pip._vendor.six.moves.urllib import request as urllib_request |
| |
| from pip._internal.download import is_url, url_to_path |
| from pip._internal.exceptions import ( |
| BestVersionAlreadyInstalled, DistributionNotFound, InvalidWheelFilename, |
| UnsupportedWheel, |
| ) |
| from pip._internal.models.candidate import InstallationCandidate |
| from pip._internal.models.format_control import FormatControl |
| from pip._internal.models.link import Link |
| from pip._internal.models.selection_prefs import SelectionPreferences |
| from pip._internal.models.target_python import TargetPython |
| from pip._internal.utils.compat import ipaddress |
| from pip._internal.utils.logging import indent_log |
| from pip._internal.utils.misc import ( |
| ARCHIVE_EXTENSIONS, SUPPORTED_EXTENSIONS, WHEEL_EXTENSION, path_to_url, |
| redact_password_from_url, |
| ) |
| from pip._internal.utils.packaging import check_requires_python |
| from pip._internal.utils.typing import MYPY_CHECK_RUNNING |
| from pip._internal.wheel import Wheel |
| |
| if MYPY_CHECK_RUNNING: |
| from logging import Logger |
| from typing import ( |
| Any, Callable, FrozenSet, Iterable, Iterator, List, MutableMapping, |
| Optional, Sequence, Set, Text, Tuple, Union, |
| ) |
| import xml.etree.ElementTree |
| from pip._vendor.packaging.version import _BaseVersion |
| from pip._vendor.requests import Response |
| from pip._internal.models.search_scope import SearchScope |
| from pip._internal.req import InstallRequirement |
| from pip._internal.download import PipSession |
| from pip._internal.pep425tags import Pep425Tag |
| from pip._internal.utils.hashes import Hashes |
| |
| BuildTag = Tuple[Any, ...] # either empty tuple or Tuple[int, str] |
| CandidateSortingKey = ( |
| Tuple[int, int, int, _BaseVersion, BuildTag, Optional[int]] |
| ) |
| HTMLElement = xml.etree.ElementTree.Element |
| SecureOrigin = Tuple[str, str, Optional[str]] |
| |
| |
| __all__ = ['FormatControl', 'FoundCandidates', 'PackageFinder'] |
| |
| |
| SECURE_ORIGINS = [ |
| # protocol, hostname, port |
| # Taken from Chrome's list of secure origins (See: http://bit.ly/1qrySKC) |
| ("https", "*", "*"), |
| ("*", "localhost", "*"), |
| ("*", "127.0.0.0/8", "*"), |
| ("*", "::1/128", "*"), |
| ("file", "*", None), |
| # ssh is always secure. |
| ("ssh", "*", "*"), |
| ] # type: List[SecureOrigin] |
| |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| def _match_vcs_scheme(url): |
| # type: (str) -> Optional[str] |
| """Look for VCS schemes in the URL. |
| |
| Returns the matched VCS scheme, or None if there's no match. |
| """ |
| from pip._internal.vcs import vcs |
| for scheme in vcs.schemes: |
| if url.lower().startswith(scheme) and url[len(scheme)] in '+:': |
| return scheme |
| return None |
| |
| |
| def _is_url_like_archive(url): |
| # type: (str) -> bool |
| """Return whether the URL looks like an archive. |
| """ |
| filename = Link(url).filename |
| for bad_ext in ARCHIVE_EXTENSIONS: |
| if filename.endswith(bad_ext): |
| return True |
| return False |
| |
| |
| class _NotHTML(Exception): |
| def __init__(self, content_type, request_desc): |
| # type: (str, str) -> None |
| super(_NotHTML, self).__init__(content_type, request_desc) |
| self.content_type = content_type |
| self.request_desc = request_desc |
| |
| |
| def _ensure_html_header(response): |
| # type: (Response) -> None |
| """Check the Content-Type header to ensure the response contains HTML. |
| |
| Raises `_NotHTML` if the content type is not text/html. |
| """ |
| content_type = response.headers.get("Content-Type", "") |
| if not content_type.lower().startswith("text/html"): |
| raise _NotHTML(content_type, response.request.method) |
| |
| |
| class _NotHTTP(Exception): |
| pass |
| |
| |
| def _ensure_html_response(url, session): |
| # type: (str, PipSession) -> None |
| """Send a HEAD request to the URL, and ensure the response contains HTML. |
| |
| Raises `_NotHTTP` if the URL is not available for a HEAD request, or |
| `_NotHTML` if the content type is not text/html. |
| """ |
| scheme, netloc, path, query, fragment = urllib_parse.urlsplit(url) |
| if scheme not in {'http', 'https'}: |
| raise _NotHTTP() |
| |
| resp = session.head(url, allow_redirects=True) |
| resp.raise_for_status() |
| |
| _ensure_html_header(resp) |
| |
| |
| def _get_html_response(url, session): |
| # type: (str, PipSession) -> Response |
| """Access an HTML page with GET, and return the response. |
| |
| This consists of three parts: |
| |
| 1. If the URL looks suspiciously like an archive, send a HEAD first to |
| check the Content-Type is HTML, to avoid downloading a large file. |
| Raise `_NotHTTP` if the content type cannot be determined, or |
| `_NotHTML` if it is not HTML. |
| 2. Actually perform the request. Raise HTTP exceptions on network failures. |
| 3. Check the Content-Type header to make sure we got HTML, and raise |
| `_NotHTML` otherwise. |
| """ |
| if _is_url_like_archive(url): |
| _ensure_html_response(url, session=session) |
| |
| logger.debug('Getting page %s', redact_password_from_url(url)) |
| |
| resp = session.get( |
| url, |
| headers={ |
| "Accept": "text/html", |
| # We don't want to blindly returned cached data for |
| # /simple/, because authors generally expecting that |
| # twine upload && pip install will function, but if |
| # they've done a pip install in the last ~10 minutes |
| # it won't. Thus by setting this to zero we will not |
| # blindly use any cached data, however the benefit of |
| # using max-age=0 instead of no-cache, is that we will |
| # still support conditional requests, so we will still |
| # minimize traffic sent in cases where the page hasn't |
| # changed at all, we will just always incur the round |
| # trip for the conditional GET now instead of only |
| # once per 10 minutes. |
| # For more information, please see pypa/pip#5670. |
| "Cache-Control": "max-age=0", |
| }, |
| ) |
| resp.raise_for_status() |
| |
| # The check for archives above only works if the url ends with |
| # something that looks like an archive. However that is not a |
| # requirement of an url. Unless we issue a HEAD request on every |
| # url we cannot know ahead of time for sure if something is HTML |
| # or not. However we can check after we've downloaded it. |
| _ensure_html_header(resp) |
| |
| return resp |
| |
| |
| def _handle_get_page_fail( |
| link, # type: Link |
| reason, # type: Union[str, Exception] |
| meth=None # type: Optional[Callable[..., None]] |
| ): |
| # type: (...) -> None |
| if meth is None: |
| meth = logger.debug |
| meth("Could not fetch URL %s: %s - skipping", link, reason) |
| |
| |
| def _get_html_page(link, session=None): |
| # type: (Link, Optional[PipSession]) -> Optional[HTMLPage] |
| if session is None: |
| raise TypeError( |
| "_get_html_page() missing 1 required keyword argument: 'session'" |
| ) |
| |
| url = link.url.split('#', 1)[0] |
| |
| # Check for VCS schemes that do not support lookup as web pages. |
| vcs_scheme = _match_vcs_scheme(url) |
| if vcs_scheme: |
| logger.debug('Cannot look at %s URL %s', vcs_scheme, link) |
| return None |
| |
| # Tack index.html onto file:// URLs that point to directories |
| scheme, _, path, _, _, _ = urllib_parse.urlparse(url) |
| if (scheme == 'file' and os.path.isdir(urllib_request.url2pathname(path))): |
| # add trailing slash if not present so urljoin doesn't trim |
| # final segment |
| if not url.endswith('/'): |
| url += '/' |
| url = urllib_parse.urljoin(url, 'index.html') |
| logger.debug(' file: URL is directory, getting %s', url) |
| |
| try: |
| resp = _get_html_response(url, session=session) |
| except _NotHTTP: |
| logger.debug( |
| 'Skipping page %s because it looks like an archive, and cannot ' |
| 'be checked by HEAD.', link, |
| ) |
| except _NotHTML as exc: |
| logger.debug( |
| 'Skipping page %s because the %s request got Content-Type: %s', |
| link, exc.request_desc, exc.content_type, |
| ) |
| except HTTPError as exc: |
| _handle_get_page_fail(link, exc) |
| except RetryError as exc: |
| _handle_get_page_fail(link, exc) |
| except SSLError as exc: |
| reason = "There was a problem confirming the ssl certificate: " |
| reason += str(exc) |
| _handle_get_page_fail(link, reason, meth=logger.info) |
| except requests.ConnectionError as exc: |
| _handle_get_page_fail(link, "connection error: %s" % exc) |
| except requests.Timeout: |
| _handle_get_page_fail(link, "timed out") |
| else: |
| return HTMLPage(resp.content, resp.url, resp.headers) |
| return None |
| |
| |
| def _check_link_requires_python( |
| link, # type: Link |
| version_info, # type: Tuple[int, int, int] |
| ignore_requires_python=False, # type: bool |
| ): |
| # type: (...) -> bool |
| """ |
| Return whether the given Python version is compatible with a link's |
| "Requires-Python" value. |
| |
| :param version_info: A 3-tuple of ints representing the Python |
| major-minor-micro version to check. |
| :param ignore_requires_python: Whether to ignore the "Requires-Python" |
| value if the given Python version isn't compatible. |
| """ |
| try: |
| is_compatible = check_requires_python( |
| link.requires_python, version_info=version_info, |
| ) |
| except specifiers.InvalidSpecifier: |
| logger.debug( |
| "Ignoring invalid Requires-Python (%r) for link: %s", |
| link.requires_python, link, |
| ) |
| else: |
| if not is_compatible: |
| version = '.'.join(map(str, version_info)) |
| if not ignore_requires_python: |
| logger.debug( |
| 'Link requires a different Python (%s not in: %r): %s', |
| version, link.requires_python, link, |
| ) |
| return False |
| |
| logger.debug( |
| 'Ignoring failed Requires-Python check (%s not in: %r) ' |
| 'for link: %s', |
| version, link.requires_python, link, |
| ) |
| |
| return True |
| |
| |
| class LinkEvaluator(object): |
| |
| """ |
| Responsible for evaluating links for a particular project. |
| """ |
| |
| _py_version_re = re.compile(r'-py([123]\.?[0-9]?)$') |
| |
| # Don't include an allow_yanked default value to make sure each call |
| # site considers whether yanked releases are allowed. This also causes |
| # that decision to be made explicit in the calling code, which helps |
| # people when reading the code. |
| def __init__( |
| self, |
| project_name, # type: str |
| canonical_name, # type: str |
| formats, # type: FrozenSet |
| target_python, # type: TargetPython |
| allow_yanked, # type: bool |
| ignore_requires_python=None, # type: Optional[bool] |
| ): |
| # type: (...) -> None |
| """ |
| :param project_name: The user supplied package name. |
| :param canonical_name: The canonical package name. |
| :param formats: The formats allowed for this package. Should be a set |
| with 'binary' or 'source' or both in it. |
| :param target_python: The target Python interpreter to use when |
| evaluating link compatibility. This is used, for example, to |
| check wheel compatibility, as well as when checking the Python |
| version, e.g. the Python version embedded in a link filename |
| (or egg fragment) and against an HTML link's optional PEP 503 |
| "data-requires-python" attribute. |
| :param allow_yanked: Whether files marked as yanked (in the sense |
| of PEP 592) are permitted to be candidates for install. |
| :param ignore_requires_python: Whether to ignore incompatible |
| PEP 503 "data-requires-python" values in HTML links. Defaults |
| to False. |
| """ |
| if ignore_requires_python is None: |
| ignore_requires_python = False |
| |
| self._allow_yanked = allow_yanked |
| self._canonical_name = canonical_name |
| self._ignore_requires_python = ignore_requires_python |
| self._formats = formats |
| self._target_python = target_python |
| |
| self.project_name = project_name |
| |
| def evaluate_link(self, link): |
| # type: (Link) -> Tuple[bool, Optional[Text]] |
| """ |
| Determine whether a link is a candidate for installation. |
| |
| :return: A tuple (is_candidate, result), where `result` is (1) a |
| version string if `is_candidate` is True, and (2) if |
| `is_candidate` is False, an optional string to log the reason |
| the link fails to qualify. |
| """ |
| version = None |
| if link.is_yanked and not self._allow_yanked: |
| reason = link.yanked_reason or '<none given>' |
| # Mark this as a unicode string to prevent "UnicodeEncodeError: |
| # 'ascii' codec can't encode character" in Python 2 when |
| # the reason contains non-ascii characters. |
| return (False, u'yanked for reason: {}'.format(reason)) |
| |
| if link.egg_fragment: |
| egg_info = link.egg_fragment |
| ext = link.ext |
| else: |
| egg_info, ext = link.splitext() |
| if not ext: |
| return (False, 'not a file') |
| if ext not in SUPPORTED_EXTENSIONS: |
| return (False, 'unsupported archive format: %s' % ext) |
| if "binary" not in self._formats and ext == WHEEL_EXTENSION: |
| reason = 'No binaries permitted for %s' % self.project_name |
| return (False, reason) |
| if "macosx10" in link.path and ext == '.zip': |
| return (False, 'macosx10 one') |
| if ext == WHEEL_EXTENSION: |
| try: |
| wheel = Wheel(link.filename) |
| except InvalidWheelFilename: |
| return (False, 'invalid wheel filename') |
| if canonicalize_name(wheel.name) != self._canonical_name: |
| reason = 'wrong project name (not %s)' % self.project_name |
| return (False, reason) |
| |
| supported_tags = self._target_python.get_tags() |
| if not wheel.supported(supported_tags): |
| # Include the wheel's tags in the reason string to |
| # simplify troubleshooting compatibility issues. |
| file_tags = wheel.get_formatted_file_tags() |
| reason = ( |
| "none of the wheel's tags match: {}".format( |
| ', '.join(file_tags) |
| ) |
| ) |
| return (False, reason) |
| |
| version = wheel.version |
| |
| # This should be up by the self.ok_binary check, but see issue 2700. |
| if "source" not in self._formats and ext != WHEEL_EXTENSION: |
| return (False, 'No sources permitted for %s' % self.project_name) |
| |
| if not version: |
| version = _extract_version_from_fragment( |
| egg_info, self._canonical_name, |
| ) |
| if not version: |
| return ( |
| False, 'Missing project version for %s' % self.project_name, |
| ) |
| |
| match = self._py_version_re.search(version) |
| if match: |
| version = version[:match.start()] |
| py_version = match.group(1) |
| if py_version != self._target_python.py_version: |
| return (False, 'Python version is incorrect') |
| |
| supports_python = _check_link_requires_python( |
| link, version_info=self._target_python.py_version_info, |
| ignore_requires_python=self._ignore_requires_python, |
| ) |
| if not supports_python: |
| # Return None for the reason text to suppress calling |
| # _log_skipped_link(). |
| return (False, None) |
| |
| logger.debug('Found link %s, version: %s', link, version) |
| |
| return (True, version) |
| |
| |
| def filter_unallowed_hashes( |
| candidates, # type: List[InstallationCandidate] |
| hashes, # type: Hashes |
| project_name, # type: str |
| ): |
| # type: (...) -> List[InstallationCandidate] |
| """ |
| Filter out candidates whose hashes aren't allowed, and return a new |
| list of candidates. |
| |
| If at least one candidate has an allowed hash, then all candidates with |
| either an allowed hash or no hash specified are returned. Otherwise, |
| the given candidates are returned. |
| |
| Including the candidates with no hash specified when there is a match |
| allows a warning to be logged if there is a more preferred candidate |
| with no hash specified. Returning all candidates in the case of no |
| matches lets pip report the hash of the candidate that would otherwise |
| have been installed (e.g. permitting the user to more easily update |
| their requirements file with the desired hash). |
| """ |
| if not hashes: |
| logger.debug( |
| 'Given no hashes to check %s links for project %r: ' |
| 'discarding no candidates', |
| len(candidates), |
| project_name, |
| ) |
| # Make sure we're not returning back the given value. |
| return list(candidates) |
| |
| matches_or_no_digest = [] |
| # Collect the non-matches for logging purposes. |
| non_matches = [] |
| match_count = 0 |
| for candidate in candidates: |
| link = candidate.link |
| if not link.has_hash: |
| pass |
| elif link.is_hash_allowed(hashes=hashes): |
| match_count += 1 |
| else: |
| non_matches.append(candidate) |
| continue |
| |
| matches_or_no_digest.append(candidate) |
| |
| if match_count: |
| filtered = matches_or_no_digest |
| else: |
| # Make sure we're not returning back the given value. |
| filtered = list(candidates) |
| |
| if len(filtered) == len(candidates): |
| discard_message = 'discarding no candidates' |
| else: |
| discard_message = 'discarding {} non-matches:\n {}'.format( |
| len(non_matches), |
| '\n '.join(str(candidate.link) for candidate in non_matches) |
| ) |
| |
| logger.debug( |
| 'Checked %s links for project %r against %s hashes ' |
| '(%s matches, %s no digest): %s', |
| len(candidates), |
| project_name, |
| hashes.digest_count, |
| match_count, |
| len(matches_or_no_digest) - match_count, |
| discard_message |
| ) |
| |
| return filtered |
| |
| |
| class CandidatePreferences(object): |
| |
| """ |
| Encapsulates some of the preferences for filtering and sorting |
| InstallationCandidate objects. |
| """ |
| |
| def __init__( |
| self, |
| prefer_binary=False, # type: bool |
| allow_all_prereleases=False, # type: bool |
| ): |
| # type: (...) -> None |
| """ |
| :param allow_all_prereleases: Whether to allow all pre-releases. |
| """ |
| self.allow_all_prereleases = allow_all_prereleases |
| self.prefer_binary = prefer_binary |
| |
| |
| class CandidateEvaluator(object): |
| |
| """ |
| Responsible for filtering and sorting candidates for installation based |
| on what tags are valid. |
| """ |
| |
| @classmethod |
| def create( |
| cls, |
| project_name, # type: str |
| target_python=None, # type: Optional[TargetPython] |
| prefer_binary=False, # type: bool |
| allow_all_prereleases=False, # type: bool |
| specifier=None, # type: Optional[specifiers.BaseSpecifier] |
| hashes=None, # type: Optional[Hashes] |
| ): |
| # type: (...) -> CandidateEvaluator |
| """Create a CandidateEvaluator object. |
| |
| :param target_python: The target Python interpreter to use when |
| checking compatibility. If None (the default), a TargetPython |
| object will be constructed from the running Python. |
| :param hashes: An optional collection of allowed hashes. |
| """ |
| if target_python is None: |
| target_python = TargetPython() |
| if specifier is None: |
| specifier = specifiers.SpecifierSet() |
| |
| supported_tags = target_python.get_tags() |
| |
| return cls( |
| project_name=project_name, |
| supported_tags=supported_tags, |
| specifier=specifier, |
| prefer_binary=prefer_binary, |
| allow_all_prereleases=allow_all_prereleases, |
| hashes=hashes, |
| ) |
| |
| def __init__( |
| self, |
| project_name, # type: str |
| supported_tags, # type: List[Pep425Tag] |
| specifier, # type: specifiers.BaseSpecifier |
| prefer_binary=False, # type: bool |
| allow_all_prereleases=False, # type: bool |
| hashes=None, # type: Optional[Hashes] |
| ): |
| # type: (...) -> None |
| """ |
| :param supported_tags: The PEP 425 tags supported by the target |
| Python in order of preference (most preferred first). |
| """ |
| self._allow_all_prereleases = allow_all_prereleases |
| self._hashes = hashes |
| self._prefer_binary = prefer_binary |
| self._project_name = project_name |
| self._specifier = specifier |
| self._supported_tags = supported_tags |
| |
| def get_applicable_candidates( |
| self, |
| candidates, # type: List[InstallationCandidate] |
| ): |
| # type: (...) -> List[InstallationCandidate] |
| """ |
| Return the applicable candidates from a list of candidates. |
| """ |
| # Using None infers from the specifier instead. |
| allow_prereleases = self._allow_all_prereleases or None |
| specifier = self._specifier |
| versions = { |
| str(v) for v in specifier.filter( |
| # We turn the version object into a str here because otherwise |
| # when we're debundled but setuptools isn't, Python will see |
| # packaging.version.Version and |
| # pkg_resources._vendor.packaging.version.Version as different |
| # types. This way we'll use a str as a common data interchange |
| # format. If we stop using the pkg_resources provided specifier |
| # and start using our own, we can drop the cast to str(). |
| (str(c.version) for c in candidates), |
| prereleases=allow_prereleases, |
| ) |
| } |
| |
| # Again, converting version to str to deal with debundling. |
| applicable_candidates = [ |
| c for c in candidates if str(c.version) in versions |
| ] |
| |
| return filter_unallowed_hashes( |
| candidates=applicable_candidates, |
| hashes=self._hashes, |
| project_name=self._project_name, |
| ) |
| |
| def make_found_candidates( |
| self, |
| candidates, # type: List[InstallationCandidate] |
| ): |
| # type: (...) -> FoundCandidates |
| """ |
| Create and return a `FoundCandidates` instance. |
| |
| :param specifier: An optional object implementing `filter` |
| (e.g. `packaging.specifiers.SpecifierSet`) to filter applicable |
| versions. |
| """ |
| applicable_candidates = self.get_applicable_candidates(candidates) |
| |
| return FoundCandidates( |
| candidates, |
| applicable_candidates=applicable_candidates, |
| evaluator=self, |
| ) |
| |
| def _sort_key(self, candidate): |
| # type: (InstallationCandidate) -> CandidateSortingKey |
| """ |
| Function to pass as the `key` argument to a call to sorted() to sort |
| InstallationCandidates by preference. |
| |
| Returns a tuple such that tuples sorting as greater using Python's |
| default comparison operator are more preferred. |
| |
| The preference is as follows: |
| |
| First and foremost, candidates with allowed (matching) hashes are |
| always preferred over candidates without matching hashes. This is |
| because e.g. if the only candidate with an allowed hash is yanked, |
| we still want to use that candidate. |
| |
| Second, excepting hash considerations, candidates that have been |
| yanked (in the sense of PEP 592) are always less preferred than |
| candidates that haven't been yanked. Then: |
| |
| If not finding wheels, they are sorted by version only. |
| If finding wheels, then the sort order is by version, then: |
| 1. existing installs |
| 2. wheels ordered via Wheel.support_index_min(self._supported_tags) |
| 3. source archives |
| If prefer_binary was set, then all wheels are sorted above sources. |
| |
| Note: it was considered to embed this logic into the Link |
| comparison operators, but then different sdist links |
| with the same version, would have to be considered equal |
| """ |
| valid_tags = self._supported_tags |
| support_num = len(valid_tags) |
| build_tag = tuple() # type: BuildTag |
| binary_preference = 0 |
| link = candidate.link |
| if link.is_wheel: |
| # can raise InvalidWheelFilename |
| wheel = Wheel(link.filename) |
| if not wheel.supported(valid_tags): |
| raise UnsupportedWheel( |
| "%s is not a supported wheel for this platform. It " |
| "can't be sorted." % wheel.filename |
| ) |
| if self._prefer_binary: |
| binary_preference = 1 |
| pri = -(wheel.support_index_min(valid_tags)) |
| if wheel.build_tag is not None: |
| match = re.match(r'^(\d+)(.*)$', wheel.build_tag) |
| build_tag_groups = match.groups() |
| build_tag = (int(build_tag_groups[0]), build_tag_groups[1]) |
| else: # sdist |
| pri = -(support_num) |
| has_allowed_hash = int(link.is_hash_allowed(self._hashes)) |
| yank_value = -1 * int(link.is_yanked) # -1 for yanked. |
| return ( |
| has_allowed_hash, yank_value, binary_preference, candidate.version, |
| build_tag, pri, |
| ) |
| |
| def get_best_candidate( |
| self, |
| candidates, # type: List[InstallationCandidate] |
| ): |
| # type: (...) -> Optional[InstallationCandidate] |
| """ |
| Return the best candidate per the instance's sort order, or None if |
| no candidate is acceptable. |
| """ |
| if not candidates: |
| return None |
| |
| best_candidate = max(candidates, key=self._sort_key) |
| |
| # Log a warning per PEP 592 if necessary before returning. |
| link = best_candidate.link |
| if link.is_yanked: |
| reason = link.yanked_reason or '<none given>' |
| msg = ( |
| # Mark this as a unicode string to prevent |
| # "UnicodeEncodeError: 'ascii' codec can't encode character" |
| # in Python 2 when the reason contains non-ascii characters. |
| u'The candidate selected for download or install is a ' |
| 'yanked version: {candidate}\n' |
| 'Reason for being yanked: {reason}' |
| ).format(candidate=best_candidate, reason=reason) |
| logger.warning(msg) |
| |
| return best_candidate |
| |
| |
| class FoundCandidates(object): |
| """A collection of candidates, returned by `PackageFinder.find_candidates`. |
| |
| This class is only intended to be instantiated by CandidateEvaluator's |
| `make_found_candidates()` method. |
| """ |
| |
| def __init__( |
| self, |
| candidates, # type: List[InstallationCandidate] |
| applicable_candidates, # type: List[InstallationCandidate] |
| evaluator, # type: CandidateEvaluator |
| ): |
| # type: (...) -> None |
| """ |
| :param candidates: A sequence of all available candidates found. |
| :param applicable_candidates: The applicable candidates. |
| :param evaluator: A CandidateEvaluator object to sort applicable |
| candidates by order of preference. |
| """ |
| self._applicable_candidates = applicable_candidates |
| self._candidates = candidates |
| self._evaluator = evaluator |
| |
| def iter_all(self): |
| # type: () -> Iterable[InstallationCandidate] |
| """Iterate through all candidates. |
| """ |
| return iter(self._candidates) |
| |
| def iter_applicable(self): |
| # type: () -> Iterable[InstallationCandidate] |
| """Iterate through the applicable candidates. |
| """ |
| return iter(self._applicable_candidates) |
| |
| def get_best(self): |
| # type: () -> Optional[InstallationCandidate] |
| """Return the best candidate available, or None if no applicable |
| candidates are found. |
| """ |
| candidates = list(self.iter_applicable()) |
| return self._evaluator.get_best_candidate(candidates) |
| |
| |
| class PackageFinder(object): |
| """This finds packages. |
| |
| This is meant to match easy_install's technique for looking for |
| packages, by reading pages and looking for appropriate links. |
| """ |
| |
| def __init__( |
| self, |
| search_scope, # type: SearchScope |
| session, # type: PipSession |
| target_python, # type: TargetPython |
| allow_yanked, # type: bool |
| format_control=None, # type: Optional[FormatControl] |
| trusted_hosts=None, # type: Optional[List[str]] |
| candidate_prefs=None, # type: CandidatePreferences |
| ignore_requires_python=None, # type: Optional[bool] |
| ): |
| # type: (...) -> None |
| """ |
| This constructor is primarily meant to be used by the create() class |
| method and from tests. |
| |
| :param session: The Session to use to make requests. |
| :param format_control: A FormatControl object, used to control |
| the selection of source packages / binary packages when consulting |
| the index and links. |
| :param candidate_prefs: Options to use when creating a |
| CandidateEvaluator object. |
| """ |
| if trusted_hosts is None: |
| trusted_hosts = [] |
| if candidate_prefs is None: |
| candidate_prefs = CandidatePreferences() |
| |
| format_control = format_control or FormatControl(set(), set()) |
| |
| self._allow_yanked = allow_yanked |
| self._candidate_prefs = candidate_prefs |
| self._ignore_requires_python = ignore_requires_python |
| self._target_python = target_python |
| |
| self.search_scope = search_scope |
| self.session = session |
| self.format_control = format_control |
| self.trusted_hosts = trusted_hosts |
| |
| # These are boring links that have already been logged somehow. |
| self._logged_links = set() # type: Set[Link] |
| |
| # Don't include an allow_yanked default value to make sure each call |
| # site considers whether yanked releases are allowed. This also causes |
| # that decision to be made explicit in the calling code, which helps |
| # people when reading the code. |
| @classmethod |
| def create( |
| cls, |
| search_scope, # type: SearchScope |
| selection_prefs, # type: SelectionPreferences |
| trusted_hosts=None, # type: Optional[List[str]] |
| session=None, # type: Optional[PipSession] |
| target_python=None, # type: Optional[TargetPython] |
| ): |
| # type: (...) -> PackageFinder |
| """Create a PackageFinder. |
| |
| :param selection_prefs: The candidate selection preferences, as a |
| SelectionPreferences object. |
| :param trusted_hosts: Domains not to emit warnings for when not using |
| HTTPS. |
| :param session: The Session to use to make requests. |
| :param target_python: The target Python interpreter to use when |
| checking compatibility. If None (the default), a TargetPython |
| object will be constructed from the running Python. |
| """ |
| if session is None: |
| raise TypeError( |
| "PackageFinder.create() missing 1 required keyword argument: " |
| "'session'" |
| ) |
| if target_python is None: |
| target_python = TargetPython() |
| |
| candidate_prefs = CandidatePreferences( |
| prefer_binary=selection_prefs.prefer_binary, |
| allow_all_prereleases=selection_prefs.allow_all_prereleases, |
| ) |
| |
| return cls( |
| candidate_prefs=candidate_prefs, |
| search_scope=search_scope, |
| session=session, |
| target_python=target_python, |
| allow_yanked=selection_prefs.allow_yanked, |
| format_control=selection_prefs.format_control, |
| trusted_hosts=trusted_hosts, |
| ignore_requires_python=selection_prefs.ignore_requires_python, |
| ) |
| |
| @property |
| def find_links(self): |
| # type: () -> List[str] |
| return self.search_scope.find_links |
| |
| @property |
| def index_urls(self): |
| # type: () -> List[str] |
| return self.search_scope.index_urls |
| |
| @property |
| def allow_all_prereleases(self): |
| # type: () -> bool |
| return self._candidate_prefs.allow_all_prereleases |
| |
| def set_allow_all_prereleases(self): |
| # type: () -> None |
| self._candidate_prefs.allow_all_prereleases = True |
| |
| def add_trusted_host(self, host, source=None): |
| # type: (str, Optional[str]) -> None |
| """ |
| :param source: An optional source string, for logging where the host |
| string came from. |
| """ |
| # It is okay to add a previously added host because PipSession stores |
| # the resulting prefixes in a dict. |
| msg = 'adding trusted host: {!r}'.format(host) |
| if source is not None: |
| msg += ' (from {})'.format(source) |
| logger.info(msg) |
| self.session.add_insecure_host(host) |
| if host in self.trusted_hosts: |
| return |
| |
| self.trusted_hosts.append(host) |
| |
| def iter_secure_origins(self): |
| # type: () -> Iterator[SecureOrigin] |
| for secure_origin in SECURE_ORIGINS: |
| yield secure_origin |
| for host in self.trusted_hosts: |
| yield ('*', host, '*') |
| |
| @staticmethod |
| def _sort_locations(locations, expand_dir=False): |
| # type: (Sequence[str], bool) -> Tuple[List[str], List[str]] |
| """ |
| Sort locations into "files" (archives) and "urls", and return |
| a pair of lists (files,urls) |
| """ |
| files = [] |
| urls = [] |
| |
| # puts the url for the given file path into the appropriate list |
| def sort_path(path): |
| url = path_to_url(path) |
| if mimetypes.guess_type(url, strict=False)[0] == 'text/html': |
| urls.append(url) |
| else: |
| files.append(url) |
| |
| for url in locations: |
| |
| is_local_path = os.path.exists(url) |
| is_file_url = url.startswith('file:') |
| |
| if is_local_path or is_file_url: |
| if is_local_path: |
| path = url |
| else: |
| path = url_to_path(url) |
| if os.path.isdir(path): |
| if expand_dir: |
| path = os.path.realpath(path) |
| for item in os.listdir(path): |
| sort_path(os.path.join(path, item)) |
| elif is_file_url: |
| urls.append(url) |
| else: |
| logger.warning( |
| "Path '{0}' is ignored: " |
| "it is a directory.".format(path), |
| ) |
| elif os.path.isfile(path): |
| sort_path(path) |
| else: |
| logger.warning( |
| "Url '%s' is ignored: it is neither a file " |
| "nor a directory.", url, |
| ) |
| elif is_url(url): |
| # Only add url with clear scheme |
| urls.append(url) |
| else: |
| logger.warning( |
| "Url '%s' is ignored. It is either a non-existing " |
| "path or lacks a specific scheme.", url, |
| ) |
| |
| return files, urls |
| |
| def _validate_secure_origin(self, logger, location): |
| # type: (Logger, Link) -> bool |
| # Determine if this url used a secure transport mechanism |
| parsed = urllib_parse.urlparse(str(location)) |
| origin = (parsed.scheme, parsed.hostname, parsed.port) |
| |
| # The protocol to use to see if the protocol matches. |
| # Don't count the repository type as part of the protocol: in |
| # cases such as "git+ssh", only use "ssh". (I.e., Only verify against |
| # the last scheme.) |
| protocol = origin[0].rsplit('+', 1)[-1] |
| |
| # Determine if our origin is a secure origin by looking through our |
| # hardcoded list of secure origins, as well as any additional ones |
| # configured on this PackageFinder instance. |
| for secure_origin in self.iter_secure_origins(): |
| if protocol != secure_origin[0] and secure_origin[0] != "*": |
| continue |
| |
| try: |
| # We need to do this decode dance to ensure that we have a |
| # unicode object, even on Python 2.x. |
| addr = ipaddress.ip_address( |
| origin[1] |
| if ( |
| isinstance(origin[1], six.text_type) or |
| origin[1] is None |
| ) |
| else origin[1].decode("utf8") |
| ) |
| network = ipaddress.ip_network( |
| secure_origin[1] |
| if isinstance(secure_origin[1], six.text_type) |
| # setting secure_origin[1] to proper Union[bytes, str] |
| # creates problems in other places |
| else secure_origin[1].decode("utf8") # type: ignore |
| ) |
| except ValueError: |
| # We don't have both a valid address or a valid network, so |
| # we'll check this origin against hostnames. |
| if (origin[1] and |
| origin[1].lower() != secure_origin[1].lower() and |
| secure_origin[1] != "*"): |
| continue |
| else: |
| # We have a valid address and network, so see if the address |
| # is contained within the network. |
| if addr not in network: |
| continue |
| |
| # Check to see if the port patches |
| if (origin[2] != secure_origin[2] and |
| secure_origin[2] != "*" and |
| secure_origin[2] is not None): |
| continue |
| |
| # If we've gotten here, then this origin matches the current |
| # secure origin and we should return True |
| return True |
| |
| # If we've gotten to this point, then the origin isn't secure and we |
| # will not accept it as a valid location to search. We will however |
| # log a warning that we are ignoring it. |
| logger.warning( |
| "The repository located at %s is not a trusted or secure host and " |
| "is being ignored. If this repository is available via HTTPS we " |
| "recommend you use HTTPS instead, otherwise you may silence " |
| "this warning and allow it anyway with '--trusted-host %s'.", |
| parsed.hostname, |
| parsed.hostname, |
| ) |
| |
| return False |
| |
| def make_link_evaluator(self, project_name): |
| # type: (str) -> LinkEvaluator |
| canonical_name = canonicalize_name(project_name) |
| formats = self.format_control.get_allowed_formats(canonical_name) |
| |
| return LinkEvaluator( |
| project_name=project_name, |
| canonical_name=canonical_name, |
| formats=formats, |
| target_python=self._target_python, |
| allow_yanked=self._allow_yanked, |
| ignore_requires_python=self._ignore_requires_python, |
| ) |
| |
| def find_all_candidates(self, project_name): |
| # type: (str) -> List[InstallationCandidate] |
| """Find all available InstallationCandidate for project_name |
| |
| This checks index_urls and find_links. |
| All versions found are returned as an InstallationCandidate list. |
| |
| See LinkEvaluator.evaluate_link() for details on which files |
| are accepted. |
| """ |
| search_scope = self.search_scope |
| index_locations = search_scope.get_index_urls_locations(project_name) |
| index_file_loc, index_url_loc = self._sort_locations(index_locations) |
| fl_file_loc, fl_url_loc = self._sort_locations( |
| self.find_links, expand_dir=True, |
| ) |
| |
| file_locations = (Link(url) for url in itertools.chain( |
| index_file_loc, fl_file_loc, |
| )) |
| |
| # We trust every url that the user has given us whether it was given |
| # via --index-url or --find-links. |
| # We want to filter out any thing which does not have a secure origin. |
| url_locations = [ |
| link for link in itertools.chain( |
| (Link(url) for url in index_url_loc), |
| (Link(url) for url in fl_url_loc), |
| ) |
| if self._validate_secure_origin(logger, link) |
| ] |
| |
| logger.debug('%d location(s) to search for versions of %s:', |
| len(url_locations), project_name) |
| |
| for location in url_locations: |
| logger.debug('* %s', location) |
| |
| link_evaluator = self.make_link_evaluator(project_name) |
| find_links_versions = self._package_versions( |
| link_evaluator, |
| # We trust every directly linked archive in find_links |
| (Link(url, '-f') for url in self.find_links), |
| ) |
| |
| page_versions = [] |
| for page in self._get_pages(url_locations, project_name): |
| logger.debug('Analyzing links from page %s', page.url) |
| with indent_log(): |
| page_versions.extend( |
| self._package_versions(link_evaluator, page.iter_links()) |
| ) |
| |
| file_versions = self._package_versions(link_evaluator, file_locations) |
| if file_versions: |
| file_versions.sort(reverse=True) |
| logger.debug( |
| 'Local files found: %s', |
| ', '.join([ |
| url_to_path(candidate.link.url) |
| for candidate in file_versions |
| ]) |
| ) |
| |
| # This is an intentional priority ordering |
| return file_versions + find_links_versions + page_versions |
| |
| def make_candidate_evaluator( |
| self, |
| project_name, # type: str |
| specifier=None, # type: Optional[specifiers.BaseSpecifier] |
| hashes=None, # type: Optional[Hashes] |
| ): |
| # type: (...) -> CandidateEvaluator |
| """Create a CandidateEvaluator object to use. |
| """ |
| candidate_prefs = self._candidate_prefs |
| return CandidateEvaluator.create( |
| project_name=project_name, |
| target_python=self._target_python, |
| prefer_binary=candidate_prefs.prefer_binary, |
| allow_all_prereleases=candidate_prefs.allow_all_prereleases, |
| specifier=specifier, |
| hashes=hashes, |
| ) |
| |
| def find_candidates( |
| self, |
| project_name, # type: str |
| specifier=None, # type: Optional[specifiers.BaseSpecifier] |
| hashes=None, # type: Optional[Hashes] |
| ): |
| # type: (...) -> FoundCandidates |
| """Find matches for the given project and specifier. |
| |
| :param specifier: An optional object implementing `filter` |
| (e.g. `packaging.specifiers.SpecifierSet`) to filter applicable |
| versions. |
| |
| :return: A `FoundCandidates` instance. |
| """ |
| candidates = self.find_all_candidates(project_name) |
| candidate_evaluator = self.make_candidate_evaluator( |
| project_name=project_name, |
| specifier=specifier, |
| hashes=hashes, |
| ) |
| return candidate_evaluator.make_found_candidates(candidates) |
| |
| def find_requirement(self, req, upgrade): |
| # type: (InstallRequirement, bool) -> Optional[Link] |
| """Try to find a Link matching req |
| |
| Expects req, an InstallRequirement and upgrade, a boolean |
| Returns a Link if found, |
| Raises DistributionNotFound or BestVersionAlreadyInstalled otherwise |
| """ |
| hashes = req.hashes(trust_internet=False) |
| candidates = self.find_candidates( |
| req.name, specifier=req.specifier, hashes=hashes, |
| ) |
| best_candidate = candidates.get_best() |
| |
| installed_version = None # type: Optional[_BaseVersion] |
| if req.satisfied_by is not None: |
| installed_version = parse_version(req.satisfied_by.version) |
| |
| def _format_versions(cand_iter): |
| # This repeated parse_version and str() conversion is needed to |
| # handle different vendoring sources from pip and pkg_resources. |
| # If we stop using the pkg_resources provided specifier and start |
| # using our own, we can drop the cast to str(). |
| return ", ".join(sorted( |
| {str(c.version) for c in cand_iter}, |
| key=parse_version, |
| )) or "none" |
| |
| if installed_version is None and best_candidate is None: |
| logger.critical( |
| 'Could not find a version that satisfies the requirement %s ' |
| '(from versions: %s)', |
| req, |
| _format_versions(candidates.iter_all()), |
| ) |
| |
| raise DistributionNotFound( |
| 'No matching distribution found for %s' % req |
| ) |
| |
| best_installed = False |
| if installed_version and ( |
| best_candidate is None or |
| best_candidate.version <= installed_version): |
| best_installed = True |
| |
| if not upgrade and installed_version is not None: |
| if best_installed: |
| logger.debug( |
| 'Existing installed version (%s) is most up-to-date and ' |
| 'satisfies requirement', |
| installed_version, |
| ) |
| else: |
| logger.debug( |
| 'Existing installed version (%s) satisfies requirement ' |
| '(most up-to-date version is %s)', |
| installed_version, |
| best_candidate.version, |
| ) |
| return None |
| |
| if best_installed: |
| # We have an existing version, and its the best version |
| logger.debug( |
| 'Installed version (%s) is most up-to-date (past versions: ' |
| '%s)', |
| installed_version, |
| _format_versions(candidates.iter_applicable()), |
| ) |
| raise BestVersionAlreadyInstalled |
| |
| logger.debug( |
| 'Using version %s (newest of versions: %s)', |
| best_candidate.version, |
| _format_versions(candidates.iter_applicable()), |
| ) |
| return best_candidate.link |
| |
| def _get_pages(self, locations, project_name): |
| # type: (Iterable[Link], str) -> Iterable[HTMLPage] |
| """ |
| Yields (page, page_url) from the given locations, skipping |
| locations that have errors. |
| """ |
| seen = set() # type: Set[Link] |
| for location in locations: |
| if location in seen: |
| continue |
| seen.add(location) |
| |
| page = _get_html_page(location, session=self.session) |
| if page is None: |
| continue |
| |
| yield page |
| |
| def _sort_links(self, links): |
| # type: (Iterable[Link]) -> List[Link] |
| """ |
| Returns elements of links in order, non-egg links first, egg links |
| second, while eliminating duplicates |
| """ |
| eggs, no_eggs = [], [] |
| seen = set() # type: Set[Link] |
| for link in links: |
| if link not in seen: |
| seen.add(link) |
| if link.egg_fragment: |
| eggs.append(link) |
| else: |
| no_eggs.append(link) |
| return no_eggs + eggs |
| |
| def _log_skipped_link(self, link, reason): |
| # type: (Link, Text) -> None |
| if link not in self._logged_links: |
| # Mark this as a unicode string to prevent "UnicodeEncodeError: |
| # 'ascii' codec can't encode character" in Python 2 when |
| # the reason contains non-ascii characters. |
| # Also, put the link at the end so the reason is more visible |
| # and because the link string is usually very long. |
| logger.debug(u'Skipping link: %s: %s', reason, link) |
| self._logged_links.add(link) |
| |
| def get_install_candidate(self, link_evaluator, link): |
| # type: (LinkEvaluator, Link) -> Optional[InstallationCandidate] |
| """ |
| If the link is a candidate for install, convert it to an |
| InstallationCandidate and return it. Otherwise, return None. |
| """ |
| is_candidate, result = link_evaluator.evaluate_link(link) |
| if not is_candidate: |
| if result: |
| self._log_skipped_link(link, reason=result) |
| return None |
| |
| return InstallationCandidate( |
| project=link_evaluator.project_name, |
| link=link, |
| # Convert the Text result to str since InstallationCandidate |
| # accepts str. |
| version=str(result), |
| ) |
| |
| def _package_versions(self, link_evaluator, links): |
| # type: (LinkEvaluator, Iterable[Link]) -> List[InstallationCandidate] |
| result = [] |
| for link in self._sort_links(links): |
| candidate = self.get_install_candidate(link_evaluator, link) |
| if candidate is not None: |
| result.append(candidate) |
| return result |
| |
| |
| def _find_name_version_sep(fragment, canonical_name): |
| # type: (str, str) -> int |
| """Find the separator's index based on the package's canonical name. |
| |
| :param fragment: A <package>+<version> filename "fragment" (stem) or |
| egg fragment. |
| :param canonical_name: The package's canonical name. |
| |
| This function is needed since the canonicalized name does not necessarily |
| have the same length as the egg info's name part. An example:: |
| |
| >>> fragment = 'foo__bar-1.0' |
| >>> canonical_name = 'foo-bar' |
| >>> _find_name_version_sep(fragment, canonical_name) |
| 8 |
| """ |
| # Project name and version must be separated by one single dash. Find all |
| # occurrences of dashes; if the string in front of it matches the canonical |
| # name, this is the one separating the name and version parts. |
| for i, c in enumerate(fragment): |
| if c != "-": |
| continue |
| if canonicalize_name(fragment[:i]) == canonical_name: |
| return i |
| raise ValueError("{} does not match {}".format(fragment, canonical_name)) |
| |
| |
| def _extract_version_from_fragment(fragment, canonical_name): |
| # type: (str, str) -> Optional[str] |
| """Parse the version string from a <package>+<version> filename |
| "fragment" (stem) or egg fragment. |
| |
| :param fragment: The string to parse. E.g. foo-2.1 |
| :param canonical_name: The canonicalized name of the package this |
| belongs to. |
| """ |
| try: |
| version_start = _find_name_version_sep(fragment, canonical_name) + 1 |
| except ValueError: |
| return None |
| version = fragment[version_start:] |
| if not version: |
| return None |
| return version |
| |
| |
| def _determine_base_url(document, page_url): |
| """Determine the HTML document's base URL. |
| |
| This looks for a ``<base>`` tag in the HTML document. If present, its href |
| attribute denotes the base URL of anchor tags in the document. If there is |
| no such tag (or if it does not have a valid href attribute), the HTML |
| file's URL is used as the base URL. |
| |
| :param document: An HTML document representation. The current |
| implementation expects the result of ``html5lib.parse()``. |
| :param page_url: The URL of the HTML document. |
| """ |
| for base in document.findall(".//base"): |
| href = base.get("href") |
| if href is not None: |
| return href |
| return page_url |
| |
| |
| def _get_encoding_from_headers(headers): |
| """Determine if we have any encoding information in our headers. |
| """ |
| if headers and "Content-Type" in headers: |
| content_type, params = cgi.parse_header(headers["Content-Type"]) |
| if "charset" in params: |
| return params['charset'] |
| return None |
| |
| |
| def _clean_link(url): |
| # type: (str) -> str |
| """Makes sure a link is fully encoded. That is, if a ' ' shows up in |
| the link, it will be rewritten to %20 (while not over-quoting |
| % or other characters).""" |
| # Split the URL into parts according to the general structure |
| # `scheme://netloc/path;parameters?query#fragment`. Note that the |
| # `netloc` can be empty and the URI will then refer to a local |
| # filesystem path. |
| result = urllib_parse.urlparse(url) |
| # In both cases below we unquote prior to quoting to make sure |
| # nothing is double quoted. |
| if result.netloc == "": |
| # On Windows the path part might contain a drive letter which |
| # should not be quoted. On Linux where drive letters do not |
| # exist, the colon should be quoted. We rely on urllib.request |
| # to do the right thing here. |
| path = urllib_request.pathname2url( |
| urllib_request.url2pathname(result.path)) |
| else: |
| # In addition to the `/` character we protect `@` so that |
| # revision strings in VCS URLs are properly parsed. |
| path = urllib_parse.quote(urllib_parse.unquote(result.path), safe="/@") |
| return urllib_parse.urlunparse(result._replace(path=path)) |
| |
| |
| def _create_link_from_element( |
| anchor, # type: HTMLElement |
| page_url, # type: str |
| base_url, # type: str |
| ): |
| # type: (...) -> Optional[Link] |
| """ |
| Convert an anchor element in a simple repository page to a Link. |
| """ |
| href = anchor.get("href") |
| if not href: |
| return None |
| |
| url = _clean_link(urllib_parse.urljoin(base_url, href)) |
| pyrequire = anchor.get('data-requires-python') |
| pyrequire = unescape(pyrequire) if pyrequire else None |
| |
| yanked_reason = anchor.get('data-yanked') |
| if yanked_reason: |
| # This is a unicode string in Python 2 (and 3). |
| yanked_reason = unescape(yanked_reason) |
| |
| link = Link( |
| url, |
| comes_from=page_url, |
| requires_python=pyrequire, |
| yanked_reason=yanked_reason, |
| ) |
| |
| return link |
| |
| |
| class HTMLPage(object): |
| """Represents one page, along with its URL""" |
| |
| def __init__(self, content, url, headers=None): |
| # type: (bytes, str, MutableMapping[str, str]) -> None |
| self.content = content |
| self.url = url |
| self.headers = headers |
| |
| def __str__(self): |
| return redact_password_from_url(self.url) |
| |
| def iter_links(self): |
| # type: () -> Iterable[Link] |
| """Yields all links in the page""" |
| document = html5lib.parse( |
| self.content, |
| transport_encoding=_get_encoding_from_headers(self.headers), |
| namespaceHTMLElements=False, |
| ) |
| base_url = _determine_base_url(document, self.url) |
| for anchor in document.findall(".//a"): |
| link = _create_link_from_element( |
| anchor, |
| page_url=self.url, |
| base_url=base_url, |
| ) |
| if link is None: |
| continue |
| yield link |