blob: cddba0a6ff926aa5f0e195a638aca44307c9f60c [file] [log] [blame]
#
# Copyright 2015 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Command-line utility for fetching/inspecting credentials.
oauth2l (pronounced "oauthtool") is a small utility for fetching
credentials, or inspecting existing credentials. Here we demonstrate
some sample use:
$ oauth2l fetch userinfo.email bigquery compute
Fetched credentials of type:
oauth2client.client.OAuth2Credentials
Access token:
ya29.abcdefghijklmnopqrstuvwxyz123yessirree
$ oauth2l header userinfo.email
Authorization: Bearer ya29.zyxwvutsrqpnmolkjihgfedcba
$ oauth2l validate thisisnotatoken
<exit status: 1>
$ oauth2l validate ya29.zyxwvutsrqpnmolkjihgfedcba
$ oauth2l scopes ya29.abcdefghijklmnopqrstuvwxyz123yessirree
https://www.googleapis.com/auth/bigquery
https://www.googleapis.com/auth/compute
https://www.googleapis.com/auth/userinfo.email
The `header` command is designed to be easy to use with `curl`:
$ curl -H "$(oauth2l header bigquery)" \\
'https://www.googleapis.com/bigquery/v2/projects'
... lists all projects ...
The token can also be printed in other formats, for easy chaining
into other programs:
$ oauth2l fetch -f json_compact userinfo.email
<one-line JSON object with credential information>
$ oauth2l fetch -f bare drive
ya29.suchT0kenManyCredentialsW0Wokyougetthepoint
"""
from __future__ import print_function
import argparse
import json
import logging
import os
import pkgutil
import sys
import textwrap
import oauth2client.client
from six.moves import http_client
import apitools.base.py as apitools_base
# We could use a generated client here, but it's used for precisely
# one URL, with one parameter and no worries about URL encoding. Let's
# go with simple.
_OAUTH2_TOKENINFO_TEMPLATE = (
'https://www.googleapis.com/oauth2/v2/tokeninfo'
'?access_token={access_token}'
)
def GetDefaultClientInfo():
client_secrets_json = pkgutil.get_data(
'apitools.data', 'apitools_client_secrets.json').decode('utf8')
client_secrets = json.loads(client_secrets_json)['installed']
return {
'client_id': client_secrets['client_id'],
'client_secret': client_secrets['client_secret'],
'user_agent': 'apitools/0.2 oauth2l/0.1',
}
def GetClientInfoFromFlags(client_secrets):
"""Fetch client info from args."""
if client_secrets:
client_secrets_path = os.path.expanduser(client_secrets)
if not os.path.exists(client_secrets_path):
raise ValueError(
'Cannot find file: {0}'.format(client_secrets))
with open(client_secrets_path) as client_secrets_file:
client_secrets = json.load(client_secrets_file)
if 'installed' not in client_secrets:
raise ValueError('Provided client ID must be for an installed app')
client_secrets = client_secrets['installed']
return {
'client_id': client_secrets['client_id'],
'client_secret': client_secrets['client_secret'],
'user_agent': 'apitools/0.2 oauth2l/0.1',
}
else:
return GetDefaultClientInfo()
def _ExpandScopes(scopes):
scope_prefix = 'https://www.googleapis.com/auth/'
return [s if s.startswith('https://') else scope_prefix + s
for s in scopes]
def _PrettyJson(data):
return json.dumps(data, sort_keys=True, indent=4, separators=(',', ': '))
def _CompactJson(data):
return json.dumps(data, sort_keys=True, separators=(',', ':'))
def _AsText(text_or_bytes):
if isinstance(text_or_bytes, bytes):
return text_or_bytes.decode('utf8')
return text_or_bytes
def _Format(fmt, credentials):
"""Format credentials according to fmt."""
if fmt == 'bare':
return credentials.access_token
elif fmt == 'header':
return 'Authorization: Bearer %s' % credentials.access_token
elif fmt == 'json':
return _PrettyJson(json.loads(_AsText(credentials.to_json())))
elif fmt == 'json_compact':
return _CompactJson(json.loads(_AsText(credentials.to_json())))
elif fmt == 'pretty':
format_str = textwrap.dedent('\n'.join([
'Fetched credentials of type:',
' {credentials_type.__module__}.{credentials_type.__name__}',
'Access token:',
' {credentials.access_token}',
]))
return format_str.format(credentials=credentials,
credentials_type=type(credentials))
raise ValueError('Unknown format: {0}'.format(fmt))
_FORMATS = set(('bare', 'header', 'json', 'json_compact', 'pretty'))
def _GetTokenScopes(access_token):
"""Return the list of valid scopes for the given token as a list."""
url = _OAUTH2_TOKENINFO_TEMPLATE.format(access_token=access_token)
response = apitools_base.MakeRequest(
apitools_base.GetHttp(), apitools_base.Request(url))
if response.status_code not in [http_client.OK, http_client.BAD_REQUEST]:
raise apitools_base.HttpError.FromResponse(response)
if response.status_code == http_client.BAD_REQUEST:
return []
return json.loads(_AsText(response.content))['scope'].split(' ')
def _ValidateToken(access_token):
"""Return True iff the provided access token is valid."""
return bool(_GetTokenScopes(access_token))
def _FetchCredentials(args, client_info=None, credentials_filename=None):
"""Fetch a credential for the given client_info and scopes."""
client_info = client_info or GetClientInfoFromFlags(args.client_secrets)
scopes = _ExpandScopes(args.scope)
if not scopes:
raise ValueError('No scopes provided')
credentials_filename = credentials_filename or args.credentials_filename
# TODO(craigcitro): Remove this logging nonsense once we quiet the
# spurious logging in oauth2client.
old_level = logging.getLogger().level
logging.getLogger().setLevel(logging.ERROR)
credentials = apitools_base.GetCredentials(
'oauth2l', scopes, credentials_filename=credentials_filename,
service_account_json_keyfile=args.service_account_json_keyfile,
oauth2client_args='', **client_info)
logging.getLogger().setLevel(old_level)
if not _ValidateToken(credentials.access_token):
credentials.refresh(apitools_base.GetHttp())
return credentials
def _Email(args):
"""Print the email address for this token, if possible."""
userinfo = apitools_base.GetUserinfo(
oauth2client.client.AccessTokenCredentials(args.access_token,
'oauth2l/1.0'))
user_email = userinfo.get('email')
if user_email:
print(user_email)
def _Fetch(args):
"""Fetch a valid access token and display it."""
credentials = _FetchCredentials(args)
print(_Format(args.credentials_format.lower(), credentials))
def _Header(args):
"""Fetch an access token and display it formatted as an HTTP header."""
print(_Format('header', _FetchCredentials(args)))
def _Scopes(args):
"""Print the list of scopes for a valid token."""
scopes = _GetTokenScopes(args.access_token)
if not scopes:
return 1
for scope in sorted(scopes):
print(scope)
def _Userinfo(args):
"""Print the userinfo for this token, if possible."""
userinfo = apitools_base.GetUserinfo(
oauth2client.client.AccessTokenCredentials(args.access_token,
'oauth2l/1.0'))
if args.format == 'json':
print(_PrettyJson(userinfo))
else:
print(_CompactJson(userinfo))
def _Validate(args):
"""Validate an access token. Exits with 0 if valid, 1 otherwise."""
return 1 - (_ValidateToken(args.access_token))
def _GetParser():
"""Returns argparse argument parser."""
shared_flags = argparse.ArgumentParser(add_help=False)
shared_flags.add_argument(
'--client_secrets',
default='',
help=('If specified, use the client ID/secret from the named '
'file, which should be a client_secrets.json file '
'downloaded from the Developer Console.'))
shared_flags.add_argument(
'--credentials_filename',
default='',
help='(optional) Filename for fetching/storing credentials.')
shared_flags.add_argument(
'--service_account_json_keyfile',
default='',
help=('Filename for a JSON service account key downloaded from '
'the Google Developer Console.'))
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
subparsers = parser.add_subparsers(dest='command')
# email
email = subparsers.add_parser('email', help=_Email.__doc__,
parents=[shared_flags])
email.set_defaults(func=_Email)
email.add_argument(
'access_token',
help=('Access token to print associated email address for. Must have '
'the userinfo.email scope.'))
# fetch
fetch = subparsers.add_parser('fetch', help=_Fetch.__doc__,
parents=[shared_flags])
fetch.set_defaults(func=_Fetch)
fetch.add_argument(
'-f', '--credentials_format',
default='pretty', choices=sorted(_FORMATS),
help='Output format for token.')
fetch.add_argument(
'scope',
nargs='*',
help='Scope to fetch. May be provided multiple times.')
# header
header = subparsers.add_parser('header', help=_Header.__doc__,
parents=[shared_flags])
header.set_defaults(func=_Header)
header.add_argument(
'scope',
nargs='*',
help='Scope to header. May be provided multiple times.')
# scopes
scopes = subparsers.add_parser('scopes', help=_Scopes.__doc__,
parents=[shared_flags])
scopes.set_defaults(func=_Scopes)
scopes.add_argument(
'access_token',
help=('Scopes associated with this token will be printed.'))
# userinfo
userinfo = subparsers.add_parser('userinfo', help=_Userinfo.__doc__,
parents=[shared_flags])
userinfo.set_defaults(func=_Userinfo)
userinfo.add_argument(
'-f', '--format',
default='json', choices=('json', 'json_compact'),
help='Output format for userinfo.')
userinfo.add_argument(
'access_token',
help=('Access token to print associated email address for. Must have '
'the userinfo.email scope.'))
# validate
validate = subparsers.add_parser('validate', help=_Validate.__doc__,
parents=[shared_flags])
validate.set_defaults(func=_Validate)
validate.add_argument(
'access_token',
help='Access token to validate.')
return parser
def main(argv=None):
argv = argv or sys.argv
# Invoke the newly created parser.
args = _GetParser().parse_args(argv[1:])
try:
exit_code = args.func(args)
except BaseException as e:
print('Error encountered in {0} operation: {1}'.format(
args.command, e))
return 1
return exit_code
if __name__ == '__main__':
sys.exit(main(sys.argv))