| # Copyright 2015 Google Inc. All rights reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """Unit tests for oauth2client.multistore_file.""" |
| |
| import datetime |
| import errno |
| import os |
| import stat |
| import tempfile |
| |
| import mock |
| import unittest2 |
| |
| from oauth2client import client |
| from oauth2client import util |
| from oauth2client.contrib import locked_file |
| from oauth2client.contrib import multistore_file |
| |
| _filehandle, FILENAME = tempfile.mkstemp('oauth2client_test.data') |
| os.close(_filehandle) |
| |
| |
| class _MockLockedFile(object): |
| |
| def __init__(self, filename_str, error_class, error_code): |
| self.filename_str = filename_str |
| self.error_class = error_class |
| self.error_code = error_code |
| self.open_and_lock_called = False |
| |
| def open_and_lock(self): |
| self.open_and_lock_called = True |
| raise self.error_class(self.error_code, '') |
| |
| def is_locked(self): |
| return False |
| |
| def filename(self): |
| return self.filename_str |
| |
| |
| class Test__dict_to_tuple_key(unittest2.TestCase): |
| |
| def test_key_conversions(self): |
| key1, val1 = 'somekey', 'some value' |
| key2, val2 = 'another', 'something else' |
| key3, val3 = 'onemore', 'foo' |
| test_dict = { |
| key1: val1, |
| key2: val2, |
| key3: val3, |
| } |
| tuple_key = multistore_file._dict_to_tuple_key(test_dict) |
| |
| # the resulting key should be naturally sorted |
| expected_output = ( |
| (key2, val2), |
| (key3, val3), |
| (key1, val1), |
| ) |
| self.assertTupleEqual(expected_output, tuple_key) |
| # check we get the original dictionary back |
| self.assertDictEqual(test_dict, dict(tuple_key)) |
| |
| |
| class MultistoreFileTests(unittest2.TestCase): |
| |
| def tearDown(self): |
| try: |
| os.unlink(FILENAME) |
| except OSError: |
| pass |
| |
| def setUp(self): |
| try: |
| os.unlink(FILENAME) |
| except OSError: |
| pass |
| |
| def _create_test_credentials(self, client_id='some_client_id', |
| expiration=None): |
| access_token = 'foo' |
| client_secret = 'cOuDdkfjxxnv+' |
| refresh_token = '1/0/a.df219fjls0' |
| token_expiry = expiration or datetime.datetime.utcnow() |
| token_uri = 'https://www.google.com/accounts/o8/oauth2/token' |
| user_agent = 'refresh_checker/1.0' |
| |
| credentials = client.OAuth2Credentials( |
| access_token, client_id, client_secret, |
| refresh_token, token_expiry, token_uri, |
| user_agent) |
| return credentials |
| |
| def test_lock_file_raises_ioerror(self): |
| filehandle, filename = tempfile.mkstemp() |
| os.close(filehandle) |
| |
| try: |
| for error_code in (errno.EDEADLK, errno.ENOSYS, errno.ENOLCK, |
| errno.EACCES): |
| for error_class in (IOError, OSError): |
| multistore = multistore_file._MultiStore(filename) |
| multistore._file = _MockLockedFile( |
| filename, error_class, error_code) |
| # Should not raise though the underlying file class did. |
| multistore._lock() |
| self.assertTrue(multistore._file.open_and_lock_called) |
| finally: |
| os.unlink(filename) |
| |
| def test_lock_file_raise_unexpected_error(self): |
| filehandle, filename = tempfile.mkstemp() |
| os.close(filehandle) |
| |
| try: |
| multistore = multistore_file._MultiStore(filename) |
| multistore._file = _MockLockedFile(filename, IOError, errno.EBUSY) |
| with self.assertRaises(IOError): |
| multistore._lock() |
| self.assertTrue(multistore._file.open_and_lock_called) |
| finally: |
| os.unlink(filename) |
| |
| def test_read_only_file_fail_lock(self): |
| credentials = self._create_test_credentials() |
| |
| open(FILENAME, 'a+b').close() |
| os.chmod(FILENAME, 0o400) |
| |
| store = multistore_file.get_credential_storage( |
| FILENAME, |
| credentials.client_id, |
| credentials.user_agent, |
| ['some-scope', 'some-other-scope']) |
| |
| store.put(credentials) |
| if os.name == 'posix': # pragma: NO COVER |
| self.assertTrue(store._multistore._read_only) |
| os.chmod(FILENAME, 0o600) |
| |
| def test_read_only_file_fail_lock_no_warning(self): |
| open(FILENAME, 'a+b').close() |
| os.chmod(FILENAME, 0o400) |
| |
| multistore = multistore_file._MultiStore(FILENAME) |
| |
| with mock.patch.object(multistore_file.logger, 'warn') as mock_warn: |
| multistore._warn_on_readonly = False |
| multistore._lock() |
| self.assertFalse(mock_warn.called) |
| |
| def test_lock_skip_refresh(self): |
| with open(FILENAME, 'w') as f: |
| f.write('123') |
| os.chmod(FILENAME, 0o400) |
| |
| multistore = multistore_file._MultiStore(FILENAME) |
| |
| refresh_patch = mock.patch.object( |
| multistore, '_refresh_data_cache') |
| |
| with refresh_patch as refresh_mock: |
| multistore._data = {} |
| multistore._lock() |
| self.assertFalse(refresh_mock.called) |
| |
| @unittest2.skipIf(not hasattr(os, 'symlink'), 'No symlink available') |
| def test_multistore_no_symbolic_link_files(self): |
| SYMFILENAME = FILENAME + 'sym' |
| os.symlink(FILENAME, SYMFILENAME) |
| store = multistore_file.get_credential_storage( |
| SYMFILENAME, |
| 'some_client_id', |
| 'user-agent/1.0', |
| ['some-scope', 'some-other-scope']) |
| try: |
| with self.assertRaises( |
| locked_file.CredentialsFileSymbolicLinkError): |
| store.get() |
| finally: |
| os.unlink(SYMFILENAME) |
| |
| def test_multistore_non_existent_file(self): |
| store = multistore_file.get_credential_storage( |
| FILENAME, |
| 'some_client_id', |
| 'user-agent/1.0', |
| ['some-scope', 'some-other-scope']) |
| |
| credentials = store.get() |
| self.assertEquals(None, credentials) |
| |
| def test_multistore_file(self): |
| credentials = self._create_test_credentials() |
| |
| store = multistore_file.get_credential_storage( |
| FILENAME, |
| credentials.client_id, |
| credentials.user_agent, |
| ['some-scope', 'some-other-scope']) |
| |
| # Save credentials |
| store.put(credentials) |
| credentials = store.get() |
| |
| self.assertNotEquals(None, credentials) |
| self.assertEquals('foo', credentials.access_token) |
| |
| # Delete credentials |
| store.delete() |
| credentials = store.get() |
| |
| self.assertEquals(None, credentials) |
| |
| if os.name == 'posix': # pragma: NO COVER |
| self.assertEquals( |
| 0o600, stat.S_IMODE(os.stat(FILENAME).st_mode)) |
| |
| def test_multistore_file_custom_key(self): |
| credentials = self._create_test_credentials() |
| |
| custom_key = {'myapp': 'testing', 'clientid': 'some client'} |
| store = multistore_file.get_credential_storage_custom_key( |
| FILENAME, custom_key) |
| |
| store.put(credentials) |
| stored_credentials = store.get() |
| |
| self.assertNotEquals(None, stored_credentials) |
| self.assertEqual(credentials.access_token, |
| stored_credentials.access_token) |
| |
| store.delete() |
| stored_credentials = store.get() |
| |
| self.assertEquals(None, stored_credentials) |
| |
| def test_multistore_file_custom_string_key(self): |
| credentials = self._create_test_credentials() |
| |
| # store with string key |
| store = multistore_file.get_credential_storage_custom_string_key( |
| FILENAME, 'mykey') |
| |
| store.put(credentials) |
| stored_credentials = store.get() |
| |
| self.assertNotEquals(None, stored_credentials) |
| self.assertEqual(credentials.access_token, |
| stored_credentials.access_token) |
| |
| # try retrieving with a dictionary |
| multistore_file.get_credential_storage_custom_string_key( |
| FILENAME, {'key': 'mykey'}) |
| stored_credentials = store.get() |
| self.assertNotEquals(None, stored_credentials) |
| self.assertEqual(credentials.access_token, |
| stored_credentials.access_token) |
| |
| store.delete() |
| stored_credentials = store.get() |
| |
| self.assertEquals(None, stored_credentials) |
| |
| def test_multistore_file_backwards_compatibility(self): |
| credentials = self._create_test_credentials() |
| scopes = ['scope1', 'scope2'] |
| |
| # store the credentials using the legacy key method |
| store = multistore_file.get_credential_storage( |
| FILENAME, 'client_id', 'user_agent', scopes) |
| store.put(credentials) |
| |
| # retrieve the credentials using a custom key that matches the |
| # legacy key |
| key = {'clientId': 'client_id', 'userAgent': 'user_agent', |
| 'scope': util.scopes_to_string(scopes)} |
| store = multistore_file.get_credential_storage_custom_key( |
| FILENAME, key) |
| stored_credentials = store.get() |
| |
| self.assertEqual(credentials.access_token, |
| stored_credentials.access_token) |
| |
| def test_multistore_file_get_all_keys(self): |
| # start with no keys |
| keys = multistore_file.get_all_credential_keys(FILENAME) |
| self.assertEquals([], keys) |
| |
| # store credentials |
| credentials = self._create_test_credentials(client_id='client1') |
| custom_key = {'myapp': 'testing', 'clientid': 'client1'} |
| store1 = multistore_file.get_credential_storage_custom_key( |
| FILENAME, custom_key) |
| store1.put(credentials) |
| |
| keys = multistore_file.get_all_credential_keys(FILENAME) |
| self.assertEquals([custom_key], keys) |
| |
| # store more credentials |
| credentials = self._create_test_credentials(client_id='client2') |
| string_key = 'string_key' |
| store2 = multistore_file.get_credential_storage_custom_string_key( |
| FILENAME, string_key) |
| store2.put(credentials) |
| |
| keys = multistore_file.get_all_credential_keys(FILENAME) |
| self.assertEquals(2, len(keys)) |
| self.assertTrue(custom_key in keys) |
| self.assertTrue({'key': string_key} in keys) |
| |
| # back to no keys |
| store1.delete() |
| store2.delete() |
| keys = multistore_file.get_all_credential_keys(FILENAME) |
| self.assertEquals([], keys) |
| |
| def _refresh_data_cache_helper(self): |
| multistore = multistore_file._MultiStore(FILENAME) |
| json_patch = mock.patch.object(multistore, '_locked_json_read') |
| |
| return multistore, json_patch |
| |
| def test__refresh_data_cache_bad_json(self): |
| multistore, json_patch = self._refresh_data_cache_helper() |
| |
| with json_patch as json_mock: |
| json_mock.side_effect = ValueError('') |
| multistore._refresh_data_cache() |
| self.assertTrue(json_mock.called) |
| self.assertEqual(multistore._data, {}) |
| |
| def test__refresh_data_cache_bad_version(self): |
| multistore, json_patch = self._refresh_data_cache_helper() |
| |
| with json_patch as json_mock: |
| json_mock.return_value = {} |
| multistore._refresh_data_cache() |
| self.assertTrue(json_mock.called) |
| self.assertEqual(multistore._data, {}) |
| |
| def test__refresh_data_cache_newer_version(self): |
| multistore, json_patch = self._refresh_data_cache_helper() |
| |
| with json_patch as json_mock: |
| json_mock.return_value = {'file_version': 5} |
| with self.assertRaises(multistore_file.NewerCredentialStoreError): |
| multistore._refresh_data_cache() |
| self.assertTrue(json_mock.called) |
| |
| def test__refresh_data_cache_bad_credentials(self): |
| multistore, json_patch = self._refresh_data_cache_helper() |
| |
| with json_patch as json_mock: |
| json_mock.return_value = { |
| 'file_version': 1, |
| 'data': [ |
| {'lol': 'this is a bad credential object.'} |
| ]} |
| multistore._refresh_data_cache() |
| self.assertTrue(json_mock.called) |
| self.assertEqual(multistore._data, {}) |
| |
| def test__delete_credential_nonexistent(self): |
| multistore = multistore_file._MultiStore(FILENAME) |
| |
| with mock.patch.object(multistore, '_write') as write_mock: |
| multistore._data = {} |
| multistore._delete_credential('nonexistent_key') |
| self.assertTrue(write_mock.called) |