blob: ba5d9830833e1284f7fed9c497304a175bb43e1b [file] [log] [blame]
# Copyright 2010 Google Inc.
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish, dis-
# tribute, sublicense, and/or sell copies of the Software, and to permit
# persons to whom the Software is furnished to do so, subject to the fol-
# lowing conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
"""
Tests of resumable downloads.
"""
import errno
import os
import re
import boto
from boto.s3.resumable_download_handler import get_cur_file_size
from boto.s3.resumable_download_handler import ResumableDownloadHandler
from boto.exception import ResumableTransferDisposition
from boto.exception import ResumableDownloadException
from cb_test_harness import CallbackTestHarness
from tests.integration.gs.testcase import GSTestCase
SMALL_KEY_SIZE = 2 * 1024 # 2 KB.
LARGE_KEY_SIZE = 500 * 1024 # 500 KB.
class ResumableDownloadTests(GSTestCase):
"""Resumable download test suite."""
def make_small_key(self):
small_src_key_as_string = os.urandom(SMALL_KEY_SIZE)
small_src_key = self._MakeKey(data=small_src_key_as_string)
return small_src_key_as_string, small_src_key
def make_tracker_file(self, tmpdir=None):
if not tmpdir:
tmpdir = self._MakeTempDir()
tracker_file = os.path.join(tmpdir, 'tracker')
return tracker_file
def make_dst_fp(self, tmpdir=None):
if not tmpdir:
tmpdir = self._MakeTempDir()
dst_file = os.path.join(tmpdir, 'dstfile')
return open(dst_file, 'w')
def test_non_resumable_download(self):
"""
Tests that non-resumable downloads work
"""
dst_fp = self.make_dst_fp()
small_src_key_as_string, small_src_key = self.make_small_key()
small_src_key.get_contents_to_file(dst_fp)
self.assertEqual(SMALL_KEY_SIZE,
get_cur_file_size(dst_fp))
self.assertEqual(small_src_key_as_string,
small_src_key.get_contents_as_string())
def test_download_without_persistent_tracker(self):
"""
Tests a single resumable download, with no tracker persistence
"""
res_download_handler = ResumableDownloadHandler()
dst_fp = self.make_dst_fp()
small_src_key_as_string, small_src_key = self.make_small_key()
small_src_key.get_contents_to_file(
dst_fp, res_download_handler=res_download_handler)
self.assertEqual(SMALL_KEY_SIZE,
get_cur_file_size(dst_fp))
self.assertEqual(small_src_key_as_string,
small_src_key.get_contents_as_string())
def test_failed_download_with_persistent_tracker(self):
"""
Tests that failed resumable download leaves a correct tracker file
"""
harness = CallbackTestHarness()
tmpdir = self._MakeTempDir()
tracker_file_name = self.make_tracker_file(tmpdir)
dst_fp = self.make_dst_fp(tmpdir)
res_download_handler = ResumableDownloadHandler(
tracker_file_name=tracker_file_name, num_retries=0)
small_src_key_as_string, small_src_key = self.make_small_key()
try:
small_src_key.get_contents_to_file(
dst_fp, cb=harness.call,
res_download_handler=res_download_handler)
self.fail('Did not get expected ResumableDownloadException')
except ResumableDownloadException, e:
# We'll get a ResumableDownloadException at this point because
# of CallbackTestHarness (above). Check that the tracker file was
# created correctly.
self.assertEqual(e.disposition,
ResumableTransferDisposition.ABORT_CUR_PROCESS)
self.assertTrue(os.path.exists(tracker_file_name))
f = open(tracker_file_name)
etag_line = f.readline()
self.assertEquals(etag_line.rstrip('\n'), small_src_key.etag.strip('"\''))
def test_retryable_exception_recovery(self):
"""
Tests handling of a retryable exception
"""
# Test one of the RETRYABLE_EXCEPTIONS.
exception = ResumableDownloadHandler.RETRYABLE_EXCEPTIONS[0]
harness = CallbackTestHarness(exception=exception)
res_download_handler = ResumableDownloadHandler(num_retries=1)
dst_fp = self.make_dst_fp()
small_src_key_as_string, small_src_key = self.make_small_key()
small_src_key.get_contents_to_file(
dst_fp, cb=harness.call,
res_download_handler=res_download_handler)
# Ensure downloaded object has correct content.
self.assertEqual(SMALL_KEY_SIZE,
get_cur_file_size(dst_fp))
self.assertEqual(small_src_key_as_string,
small_src_key.get_contents_as_string())
def test_broken_pipe_recovery(self):
"""
Tests handling of a Broken Pipe (which interacts with an httplib bug)
"""
exception = IOError(errno.EPIPE, "Broken pipe")
harness = CallbackTestHarness(exception=exception)
res_download_handler = ResumableDownloadHandler(num_retries=1)
dst_fp = self.make_dst_fp()
small_src_key_as_string, small_src_key = self.make_small_key()
small_src_key.get_contents_to_file(
dst_fp, cb=harness.call,
res_download_handler=res_download_handler)
# Ensure downloaded object has correct content.
self.assertEqual(SMALL_KEY_SIZE,
get_cur_file_size(dst_fp))
self.assertEqual(small_src_key_as_string,
small_src_key.get_contents_as_string())
def test_non_retryable_exception_handling(self):
"""
Tests resumable download that fails with a non-retryable exception
"""
harness = CallbackTestHarness(
exception=OSError(errno.EACCES, 'Permission denied'))
res_download_handler = ResumableDownloadHandler(num_retries=1)
dst_fp = self.make_dst_fp()
small_src_key_as_string, small_src_key = self.make_small_key()
try:
small_src_key.get_contents_to_file(
dst_fp, cb=harness.call,
res_download_handler=res_download_handler)
self.fail('Did not get expected OSError')
except OSError, e:
# Ensure the error was re-raised.
self.assertEqual(e.errno, 13)
def test_failed_and_restarted_download_with_persistent_tracker(self):
"""
Tests resumable download that fails once and then completes,
with tracker file
"""
harness = CallbackTestHarness()
tmpdir = self._MakeTempDir()
tracker_file_name = self.make_tracker_file(tmpdir)
dst_fp = self.make_dst_fp(tmpdir)
small_src_key_as_string, small_src_key = self.make_small_key()
res_download_handler = ResumableDownloadHandler(
tracker_file_name=tracker_file_name, num_retries=1)
small_src_key.get_contents_to_file(
dst_fp, cb=harness.call,
res_download_handler=res_download_handler)
# Ensure downloaded object has correct content.
self.assertEqual(SMALL_KEY_SIZE,
get_cur_file_size(dst_fp))
self.assertEqual(small_src_key_as_string,
small_src_key.get_contents_as_string())
# Ensure tracker file deleted.
self.assertFalse(os.path.exists(tracker_file_name))
def test_multiple_in_process_failures_then_succeed(self):
"""
Tests resumable download that fails twice in one process, then completes
"""
res_download_handler = ResumableDownloadHandler(num_retries=3)
dst_fp = self.make_dst_fp()
small_src_key_as_string, small_src_key = self.make_small_key()
small_src_key.get_contents_to_file(
dst_fp, res_download_handler=res_download_handler)
# Ensure downloaded object has correct content.
self.assertEqual(SMALL_KEY_SIZE,
get_cur_file_size(dst_fp))
self.assertEqual(small_src_key_as_string,
small_src_key.get_contents_as_string())
def test_multiple_in_process_failures_then_succeed_with_tracker_file(self):
"""
Tests resumable download that fails completely in one process,
then when restarted completes, using a tracker file
"""
# Set up test harness that causes more failures than a single
# ResumableDownloadHandler instance will handle, writing enough data
# before the first failure that some of it survives that process run.
harness = CallbackTestHarness(
fail_after_n_bytes=LARGE_KEY_SIZE/2, num_times_to_fail=2)
larger_src_key_as_string = os.urandom(LARGE_KEY_SIZE)
larger_src_key = self._MakeKey(data=larger_src_key_as_string)
tmpdir = self._MakeTempDir()
tracker_file_name = self.make_tracker_file(tmpdir)
dst_fp = self.make_dst_fp(tmpdir)
res_download_handler = ResumableDownloadHandler(
tracker_file_name=tracker_file_name, num_retries=0)
try:
larger_src_key.get_contents_to_file(
dst_fp, cb=harness.call,
res_download_handler=res_download_handler)
self.fail('Did not get expected ResumableDownloadException')
except ResumableDownloadException, e:
self.assertEqual(e.disposition,
ResumableTransferDisposition.ABORT_CUR_PROCESS)
# Ensure a tracker file survived.
self.assertTrue(os.path.exists(tracker_file_name))
# Try it one more time; this time should succeed.
larger_src_key.get_contents_to_file(
dst_fp, cb=harness.call,
res_download_handler=res_download_handler)
self.assertEqual(LARGE_KEY_SIZE,
get_cur_file_size(dst_fp))
self.assertEqual(larger_src_key_as_string,
larger_src_key.get_contents_as_string())
self.assertFalse(os.path.exists(tracker_file_name))
# Ensure some of the file was downloaded both before and after failure.
self.assertTrue(
len(harness.transferred_seq_before_first_failure) > 1 and
len(harness.transferred_seq_after_first_failure) > 1)
def test_download_with_inital_partial_download_before_failure(self):
"""
Tests resumable download that successfully downloads some content
before it fails, then restarts and completes
"""
# Set up harness to fail download after several hundred KB so download
# server will have saved something before we retry.
harness = CallbackTestHarness(
fail_after_n_bytes=LARGE_KEY_SIZE/2)
larger_src_key_as_string = os.urandom(LARGE_KEY_SIZE)
larger_src_key = self._MakeKey(data=larger_src_key_as_string)
res_download_handler = ResumableDownloadHandler(num_retries=1)
dst_fp = self.make_dst_fp()
larger_src_key.get_contents_to_file(
dst_fp, cb=harness.call,
res_download_handler=res_download_handler)
# Ensure downloaded object has correct content.
self.assertEqual(LARGE_KEY_SIZE,
get_cur_file_size(dst_fp))
self.assertEqual(larger_src_key_as_string,
larger_src_key.get_contents_as_string())
# Ensure some of the file was downloaded both before and after failure.
self.assertTrue(
len(harness.transferred_seq_before_first_failure) > 1 and
len(harness.transferred_seq_after_first_failure) > 1)
def test_zero_length_object_download(self):
"""
Tests downloading a zero-length object (exercises boundary conditions).
"""
res_download_handler = ResumableDownloadHandler()
dst_fp = self.make_dst_fp()
k = self._MakeKey()
k.get_contents_to_file(dst_fp,
res_download_handler=res_download_handler)
self.assertEqual(0, get_cur_file_size(dst_fp))
def test_download_with_invalid_tracker_etag(self):
"""
Tests resumable download with a tracker file containing an invalid etag
"""
tmp_dir = self._MakeTempDir()
dst_fp = self.make_dst_fp(tmp_dir)
small_src_key_as_string, small_src_key = self.make_small_key()
invalid_etag_tracker_file_name = os.path.join(tmp_dir,
'invalid_etag_tracker')
f = open(invalid_etag_tracker_file_name, 'w')
f.write('3.14159\n')
f.close()
res_download_handler = ResumableDownloadHandler(
tracker_file_name=invalid_etag_tracker_file_name)
# An error should be printed about the invalid tracker, but then it
# should run the update successfully.
small_src_key.get_contents_to_file(
dst_fp, res_download_handler=res_download_handler)
self.assertEqual(SMALL_KEY_SIZE, get_cur_file_size(dst_fp))
self.assertEqual(small_src_key_as_string,
small_src_key.get_contents_as_string())
def test_download_with_inconsistent_etag_in_tracker(self):
"""
Tests resumable download with an inconsistent etag in tracker file
"""
tmp_dir = self._MakeTempDir()
dst_fp = self.make_dst_fp(tmp_dir)
small_src_key_as_string, small_src_key = self.make_small_key()
inconsistent_etag_tracker_file_name = os.path.join(tmp_dir,
'inconsistent_etag_tracker')
f = open(inconsistent_etag_tracker_file_name, 'w')
good_etag = small_src_key.etag.strip('"\'')
new_val_as_list = []
for c in reversed(good_etag):
new_val_as_list.append(c)
f.write('%s\n' % ''.join(new_val_as_list))
f.close()
res_download_handler = ResumableDownloadHandler(
tracker_file_name=inconsistent_etag_tracker_file_name)
# An error should be printed about the expired tracker, but then it
# should run the update successfully.
small_src_key.get_contents_to_file(
dst_fp, res_download_handler=res_download_handler)
self.assertEqual(SMALL_KEY_SIZE,
get_cur_file_size(dst_fp))
self.assertEqual(small_src_key_as_string,
small_src_key.get_contents_as_string())
def test_download_with_unwritable_tracker_file(self):
"""
Tests resumable download with an unwritable tracker file
"""
# Make dir where tracker_file lives temporarily unwritable.
tmp_dir = self._MakeTempDir()
tracker_file_name = os.path.join(tmp_dir, 'tracker')
save_mod = os.stat(tmp_dir).st_mode
try:
os.chmod(tmp_dir, 0)
res_download_handler = ResumableDownloadHandler(
tracker_file_name=tracker_file_name)
except ResumableDownloadException, e:
self.assertEqual(e.disposition, ResumableTransferDisposition.ABORT)
self.assertNotEqual(
e.message.find('Couldn\'t write URI tracker file'), -1)
finally:
# Restore original protection of dir where tracker_file lives.
os.chmod(tmp_dir, save_mod)