blob: 44bb9bc254476615f165531411cfa385191c7ea2 [file] [log] [blame]
"""Command-line utility for fetching/inspecting credentials.
oauth2l (pronounced "oauthtool") is a small utility for fetching
credentials, or inspecting existing credentials. Here we demonstrate
some sample use:
$ oauth2l fetch userinfo.email bigquery compute
Fetched credentials of type:
oauth2client.client.OAuth2Credentials
Access token:
ya29.abcdefghijklmnopqrstuvwxyz123yessirree
$ oauth2l header userinfo.email
Authorization: Bearer ya29.zyxwvutsrqpnmolkjihgfedcba
$ oauth2l validate thisisnotatoken
<exit status: 1>
$ oauth2l validate ya29.zyxwvutsrqpnmolkjihgfedcba
$ oauth2l scopes ya29.abcdefghijklmnopqrstuvwxyz123yessirree
https://www.googleapis.com/auth/bigquery
https://www.googleapis.com/auth/compute
https://www.googleapis.com/auth/userinfo.email
The `header` command is designed to be easy to use with `curl`:
$ curl "$(oauth2l header bigquery)" \
'https://www.googleapis.com/bigquery/v2/projects'
The token can also be printed in other formats, for easy chaining
into other programs:
$ oauth2l fetch -f json_compact userinfo.email
<one-line JSON object with credential information>
$ oauth2l fetch -f bare drive
ya29.suchT0kenManyCredentialsW0Wokyougetthepoint
"""
import httplib
import json
import logging
import os
import pkgutil
import sys
import textwrap
import gflags as flags
from google.apputils import appcommands
import oauth2client.client
import apitools.base.py as apitools_base
from apitools.base.py import cli as apitools_cli
FLAGS = flags.FLAGS
# We could use a generated client here, but it's used for precisely
# one URL, with one parameter and no worries about URL encoding. Let's
# go with simple.
_OAUTH2_TOKENINFO_TEMPLATE = (
'https://www.googleapis.com/oauth2/v2/tokeninfo'
'?access_token={access_token}'
)
flags.DEFINE_string(
'client_secrets', '',
'If specified, use the client ID/secret from the named '
'file, which should be a client_secrets.json file as downloaded '
'from the Developer Console.')
flags.DEFINE_string(
'credentials_filename', '',
'(optional) Filename for fetching/storing credentials.')
flags.DEFINE_string(
'service_account_json_keyfile', '',
'Filename for a JSON service account key downloaded from the Developer '
'Console.')
def GetDefaultClientInfo():
client_secrets = json.loads(pkgutil.get_data(
'apitools.data', 'apitools_client_secrets.json'))['installed']
return {
'client_id': client_secrets['client_id'],
'client_secret': client_secrets['client_secret'],
'user_agent': 'apitools/0.2 oauth2l/0.1',
}
def GetClientInfoFromFlags():
"""Fetch client info from FLAGS."""
if FLAGS.client_secrets:
client_secrets_path = os.path.expanduser(FLAGS.client_secrets)
if not os.path.exists(client_secrets_path):
raise ValueError('Cannot find file: %s' % FLAGS.client_secrets)
with open(client_secrets_path) as client_secrets_file:
client_secrets = json.load(client_secrets_file)
if 'installed' not in client_secrets:
raise ValueError('Provided client ID must be for an installed app')
client_secrets = client_secrets['installed']
return {
'client_id': client_secrets['client_id'],
'client_secret': client_secrets['client_secret'],
'user_agent': 'apitools/0.2 oauth2l/0.1',
}
else:
return GetDefaultClientInfo()
def _ExpandScopes(scopes):
scope_prefix = 'https://www.googleapis.com/auth/'
return [s if s.startswith('https://') else scope_prefix + s
for s in scopes]
def _PrettyJson(data):
return json.dumps(data, sort_keys=True, indent=4, separators=(',', ': '))
def _CompactJson(data):
return json.dumps(data, sort_keys=True, separators=(',', ':'))
def _Format(fmt, credentials):
"""Format credentials according to fmt."""
if fmt == 'bare':
return credentials.access_token
elif fmt == 'header':
return 'Authorization: Bearer %s' % credentials.access_token
elif fmt == 'json':
return _PrettyJson(json.loads(credentials.to_json()))
elif fmt == 'json_compact':
return _CompactJson(json.loads(credentials.to_json()))
elif fmt == 'pretty':
format_str = textwrap.dedent('\n'.join([
'Fetched credentials of type:',
' {credentials_type.__module__}.{credentials_type.__name__}',
'Access token:',
' {credentials.access_token}',
]))
return format_str.format(credentials=credentials,
credentials_type=type(credentials))
raise ValueError('Unknown format: {}'.format(fmt))
_FORMATS = set(('bare', 'header', 'json', 'json_compact', 'pretty'))
def _GetTokenScopes(access_token):
"""Return the list of valid scopes for the given token as a list."""
url = _OAUTH2_TOKENINFO_TEMPLATE.format(access_token=access_token)
response = apitools_base.MakeRequest(
apitools_base.GetHttp(), apitools_base.Request(url))
if response.status_code not in [httplib.OK, httplib.BAD_REQUEST]:
raise apitools_base.HttpError.FromResponse(response)
if response.status_code == httplib.BAD_REQUEST:
return []
return json.loads(response.content)['scope'].split(' ')
def _ValidateToken(access_token):
"""Return True iff the provided access token is valid."""
return bool(_GetTokenScopes(access_token))
def FetchCredentials(scopes, client_info=None, credentials_filename=None):
"""Fetch a credential for the given client_info and scopes."""
client_info = client_info or GetClientInfoFromFlags()
scopes = _ExpandScopes(scopes)
if not scopes:
raise ValueError('No scopes provided')
credentials_filename = credentials_filename or FLAGS.credentials_filename
# TODO(craigcitro): Remove this logging nonsense once we quiet the
# spurious logging in oauth2client.
old_level = logging.getLogger().level
logging.getLogger().setLevel(logging.ERROR)
credentials = apitools_base.GetCredentials(
'oauth2l', scopes, credentials_filename=credentials_filename,
service_account_json_keyfile=FLAGS.service_account_json_keyfile,
oauth2client_args='', **client_info)
logging.getLogger().setLevel(old_level)
if not _ValidateToken(credentials.access_token):
credentials.refresh(apitools_base.GetHttp())
return credentials
class _Email(apitools_cli.NewCmd):
"""Get user email."""
usage = 'email <access_token>'
def RunWithArgs(self, access_token):
"""Print the email address for this token, if possible."""
userinfo = apitools_base.GetUserinfo(
oauth2client.client.AccessTokenCredentials(access_token,
'oauth2l/1.0'))
user_email = userinfo.get('email')
if user_email:
print user_email
class _Fetch(apitools_cli.NewCmd):
"""Fetch credentials."""
usage = 'fetch <scope> [<scope> ...]'
def __init__(self, name, flag_values):
super(_Fetch, self).__init__(name, flag_values)
flags.DEFINE_enum(
'credentials_format', 'pretty', sorted(_FORMATS),
'Output format for token.',
short_name='f', flag_values=flag_values)
def RunWithArgs(self, *scopes):
"""Fetch a valid access token and display it."""
credentials = FetchCredentials(scopes)
print _Format(FLAGS.credentials_format.lower(), credentials)
class _Header(apitools_cli.NewCmd):
"""Print credentials for a header."""
usage = 'header <scope> [<scope> ...]'
def RunWithArgs(self, *scopes):
"""Fetch a valid access token and display it formatted for a header."""
print _Format('header', FetchCredentials(scopes))
class _Scopes(apitools_cli.NewCmd):
"""Get the list of scopes for a token."""
usage = 'scopes <access_token>'
def RunWithArgs(self, access_token):
"""Print the list of scopes for a valid token."""
scopes = _GetTokenScopes(access_token)
if not scopes:
return 1
for scope in sorted(scopes):
print scope
class _Userinfo(apitools_cli.NewCmd):
"""Get userinfo."""
usage = 'userinfo <access_token>'
def __init__(self, name, flag_values):
super(_Userinfo, self).__init__(name, flag_values)
flags.DEFINE_enum(
'format', 'json', sorted(('json', 'json_compact')),
'Output format for userinfo.',
short_name='f', flag_values=flag_values)
def RunWithArgs(self, access_token):
"""Print the userinfo for this token (if we have the right scopes)."""
userinfo = apitools_base.GetUserinfo(
oauth2client.client.AccessTokenCredentials(access_token,
'oauth2l/1.0'))
if FLAGS.format == 'json':
print _PrettyJson(userinfo)
else:
print _CompactJson(userinfo)
class _Validate(apitools_cli.NewCmd):
"""Validate a token."""
usage = 'validate <access_token>'
def RunWithArgs(self, access_token):
"""Validate an access token. Exits with 0 if valid, 1 otherwise."""
return 1 - (_ValidateToken(access_token))
def run_main(): # pylint:disable=invalid-name
"""Function to be used as setuptools script entry point."""
# Put the flags for this module somewhere the flags module will look
# for them.
# pylint:disable=protected-access
new_name = flags._GetMainModule()
sys.modules[new_name] = sys.modules['__main__']
for flag in FLAGS.FlagsByModuleDict().get(__name__, []):
FLAGS._RegisterFlagByModule(new_name, flag)
for key_flag in FLAGS.KeyFlagsByModuleDict().get(__name__, []):
FLAGS._RegisterKeyFlagForModule(new_name, key_flag)
# pylint:enable=protected-access
# Now set __main__ appropriately so that appcommands will be
# happy.
sys.modules['__main__'] = sys.modules[__name__]
appcommands.Run()
sys.modules['__main__'] = sys.modules.pop(new_name)
def main(unused_argv):
appcommands.AddCmd('email', _Email)
appcommands.AddCmd('fetch', _Fetch)
appcommands.AddCmd('header', _Header)
appcommands.AddCmd('scopes', _Scopes)
appcommands.AddCmd('userinfo', _Userinfo)
appcommands.AddCmd('validate', _Validate)
if __name__ == '__main__':
appcommands.Run()