blob: 26e2fc82c943303e938bd3122908957756365ffb [file] [log] [blame]
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2012 Amazon.com, Inc. or its affiliates. All Rights Reserved
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish, dis-
# tribute, sublicense, and/or sell copies of the Software, and to permit
# persons to whom the Software is furnished to do so, subject to the fol-
# lowing conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
#
from tests.compat import mock, unittest
from tests.unit import AWSMockServiceTestCase
from boto.compat import StringIO
from boto.exception import BotoServerError
from boto.s3.connection import S3Connection
from boto.s3.bucket import Bucket
from boto.s3.key import Key
class TestS3Key(AWSMockServiceTestCase):
connection_class = S3Connection
def setUp(self):
super(TestS3Key, self).setUp()
def default_body(self):
return "default body"
def test_unicode_name(self):
k = Key()
k.name = u'Österreich'
print(repr(k))
def test_when_no_restore_header_present(self):
self.set_http_response(status_code=200)
b = Bucket(self.service_connection, 'mybucket')
k = b.get_key('myglacierkey')
self.assertIsNone(k.ongoing_restore)
self.assertIsNone(k.expiry_date)
def test_restore_header_with_ongoing_restore(self):
self.set_http_response(
status_code=200,
header=[('x-amz-restore', 'ongoing-request="true"')])
b = Bucket(self.service_connection, 'mybucket')
k = b.get_key('myglacierkey')
self.assertTrue(k.ongoing_restore)
self.assertIsNone(k.expiry_date)
def test_restore_completed(self):
self.set_http_response(
status_code=200,
header=[('x-amz-restore',
'ongoing-request="false", '
'expiry-date="Fri, 21 Dec 2012 00:00:00 GMT"')])
b = Bucket(self.service_connection, 'mybucket')
k = b.get_key('myglacierkey')
self.assertFalse(k.ongoing_restore)
self.assertEqual(k.expiry_date, 'Fri, 21 Dec 2012 00:00:00 GMT')
def test_delete_key_return_key(self):
self.set_http_response(status_code=204, body='')
b = Bucket(self.service_connection, 'mybucket')
key = b.delete_key('fookey')
self.assertIsNotNone(key)
def test_storage_class(self):
self.set_http_response(status_code=200)
b = Bucket(self.service_connection, 'mybucket')
k = b.get_key('fookey')
# Mock out the bucket object - we really only care about calls
# to list.
k.bucket = mock.MagicMock()
# Default behavior doesn't call list
k.set_contents_from_string('test')
k.bucket.list.assert_not_called()
# Direct access calls list to get the real value if unset,
# and still defaults to STANDARD if unavailable.
sc_value = k.storage_class
self.assertEqual(sc_value, 'STANDARD')
k.bucket.list.assert_called_with(k.name.encode('utf-8'))
k.bucket.list.reset_mock()
# Setting manually doesn't call list
k.storage_class = 'GLACIER'
k.set_contents_from_string('test')
k.bucket.list.assert_not_called()
def test_change_storage_class(self):
self.set_http_response(status_code=200)
b = Bucket(self.service_connection, 'mybucket')
k = b.get_key('fookey')
# Mock out Key.copy so we can record calls to it
k.copy = mock.MagicMock()
# Mock out the bucket so we don't actually need to have fake responses
k.bucket = mock.MagicMock()
k.bucket.name = 'mybucket'
self.assertEqual(k.storage_class, 'STANDARD')
# The default change_storage_class call should result in a copy to our
# bucket
k.change_storage_class('REDUCED_REDUNDANCY')
k.copy.assert_called_with(
'mybucket',
'fookey',
reduced_redundancy=True,
preserve_acl=True,
validate_dst_bucket=True,
)
def test_change_storage_class_new_bucket(self):
self.set_http_response(status_code=200)
b = Bucket(self.service_connection, 'mybucket')
k = b.get_key('fookey')
# Mock out Key.copy so we can record calls to it
k.copy = mock.MagicMock()
# Mock out the bucket so we don't actually need to have fake responses
k.bucket = mock.MagicMock()
k.bucket.name = 'mybucket'
self.assertEqual(k.storage_class, 'STANDARD')
# Specifying a different dst_bucket should result in a copy to the new
# bucket
k.copy.reset_mock()
k.change_storage_class('REDUCED_REDUNDANCY', dst_bucket='yourbucket')
k.copy.assert_called_with(
'yourbucket',
'fookey',
reduced_redundancy=True,
preserve_acl=True,
validate_dst_bucket=True,
)
def counter(fn):
def _wrapper(*args, **kwargs):
_wrapper.count += 1
return fn(*args, **kwargs)
_wrapper.count = 0
return _wrapper
class TestS3KeyRetries(AWSMockServiceTestCase):
connection_class = S3Connection
@mock.patch('time.sleep')
def test_500_retry(self, sleep_mock):
self.set_http_response(status_code=500)
b = Bucket(self.service_connection, 'mybucket')
k = b.new_key('test_failure')
fail_file = StringIO('This will attempt to retry.')
with self.assertRaises(BotoServerError):
k.send_file(fail_file)
@mock.patch('time.sleep')
def test_400_timeout(self, sleep_mock):
weird_timeout_body = "<Error><Code>RequestTimeout</Code></Error>"
self.set_http_response(status_code=400, body=weird_timeout_body)
b = Bucket(self.service_connection, 'mybucket')
k = b.new_key('test_failure')
fail_file = StringIO('This will pretend to be chunk-able.')
k.should_retry = counter(k.should_retry)
self.assertEqual(k.should_retry.count, 0)
with self.assertRaises(BotoServerError):
k.send_file(fail_file)
self.assertTrue(k.should_retry.count, 1)
@mock.patch('time.sleep')
def test_502_bad_gateway(self, sleep_mock):
weird_timeout_body = "<Error><Code>BadGateway</Code></Error>"
self.set_http_response(status_code=502, body=weird_timeout_body)
b = Bucket(self.service_connection, 'mybucket')
k = b.new_key('test_failure')
fail_file = StringIO('This will pretend to be chunk-able.')
k.should_retry = counter(k.should_retry)
self.assertEqual(k.should_retry.count, 0)
with self.assertRaises(BotoServerError):
k.send_file(fail_file)
self.assertTrue(k.should_retry.count, 1)
@mock.patch('time.sleep')
def test_504_gateway_timeout(self, sleep_mock):
weird_timeout_body = "<Error><Code>GatewayTimeout</Code></Error>"
self.set_http_response(status_code=504, body=weird_timeout_body)
b = Bucket(self.service_connection, 'mybucket')
k = b.new_key('test_failure')
fail_file = StringIO('This will pretend to be chunk-able.')
k.should_retry = counter(k.should_retry)
self.assertEqual(k.should_retry.count, 0)
with self.assertRaises(BotoServerError):
k.send_file(fail_file)
self.assertTrue(k.should_retry.count, 1)
class TestFileError(unittest.TestCase):
def test_file_error(self):
key = Key()
class CustomException(Exception): pass
key.get_contents_to_file = mock.Mock(
side_effect=CustomException('File blew up!'))
# Ensure our exception gets raised instead of a file or IO error
with self.assertRaises(CustomException):
key.get_contents_to_filename('foo.txt')
if __name__ == '__main__':
unittest.main()