| from __future__ import absolute_import |
| |
| import logging |
| import sys |
| import textwrap |
| from collections import OrderedDict |
| |
| from pip._vendor import pkg_resources |
| from pip._vendor.packaging.version import parse as parse_version |
| # NOTE: XMLRPC Client is not annotated in typeshed as on 2017-07-17, which is |
| # why we ignore the type on this import |
| from pip._vendor.six.moves import xmlrpc_client # type: ignore |
| |
| from pip._internal.cli.base_command import Command |
| from pip._internal.cli.req_command import SessionCommandMixin |
| from pip._internal.cli.status_codes import NO_MATCHES_FOUND, SUCCESS |
| from pip._internal.exceptions import CommandError |
| from pip._internal.models.index import PyPI |
| from pip._internal.network.xmlrpc import PipXmlrpcTransport |
| from pip._internal.utils.compat import get_terminal_size |
| from pip._internal.utils.logging import indent_log |
| from pip._internal.utils.misc import get_distribution, write_output |
| from pip._internal.utils.typing import MYPY_CHECK_RUNNING |
| |
| if MYPY_CHECK_RUNNING: |
| from optparse import Values |
| from typing import List, Dict, Optional |
| from typing_extensions import TypedDict |
| TransformedHit = TypedDict( |
| 'TransformedHit', |
| {'name': str, 'summary': str, 'versions': List[str]}, |
| ) |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| class SearchCommand(Command, SessionCommandMixin): |
| """Search for PyPI packages whose name or summary contains <query>.""" |
| |
| usage = """ |
| %prog [options] <query>""" |
| ignore_require_venv = True |
| |
| def add_options(self): |
| # type: () -> None |
| self.cmd_opts.add_option( |
| '-i', '--index', |
| dest='index', |
| metavar='URL', |
| default=PyPI.pypi_url, |
| help='Base URL of Python Package Index (default %default)') |
| |
| self.parser.insert_option_group(0, self.cmd_opts) |
| |
| def run(self, options, args): |
| # type: (Values, List[str]) -> int |
| if not args: |
| raise CommandError('Missing required argument (search query).') |
| query = args |
| pypi_hits = self.search(query, options) |
| hits = transform_hits(pypi_hits) |
| |
| terminal_width = None |
| if sys.stdout.isatty(): |
| terminal_width = get_terminal_size()[0] |
| |
| print_results(hits, terminal_width=terminal_width) |
| if pypi_hits: |
| return SUCCESS |
| return NO_MATCHES_FOUND |
| |
| def search(self, query, options): |
| # type: (List[str], Values) -> List[Dict[str, str]] |
| index_url = options.index |
| |
| session = self.get_default_session(options) |
| |
| transport = PipXmlrpcTransport(index_url, session) |
| pypi = xmlrpc_client.ServerProxy(index_url, transport) |
| hits = pypi.search({'name': query, 'summary': query}, 'or') |
| return hits |
| |
| |
| def transform_hits(hits): |
| # type: (List[Dict[str, str]]) -> List[TransformedHit] |
| """ |
| The list from pypi is really a list of versions. We want a list of |
| packages with the list of versions stored inline. This converts the |
| list from pypi into one we can use. |
| """ |
| packages = OrderedDict() # type: OrderedDict[str, TransformedHit] |
| for hit in hits: |
| name = hit['name'] |
| summary = hit['summary'] |
| version = hit['version'] |
| |
| if name not in packages.keys(): |
| packages[name] = { |
| 'name': name, |
| 'summary': summary, |
| 'versions': [version], |
| } |
| else: |
| packages[name]['versions'].append(version) |
| |
| # if this is the highest version, replace summary and score |
| if version == highest_version(packages[name]['versions']): |
| packages[name]['summary'] = summary |
| |
| return list(packages.values()) |
| |
| |
| def print_results(hits, name_column_width=None, terminal_width=None): |
| # type: (List[TransformedHit], Optional[int], Optional[int]) -> None |
| if not hits: |
| return |
| if name_column_width is None: |
| name_column_width = max([ |
| len(hit['name']) + len(highest_version(hit.get('versions', ['-']))) |
| for hit in hits |
| ]) + 4 |
| |
| installed_packages = [p.project_name for p in pkg_resources.working_set] |
| for hit in hits: |
| name = hit['name'] |
| summary = hit['summary'] or '' |
| latest = highest_version(hit.get('versions', ['-'])) |
| if terminal_width is not None: |
| target_width = terminal_width - name_column_width - 5 |
| if target_width > 10: |
| # wrap and indent summary to fit terminal |
| summary_lines = textwrap.wrap(summary, target_width) |
| summary = ('\n' + ' ' * (name_column_width + 3)).join( |
| summary_lines) |
| |
| line = '{name_latest:{name_column_width}} - {summary}'.format( |
| name_latest='{name} ({latest})'.format(**locals()), |
| **locals()) |
| try: |
| write_output(line) |
| if name in installed_packages: |
| dist = get_distribution(name) |
| assert dist is not None |
| with indent_log(): |
| if dist.version == latest: |
| write_output('INSTALLED: %s (latest)', dist.version) |
| else: |
| write_output('INSTALLED: %s', dist.version) |
| if parse_version(latest).pre: |
| write_output('LATEST: %s (pre-release; install' |
| ' with "pip install --pre")', latest) |
| else: |
| write_output('LATEST: %s', latest) |
| except UnicodeEncodeError: |
| pass |
| |
| |
| def highest_version(versions): |
| # type: (List[str]) -> str |
| return max(versions, key=parse_version) |