"""Command-line utility for fetching/inspecting credentials.
oauth2l (pronounced "oauthtool") is a small utility for fetching
credentials, or inspecting existing credentials. Here we demonstrate
some sample use:
$ oauth2l fetch bigquery compute
Fetched credentials of type:
Access token:
$ oauth2l header
Authorization: Bearer ya29.zyxwvutsrqpnmolkjihgfedcba
$ oauth2l validate thisisnotatoken
<exit status: 1>
$ oauth2l validate ya29.zyxwvutsrqpnmolkjihgfedcba
$ oauth2l scopes ya29.abcdefghijklmnopqrstuvwxyz123yessirree
The `header` command is designed to be easy to use with `curl`:
$ curl "$(oauth2l header bigquery)" \
The token can also be printed in other formats, for easy chaining
into other programs:
$ oauth2l fetch -f json_compact
<one-line JSON object with credential information>
$ oauth2l fetch -f bare drive
import httplib
import json
import logging
import os
import pkgutil
import sys
import textwrap
import gflags as flags
from google.apputils import appcommands
import oauth2client.client
import as apitools_base
from import cli as apitools_cli
# We could use a generated client here, but it's used for precisely
# one URL, with one parameter and no worries about URL encoding. Let's
# go with simple.
'client_secrets', '',
'If specified, use the client ID/secret from the named '
'file, which should be a client_secrets.json file as downloaded '
'from the Developer Console.')
'credentials_filename', '',
'(optional) Filename for fetching/storing credentials.')
'service_account_json_keyfile', '',
'Filename for a JSON service account key downloaded from the Developer '
def GetDefaultClientInfo():
client_secrets = json.loads(pkgutil.get_data(
'', 'apitools_client_secrets.json'))['installed']
return {
'client_id': client_secrets['client_id'],
'client_secret': client_secrets['client_secret'],
'user_agent': 'apitools/0.2 oauth2l/0.1',
def GetClientInfoFromFlags():
"""Fetch client info from FLAGS."""
if FLAGS.client_secrets:
client_secrets_path = os.path.expanduser(FLAGS.client_secrets)
if not os.path.exists(client_secrets_path):
raise ValueError('Cannot find file: %s' % FLAGS.client_secrets)
with open(client_secrets_path) as client_secrets_file:
client_secrets = json.load(client_secrets_file)
if 'installed' not in client_secrets:
raise ValueError('Provided client ID must be for an installed app')
client_secrets = client_secrets['installed']
return {
'client_id': client_secrets['client_id'],
'client_secret': client_secrets['client_secret'],
'user_agent': 'apitools/0.2 oauth2l/0.1',
return GetDefaultClientInfo()
def _ExpandScopes(scopes):
scope_prefix = ''
return [s if s.startswith('https://') else scope_prefix + s
for s in scopes]
def _PrettyJson(data):
return json.dumps(data, sort_keys=True, indent=4, separators=(',', ': '))
def _CompactJson(data):
return json.dumps(data, sort_keys=True, separators=(',', ':'))
def _Format(fmt, credentials):
"""Format credentials according to fmt."""
if fmt == 'bare':
return credentials.access_token
elif fmt == 'header':
return 'Authorization: Bearer %s' % credentials.access_token
elif fmt == 'json':
return _PrettyJson(json.loads(credentials.to_json()))
elif fmt == 'json_compact':
return _CompactJson(json.loads(credentials.to_json()))
elif fmt == 'pretty':
format_str = textwrap.dedent('\n'.join([
'Fetched credentials of type:',
' {credentials_type.__module__}.{credentials_type.__name__}',
'Access token:',
' {credentials.access_token}',
return format_str.format(credentials=credentials,
raise ValueError('Unknown format: {}'.format(fmt))
_FORMATS = set(('bare', 'header', 'json', 'json_compact', 'pretty'))
def _GetTokenScopes(access_token):
"""Return the list of valid scopes for the given token as a list."""
url = _OAUTH2_TOKENINFO_TEMPLATE.format(access_token=access_token)
response = apitools_base.MakeRequest(
apitools_base.GetHttp(), apitools_base.Request(url))
if response.status_code not in [httplib.OK, httplib.BAD_REQUEST]:
raise apitools_base.HttpError.FromResponse(response)
if response.status_code == httplib.BAD_REQUEST:
return []
return json.loads(response.content)['scope'].split(' ')
def _ValidateToken(access_token):
"""Return True iff the provided access token is valid."""
return bool(_GetTokenScopes(access_token))
def FetchCredentials(scopes, client_info=None, credentials_filename=None):
"""Fetch a credential for the given client_info and scopes."""
client_info = client_info or GetClientInfoFromFlags()
scopes = _ExpandScopes(scopes)
if not scopes:
raise ValueError('No scopes provided')
credentials_filename = credentials_filename or FLAGS.credentials_filename
# TODO(craigcitro): Remove this logging nonsense once we quiet the
# spurious logging in oauth2client.
old_level = logging.getLogger().level
credentials = apitools_base.GetCredentials(
'oauth2l', scopes, credentials_filename=credentials_filename,
oauth2client_args='', **client_info)
if not _ValidateToken(credentials.access_token):
return credentials
class _Email(apitools_cli.NewCmd):
"""Get user email."""
usage = 'email <access_token>'
def RunWithArgs(self, access_token):
"""Print the email address for this token, if possible."""
userinfo = apitools_base.GetUserinfo(
user_email = userinfo.get('email')
if user_email:
print user_email
class _Fetch(apitools_cli.NewCmd):
"""Fetch credentials."""
usage = 'fetch <scope> [<scope> ...]'
def __init__(self, name, flag_values):
super(_Fetch, self).__init__(name, flag_values)
'credentials_format', 'pretty', sorted(_FORMATS),
'Output format for token.',
short_name='f', flag_values=flag_values)
def RunWithArgs(self, *scopes):
"""Fetch a valid access token and display it."""
credentials = FetchCredentials(scopes)
print _Format(FLAGS.credentials_format.lower(), credentials)
class _Header(apitools_cli.NewCmd):
"""Print credentials for a header."""
usage = 'header <scope> [<scope> ...]'
def RunWithArgs(self, *scopes):
"""Fetch a valid access token and display it formatted for a header."""
print _Format('header', FetchCredentials(scopes))
class _Scopes(apitools_cli.NewCmd):
"""Get the list of scopes for a token."""
usage = 'scopes <access_token>'
def RunWithArgs(self, access_token):
"""Print the list of scopes for a valid token."""
scopes = _GetTokenScopes(access_token)
if not scopes:
return 1
for scope in sorted(scopes):
print scope
class _Userinfo(apitools_cli.NewCmd):
"""Get userinfo."""
usage = 'userinfo <access_token>'
def __init__(self, name, flag_values):
super(_Userinfo, self).__init__(name, flag_values)
'format', 'json', sorted(('json', 'json_compact')),
'Output format for userinfo.',
short_name='f', flag_values=flag_values)
def RunWithArgs(self, access_token):
"""Print the userinfo for this token (if we have the right scopes)."""
userinfo = apitools_base.GetUserinfo(
if FLAGS.format == 'json':
print _PrettyJson(userinfo)
print _CompactJson(userinfo)
class _Validate(apitools_cli.NewCmd):
"""Validate a token."""
usage = 'validate <access_token>'
def RunWithArgs(self, access_token):
"""Validate an access token. Exits with 0 if valid, 1 otherwise."""
return 1 - (_ValidateToken(access_token))
def run_main(): # pylint:disable=invalid-name
"""Function to be used as setuptools script entry point."""
# Put the flags for this module somewhere the flags module will look
# for them.
# pylint:disable=protected-access
new_name = flags._GetMainModule()
sys.modules[new_name] = sys.modules['__main__']
for flag in FLAGS.FlagsByModuleDict().get(__name__, []):
FLAGS._RegisterFlagByModule(new_name, flag)
for key_flag in FLAGS.KeyFlagsByModuleDict().get(__name__, []):
FLAGS._RegisterKeyFlagForModule(new_name, key_flag)
# pylint:enable=protected-access
# Now set __main__ appropriately so that appcommands will be
# happy.
sys.modules['__main__'] = sys.modules[__name__]
sys.modules['__main__'] = sys.modules.pop(new_name)
def main(unused_argv):
appcommands.AddCmd('email', _Email)
appcommands.AddCmd('fetch', _Fetch)
appcommands.AddCmd('header', _Header)
appcommands.AddCmd('scopes', _Scopes)
appcommands.AddCmd('userinfo', _Userinfo)
appcommands.AddCmd('validate', _Validate)
if __name__ == '__main__':