| #!/usr/bin/env python |
| """Assorted utilities shared between parts of apitools.""" |
| from __future__ import print_function |
| |
| import collections |
| import contextlib |
| import json |
| import keyword |
| import logging |
| import os |
| import re |
| |
| import six |
| import six.moves.urllib.error as urllib_error |
| import six.moves.urllib.request as urllib_request |
| |
| |
| class Error(Exception): |
| |
| """Base error for apitools generation.""" |
| |
| |
| class CommunicationError(Error): |
| |
| """Error in network communication.""" |
| |
| |
| def _SortLengthFirstKey(a): |
| return -len(a), a |
| |
| |
| class Names(object): |
| |
| """Utility class for cleaning and normalizing names in a fixed style.""" |
| DEFAULT_NAME_CONVENTION = 'LOWER_CAMEL' |
| NAME_CONVENTIONS = ['LOWER_CAMEL', 'LOWER_WITH_UNDER', 'NONE'] |
| |
| def __init__(self, strip_prefixes, |
| name_convention=None, |
| capitalize_enums=False): |
| self.__strip_prefixes = sorted(strip_prefixes, key=_SortLengthFirstKey) |
| self.__name_convention = ( |
| name_convention or self.DEFAULT_NAME_CONVENTION) |
| self.__capitalize_enums = capitalize_enums |
| |
| @staticmethod |
| def __FromCamel(name, separator='_'): |
| name = re.sub(r'([a-z0-9])([A-Z])', r'\1%s\2' % separator, name) |
| return name.lower() |
| |
| @staticmethod |
| def __ToCamel(name, separator='_'): |
| # TODO(craigcitro): Consider what to do about leading or trailing |
| # underscores (such as `_refValue` in discovery). |
| return ''.join(s[0:1].upper() + s[1:] for s in name.split(separator)) |
| |
| @staticmethod |
| def __ToLowerCamel(name, separator='_'): |
| name = Names.__ToCamel(name, separator=separator) |
| return name[0].lower() + name[1:] |
| |
| def __StripName(self, name): |
| """Strip strip_prefix entries from name.""" |
| if not name: |
| return name |
| for prefix in self.__strip_prefixes: |
| if name.startswith(prefix): |
| return name[len(prefix):] |
| return name |
| |
| @staticmethod |
| def CleanName(name): |
| """Perform generic name cleaning.""" |
| name = re.sub('[^_A-Za-z0-9]', '_', name) |
| if name[0].isdigit(): |
| name = '_%s' % name |
| while keyword.iskeyword(name): |
| name = '%s_' % name |
| # If we end up with __ as a prefix, we'll run afoul of python |
| # field renaming, so we manually correct for it. |
| if name.startswith('__'): |
| name = 'f%s' % name |
| return name |
| |
| @staticmethod |
| def NormalizeRelativePath(path): |
| """Normalize camelCase entries in path.""" |
| path_components = path.split('/') |
| normalized_components = [] |
| for component in path_components: |
| if re.match(r'{[A-Za-z0-9_]+}$', component): |
| normalized_components.append( |
| '{%s}' % Names.CleanName(component[1:-1])) |
| else: |
| normalized_components.append(component) |
| return '/'.join(normalized_components) |
| |
| def NormalizeEnumName(self, enum_name): |
| if self.__capitalize_enums: |
| enum_name = enum_name.upper() |
| return self.CleanName(enum_name) |
| |
| def ClassName(self, name, separator='_'): |
| """Generate a valid class name from name.""" |
| # TODO(craigcitro): Get rid of this case here and in MethodName. |
| if name is None: |
| return name |
| # TODO(craigcitro): This is a hack to handle the case of specific |
| # protorpc class names; clean this up. |
| if name.startswith('protorpc.') or name.startswith('message_types.'): |
| return name |
| name = self.__StripName(name) |
| name = self.__ToCamel(name, separator=separator) |
| return self.CleanName(name) |
| |
| def MethodName(self, name, separator='_'): |
| """Generate a valid method name from name.""" |
| if name is None: |
| return None |
| name = Names.__ToCamel(name, separator=separator) |
| return Names.CleanName(name) |
| |
| def FieldName(self, name): |
| """Generate a valid field name from name.""" |
| # TODO(craigcitro): We shouldn't need to strip this name, but some |
| # of the service names here are excessive. Fix the API and then |
| # remove this. |
| name = self.__StripName(name) |
| if self.__name_convention == 'LOWER_CAMEL': |
| name = Names.__ToLowerCamel(name) |
| elif self.__name_convention == 'LOWER_WITH_UNDER': |
| name = Names.__FromCamel(name) |
| return Names.CleanName(name) |
| |
| |
| @contextlib.contextmanager |
| def Chdir(dirname, create=True): |
| if not os.path.exists(dirname): |
| if not create: |
| raise OSError('Cannot find directory %s' % dirname) |
| else: |
| os.mkdir(dirname) |
| previous_directory = os.getcwd() |
| os.chdir(dirname) |
| yield |
| os.chdir(previous_directory) |
| |
| |
| def NormalizeVersion(version): |
| # Currently, '.' is the only character that might cause us trouble. |
| return version.replace('.', '_') |
| |
| |
| class ClientInfo(collections.namedtuple('ClientInfo', ( |
| 'package', 'scopes', 'version', 'client_id', 'client_secret', |
| 'user_agent', 'client_class_name', 'url_version', 'api_key'))): |
| |
| """Container for client-related info and names.""" |
| |
| @classmethod |
| def Create(cls, discovery_doc, |
| scope_ls, client_id, client_secret, user_agent, names, api_key): |
| """Create a new ClientInfo object from a discovery document.""" |
| scopes = set( |
| discovery_doc.get('auth', {}).get('oauth2', {}).get('scopes', {})) |
| scopes.update(scope_ls) |
| client_info = { |
| 'package': discovery_doc['name'], |
| 'version': NormalizeVersion(discovery_doc['version']), |
| 'url_version': discovery_doc['version'], |
| 'scopes': sorted(list(scopes)), |
| 'client_id': client_id, |
| 'client_secret': client_secret, |
| 'user_agent': user_agent, |
| 'api_key': api_key, |
| } |
| client_class_name = '%s%s' % ( |
| names.ClassName(client_info['package']), |
| names.ClassName(client_info['version'])) |
| client_info['client_class_name'] = client_class_name |
| return cls(**client_info) |
| |
| @property |
| def default_directory(self): |
| return self.package |
| |
| @property |
| def cli_rule_name(self): |
| return '%s_%s' % (self.package, self.version) |
| |
| @property |
| def cli_file_name(self): |
| return '%s.py' % self.cli_rule_name |
| |
| @property |
| def client_rule_name(self): |
| return '%s_%s_client' % (self.package, self.version) |
| |
| @property |
| def client_file_name(self): |
| return '%s.py' % self.client_rule_name |
| |
| @property |
| def messages_rule_name(self): |
| return '%s_%s_messages' % (self.package, self.version) |
| |
| @property |
| def services_rule_name(self): |
| return '%s_%s_services' % (self.package, self.version) |
| |
| @property |
| def messages_file_name(self): |
| return '%s.py' % self.messages_rule_name |
| |
| @property |
| def messages_proto_file_name(self): |
| return '%s.proto' % self.messages_rule_name |
| |
| @property |
| def services_proto_file_name(self): |
| return '%s.proto' % self.services_rule_name |
| |
| |
| def GetPackage(path): |
| path_components = path.split(os.path.sep) |
| return '.'.join(path_components) |
| |
| |
| def CleanDescription(description): |
| """Return a version of description safe for printing in a docstring.""" |
| if not isinstance(description, six.string_types): |
| return description |
| return description.replace('"""', '" " "') |
| |
| |
| class SimplePrettyPrinter(object): |
| |
| """Simple pretty-printer that supports an indent contextmanager.""" |
| |
| def __init__(self, out): |
| self.__out = out |
| self.__indent = '' |
| self.__skip = False |
| self.__comment_context = False |
| |
| @property |
| def indent(self): |
| return self.__indent |
| |
| def CalculateWidth(self, max_width=78): |
| return max_width - len(self.indent) |
| |
| @contextlib.contextmanager |
| def Indent(self, indent=' '): |
| previous_indent = self.__indent |
| self.__indent = '%s%s' % (previous_indent, indent) |
| yield |
| self.__indent = previous_indent |
| |
| @contextlib.contextmanager |
| def CommentContext(self): |
| """Print without any argument formatting.""" |
| old_context = self.__comment_context |
| self.__comment_context = True |
| yield |
| self.__comment_context = old_context |
| |
| def __call__(self, *args): |
| if self.__comment_context and args[1:]: |
| raise Error('Cannot do string interpolation in comment context') |
| if args and args[0]: |
| if not self.__comment_context: |
| line = (args[0] % args[1:]).rstrip() |
| else: |
| line = args[0].rstrip() |
| line = line.encode('ascii', 'backslashreplace') |
| print('%s%s' % (self.__indent, line), file=self.__out) |
| else: |
| print('', file=self.__out) |
| |
| |
| def NormalizeDiscoveryUrl(discovery_url): |
| """Expands a few abbreviations into full discovery urls.""" |
| if discovery_url.startswith('http'): |
| return discovery_url |
| elif '.' not in discovery_url: |
| raise ValueError('Unrecognized value "%s" for discovery url') |
| api_name, _, api_version = discovery_url.partition('.') |
| return 'https://www.googleapis.com/discovery/v1/apis/%s/%s/rest' % ( |
| api_name, api_version) |
| |
| |
| def FetchDiscoveryDoc(discovery_url, retries=5): |
| """Fetch the discovery document at the given url.""" |
| discovery_url = NormalizeDiscoveryUrl(discovery_url) |
| discovery_doc = None |
| last_exception = None |
| for _ in range(retries): |
| try: |
| discovery_doc = json.loads( |
| urllib_request.urlopen(discovery_url).read()) |
| break |
| except (urllib_error.HTTPError, |
| urllib_error.URLError) as last_exception: |
| logging.warning( |
| 'Attempting to fetch discovery doc again after "%s"', |
| last_exception) |
| if discovery_doc is None: |
| raise CommunicationError( |
| 'Could not find discovery doc at url "%s": %s' % ( |
| discovery_url, last_exception)) |
| return discovery_doc |