| """ PEP 610 """ |
| import json |
| import re |
| |
| from pip._vendor import six |
| from pip._vendor.six.moves.urllib import parse as urllib_parse |
| |
| from pip._internal.utils.typing import MYPY_CHECK_RUNNING |
| |
| if MYPY_CHECK_RUNNING: |
| from typing import ( |
| Any, Dict, Iterable, Optional, Type, TypeVar, Union |
| ) |
| |
| T = TypeVar("T") |
| |
| |
| DIRECT_URL_METADATA_NAME = "direct_url.json" |
| ENV_VAR_RE = re.compile(r"^\$\{[A-Za-z0-9-_]+\}(:\$\{[A-Za-z0-9-_]+\})?$") |
| |
| __all__ = [ |
| "DirectUrl", |
| "DirectUrlValidationError", |
| "DirInfo", |
| "ArchiveInfo", |
| "VcsInfo", |
| ] |
| |
| |
| class DirectUrlValidationError(Exception): |
| pass |
| |
| |
| def _get(d, expected_type, key, default=None): |
| # type: (Dict[str, Any], Type[T], str, Optional[T]) -> Optional[T] |
| """Get value from dictionary and verify expected type.""" |
| if key not in d: |
| return default |
| value = d[key] |
| if six.PY2 and expected_type is str: |
| expected_type = six.string_types # type: ignore |
| if not isinstance(value, expected_type): |
| raise DirectUrlValidationError( |
| "{!r} has unexpected type for {} (expected {})".format( |
| value, key, expected_type |
| ) |
| ) |
| return value |
| |
| |
| def _get_required(d, expected_type, key, default=None): |
| # type: (Dict[str, Any], Type[T], str, Optional[T]) -> T |
| value = _get(d, expected_type, key, default) |
| if value is None: |
| raise DirectUrlValidationError("{} must have a value".format(key)) |
| return value |
| |
| |
| def _exactly_one_of(infos): |
| # type: (Iterable[Optional[InfoType]]) -> InfoType |
| infos = [info for info in infos if info is not None] |
| if not infos: |
| raise DirectUrlValidationError( |
| "missing one of archive_info, dir_info, vcs_info" |
| ) |
| if len(infos) > 1: |
| raise DirectUrlValidationError( |
| "more than one of archive_info, dir_info, vcs_info" |
| ) |
| assert infos[0] is not None |
| return infos[0] |
| |
| |
| def _filter_none(**kwargs): |
| # type: (Any) -> Dict[str, Any] |
| """Make dict excluding None values.""" |
| return {k: v for k, v in kwargs.items() if v is not None} |
| |
| |
| class VcsInfo(object): |
| name = "vcs_info" |
| |
| def __init__( |
| self, |
| vcs, # type: str |
| commit_id, # type: str |
| requested_revision=None, # type: Optional[str] |
| resolved_revision=None, # type: Optional[str] |
| resolved_revision_type=None, # type: Optional[str] |
| ): |
| self.vcs = vcs |
| self.requested_revision = requested_revision |
| self.commit_id = commit_id |
| self.resolved_revision = resolved_revision |
| self.resolved_revision_type = resolved_revision_type |
| |
| @classmethod |
| def _from_dict(cls, d): |
| # type: (Optional[Dict[str, Any]]) -> Optional[VcsInfo] |
| if d is None: |
| return None |
| return cls( |
| vcs=_get_required(d, str, "vcs"), |
| commit_id=_get_required(d, str, "commit_id"), |
| requested_revision=_get(d, str, "requested_revision"), |
| resolved_revision=_get(d, str, "resolved_revision"), |
| resolved_revision_type=_get(d, str, "resolved_revision_type"), |
| ) |
| |
| def _to_dict(self): |
| # type: () -> Dict[str, Any] |
| return _filter_none( |
| vcs=self.vcs, |
| requested_revision=self.requested_revision, |
| commit_id=self.commit_id, |
| resolved_revision=self.resolved_revision, |
| resolved_revision_type=self.resolved_revision_type, |
| ) |
| |
| |
| class ArchiveInfo(object): |
| name = "archive_info" |
| |
| def __init__( |
| self, |
| hash=None, # type: Optional[str] |
| ): |
| self.hash = hash |
| |
| @classmethod |
| def _from_dict(cls, d): |
| # type: (Optional[Dict[str, Any]]) -> Optional[ArchiveInfo] |
| if d is None: |
| return None |
| return cls(hash=_get(d, str, "hash")) |
| |
| def _to_dict(self): |
| # type: () -> Dict[str, Any] |
| return _filter_none(hash=self.hash) |
| |
| |
| class DirInfo(object): |
| name = "dir_info" |
| |
| def __init__( |
| self, |
| editable=False, # type: bool |
| ): |
| self.editable = editable |
| |
| @classmethod |
| def _from_dict(cls, d): |
| # type: (Optional[Dict[str, Any]]) -> Optional[DirInfo] |
| if d is None: |
| return None |
| return cls( |
| editable=_get_required(d, bool, "editable", default=False) |
| ) |
| |
| def _to_dict(self): |
| # type: () -> Dict[str, Any] |
| return _filter_none(editable=self.editable or None) |
| |
| |
| if MYPY_CHECK_RUNNING: |
| InfoType = Union[ArchiveInfo, DirInfo, VcsInfo] |
| |
| |
| class DirectUrl(object): |
| |
| def __init__( |
| self, |
| url, # type: str |
| info, # type: InfoType |
| subdirectory=None, # type: Optional[str] |
| ): |
| self.url = url |
| self.info = info |
| self.subdirectory = subdirectory |
| |
| def _remove_auth_from_netloc(self, netloc): |
| # type: (str) -> str |
| if "@" not in netloc: |
| return netloc |
| user_pass, netloc_no_user_pass = netloc.split("@", 1) |
| if ( |
| isinstance(self.info, VcsInfo) and |
| self.info.vcs == "git" and |
| user_pass == "git" |
| ): |
| return netloc |
| if ENV_VAR_RE.match(user_pass): |
| return netloc |
| return netloc_no_user_pass |
| |
| @property |
| def redacted_url(self): |
| # type: () -> str |
| """url with user:password part removed unless it is formed with |
| environment variables as specified in PEP 610, or it is ``git`` |
| in the case of a git URL. |
| """ |
| purl = urllib_parse.urlsplit(self.url) |
| netloc = self._remove_auth_from_netloc(purl.netloc) |
| surl = urllib_parse.urlunsplit( |
| (purl.scheme, netloc, purl.path, purl.query, purl.fragment) |
| ) |
| return surl |
| |
| def validate(self): |
| # type: () -> None |
| self.from_dict(self.to_dict()) |
| |
| @classmethod |
| def from_dict(cls, d): |
| # type: (Dict[str, Any]) -> DirectUrl |
| return DirectUrl( |
| url=_get_required(d, str, "url"), |
| subdirectory=_get(d, str, "subdirectory"), |
| info=_exactly_one_of( |
| [ |
| ArchiveInfo._from_dict(_get(d, dict, "archive_info")), |
| DirInfo._from_dict(_get(d, dict, "dir_info")), |
| VcsInfo._from_dict(_get(d, dict, "vcs_info")), |
| ] |
| ), |
| ) |
| |
| def to_dict(self): |
| # type: () -> Dict[str, Any] |
| res = _filter_none( |
| url=self.redacted_url, |
| subdirectory=self.subdirectory, |
| ) |
| res[self.info.name] = self.info._to_dict() |
| return res |
| |
| @classmethod |
| def from_json(cls, s): |
| # type: (str) -> DirectUrl |
| return cls.from_dict(json.loads(s)) |
| |
| def to_json(self): |
| # type: () -> str |
| return json.dumps(self.to_dict(), sort_keys=True) |