blob: 54b8fd106108eb43bafd4b1a4a3ca4e4f522c687 [file] [log] [blame]
# Copyright 2012 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import json
import logging
import os
from telemetry.core import util
from telemetry.internal.backends import codepen_credentials_backend
from telemetry.internal.backends import facebook_credentials_backend
from telemetry.internal.backends import google_credentials_backend
from telemetry.testing import options_for_unittests
class CredentialsError(Exception):
"""Error that can be thrown when logging in."""
class BrowserCredentials(object):
def __init__(self, backends=None):
self._credentials = {}
self._credentials_path = None
self._extra_credentials = {}
if backends is None:
backends = [
codepen_credentials_backend.CodePenCredentialsBackend(),
facebook_credentials_backend.FacebookCredentialsBackend(),
facebook_credentials_backend.FacebookCredentialsBackend2(),
google_credentials_backend.GoogleCredentialsBackend(),
google_credentials_backend.GoogleCredentialsBackend2()]
self._backends = {}
for backend in backends:
self._backends[backend.credentials_type] = backend
def AddBackend(self, backend):
assert backend.credentials_type not in self._backends
self._backends[backend.credentials_type] = backend
def IsLoggedIn(self, credentials_type):
if credentials_type not in self._backends:
raise CredentialsError(
'Unrecognized credentials type: %s', credentials_type)
if credentials_type not in self._credentials:
return False
return self._backends[credentials_type].IsLoggedIn()
def CanLogin(self, credentials_type):
if credentials_type not in self._backends:
raise CredentialsError(
'Unrecognized credentials type: %s', credentials_type)
return credentials_type in self._credentials
def LoginNeeded(self, tab, credentials_type):
if credentials_type not in self._backends:
raise CredentialsError(
'Unrecognized credentials type: %s', credentials_type)
if credentials_type not in self._credentials:
return False
from telemetry.page import action_runner
runner = action_runner.ActionRunner(tab)
return self._backends[credentials_type].LoginNeeded(
tab, runner, self._credentials[credentials_type])
def LoginNoLongerNeeded(self, tab, credentials_type):
assert credentials_type in self._backends
self._backends[credentials_type].LoginNoLongerNeeded(tab)
@property
def credentials_path(self):
return self._credentials_path
@credentials_path.setter
def credentials_path(self, credentials_path):
self._credentials_path = credentials_path
self._RebuildCredentials()
def Add(self, credentials_type, data):
if credentials_type not in self._extra_credentials:
self._extra_credentials[credentials_type] = {}
for k, v in data.items():
assert k not in self._extra_credentials[credentials_type]
self._extra_credentials[credentials_type][k] = v
self._RebuildCredentials()
def _ResetLoggedInState(self):
"""Makes the backends think we're not logged in even though we are.
Should only be used in unit tests to simulate --dont-override-profile.
"""
for backend in self._backends.keys():
# pylint: disable=protected-access
self._backends[backend]._ResetLoggedInState()
def _RebuildCredentials(self):
credentials = {}
if self._credentials_path == None:
pass
elif os.path.exists(self._credentials_path):
with open(self._credentials_path, 'r') as f:
credentials = json.loads(f.read())
# TODO(nduca): use system keychain, if possible.
homedir_credentials_path = os.path.expanduser('~/.telemetry-credentials')
homedir_credentials = {}
if (not options_for_unittests.GetCopy() and
os.path.exists(homedir_credentials_path)):
logging.info("Found ~/.telemetry-credentials. Its contents will be used "
"when no other credentials can be found.")
with open(homedir_credentials_path, 'r') as f:
homedir_credentials = json.loads(f.read())
self._credentials = {}
all_keys = set(credentials.keys()).union(
homedir_credentials.keys()).union(
self._extra_credentials.keys())
for k in all_keys:
if k in credentials:
self._credentials[k] = credentials[k]
if k in homedir_credentials:
logging.info("Will use ~/.telemetry-credentials for %s logins." % k)
self._credentials[k] = homedir_credentials[k]
if k in self._extra_credentials:
self._credentials[k] = self._extra_credentials[k]
def WarnIfMissingCredentials(self, page):
if page.credentials and not self.CanLogin(page.credentials):
files_to_tweak = []
if page.credentials_path:
files_to_tweak.append(page.credentials_path)
files_to_tweak.append('~/.telemetry-credentials')
example_credentials_file = os.path.join(
util.GetTelemetryDir(), 'examples', 'credentials_example.json')
logging.warning("""
Credentials for %s were not found. page %s will not be tested.
To fix this, either follow the instructions to authenticate to gsutil
here:
http://www.chromium.org/developers/telemetry/upload_to_cloud_storage,
or add your own credentials to:
%s
An example credentials file you can copy from is here:
%s\n""" % (page.credentials, page, ' or '.join(files_to_tweak),
example_credentials_file))