blob: 9a8f36ec8d48f0dbbaf61e405879a627cf4198a3 [file] [log] [blame]
#!/usr/bin/env python
"""Common credentials classes and constructors."""
from __future__ import print_function
import datetime
import json
import os
import threading
import httplib2
import oauth2client
import oauth2client.client
import oauth2client.gce
import oauth2client.locked_file
import oauth2client.multistore_file
import oauth2client.service_account
from oauth2client import tools # for gflags declarations
from six.moves import http_client
from six.moves import urllib
from apitools.base.py import exceptions
from apitools.base.py import util
try:
import gflags
FLAGS = gflags.FLAGS
except ImportError:
FLAGS = None
__all__ = [
'CredentialsFromFile',
'GaeAssertionCredentials',
'GceAssertionCredentials',
'GetCredentials',
'GetUserinfo',
'ServiceAccountCredentials',
'ServiceAccountCredentialsFromFile',
]
# Lock when accessing the cache file to avoid resource contention.
cache_file_lock = threading.Lock()
def SetCredentialsCacheFileLock(lock):
global cache_file_lock # pylint: disable=global-statement
cache_file_lock = lock
# List of additional methods we use when attempting to construct
# credentials. Users can register their own methods here, which we try
# before the defaults.
_CREDENTIALS_METHODS = []
def _RegisterCredentialsMethod(method, position=None):
"""Register a new method for fetching credentials.
This new method should be a function with signature:
client_info, **kwds -> Credentials or None
This method can be used as a decorator, unless position needs to
be supplied.
Note that method must *always* accept arbitrary keyword arguments.
Args:
method: New credential-fetching method.
position: (default: None) Where in the list of methods to
add this; if None, we append. In all but rare cases,
this should be either 0 or None.
Returns:
method, for use as a decorator.
"""
if position is None:
position = len(_CREDENTIALS_METHODS)
else:
position = min(position, len(_CREDENTIALS_METHODS))
_CREDENTIALS_METHODS.insert(position, method)
return method
def GetCredentials(package_name, scopes, client_id, client_secret, user_agent,
credentials_filename=None,
api_key=None, # pylint: disable=unused-argument
client=None, # pylint: disable=unused-argument
oauth2client_args=None,
**kwds):
"""Attempt to get credentials, using an oauth dance as the last resort."""
scopes = util.NormalizeScopes(scopes)
client_info = {
'client_id': client_id,
'client_secret': client_secret,
'scope': ' '.join(sorted(scopes)),
'user_agent': user_agent or '%s-generated/0.1' % package_name,
}
for method in _CREDENTIALS_METHODS:
credentials = method(client_info, **kwds)
if credentials is not None:
return credentials
credentials_filename = credentials_filename or os.path.expanduser(
'~/.apitools.token')
credentials = CredentialsFromFile(credentials_filename, client_info,
oauth2client_args=oauth2client_args)
if credentials is not None:
return credentials
raise exceptions.CredentialsError('Could not create valid credentials')
def ServiceAccountCredentialsFromFile(
service_account_name, private_key_filename, scopes,
service_account_kwargs=None):
with open(private_key_filename) as key_file:
return ServiceAccountCredentials(
service_account_name, key_file.read(), scopes,
service_account_kwargs=service_account_kwargs)
def ServiceAccountCredentials(service_account_name, private_key, scopes,
service_account_kwargs=None):
service_account_kwargs = service_account_kwargs or {}
scopes = util.NormalizeScopes(scopes)
return oauth2client.client.SignedJwtAssertionCredentials(
service_account_name, private_key, scopes, **service_account_kwargs)
def _EnsureFileExists(filename):
"""Touches a file; returns False on error, True on success."""
if not os.path.exists(filename):
old_umask = os.umask(0o177)
try:
open(filename, 'a+b').close()
except OSError:
return False
finally:
os.umask(old_umask)
return True
def _GceMetadataRequest(relative_url, use_metadata_ip=False):
"""Request the given url from the GCE metadata service."""
if use_metadata_ip:
base_url = 'http://169.254.169.254/'
else:
base_url = 'http://metadata.google.internal/'
url = base_url + 'computeMetadata/v1/' + relative_url
# Extra header requirement can be found here:
# https://developers.google.com/compute/docs/metadata
headers = {'Metadata-Flavor': 'Google'}
request = urllib.request.Request(url, headers=headers)
opener = urllib.request.build_opener(urllib.request.ProxyHandler({}))
try:
response = opener.open(request)
except urllib.error.URLError as e:
raise exceptions.CommunicationError(
'Could not reach metadata service: %s' % e.reason)
return response
class GceAssertionCredentials(oauth2client.gce.AppAssertionCredentials):
"""Assertion credentials for GCE instances."""
def __init__(self, scopes=None, service_account_name='default', **kwds):
"""Initializes the credentials instance.
Args:
scopes: The scopes to get. If None, whatever scopes that are
available to the instance are used.
service_account_name: The service account to retrieve the scopes
from.
**kwds: Additional keyword args.
"""
# If there is a connectivity issue with the metadata server,
# detection calls may fail even if we've already successfully
# identified these scopes in the same execution. However, the
# available scopes don't change once an instance is created,
# so there is no reason to perform more than one query.
self.__service_account_name = service_account_name
cached_scopes = None
cache_filename = kwds.get('cache_filename')
if cache_filename:
cached_scopes = self._CheckCacheFileForMatch(
cache_filename, scopes)
scopes = cached_scopes or self._ScopesFromMetadataServer(scopes)
if cache_filename and not cached_scopes:
self._WriteCacheFile(cache_filename, scopes)
super(GceAssertionCredentials, self).__init__(scopes, **kwds)
@classmethod
def Get(cls, *args, **kwds):
try:
return cls(*args, **kwds)
except exceptions.Error:
return None
def _CheckCacheFileForMatch(self, cache_filename, scopes):
"""Checks the cache file to see if it matches the given credentials.
Args:
cache_filename: Cache filename to check.
scopes: Scopes for the desired credentials.
Returns:
List of scopes (if cache matches) or None.
"""
creds = { # Credentials metadata dict.
'scopes': sorted(list(scopes)) if scopes else None,
'svc_acct_name': self.__service_account_name,
}
with cache_file_lock:
if _EnsureFileExists(cache_filename):
locked_file = oauth2client.locked_file.LockedFile(
cache_filename, 'r+b', 'rb')
try:
locked_file.open_and_lock()
cached_creds_str = locked_file.file_handle().read()
if cached_creds_str:
# Cached credentials metadata dict.
cached_creds = json.loads(cached_creds_str)
if (creds['svc_acct_name'] ==
cached_creds['svc_acct_name']):
if (creds['scopes'] in
(None, cached_creds['scopes'])):
scopes = cached_creds['scopes']
finally:
locked_file.unlock_and_close()
return scopes
def _WriteCacheFile(self, cache_filename, scopes):
"""Writes the credential metadata to the cache file.
This does not save the credentials themselves (CredentialStore class
optionally handles that after this class is initialized).
Args:
cache_filename: Cache filename to check.
scopes: Scopes for the desired credentials.
"""
with cache_file_lock:
if _EnsureFileExists(cache_filename):
locked_file = oauth2client.locked_file.LockedFile(
cache_filename, 'r+b', 'rb')
try:
locked_file.open_and_lock()
if locked_file.is_locked():
creds = { # Credentials metadata dict.
'scopes': sorted(list(scopes)),
'svc_acct_name': self.__service_account_name}
locked_file.file_handle().write(
json.dumps(creds, encoding='ascii'))
# If it's not locked, the locking process will
# write the same data to the file, so just
# continue.
finally:
locked_file.unlock_and_close()
def _ScopesFromMetadataServer(self, scopes):
if not util.DetectGce():
raise exceptions.ResourceUnavailableError(
'GCE credentials requested outside a GCE instance')
if not self.GetServiceAccount(self.__service_account_name):
raise exceptions.ResourceUnavailableError(
'GCE credentials requested but service account '
'%s does not exist.' % self.__service_account_name)
if scopes:
scope_ls = util.NormalizeScopes(scopes)
instance_scopes = self.GetInstanceScopes()
if scope_ls > instance_scopes:
raise exceptions.CredentialsError(
'Instance did not have access to scopes %s' % (
sorted(list(scope_ls - instance_scopes)),))
else:
scopes = self.GetInstanceScopes()
return scopes
def GetServiceAccount(self, account):
relative_url = 'instance/service-accounts'
response = _GceMetadataRequest(relative_url)
response_lines = [line.rstrip('/\n\r')
for line in response.readlines()]
return account in response_lines
def GetInstanceScopes(self):
relative_url = 'instance/service-accounts/{0}/scopes'.format(
self.__service_account_name)
response = _GceMetadataRequest(relative_url)
return util.NormalizeScopes(scope.strip()
for scope in response.readlines())
def _refresh(self, do_request):
"""Refresh self.access_token.
This function replaces AppAssertionCredentials._refresh, which
does not use the credential store and is therefore poorly
suited for multi-threaded scenarios.
Args:
do_request: A function matching httplib2.Http.request's signature.
"""
# pylint: disable=protected-access
oauth2client.client.OAuth2Credentials._refresh(self, do_request)
# pylint: enable=protected-access
def _do_refresh_request(self, unused_http_request):
"""Refresh self.access_token by querying the metadata server.
If self.store is initialized, store acquired credentials there.
"""
relative_url = 'instance/service-accounts/{0}/token'.format(
self.__service_account_name)
try:
response = _GceMetadataRequest(relative_url)
except exceptions.CommunicationError:
self.invalid = True
if self.store:
self.store.locked_put(self)
raise
content = response.read()
try:
credential_info = json.loads(content)
except ValueError:
raise exceptions.CredentialsError(
'Could not parse response as JSON: %s' % content)
self.access_token = credential_info['access_token']
if 'expires_in' in credential_info:
expires_in = int(credential_info['expires_in'])
self.token_expiry = (
datetime.timedelta(seconds=expires_in) +
datetime.datetime.utcnow())
else:
self.token_expiry = None
self.invalid = False
if self.store:
self.store.locked_put(self)
@classmethod
def from_json(cls, json_data):
data = json.loads(json_data)
kwargs = {}
if 'cache_filename' in data.get('kwargs', []):
kwargs['cache_filename'] = data['kwargs']['cache_filename']
credentials = GceAssertionCredentials(scopes=[data['scope']],
**kwargs)
if 'access_token' in data:
credentials.access_token = data['access_token']
if 'token_expiry' in data:
credentials.token_expiry = datetime.datetime.strptime(
data['token_expiry'], oauth2client.client.EXPIRY_FORMAT)
if 'invalid' in data:
credentials.invalid = data['invalid']
return credentials
@property
def serialization_data(self):
raise NotImplementedError(
'Cannot serialize credentials for GCE service accounts.')
# TODO(craigcitro): Currently, we can't even *load*
# `oauth2client.appengine` without being on appengine, because of how
# it handles imports. Fix that by splitting that module into
# GAE-specific and GAE-independent bits, and guarding imports.
class GaeAssertionCredentials(oauth2client.client.AssertionCredentials):
"""Assertion credentials for Google App Engine apps."""
def __init__(self, scopes, **kwds):
if not util.DetectGae():
raise exceptions.ResourceUnavailableError(
'GCE credentials requested outside a GCE instance')
self._scopes = list(util.NormalizeScopes(scopes))
super(GaeAssertionCredentials, self).__init__(None, **kwds)
@classmethod
def Get(cls, *args, **kwds):
try:
return cls(*args, **kwds)
except exceptions.Error:
return None
@classmethod
def from_json(cls, json_data):
data = json.loads(json_data)
return GaeAssertionCredentials(data['_scopes'])
def _refresh(self, _):
"""Refresh self.access_token.
Args:
_: (ignored) A function matching httplib2.Http.request's signature.
"""
from google.appengine.api import app_identity
try:
token, _ = app_identity.get_access_token(self._scopes)
except app_identity.Error as e:
raise exceptions.CredentialsError(str(e))
self.access_token = token
def _GetRunFlowFlags(args=None):
# There's one rare situation where gsutil will not have argparse
# available, but doesn't need anything depending on argparse anyway,
# since they're bringing their own credentials. So we just allow this
# to fail with an ImportError in those cases.
#
# TODO(craigcitro): Move this import back to the top when we drop
# python 2.6 support (eg when gsutil does).
import argparse
parser = argparse.ArgumentParser(parents=[tools.argparser])
# Get command line argparse flags.
flags, _ = parser.parse_known_args(args=args)
# Allow `gflags` and `argparse` to be used side-by-side.
if hasattr(FLAGS, 'auth_host_name'):
flags.auth_host_name = FLAGS.auth_host_name
if hasattr(FLAGS, 'auth_host_port'):
flags.auth_host_port = FLAGS.auth_host_port
if hasattr(FLAGS, 'auth_local_webserver'):
flags.noauth_local_webserver = (not FLAGS.auth_local_webserver)
return flags
# TODO(craigcitro): Switch this from taking a path to taking a stream.
def CredentialsFromFile(path, client_info, oauth2client_args=None):
"""Read credentials from a file."""
credential_store = oauth2client.multistore_file.get_credential_storage(
path,
client_info['client_id'],
client_info['user_agent'],
client_info['scope'])
if hasattr(FLAGS, 'auth_local_webserver'):
FLAGS.auth_local_webserver = False
credentials = credential_store.get()
if credentials is None or credentials.invalid:
print('Generating new OAuth credentials ...')
for _ in range(20):
# If authorization fails, we want to retry, rather than let this
# cascade up and get caught elsewhere. If users want out of the
# retry loop, they can ^C.
try:
flow = oauth2client.client.OAuth2WebServerFlow(**client_info)
flags = _GetRunFlowFlags(args=oauth2client_args)
credentials = tools.run_flow(flow, credential_store, flags)
break
except (oauth2client.client.FlowExchangeError, SystemExit) as e:
# Here SystemExit is "no credential at all", and the
# FlowExchangeError is "invalid" -- usually because
# you reused a token.
print('Invalid authorization: %s' % (e,))
except httplib2.HttpLib2Error as e:
print('Communication error: %s' % (e,))
raise exceptions.CredentialsError(
'Communication error creating credentials: %s' % e)
return credentials
# TODO(craigcitro): Push this into oauth2client.
def GetUserinfo(credentials, http=None): # pylint: disable=invalid-name
"""Get the userinfo associated with the given credentials.
This is dependent on the token having either the userinfo.email or
userinfo.profile scope for the given token.
Args:
credentials: (oauth2client.client.Credentials) incoming credentials
http: (httplib2.Http, optional) http instance to use
Returns:
The email address for this token, or None if the required scopes
aren't available.
"""
http = http or httplib2.Http()
url_root = 'https://www.googleapis.com/oauth2/v2/tokeninfo'
query_args = {'access_token': credentials.access_token}
url = '?'.join((url_root, urllib.parse.urlencode(query_args)))
# We ignore communication woes here (i.e. SSL errors, socket
# timeout), as handling these should be done in a common location.
response, content = http.request(url)
if response.status == http_client.BAD_REQUEST:
credentials.refresh(http)
response, content = http.request(url)
return json.loads(content or '{}') # Save ourselves from an empty reply.
@_RegisterCredentialsMethod
def _GetServiceAccountCredentials(
client_info, service_account_name=None, service_account_keyfile=None,
service_account_json_keyfile=None, **unused_kwds):
if ((service_account_name and not service_account_keyfile) or
(service_account_keyfile and not service_account_name)):
raise exceptions.CredentialsError(
'Service account name or keyfile provided without the other')
scopes = client_info['scope'].split()
user_agent = client_info['user_agent']
if service_account_json_keyfile:
with open(service_account_json_keyfile) as keyfile:
service_account_info = json.load(keyfile)
account_type = service_account_info.get('type')
if account_type != oauth2client.client.SERVICE_ACCOUNT:
raise exceptions.CredentialsError(
'Invalid service account credentials: %s' % (
service_account_json_keyfile,))
# pylint: disable=protected-access
credentials = oauth2client.service_account._ServiceAccountCredentials(
service_account_id=service_account_info['client_id'],
service_account_email=service_account_info['client_email'],
private_key_id=service_account_info['private_key_id'],
private_key_pkcs8_text=service_account_info['private_key'],
scopes=scopes, user_agent=user_agent)
# pylint: enable=protected-access
return credentials
if service_account_name is not None:
credentials = ServiceAccountCredentialsFromFile(
service_account_name, service_account_keyfile, scopes,
service_account_kwargs={'user_agent': user_agent})
if credentials is not None:
return credentials
@_RegisterCredentialsMethod
def _GetGaeServiceAccount(unused_client_info, scopes, **unused_kwds):
return GaeAssertionCredentials.Get(scopes=scopes)
@_RegisterCredentialsMethod
def _GetGceServiceAccount(unused_client_info, scopes, **unused_kwds):
return GceAssertionCredentials.Get(scopes=scopes)
@_RegisterCredentialsMethod
def _GetApplicationDefaultCredentials(
unused_client_info, scopes, skip_application_default_credentials=False,
**unused_kwds):
if skip_application_default_credentials:
return None
gc = oauth2client.client.GoogleCredentials
with cache_file_lock:
try:
# pylint: disable=protected-access
# We've already done our own check for GAE/GCE
# credentials, we don't want to pay for checking again.
credentials = gc._implicit_credentials_from_files()
except oauth2client.client.ApplicationDefaultCredentialsError:
return None
# If we got back a non-service account credential, we need to use
# a heuristic to decide whether or not the application default
# credential will work for us. We assume that if we're requesting
# cloud-platform, our scopes are a subset of cloud scopes, and the
# ADC will work.
cp = 'https://www.googleapis.com/auth/cloud-platform'
if not isinstance(credentials, gc) or cp in scopes:
return credentials
return None