| # -*- coding: utf-8 -*- |
| # Copyright 2013 Google Inc. All Rights Reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """Integration tests for cp command.""" |
| |
| from __future__ import absolute_import |
| |
| import base64 |
| import binascii |
| import datetime |
| import httplib |
| import logging |
| import os |
| import pickle |
| import pkgutil |
| import random |
| import re |
| import string |
| import sys |
| |
| from apitools.base.py import exceptions as apitools_exceptions |
| import boto |
| from boto import storage_uri |
| from boto.exception import ResumableTransferDisposition |
| from boto.exception import ResumableUploadException |
| from boto.exception import StorageResponseError |
| from boto.storage_uri import BucketStorageUri |
| |
| from gslib.cloud_api import ResumableDownloadException |
| from gslib.cloud_api import ResumableUploadException |
| from gslib.cloud_api import ResumableUploadStartOverException |
| from gslib.copy_helper import GetTrackerFilePath |
| from gslib.copy_helper import TrackerFileType |
| from gslib.cs_api_map import ApiSelector |
| from gslib.gcs_json_api import GcsJsonApi |
| from gslib.hashing_helper import CalculateMd5FromContents |
| from gslib.storage_url import StorageUrlFromString |
| import gslib.tests.testcase as testcase |
| from gslib.tests.testcase.base import NotParallelizable |
| from gslib.tests.testcase.integration_testcase import SkipForS3 |
| from gslib.tests.util import GenerationFromURI as urigen |
| from gslib.tests.util import HAS_S3_CREDS |
| from gslib.tests.util import ObjectToURI as suri |
| from gslib.tests.util import PerformsFileToObjectUpload |
| from gslib.tests.util import SetBotoConfigForTest |
| from gslib.tests.util import unittest |
| from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages |
| from gslib.tracker_file import DeleteTrackerFile |
| from gslib.tracker_file import GetRewriteTrackerFilePath |
| from gslib.util import EIGHT_MIB |
| from gslib.util import IS_WINDOWS |
| from gslib.util import MakeHumanReadable |
| from gslib.util import ONE_KIB |
| from gslib.util import ONE_MIB |
| from gslib.util import Retry |
| from gslib.util import START_CALLBACK_PER_BYTES |
| from gslib.util import UTF8 |
| |
| |
| # Custom test callbacks must be pickleable, and therefore at global scope. |
| class _HaltingCopyCallbackHandler(object): |
| """Test callback handler for intentionally stopping a resumable transfer.""" |
| |
| def __init__(self, is_upload, halt_at_byte): |
| self._is_upload = is_upload |
| self._halt_at_byte = halt_at_byte |
| |
| # pylint: disable=invalid-name |
| def call(self, total_bytes_transferred, total_size): |
| """Forcibly exits if the transfer has passed the halting point.""" |
| if total_bytes_transferred >= self._halt_at_byte: |
| sys.stderr.write( |
| 'Halting transfer after byte %s. %s/%s transferred.\r\n' % ( |
| self._halt_at_byte, MakeHumanReadable(total_bytes_transferred), |
| MakeHumanReadable(total_size))) |
| if self._is_upload: |
| raise ResumableUploadException('Artifically halting upload.') |
| else: |
| raise ResumableDownloadException('Artifically halting download.') |
| |
| |
| class _JSONForceHTTPErrorCopyCallbackHandler(object): |
| """Test callback handler that raises an arbitrary HTTP error exception.""" |
| |
| def __init__(self, startover_at_byte, http_error_num): |
| self._startover_at_byte = startover_at_byte |
| self._http_error_num = http_error_num |
| self.started_over_once = False |
| |
| # pylint: disable=invalid-name |
| def call(self, total_bytes_transferred, total_size): |
| """Forcibly exits if the transfer has passed the halting point.""" |
| if (total_bytes_transferred >= self._startover_at_byte |
| and not self.started_over_once): |
| sys.stderr.write( |
| 'Forcing HTTP error %s after byte %s. ' |
| '%s/%s transferred.\r\n' % ( |
| self._http_error_num, |
| self._startover_at_byte, |
| MakeHumanReadable(total_bytes_transferred), |
| MakeHumanReadable(total_size))) |
| self.started_over_once = True |
| raise apitools_exceptions.HttpError( |
| {'status': self._http_error_num}, None, None) |
| |
| |
| class _XMLResumableUploadStartOverCopyCallbackHandler(object): |
| """Test callback handler that raises start-over exception during upload.""" |
| |
| def __init__(self, startover_at_byte): |
| self._startover_at_byte = startover_at_byte |
| self.started_over_once = False |
| |
| # pylint: disable=invalid-name |
| def call(self, total_bytes_transferred, total_size): |
| """Forcibly exits if the transfer has passed the halting point.""" |
| if (total_bytes_transferred >= self._startover_at_byte |
| and not self.started_over_once): |
| sys.stderr.write( |
| 'Forcing ResumableUpload start over error after byte %s. ' |
| '%s/%s transferred.\r\n' % ( |
| self._startover_at_byte, |
| MakeHumanReadable(total_bytes_transferred), |
| MakeHumanReadable(total_size))) |
| self.started_over_once = True |
| raise boto.exception.ResumableUploadException( |
| 'Forcing upload start over', |
| ResumableTransferDisposition.START_OVER) |
| |
| |
| class _DeleteBucketThenStartOverCopyCallbackHandler(object): |
| """Test callback handler that deletes bucket then raises start-over.""" |
| |
| def __init__(self, startover_at_byte, bucket_uri): |
| self._startover_at_byte = startover_at_byte |
| self._bucket_uri = bucket_uri |
| self.started_over_once = False |
| |
| # pylint: disable=invalid-name |
| def call(self, total_bytes_transferred, total_size): |
| """Forcibly exits if the transfer has passed the halting point.""" |
| if (total_bytes_transferred >= self._startover_at_byte |
| and not self.started_over_once): |
| sys.stderr.write('Deleting bucket (%s)' %(self._bucket_uri.bucket_name)) |
| |
| @Retry(StorageResponseError, tries=5, timeout_secs=1) |
| def DeleteBucket(): |
| bucket_list = list(self._bucket_uri.list_bucket(all_versions=True)) |
| for k in bucket_list: |
| self._bucket_uri.get_bucket().delete_key(k.name, |
| version_id=k.version_id) |
| self._bucket_uri.delete_bucket() |
| |
| DeleteBucket() |
| sys.stderr.write( |
| 'Forcing ResumableUpload start over error after byte %s. ' |
| '%s/%s transferred.\r\n' % ( |
| self._startover_at_byte, |
| MakeHumanReadable(total_bytes_transferred), |
| MakeHumanReadable(total_size))) |
| self.started_over_once = True |
| raise ResumableUploadStartOverException( |
| 'Artificially forcing start-over') |
| |
| |
| class _RewriteHaltException(Exception): |
| pass |
| |
| |
| class _HaltingRewriteCallbackHandler(object): |
| """Test callback handler for intentionally stopping a rewrite operation.""" |
| |
| def __init__(self, halt_at_byte): |
| self._halt_at_byte = halt_at_byte |
| |
| # pylint: disable=invalid-name |
| def call(self, total_bytes_rewritten, unused_total_size): |
| """Forcibly exits if the operation has passed the halting point.""" |
| if total_bytes_rewritten >= self._halt_at_byte: |
| raise _RewriteHaltException('Artificially halting rewrite') |
| |
| |
| class _EnsureRewriteResumeCallbackHandler(object): |
| """Test callback handler for ensuring a rewrite operation resumed.""" |
| |
| def __init__(self, required_byte): |
| self._required_byte = required_byte |
| |
| # pylint: disable=invalid-name |
| def call(self, total_bytes_rewritten, unused_total_size): |
| """Forcibly exits if the operation has passed the halting point.""" |
| if total_bytes_rewritten <= self._required_byte: |
| raise _RewriteHaltException( |
| 'Rewrite did not resume; %s bytes written, but %s bytes should ' |
| 'have already been written.' % (total_bytes_rewritten, |
| self._required_byte)) |
| |
| |
| class _ResumableUploadRetryHandler(object): |
| """Test callback handler for causing retries during a resumable transfer.""" |
| |
| def __init__(self, retry_at_byte, exception_to_raise, exc_args, |
| num_retries=1): |
| self._retry_at_byte = retry_at_byte |
| self._exception_to_raise = exception_to_raise |
| self._exception_args = exc_args |
| self._num_retries = num_retries |
| |
| self._retries_made = 0 |
| |
| # pylint: disable=invalid-name |
| def call(self, total_bytes_transferred, unused_total_size): |
| """Cause a single retry at the retry point.""" |
| if (total_bytes_transferred >= self._retry_at_byte |
| and self._retries_made < self._num_retries): |
| self._retries_made += 1 |
| raise self._exception_to_raise(*self._exception_args) |
| |
| |
| class TestCp(testcase.GsUtilIntegrationTestCase): |
| """Integration tests for cp command.""" |
| |
| # For tests that artificially halt, we need to ensure at least one callback |
| # occurs. |
| halt_size = START_CALLBACK_PER_BYTES * 2 |
| |
| def _get_test_file(self, name): |
| contents = pkgutil.get_data('gslib', 'tests/test_data/%s' % name) |
| return self.CreateTempFile(file_name=name, contents=contents) |
| |
| @PerformsFileToObjectUpload |
| def test_noclobber(self): |
| key_uri = self.CreateObject(contents='foo') |
| fpath = self.CreateTempFile(contents='bar') |
| stderr = self.RunGsUtil(['cp', '-n', fpath, suri(key_uri)], |
| return_stderr=True) |
| self.assertIn('Skipping existing item: %s' % suri(key_uri), stderr) |
| self.assertEqual(key_uri.get_contents_as_string(), 'foo') |
| stderr = self.RunGsUtil(['cp', '-n', suri(key_uri), fpath], |
| return_stderr=True) |
| with open(fpath, 'r') as f: |
| self.assertIn('Skipping existing item: %s' % suri(f), stderr) |
| self.assertEqual(f.read(), 'bar') |
| |
| def test_dest_bucket_not_exist(self): |
| fpath = self.CreateTempFile(contents='foo') |
| invalid_bucket_uri = ( |
| '%s://%s' % (self.default_provider, self.nonexistent_bucket_name)) |
| stderr = self.RunGsUtil(['cp', fpath, invalid_bucket_uri], |
| expected_status=1, return_stderr=True) |
| self.assertIn('does not exist.', stderr) |
| |
| def test_copy_in_cloud_noclobber(self): |
| bucket1_uri = self.CreateBucket() |
| bucket2_uri = self.CreateBucket() |
| key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo') |
| stderr = self.RunGsUtil(['cp', suri(key_uri), suri(bucket2_uri)], |
| return_stderr=True) |
| # Rewrite API may output an additional 'Copying' progress notification. |
| self.assertGreaterEqual(stderr.count('Copying'), 1) |
| self.assertLessEqual(stderr.count('Copying'), 2) |
| stderr = self.RunGsUtil(['cp', '-n', suri(key_uri), suri(bucket2_uri)], |
| return_stderr=True) |
| self.assertIn('Skipping existing item: %s' % |
| suri(bucket2_uri, key_uri.object_name), stderr) |
| |
| @PerformsFileToObjectUpload |
| def test_streaming(self): |
| bucket_uri = self.CreateBucket() |
| stderr = self.RunGsUtil(['cp', '-', '%s' % suri(bucket_uri, 'foo')], |
| stdin='bar', return_stderr=True) |
| self.assertIn('Copying from <STDIN>', stderr) |
| key_uri = bucket_uri.clone_replace_name('foo') |
| self.assertEqual(key_uri.get_contents_as_string(), 'bar') |
| |
| def test_streaming_multiple_arguments(self): |
| bucket_uri = self.CreateBucket() |
| stderr = self.RunGsUtil(['cp', '-', '-', suri(bucket_uri)], |
| stdin='bar', return_stderr=True, expected_status=1) |
| self.assertIn('Multiple URL strings are not supported with streaming', |
| stderr) |
| |
| # TODO: Implement a way to test both with and without using magic file. |
| |
| @PerformsFileToObjectUpload |
| def test_detect_content_type(self): |
| """Tests local detection of content type.""" |
| bucket_uri = self.CreateBucket() |
| dsturi = suri(bucket_uri, 'foo') |
| |
| self.RunGsUtil(['cp', self._get_test_file('test.mp3'), dsturi]) |
| |
| # Use @Retry as hedge against bucket listing eventual consistency. |
| @Retry(AssertionError, tries=3, timeout_secs=1) |
| def _Check1(): |
| stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) |
| if IS_WINDOWS: |
| self.assertTrue( |
| re.search(r'Content-Type:\s+audio/x-mpg', stdout) or |
| re.search(r'Content-Type:\s+audio/mpeg', stdout)) |
| else: |
| self.assertRegexpMatches(stdout, r'Content-Type:\s+audio/mpeg') |
| _Check1() |
| |
| self.RunGsUtil(['cp', self._get_test_file('test.gif'), dsturi]) |
| |
| # Use @Retry as hedge against bucket listing eventual consistency. |
| @Retry(AssertionError, tries=3, timeout_secs=1) |
| def _Check2(): |
| stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) |
| self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif') |
| _Check2() |
| |
| def test_content_type_override_default(self): |
| """Tests overriding content type with the default value.""" |
| bucket_uri = self.CreateBucket() |
| dsturi = suri(bucket_uri, 'foo') |
| |
| self.RunGsUtil(['-h', 'Content-Type:', 'cp', |
| self._get_test_file('test.mp3'), dsturi]) |
| |
| # Use @Retry as hedge against bucket listing eventual consistency. |
| @Retry(AssertionError, tries=3, timeout_secs=1) |
| def _Check1(): |
| stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) |
| self.assertRegexpMatches(stdout, |
| r'Content-Type:\s+application/octet-stream') |
| _Check1() |
| |
| self.RunGsUtil(['-h', 'Content-Type:', 'cp', |
| self._get_test_file('test.gif'), dsturi]) |
| |
| # Use @Retry as hedge against bucket listing eventual consistency. |
| @Retry(AssertionError, tries=3, timeout_secs=1) |
| def _Check2(): |
| stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) |
| self.assertRegexpMatches(stdout, |
| r'Content-Type:\s+application/octet-stream') |
| _Check2() |
| |
| def test_content_type_override(self): |
| """Tests overriding content type with a value.""" |
| bucket_uri = self.CreateBucket() |
| dsturi = suri(bucket_uri, 'foo') |
| |
| self.RunGsUtil(['-h', 'Content-Type:text/plain', 'cp', |
| self._get_test_file('test.mp3'), dsturi]) |
| |
| # Use @Retry as hedge against bucket listing eventual consistency. |
| @Retry(AssertionError, tries=3, timeout_secs=1) |
| def _Check1(): |
| stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) |
| self.assertRegexpMatches(stdout, r'Content-Type:\s+text/plain') |
| _Check1() |
| |
| self.RunGsUtil(['-h', 'Content-Type:text/plain', 'cp', |
| self._get_test_file('test.gif'), dsturi]) |
| |
| # Use @Retry as hedge against bucket listing eventual consistency. |
| @Retry(AssertionError, tries=3, timeout_secs=1) |
| def _Check2(): |
| stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) |
| self.assertRegexpMatches(stdout, r'Content-Type:\s+text/plain') |
| _Check2() |
| |
| @unittest.skipIf(IS_WINDOWS, 'magicfile is not available on Windows.') |
| @PerformsFileToObjectUpload |
| def test_magicfile_override(self): |
| """Tests content type override with magicfile value.""" |
| bucket_uri = self.CreateBucket() |
| dsturi = suri(bucket_uri, 'foo') |
| fpath = self.CreateTempFile(contents='foo/bar\n') |
| self.RunGsUtil(['cp', fpath, dsturi]) |
| |
| # Use @Retry as hedge against bucket listing eventual consistency. |
| @Retry(AssertionError, tries=3, timeout_secs=1) |
| def _Check1(): |
| stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) |
| use_magicfile = boto.config.getbool('GSUtil', 'use_magicfile', False) |
| content_type = ('text/plain' if use_magicfile |
| else 'application/octet-stream') |
| self.assertRegexpMatches(stdout, r'Content-Type:\s+%s' % content_type) |
| _Check1() |
| |
| @PerformsFileToObjectUpload |
| def test_content_type_mismatches(self): |
| """Tests overriding content type when it does not match the file type.""" |
| bucket_uri = self.CreateBucket() |
| dsturi = suri(bucket_uri, 'foo') |
| fpath = self.CreateTempFile(contents='foo/bar\n') |
| |
| self.RunGsUtil(['-h', 'Content-Type:image/gif', 'cp', |
| self._get_test_file('test.mp3'), dsturi]) |
| |
| # Use @Retry as hedge against bucket listing eventual consistency. |
| @Retry(AssertionError, tries=3, timeout_secs=1) |
| def _Check1(): |
| stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) |
| self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif') |
| _Check1() |
| |
| self.RunGsUtil(['-h', 'Content-Type:image/gif', 'cp', |
| self._get_test_file('test.gif'), dsturi]) |
| |
| # Use @Retry as hedge against bucket listing eventual consistency. |
| @Retry(AssertionError, tries=3, timeout_secs=1) |
| def _Check2(): |
| stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) |
| self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif') |
| _Check2() |
| |
| self.RunGsUtil(['-h', 'Content-Type:image/gif', 'cp', fpath, dsturi]) |
| |
| # Use @Retry as hedge against bucket listing eventual consistency. |
| @Retry(AssertionError, tries=3, timeout_secs=1) |
| def _Check3(): |
| stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) |
| self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif') |
| _Check3() |
| |
| @PerformsFileToObjectUpload |
| def test_content_type_header_case_insensitive(self): |
| """Tests that content type header is treated with case insensitivity.""" |
| bucket_uri = self.CreateBucket() |
| dsturi = suri(bucket_uri, 'foo') |
| fpath = self._get_test_file('test.gif') |
| |
| self.RunGsUtil(['-h', 'content-Type:text/plain', 'cp', |
| fpath, dsturi]) |
| |
| # Use @Retry as hedge against bucket listing eventual consistency. |
| @Retry(AssertionError, tries=3, timeout_secs=1) |
| def _Check1(): |
| stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) |
| self.assertRegexpMatches(stdout, r'Content-Type:\s+text/plain') |
| self.assertNotRegexpMatches(stdout, r'image/gif') |
| _Check1() |
| |
| self.RunGsUtil(['-h', 'CONTENT-TYPE:image/gif', |
| '-h', 'content-type:image/gif', |
| 'cp', fpath, dsturi]) |
| |
| # Use @Retry as hedge against bucket listing eventual consistency. |
| @Retry(AssertionError, tries=3, timeout_secs=1) |
| def _Check2(): |
| stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True) |
| self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif') |
| self.assertNotRegexpMatches(stdout, r'image/gif,\s*image/gif') |
| _Check2() |
| |
| @PerformsFileToObjectUpload |
| def test_other_headers(self): |
| """Tests that non-content-type headers are applied successfully on copy.""" |
| bucket_uri = self.CreateBucket() |
| dst_uri = suri(bucket_uri, 'foo') |
| fpath = self._get_test_file('test.gif') |
| |
| self.RunGsUtil(['-h', 'Cache-Control:public,max-age=12', |
| '-h', 'x-%s-meta-1:abcd' % self.provider_custom_meta, 'cp', |
| fpath, dst_uri]) |
| |
| stdout = self.RunGsUtil(['ls', '-L', dst_uri], return_stdout=True) |
| self.assertRegexpMatches(stdout, r'Cache-Control\s*:\s*public,max-age=12') |
| self.assertRegexpMatches(stdout, r'Metadata:\s*1:\s*abcd') |
| |
| dst_uri2 = suri(bucket_uri, 'bar') |
| self.RunGsUtil(['cp', dst_uri, dst_uri2]) |
| # Ensure metadata was preserved across copy. |
| stdout = self.RunGsUtil(['ls', '-L', dst_uri2], return_stdout=True) |
| self.assertRegexpMatches(stdout, r'Cache-Control\s*:\s*public,max-age=12') |
| self.assertRegexpMatches(stdout, r'Metadata:\s*1:\s*abcd') |
| |
| @PerformsFileToObjectUpload |
| def test_versioning(self): |
| """Tests copy with versioning.""" |
| bucket_uri = self.CreateVersionedBucket() |
| k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data2') |
| k2_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data1') |
| g1 = urigen(k2_uri) |
| self.RunGsUtil(['cp', suri(k1_uri), suri(k2_uri)]) |
| k2_uri = bucket_uri.clone_replace_name(k2_uri.object_name) |
| k2_uri = bucket_uri.clone_replace_key(k2_uri.get_key()) |
| g2 = urigen(k2_uri) |
| k2_uri.set_contents_from_string('data3') |
| g3 = urigen(k2_uri) |
| |
| fpath = self.CreateTempFile() |
| # Check to make sure current version is data3. |
| self.RunGsUtil(['cp', k2_uri.versionless_uri, fpath]) |
| with open(fpath, 'r') as f: |
| self.assertEqual(f.read(), 'data3') |
| |
| # Check contents of all three versions |
| self.RunGsUtil(['cp', '%s#%s' % (k2_uri.versionless_uri, g1), fpath]) |
| with open(fpath, 'r') as f: |
| self.assertEqual(f.read(), 'data1') |
| self.RunGsUtil(['cp', '%s#%s' % (k2_uri.versionless_uri, g2), fpath]) |
| with open(fpath, 'r') as f: |
| self.assertEqual(f.read(), 'data2') |
| self.RunGsUtil(['cp', '%s#%s' % (k2_uri.versionless_uri, g3), fpath]) |
| with open(fpath, 'r') as f: |
| self.assertEqual(f.read(), 'data3') |
| |
| # Copy first version to current and verify. |
| self.RunGsUtil(['cp', '%s#%s' % (k2_uri.versionless_uri, g1), |
| k2_uri.versionless_uri]) |
| self.RunGsUtil(['cp', k2_uri.versionless_uri, fpath]) |
| with open(fpath, 'r') as f: |
| self.assertEqual(f.read(), 'data1') |
| |
| # Attempt to specify a version-specific URI for destination. |
| stderr = self.RunGsUtil(['cp', fpath, k2_uri.uri], return_stderr=True, |
| expected_status=1) |
| self.assertIn('cannot be the destination for gsutil cp', stderr) |
| |
| @SkipForS3('S3 lists versioned objects in reverse timestamp order.') |
| def test_recursive_copying_versioned_bucket(self): |
| """Tests that cp -R with versioned buckets copies all versions in order.""" |
| bucket1_uri = self.CreateVersionedBucket() |
| bucket2_uri = self.CreateVersionedBucket() |
| |
| # Write two versions of an object to the bucket1. |
| self.CreateObject(bucket_uri=bucket1_uri, object_name='k', contents='data0') |
| self.CreateObject(bucket_uri=bucket1_uri, object_name='k', |
| contents='longer_data1') |
| |
| self.AssertNObjectsInBucket(bucket1_uri, 2, versioned=True) |
| self.AssertNObjectsInBucket(bucket2_uri, 0, versioned=True) |
| |
| # Recursively copy to second versioned bucket. |
| self.RunGsUtil(['cp', '-R', suri(bucket1_uri, '*'), suri(bucket2_uri)]) |
| |
| # Use @Retry as hedge against bucket listing eventual consistency. |
| @Retry(AssertionError, tries=3, timeout_secs=1) |
| def _Check2(): |
| """Validates the results of the cp -R.""" |
| listing1 = self.RunGsUtil(['ls', '-la', suri(bucket1_uri)], |
| return_stdout=True).split('\n') |
| listing2 = self.RunGsUtil(['ls', '-la', suri(bucket2_uri)], |
| return_stdout=True).split('\n') |
| # 2 lines of listing output, 1 summary line, 1 empty line from \n split. |
| self.assertEquals(len(listing1), 4) |
| self.assertEquals(len(listing2), 4) |
| |
| # First object in each bucket should match in size and version-less name. |
| size1, _, uri_str1, _ = listing1[0].split() |
| self.assertEquals(size1, str(len('data0'))) |
| self.assertEquals(storage_uri(uri_str1).object_name, 'k') |
| size2, _, uri_str2, _ = listing2[0].split() |
| self.assertEquals(size2, str(len('data0'))) |
| self.assertEquals(storage_uri(uri_str2).object_name, 'k') |
| |
| # Similarly for second object in each bucket. |
| size1, _, uri_str1, _ = listing1[1].split() |
| self.assertEquals(size1, str(len('longer_data1'))) |
| self.assertEquals(storage_uri(uri_str1).object_name, 'k') |
| size2, _, uri_str2, _ = listing2[1].split() |
| self.assertEquals(size2, str(len('longer_data1'))) |
| self.assertEquals(storage_uri(uri_str2).object_name, 'k') |
| _Check2() |
| |
| @PerformsFileToObjectUpload |
| @SkipForS3('Preconditions not supported for S3.') |
| def test_cp_generation_zero_match(self): |
| """Tests that cp handles an object-not-exists precondition header.""" |
| bucket_uri = self.CreateBucket() |
| fpath1 = self.CreateTempFile(contents='data1') |
| # Match 0 means only write the object if it doesn't already exist. |
| gen_match_header = 'x-goog-if-generation-match:0' |
| |
| # First copy should succeed. |
| # TODO: This can fail (rarely) if the server returns a 5xx but actually |
| # commits the bytes. If we add restarts on small uploads, handle this |
| # case. |
| self.RunGsUtil(['-h', gen_match_header, 'cp', fpath1, suri(bucket_uri)]) |
| |
| # Second copy should fail with a precondition error. |
| stderr = self.RunGsUtil(['-h', gen_match_header, 'cp', fpath1, |
| suri(bucket_uri)], |
| return_stderr=True, expected_status=1) |
| self.assertIn('PreconditionException', stderr) |
| |
| @PerformsFileToObjectUpload |
| @SkipForS3('Preconditions not supported for S3.') |
| def test_cp_v_generation_match(self): |
| """Tests that cp -v option handles the if-generation-match header.""" |
| bucket_uri = self.CreateVersionedBucket() |
| k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data1') |
| g1 = k1_uri.generation |
| |
| tmpdir = self.CreateTempDir() |
| fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents='data2') |
| |
| gen_match_header = 'x-goog-if-generation-match:%s' % g1 |
| # First copy should succeed. |
| self.RunGsUtil(['-h', gen_match_header, 'cp', fpath1, suri(k1_uri)]) |
| |
| # Second copy should fail the precondition. |
| stderr = self.RunGsUtil(['-h', gen_match_header, 'cp', fpath1, |
| suri(k1_uri)], |
| return_stderr=True, expected_status=1) |
| |
| self.assertIn('PreconditionException', stderr) |
| |
| # Specifiying a generation with -n should fail before the request hits the |
| # server. |
| stderr = self.RunGsUtil(['-h', gen_match_header, 'cp', '-n', fpath1, |
| suri(k1_uri)], |
| return_stderr=True, expected_status=1) |
| |
| self.assertIn('ArgumentException', stderr) |
| self.assertIn('Specifying x-goog-if-generation-match is not supported ' |
| 'with cp -n', stderr) |
| |
| @PerformsFileToObjectUpload |
| def test_cp_nv(self): |
| """Tests that cp -nv works when skipping existing file.""" |
| bucket_uri = self.CreateVersionedBucket() |
| k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data1') |
| |
| tmpdir = self.CreateTempDir() |
| fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents='data2') |
| |
| # First copy should succeed. |
| self.RunGsUtil(['cp', '-nv', fpath1, suri(k1_uri)]) |
| |
| # Second copy should skip copying. |
| stderr = self.RunGsUtil(['cp', '-nv', fpath1, suri(k1_uri)], |
| return_stderr=True) |
| self.assertIn('Skipping existing item:', stderr) |
| |
| @PerformsFileToObjectUpload |
| @SkipForS3('S3 lists versioned objects in reverse timestamp order.') |
| def test_cp_v_option(self): |
| """"Tests that cp -v returns the created object's version-specific URI.""" |
| bucket_uri = self.CreateVersionedBucket() |
| k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data1') |
| k2_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data2') |
| |
| # Case 1: Upload file to object using one-shot PUT. |
| tmpdir = self.CreateTempDir() |
| fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents='data1') |
| self._run_cp_minus_v_test('-v', fpath1, k2_uri.uri) |
| |
| # Case 2: Upload file to object using resumable upload. |
| size_threshold = ONE_KIB |
| boto_config_for_test = ('GSUtil', 'resumable_threshold', |
| str(size_threshold)) |
| with SetBotoConfigForTest([boto_config_for_test]): |
| file_as_string = os.urandom(size_threshold) |
| tmpdir = self.CreateTempDir() |
| fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents=file_as_string) |
| self._run_cp_minus_v_test('-v', fpath1, k2_uri.uri) |
| |
| # Case 3: Upload stream to object. |
| self._run_cp_minus_v_test('-v', '-', k2_uri.uri) |
| |
| # Case 4: Download object to file. For this case we just expect output of |
| # gsutil cp -v to be the URI of the file. |
| tmpdir = self.CreateTempDir() |
| fpath1 = self.CreateTempFile(tmpdir=tmpdir) |
| dst_uri = storage_uri(fpath1) |
| stderr = self.RunGsUtil(['cp', '-v', suri(k1_uri), suri(dst_uri)], |
| return_stderr=True) |
| self.assertIn('Created: %s' % dst_uri.uri, stderr.split('\n')[-2]) |
| |
| # Case 5: Daisy-chain from object to object. |
| self._run_cp_minus_v_test('-Dv', k1_uri.uri, k2_uri.uri) |
| |
| # Case 6: Copy object to object in-the-cloud. |
| self._run_cp_minus_v_test('-v', k1_uri.uri, k2_uri.uri) |
| |
| def _run_cp_minus_v_test(self, opt, src_str, dst_str): |
| """Runs cp -v with the options and validates the results.""" |
| stderr = self.RunGsUtil(['cp', opt, src_str, dst_str], return_stderr=True) |
| match = re.search(r'Created: (.*)\n', stderr) |
| self.assertIsNotNone(match) |
| created_uri = match.group(1) |
| |
| # Use @Retry as hedge against bucket listing eventual consistency. |
| @Retry(AssertionError, tries=3, timeout_secs=1) |
| def _Check1(): |
| stdout = self.RunGsUtil(['ls', '-a', dst_str], return_stdout=True) |
| lines = stdout.split('\n') |
| # Final (most recent) object should match the "Created:" URI. This is |
| # in second-to-last line (last line is '\n'). |
| self.assertGreater(len(lines), 2) |
| self.assertEqual(created_uri, lines[-2]) |
| _Check1() |
| |
| @PerformsFileToObjectUpload |
| def test_stdin_args(self): |
| """Tests cp with the -I option.""" |
| tmpdir = self.CreateTempDir() |
| fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents='data1') |
| fpath2 = self.CreateTempFile(tmpdir=tmpdir, contents='data2') |
| bucket_uri = self.CreateBucket() |
| self.RunGsUtil(['cp', '-I', suri(bucket_uri)], |
| stdin='\n'.join((fpath1, fpath2))) |
| |
| # Use @Retry as hedge against bucket listing eventual consistency. |
| @Retry(AssertionError, tries=3, timeout_secs=1) |
| def _Check1(): |
| stdout = self.RunGsUtil(['ls', suri(bucket_uri)], return_stdout=True) |
| self.assertIn(os.path.basename(fpath1), stdout) |
| self.assertIn(os.path.basename(fpath2), stdout) |
| self.assertNumLines(stdout, 2) |
| _Check1() |
| |
| def test_cross_storage_class_cloud_cp(self): |
| bucket1_uri = self.CreateBucket(storage_class='STANDARD') |
| bucket2_uri = self.CreateBucket( |
| storage_class='DURABLE_REDUCED_AVAILABILITY') |
| key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo') |
| # Server now allows copy-in-the-cloud across storage classes. |
| self.RunGsUtil(['cp', suri(key_uri), suri(bucket2_uri)]) |
| |
| @unittest.skipUnless(HAS_S3_CREDS, 'Test requires both S3 and GS credentials') |
| def test_cross_provider_cp(self): |
| s3_bucket = self.CreateBucket(provider='s3') |
| gs_bucket = self.CreateBucket(provider='gs') |
| s3_key = self.CreateObject(bucket_uri=s3_bucket, contents='foo') |
| gs_key = self.CreateObject(bucket_uri=gs_bucket, contents='bar') |
| self.RunGsUtil(['cp', suri(s3_key), suri(gs_bucket)]) |
| self.RunGsUtil(['cp', suri(gs_key), suri(s3_bucket)]) |
| |
| @unittest.skipUnless(HAS_S3_CREDS, 'Test requires both S3 and GS credentials') |
| @unittest.skip('This test performs a large copy but remains here for ' |
| 'debugging purposes.') |
| def test_cross_provider_large_cp(self): |
| s3_bucket = self.CreateBucket(provider='s3') |
| gs_bucket = self.CreateBucket(provider='gs') |
| s3_key = self.CreateObject(bucket_uri=s3_bucket, contents='f'*1024*1024) |
| gs_key = self.CreateObject(bucket_uri=gs_bucket, contents='b'*1024*1024) |
| self.RunGsUtil(['cp', suri(s3_key), suri(gs_bucket)]) |
| self.RunGsUtil(['cp', suri(gs_key), suri(s3_bucket)]) |
| with SetBotoConfigForTest([ |
| ('GSUtil', 'resumable_threshold', str(ONE_KIB)), |
| ('GSUtil', 'json_resumable_chunk_size', str(ONE_KIB * 256))]): |
| # Ensure copy also works across json upload chunk boundaries. |
| self.RunGsUtil(['cp', suri(s3_key), suri(gs_bucket)]) |
| |
| @unittest.skip('This test is slow due to creating many objects, ' |
| 'but remains here for debugging purposes.') |
| def test_daisy_chain_cp_file_sizes(self): |
| """Ensure daisy chain cp works with a wide of file sizes.""" |
| bucket_uri = self.CreateBucket() |
| bucket2_uri = self.CreateBucket() |
| exponent_cap = 28 # Up to 256 MiB in size. |
| for i in range(exponent_cap): |
| one_byte_smaller = 2**i - 1 |
| normal = 2**i |
| one_byte_larger = 2**i + 1 |
| self.CreateObject(bucket_uri=bucket_uri, contents='a'*one_byte_smaller) |
| self.CreateObject(bucket_uri=bucket_uri, contents='b'*normal) |
| self.CreateObject(bucket_uri=bucket_uri, contents='c'*one_byte_larger) |
| |
| self.AssertNObjectsInBucket(bucket_uri, exponent_cap*3) |
| self.RunGsUtil(['-m', 'cp', '-D', suri(bucket_uri, '**'), |
| suri(bucket2_uri)]) |
| |
| self.AssertNObjectsInBucket(bucket2_uri, exponent_cap*3) |
| |
| def test_daisy_chain_cp(self): |
| """Tests cp with the -D option.""" |
| bucket1_uri = self.CreateBucket(storage_class='STANDARD') |
| bucket2_uri = self.CreateBucket( |
| storage_class='DURABLE_REDUCED_AVAILABILITY') |
| key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo') |
| # Set some headers on source object so we can verify that headers are |
| # presereved by daisy-chain copy. |
| self.RunGsUtil(['setmeta', '-h', 'Cache-Control:public,max-age=12', |
| '-h', 'Content-Type:image/gif', |
| '-h', 'x-%s-meta-1:abcd' % self.provider_custom_meta, |
| suri(key_uri)]) |
| # Set public-read (non-default) ACL so we can verify that cp -D -p works. |
| self.RunGsUtil(['acl', 'set', 'public-read', suri(key_uri)]) |
| acl_json = self.RunGsUtil(['acl', 'get', suri(key_uri)], return_stdout=True) |
| # Perform daisy-chain copy and verify that source object headers and ACL |
| # were preserved. Also specify -n option to test that gsutil correctly |
| # removes the x-goog-if-generation-match:0 header that was set at uploading |
| # time when updating the ACL. |
| stderr = self.RunGsUtil(['cp', '-Dpn', suri(key_uri), suri(bucket2_uri)], |
| return_stderr=True) |
| self.assertNotIn('Copy-in-the-cloud disallowed', stderr) |
| |
| @Retry(AssertionError, tries=3, timeout_secs=1) |
| def _Check(): |
| uri = suri(bucket2_uri, key_uri.object_name) |
| stdout = self.RunGsUtil(['ls', '-L', uri], return_stdout=True) |
| self.assertRegexpMatches(stdout, r'Cache-Control:\s+public,max-age=12') |
| self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif') |
| self.assertRegexpMatches(stdout, r'Metadata:\s+1:\s+abcd') |
| new_acl_json = self.RunGsUtil(['acl', 'get', uri], return_stdout=True) |
| self.assertEqual(acl_json, new_acl_json) |
| _Check() |
| |
| def test_daisy_chain_cp_download_failure(self): |
| """Tests cp with the -D option when the download thread dies.""" |
| bucket1_uri = self.CreateBucket() |
| bucket2_uri = self.CreateBucket() |
| key_uri = self.CreateObject(bucket_uri=bucket1_uri, |
| contents='a' * self.halt_size) |
| boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| test_callback_file = self.CreateTempFile( |
| contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5))) |
| with SetBotoConfigForTest([boto_config_for_test]): |
| stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, |
| '-D', suri(key_uri), suri(bucket2_uri)], |
| expected_status=1, return_stderr=True) |
| # Should have two exception traces; one from the download thread and |
| # one from the upload thread. |
| self.assertEqual(stderr.count( |
| 'ResumableDownloadException: Artifically halting download'), 2) |
| |
| def test_canned_acl_cp(self): |
| """Tests copying with a canned ACL.""" |
| bucket1_uri = self.CreateBucket() |
| bucket2_uri = self.CreateBucket() |
| key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo') |
| self.RunGsUtil(['cp', '-a', 'public-read', suri(key_uri), |
| suri(bucket2_uri)]) |
| # Set public-read on the original key after the copy so we can compare |
| # the ACLs. |
| self.RunGsUtil(['acl', 'set', 'public-read', suri(key_uri)]) |
| public_read_acl = self.RunGsUtil(['acl', 'get', suri(key_uri)], |
| return_stdout=True) |
| |
| @Retry(AssertionError, tries=3, timeout_secs=1) |
| def _Check(): |
| uri = suri(bucket2_uri, key_uri.object_name) |
| new_acl_json = self.RunGsUtil(['acl', 'get', uri], return_stdout=True) |
| self.assertEqual(public_read_acl, new_acl_json) |
| _Check() |
| |
| @PerformsFileToObjectUpload |
| def test_canned_acl_upload(self): |
| """Tests uploading a file with a canned ACL.""" |
| bucket1_uri = self.CreateBucket() |
| key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo') |
| # Set public-read on the object so we can compare the ACLs. |
| self.RunGsUtil(['acl', 'set', 'public-read', suri(key_uri)]) |
| public_read_acl = self.RunGsUtil(['acl', 'get', suri(key_uri)], |
| return_stdout=True) |
| |
| file_name = 'bar' |
| fpath = self.CreateTempFile(file_name=file_name, contents='foo') |
| self.RunGsUtil(['cp', '-a', 'public-read', fpath, suri(bucket1_uri)]) |
| new_acl_json = self.RunGsUtil(['acl', 'get', suri(bucket1_uri, file_name)], |
| return_stdout=True) |
| self.assertEqual(public_read_acl, new_acl_json) |
| |
| resumable_size = ONE_KIB |
| boto_config_for_test = ('GSUtil', 'resumable_threshold', |
| str(resumable_size)) |
| with SetBotoConfigForTest([boto_config_for_test]): |
| resumable_file_name = 'resumable_bar' |
| resumable_contents = os.urandom(resumable_size) |
| resumable_fpath = self.CreateTempFile( |
| file_name=resumable_file_name, contents=resumable_contents) |
| self.RunGsUtil(['cp', '-a', 'public-read', resumable_fpath, |
| suri(bucket1_uri)]) |
| new_resumable_acl_json = self.RunGsUtil( |
| ['acl', 'get', suri(bucket1_uri, resumable_file_name)], |
| return_stdout=True) |
| self.assertEqual(public_read_acl, new_resumable_acl_json) |
| |
| def test_cp_key_to_local_stream(self): |
| bucket_uri = self.CreateBucket() |
| contents = 'foo' |
| key_uri = self.CreateObject(bucket_uri=bucket_uri, contents=contents) |
| stdout = self.RunGsUtil(['cp', suri(key_uri), '-'], return_stdout=True) |
| self.assertIn(contents, stdout) |
| |
| def test_cp_local_file_to_local_stream(self): |
| contents = 'content' |
| fpath = self.CreateTempFile(contents=contents) |
| stdout = self.RunGsUtil(['cp', fpath, '-'], return_stdout=True) |
| self.assertIn(contents, stdout) |
| |
| @PerformsFileToObjectUpload |
| def test_cp_zero_byte_file(self): |
| dst_bucket_uri = self.CreateBucket() |
| src_dir = self.CreateTempDir() |
| fpath = os.path.join(src_dir, 'zero_byte') |
| with open(fpath, 'w') as unused_out_file: |
| pass # Write a zero byte file |
| self.RunGsUtil(['cp', fpath, suri(dst_bucket_uri)]) |
| |
| @Retry(AssertionError, tries=3, timeout_secs=1) |
| def _Check1(): |
| stdout = self.RunGsUtil(['ls', suri(dst_bucket_uri)], return_stdout=True) |
| self.assertIn(os.path.basename(fpath), stdout) |
| _Check1() |
| |
| download_path = os.path.join(src_dir, 'zero_byte_download') |
| self.RunGsUtil(['cp', suri(dst_bucket_uri, 'zero_byte'), download_path]) |
| self.assertTrue(os.stat(download_path)) |
| |
| def test_copy_bucket_to_bucket(self): |
| """Tests that recursively copying from bucket to bucket. |
| |
| This should produce identically named objects (and not, in particular, |
| destination objects named by the version-specific URI from source objects). |
| """ |
| src_bucket_uri = self.CreateVersionedBucket() |
| dst_bucket_uri = self.CreateVersionedBucket() |
| self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj0', |
| contents='abc') |
| self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj1', |
| contents='def') |
| |
| # Use @Retry as hedge against bucket listing eventual consistency. |
| @Retry(AssertionError, tries=3, timeout_secs=1) |
| def _CopyAndCheck(): |
| self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), |
| suri(dst_bucket_uri)]) |
| stdout = self.RunGsUtil(['ls', '-R', dst_bucket_uri.uri], |
| return_stdout=True) |
| self.assertIn('%s%s/obj0\n' % (dst_bucket_uri, |
| src_bucket_uri.bucket_name), stdout) |
| self.assertIn('%s%s/obj1\n' % (dst_bucket_uri, |
| src_bucket_uri.bucket_name), stdout) |
| _CopyAndCheck() |
| |
| def test_copy_bucket_to_dir(self): |
| """Tests recursively copying from bucket to a directory. |
| |
| This should produce identically named objects (and not, in particular, |
| destination objects named by the version- specific URI from source objects). |
| """ |
| src_bucket_uri = self.CreateBucket() |
| dst_dir = self.CreateTempDir() |
| self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj0', |
| contents='abc') |
| self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj1', |
| contents='def') |
| |
| # Use @Retry as hedge against bucket listing eventual consistency. |
| @Retry(AssertionError, tries=3, timeout_secs=1) |
| def _CopyAndCheck(): |
| """Copies the bucket recursively and validates the results.""" |
| self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), dst_dir]) |
| dir_list = [] |
| for dirname, _, filenames in os.walk(dst_dir): |
| for filename in filenames: |
| dir_list.append(os.path.join(dirname, filename)) |
| dir_list = sorted(dir_list) |
| self.assertEqual(len(dir_list), 2) |
| self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name, |
| 'obj0'), dir_list[0]) |
| self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name, |
| 'obj1'), dir_list[1]) |
| _CopyAndCheck() |
| |
| def test_recursive_download_with_leftover_dir_placeholder(self): |
| """Tests that we correctly handle leftover dir placeholders.""" |
| src_bucket_uri = self.CreateBucket() |
| dst_dir = self.CreateTempDir() |
| self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj0', |
| contents='abc') |
| self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj1', |
| contents='def') |
| |
| # Create a placeholder like what can be left over by web GUI tools. |
| key_uri = src_bucket_uri.clone_replace_name('/') |
| key_uri.set_contents_from_string('') |
| self.AssertNObjectsInBucket(src_bucket_uri, 3) |
| |
| stderr = self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), dst_dir], |
| return_stderr=True) |
| self.assertIn('Skipping cloud sub-directory placeholder object', stderr) |
| dir_list = [] |
| for dirname, _, filenames in os.walk(dst_dir): |
| for filename in filenames: |
| dir_list.append(os.path.join(dirname, filename)) |
| dir_list = sorted(dir_list) |
| self.assertEqual(len(dir_list), 2) |
| self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name, |
| 'obj0'), dir_list[0]) |
| self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name, |
| 'obj1'), dir_list[1]) |
| |
| def test_copy_quiet(self): |
| bucket_uri = self.CreateBucket() |
| key_uri = self.CreateObject(bucket_uri=bucket_uri, contents='foo') |
| stderr = self.RunGsUtil(['-q', 'cp', suri(key_uri), |
| suri(bucket_uri.clone_replace_name('o2'))], |
| return_stderr=True) |
| self.assertEqual(stderr.count('Copying '), 0) |
| |
| def test_cp_md5_match(self): |
| """Tests that the uploaded object has the expected MD5. |
| |
| Note that while this does perform a file to object upload, MD5's are |
| not supported for composite objects so we don't use the decorator in this |
| case. |
| """ |
| bucket_uri = self.CreateBucket() |
| fpath = self.CreateTempFile(contents='bar') |
| with open(fpath, 'r') as f_in: |
| file_md5 = base64.encodestring(binascii.unhexlify( |
| CalculateMd5FromContents(f_in))).rstrip('\n') |
| self.RunGsUtil(['cp', fpath, suri(bucket_uri)]) |
| |
| # Use @Retry as hedge against bucket listing eventual consistency. |
| @Retry(AssertionError, tries=3, timeout_secs=1) |
| def _Check1(): |
| stdout = self.RunGsUtil(['ls', '-L', suri(bucket_uri)], |
| return_stdout=True) |
| self.assertRegexpMatches(stdout, |
| r'Hash\s+\(md5\):\s+%s' % re.escape(file_md5)) |
| _Check1() |
| |
| @unittest.skipIf(IS_WINDOWS, |
| 'Unicode handling on Windows requires mods to site-packages') |
| @PerformsFileToObjectUpload |
| def test_cp_manifest_upload_unicode(self): |
| return self._ManifestUpload('foo-unicöde', 'bar-unicöde', |
| 'manifest-unicöde') |
| |
| @PerformsFileToObjectUpload |
| def test_cp_manifest_upload(self): |
| """Tests uploading with a mnifest file.""" |
| return self._ManifestUpload('foo', 'bar', 'manifest') |
| |
| def _ManifestUpload(self, file_name, object_name, manifest_name): |
| """Tests uploading with a manifest file.""" |
| bucket_uri = self.CreateBucket() |
| dsturi = suri(bucket_uri, object_name) |
| |
| fpath = self.CreateTempFile(file_name=file_name, contents='bar') |
| logpath = self.CreateTempFile(file_name=manifest_name, contents='') |
| # Ensure the file is empty. |
| open(logpath, 'w').close() |
| self.RunGsUtil(['cp', '-L', logpath, fpath, dsturi]) |
| with open(logpath, 'r') as f: |
| lines = f.readlines() |
| self.assertEqual(len(lines), 2) |
| |
| expected_headers = ['Source', 'Destination', 'Start', 'End', 'Md5', |
| 'UploadId', 'Source Size', 'Bytes Transferred', |
| 'Result', 'Description'] |
| self.assertEqual(expected_headers, lines[0].strip().split(',')) |
| results = lines[1].strip().split(',') |
| self.assertEqual(results[0][:7], 'file://') # source |
| self.assertEqual(results[1][:5], '%s://' % |
| self.default_provider) # destination |
| date_format = '%Y-%m-%dT%H:%M:%S.%fZ' |
| start_date = datetime.datetime.strptime(results[2], date_format) |
| end_date = datetime.datetime.strptime(results[3], date_format) |
| self.assertEqual(end_date > start_date, True) |
| if self.RunGsUtil == testcase.GsUtilIntegrationTestCase.RunGsUtil: |
| # Check that we didn't do automatic parallel uploads - compose doesn't |
| # calculate the MD5 hash. Since RunGsUtil is overriden in |
| # TestCpParallelUploads to force parallel uploads, we can check which |
| # method was used. |
| self.assertEqual(results[4], 'rL0Y20zC+Fzt72VPzMSk2A==') # md5 |
| self.assertEqual(int(results[6]), 3) # Source Size |
| self.assertEqual(int(results[7]), 3) # Bytes Transferred |
| self.assertEqual(results[8], 'OK') # Result |
| |
| @PerformsFileToObjectUpload |
| def test_cp_manifest_download(self): |
| """Tests downloading with a manifest file.""" |
| key_uri = self.CreateObject(contents='foo') |
| fpath = self.CreateTempFile(contents='') |
| logpath = self.CreateTempFile(contents='') |
| # Ensure the file is empty. |
| open(logpath, 'w').close() |
| self.RunGsUtil(['cp', '-L', logpath, suri(key_uri), fpath], |
| return_stdout=True) |
| with open(logpath, 'r') as f: |
| lines = f.readlines() |
| self.assertEqual(len(lines), 2) |
| |
| expected_headers = ['Source', 'Destination', 'Start', 'End', 'Md5', |
| 'UploadId', 'Source Size', 'Bytes Transferred', |
| 'Result', 'Description'] |
| self.assertEqual(expected_headers, lines[0].strip().split(',')) |
| results = lines[1].strip().split(',') |
| self.assertEqual(results[0][:5], '%s://' % |
| self.default_provider) # source |
| self.assertEqual(results[1][:7], 'file://') # destination |
| date_format = '%Y-%m-%dT%H:%M:%S.%fZ' |
| start_date = datetime.datetime.strptime(results[2], date_format) |
| end_date = datetime.datetime.strptime(results[3], date_format) |
| self.assertEqual(end_date > start_date, True) |
| self.assertEqual(results[4], 'rL0Y20zC+Fzt72VPzMSk2A==') # md5 |
| self.assertEqual(int(results[6]), 3) # Source Size |
| # Bytes transferred might be more than 3 if the file was gzipped, since |
| # the minimum gzip header is 10 bytes. |
| self.assertGreaterEqual(int(results[7]), 3) # Bytes Transferred |
| self.assertEqual(results[8], 'OK') # Result |
| |
| @PerformsFileToObjectUpload |
| def test_copy_unicode_non_ascii_filename(self): |
| key_uri = self.CreateObject(contents='foo') |
| # Make file large enough to cause a resumable upload (which hashes filename |
| # to construct tracker filename). |
| fpath = self.CreateTempFile(file_name=u'Аудиоархив', |
| contents='x' * 3 * 1024 * 1024) |
| fpath_bytes = fpath.encode(UTF8) |
| stderr = self.RunGsUtil(['cp', fpath_bytes, suri(key_uri)], |
| return_stderr=True) |
| self.assertIn('Copying file:', stderr) |
| |
| # Note: We originally one time implemented a test |
| # (test_copy_invalid_unicode_filename) that invalid unicode filenames were |
| # skipped, but it turns out os.walk() on MacOS doesn't have problems with |
| # such files (so, failed that test). Given that, we decided to remove the |
| # test. |
| |
| def test_gzip_upload_and_download(self): |
| bucket_uri = self.CreateBucket() |
| contents = 'x' * 10000 |
| tmpdir = self.CreateTempDir() |
| self.CreateTempFile(file_name='test.html', tmpdir=tmpdir, contents=contents) |
| self.CreateTempFile(file_name='test.js', tmpdir=tmpdir, contents=contents) |
| self.CreateTempFile(file_name='test.txt', tmpdir=tmpdir, contents=contents) |
| # Test that copying specifying only 2 of the 3 prefixes gzips the correct |
| # files, and test that including whitespace in the extension list works. |
| self.RunGsUtil(['cp', '-z', 'js, html', |
| os.path.join(tmpdir, 'test.*'), suri(bucket_uri)]) |
| self.AssertNObjectsInBucket(bucket_uri, 3) |
| uri1 = suri(bucket_uri, 'test.html') |
| uri2 = suri(bucket_uri, 'test.js') |
| uri3 = suri(bucket_uri, 'test.txt') |
| stdout = self.RunGsUtil(['stat', uri1], return_stdout=True) |
| self.assertRegexpMatches(stdout, r'Content-Encoding:\s+gzip') |
| stdout = self.RunGsUtil(['stat', uri2], return_stdout=True) |
| self.assertRegexpMatches(stdout, r'Content-Encoding:\s+gzip') |
| stdout = self.RunGsUtil(['stat', uri3], return_stdout=True) |
| self.assertNotRegexpMatches(stdout, r'Content-Encoding:\s+gzip') |
| fpath4 = self.CreateTempFile() |
| for uri in (uri1, uri2, uri3): |
| self.RunGsUtil(['cp', uri, suri(fpath4)]) |
| with open(fpath4, 'r') as f: |
| self.assertEqual(f.read(), contents) |
| |
| def test_upload_with_subdir_and_unexpanded_wildcard(self): |
| fpath1 = self.CreateTempFile(file_name=('tmp', 'x', 'y', 'z')) |
| bucket_uri = self.CreateBucket() |
| wildcard_uri = '%s*' % fpath1[:-5] |
| stderr = self.RunGsUtil(['cp', '-R', wildcard_uri, suri(bucket_uri)], |
| return_stderr=True) |
| self.assertIn('Copying file:', stderr) |
| self.AssertNObjectsInBucket(bucket_uri, 1) |
| |
| def test_cp_object_ending_with_slash(self): |
| """Tests that cp works with object names ending with slash.""" |
| tmpdir = self.CreateTempDir() |
| bucket_uri = self.CreateBucket() |
| self.CreateObject(bucket_uri=bucket_uri, |
| object_name='abc/', |
| contents='dir') |
| self.CreateObject(bucket_uri=bucket_uri, |
| object_name='abc/def', |
| contents='def') |
| self.AssertNObjectsInBucket(bucket_uri, 2) |
| self.RunGsUtil(['cp', '-R', suri(bucket_uri), tmpdir]) |
| # Check that files in the subdir got copied even though subdir object |
| # download was skipped. |
| with open(os.path.join(tmpdir, bucket_uri.bucket_name, 'abc', 'def')) as f: |
| self.assertEquals('def', '\n'.join(f.readlines())) |
| |
| def test_cp_without_read_access(self): |
| """Tests that cp fails without read access to the object.""" |
| # TODO: With 401's triggering retries in apitools, this test will take |
| # a long time. Ideally, make apitools accept a num_retries config for this |
| # until we stop retrying the 401's. |
| bucket_uri = self.CreateBucket() |
| object_uri = self.CreateObject(bucket_uri=bucket_uri, contents='foo') |
| |
| # Use @Retry as hedge against bucket listing eventual consistency. |
| self.AssertNObjectsInBucket(bucket_uri, 1) |
| |
| with self.SetAnonymousBotoCreds(): |
| stderr = self.RunGsUtil(['cp', suri(object_uri), 'foo'], |
| return_stderr=True, expected_status=1) |
| self.assertIn('AccessDenied', stderr) |
| |
| @unittest.skipIf(IS_WINDOWS, 'os.symlink() is not available on Windows.') |
| def test_cp_minus_e(self): |
| fpath_dir = self.CreateTempDir() |
| fpath1 = self.CreateTempFile(tmpdir=fpath_dir) |
| fpath2 = os.path.join(fpath_dir, 'cp_minus_e') |
| bucket_uri = self.CreateBucket() |
| os.symlink(fpath1, fpath2) |
| stderr = self.RunGsUtil( |
| ['cp', '-e', '%s%s*' % (fpath_dir, os.path.sep), |
| suri(bucket_uri, 'files')], |
| return_stderr=True) |
| self.assertIn('Copying file', stderr) |
| self.assertIn('Skipping symbolic link file', stderr) |
| |
| def test_cp_multithreaded_wildcard(self): |
| """Tests that cp -m works with a wildcard.""" |
| num_test_files = 5 |
| tmp_dir = self.CreateTempDir(test_files=num_test_files) |
| bucket_uri = self.CreateBucket() |
| wildcard_uri = '%s%s*' % (tmp_dir, os.sep) |
| self.RunGsUtil(['-m', 'cp', wildcard_uri, suri(bucket_uri)]) |
| self.AssertNObjectsInBucket(bucket_uri, num_test_files) |
| |
| def test_cp_duplicate_source_args(self): |
| """Tests that cp -m works when a source argument is provided twice.""" |
| object_contents = 'edge' |
| object_uri = self.CreateObject(object_name='foo', contents=object_contents) |
| tmp_dir = self.CreateTempDir() |
| self.RunGsUtil(['-m', 'cp', suri(object_uri), suri(object_uri), tmp_dir]) |
| with open(os.path.join(tmp_dir, 'foo'), 'r') as in_fp: |
| contents = in_fp.read() |
| # Contents should be not duplicated. |
| self.assertEqual(contents, object_contents) |
| |
| @SkipForS3('No resumable upload support for S3.') |
| def test_cp_resumable_upload_break(self): |
| """Tests that an upload can be resumed after a connection break.""" |
| bucket_uri = self.CreateBucket() |
| fpath = self.CreateTempFile(contents='a' * self.halt_size) |
| boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| test_callback_file = self.CreateTempFile( |
| contents=pickle.dumps(_HaltingCopyCallbackHandler(True, 5))) |
| |
| with SetBotoConfigForTest([boto_config_for_test]): |
| stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, |
| fpath, suri(bucket_uri)], |
| expected_status=1, return_stderr=True) |
| self.assertIn('Artifically halting upload', stderr) |
| stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)], |
| return_stderr=True) |
| self.assertIn('Resuming upload', stderr) |
| |
| @SkipForS3('No resumable upload support for S3.') |
| def test_cp_resumable_upload_retry(self): |
| """Tests that a resumable upload completes with one retry.""" |
| bucket_uri = self.CreateBucket() |
| fpath = self.CreateTempFile(contents='a' * self.halt_size) |
| # TODO: Raising an httplib or socket error blocks bucket teardown |
| # in JSON for 60-120s on a multiprocessing lock acquire. Figure out why; |
| # until then, raise an apitools retryable exception. |
| if self.test_api == ApiSelector.XML: |
| test_callback_file = self.CreateTempFile( |
| contents=pickle.dumps(_ResumableUploadRetryHandler( |
| 5, httplib.BadStatusLine, ('unused',)))) |
| else: |
| test_callback_file = self.CreateTempFile( |
| contents=pickle.dumps(_ResumableUploadRetryHandler( |
| 5, apitools_exceptions.BadStatusCodeError, |
| ('unused', 'unused', 'unused')))) |
| boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| with SetBotoConfigForTest([boto_config_for_test]): |
| stderr = self.RunGsUtil(['-D', 'cp', '--testcallbackfile', |
| test_callback_file, fpath, suri(bucket_uri)], |
| return_stderr=1) |
| if self.test_api == ApiSelector.XML: |
| self.assertIn('Got retryable failure', stderr) |
| else: |
| self.assertIn('Retrying', stderr) |
| |
| @SkipForS3('No resumable upload support for S3.') |
| def test_cp_resumable_streaming_upload_retry(self): |
| """Tests that a streaming resumable upload completes with one retry.""" |
| if self.test_api == ApiSelector.XML: |
| return unittest.skip('XML does not support resumable streaming uploads.') |
| bucket_uri = self.CreateBucket() |
| |
| test_callback_file = self.CreateTempFile( |
| contents=pickle.dumps(_ResumableUploadRetryHandler( |
| 5, apitools_exceptions.BadStatusCodeError, |
| ('unused', 'unused', 'unused')))) |
| # Need to reduce the JSON chunk size since streaming uploads buffer a |
| # full chunk. |
| boto_configs_for_test = [('GSUtil', 'json_resumable_chunk_size', |
| str(256 * ONE_KIB)), |
| ('Boto', 'num_retries', '2')] |
| with SetBotoConfigForTest(boto_configs_for_test): |
| stderr = self.RunGsUtil( |
| ['-D', 'cp', '--testcallbackfile', test_callback_file, '-', |
| suri(bucket_uri, 'foo')], |
| stdin='a' * 512 * ONE_KIB, return_stderr=1) |
| self.assertIn('Retrying', stderr) |
| |
| @SkipForS3('No resumable upload support for S3.') |
| def test_cp_resumable_upload(self): |
| """Tests that a basic resumable upload completes successfully.""" |
| bucket_uri = self.CreateBucket() |
| fpath = self.CreateTempFile(contents='a' * self.halt_size) |
| boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| with SetBotoConfigForTest([boto_config_for_test]): |
| self.RunGsUtil(['cp', fpath, suri(bucket_uri)]) |
| |
| @SkipForS3('No resumable upload support for S3.') |
| def test_resumable_upload_break_leaves_tracker(self): |
| """Tests that a tracker file is created with a resumable upload.""" |
| bucket_uri = self.CreateBucket() |
| fpath = self.CreateTempFile(file_name='foo', |
| contents='a' * self.halt_size) |
| boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| with SetBotoConfigForTest([boto_config_for_test]): |
| tracker_filename = GetTrackerFilePath( |
| StorageUrlFromString(suri(bucket_uri, 'foo')), |
| TrackerFileType.UPLOAD, self.test_api) |
| test_callback_file = self.CreateTempFile( |
| contents=pickle.dumps(_HaltingCopyCallbackHandler(True, 5))) |
| try: |
| stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, |
| fpath, suri(bucket_uri, 'foo')], |
| expected_status=1, return_stderr=True) |
| self.assertIn('Artifically halting upload', stderr) |
| self.assertTrue(os.path.exists(tracker_filename), |
| 'Tracker file %s not present.' % tracker_filename) |
| finally: |
| if os.path.exists(tracker_filename): |
| os.unlink(tracker_filename) |
| |
| @SkipForS3('No resumable upload support for S3.') |
| def test_cp_resumable_upload_break_file_size_change(self): |
| """Tests a resumable upload where the uploaded file changes size. |
| |
| This should fail when we read the tracker data. |
| """ |
| bucket_uri = self.CreateBucket() |
| tmp_dir = self.CreateTempDir() |
| fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir, |
| contents='a' * self.halt_size) |
| test_callback_file = self.CreateTempFile( |
| contents=pickle.dumps(_HaltingCopyCallbackHandler(True, 5))) |
| |
| boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| with SetBotoConfigForTest([boto_config_for_test]): |
| stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, |
| fpath, suri(bucket_uri)], |
| expected_status=1, return_stderr=True) |
| self.assertIn('Artifically halting upload', stderr) |
| fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir, |
| contents='a' * self.halt_size * 2) |
| stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)], |
| expected_status=1, return_stderr=True) |
| self.assertIn('ResumableUploadAbortException', stderr) |
| |
| @SkipForS3('No resumable upload support for S3.') |
| def test_cp_resumable_upload_break_file_content_change(self): |
| """Tests a resumable upload where the uploaded file changes content.""" |
| if self.test_api == ApiSelector.XML: |
| return unittest.skip( |
| 'XML doesn\'t make separate HTTP calls at fixed-size boundaries for ' |
| 'resumable uploads, so we can\'t guarantee that the server saves a ' |
| 'specific part of the upload.') |
| bucket_uri = self.CreateBucket() |
| tmp_dir = self.CreateTempDir() |
| fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir, |
| contents='a' * ONE_KIB * 512) |
| test_callback_file = self.CreateTempFile( |
| contents=pickle.dumps(_HaltingCopyCallbackHandler(True, |
| int(ONE_KIB) * 384))) |
| resumable_threshold_for_test = ( |
| 'GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| resumable_chunk_size_for_test = ( |
| 'GSUtil', 'json_resumable_chunk_size', str(ONE_KIB * 256)) |
| with SetBotoConfigForTest([resumable_threshold_for_test, |
| resumable_chunk_size_for_test]): |
| stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, |
| fpath, suri(bucket_uri)], |
| expected_status=1, return_stderr=True) |
| self.assertIn('Artifically halting upload', stderr) |
| fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir, |
| contents='b' * ONE_KIB * 512) |
| stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)], |
| expected_status=1, return_stderr=True) |
| self.assertIn('doesn\'t match cloud-supplied digest', stderr) |
| |
| @SkipForS3('No resumable upload support for S3.') |
| def test_cp_resumable_upload_break_file_smaller_size(self): |
| """Tests a resumable upload where the uploaded file changes content. |
| |
| This should fail hash validation. |
| """ |
| bucket_uri = self.CreateBucket() |
| tmp_dir = self.CreateTempDir() |
| fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir, |
| contents='a' * ONE_KIB * 512) |
| test_callback_file = self.CreateTempFile( |
| contents=pickle.dumps(_HaltingCopyCallbackHandler(True, |
| int(ONE_KIB) * 384))) |
| resumable_threshold_for_test = ( |
| 'GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| resumable_chunk_size_for_test = ( |
| 'GSUtil', 'json_resumable_chunk_size', str(ONE_KIB * 256)) |
| with SetBotoConfigForTest([resumable_threshold_for_test, |
| resumable_chunk_size_for_test]): |
| stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, |
| fpath, suri(bucket_uri)], |
| expected_status=1, return_stderr=True) |
| self.assertIn('Artifically halting upload', stderr) |
| fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir, |
| contents='a' * ONE_KIB) |
| stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)], |
| expected_status=1, return_stderr=True) |
| self.assertIn('ResumableUploadAbortException', stderr) |
| |
| # This temporarily changes the tracker directory to unwritable which |
| # interferes with any parallel running tests that use the tracker directory. |
| @NotParallelizable |
| @SkipForS3('No resumable upload support for S3.') |
| @unittest.skipIf(IS_WINDOWS, 'chmod on dir unsupported on Windows.') |
| @PerformsFileToObjectUpload |
| def test_cp_unwritable_tracker_file(self): |
| """Tests a resumable upload with an unwritable tracker file.""" |
| bucket_uri = self.CreateBucket() |
| tracker_filename = GetTrackerFilePath( |
| StorageUrlFromString(suri(bucket_uri, 'foo')), |
| TrackerFileType.UPLOAD, self.test_api) |
| tracker_dir = os.path.dirname(tracker_filename) |
| fpath = self.CreateTempFile(file_name='foo', contents='a' * ONE_KIB) |
| boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| save_mod = os.stat(tracker_dir).st_mode |
| |
| try: |
| os.chmod(tracker_dir, 0) |
| with SetBotoConfigForTest([boto_config_for_test]): |
| stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)], |
| expected_status=1, return_stderr=True) |
| self.assertIn('Couldn\'t write tracker file', stderr) |
| finally: |
| os.chmod(tracker_dir, save_mod) |
| if os.path.exists(tracker_filename): |
| os.unlink(tracker_filename) |
| |
| # This temporarily changes the tracker directory to unwritable which |
| # interferes with any parallel running tests that use the tracker directory. |
| @NotParallelizable |
| @unittest.skipIf(IS_WINDOWS, 'chmod on dir unsupported on Windows.') |
| def test_cp_unwritable_tracker_file_download(self): |
| """Tests downloads with an unwritable tracker file.""" |
| object_uri = self.CreateObject(contents='foo' * ONE_KIB) |
| tracker_filename = GetTrackerFilePath( |
| StorageUrlFromString(suri(object_uri)), |
| TrackerFileType.DOWNLOAD, self.test_api) |
| tracker_dir = os.path.dirname(tracker_filename) |
| fpath = self.CreateTempFile() |
| save_mod = os.stat(tracker_dir).st_mode |
| |
| try: |
| os.chmod(tracker_dir, 0) |
| boto_config_for_test = ('GSUtil', 'resumable_threshold', str(EIGHT_MIB)) |
| with SetBotoConfigForTest([boto_config_for_test]): |
| # Should succeed because we are below the threshold. |
| self.RunGsUtil(['cp', suri(object_uri), fpath]) |
| boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| with SetBotoConfigForTest([boto_config_for_test]): |
| stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], |
| expected_status=1, return_stderr=True) |
| self.assertIn('Couldn\'t write tracker file', stderr) |
| finally: |
| os.chmod(tracker_dir, save_mod) |
| if os.path.exists(tracker_filename): |
| os.unlink(tracker_filename) |
| |
| def test_cp_resumable_download_break(self): |
| """Tests that a download can be resumed after a connection break.""" |
| bucket_uri = self.CreateBucket() |
| object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
| contents='a' * self.halt_size) |
| fpath = self.CreateTempFile() |
| test_callback_file = self.CreateTempFile( |
| contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5))) |
| |
| boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| with SetBotoConfigForTest([boto_config_for_test]): |
| stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, |
| suri(object_uri), fpath], |
| expected_status=1, return_stderr=True) |
| self.assertIn('Artifically halting download.', stderr) |
| tracker_filename = GetTrackerFilePath( |
| StorageUrlFromString(fpath), TrackerFileType.DOWNLOAD, self.test_api) |
| self.assertTrue(os.path.isfile(tracker_filename)) |
| stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], |
| return_stderr=True) |
| self.assertIn('Resuming download', stderr) |
| |
| def test_cp_resumable_download_etag_differs(self): |
| """Tests that download restarts the file when the source object changes. |
| |
| This causes the etag not to match. |
| """ |
| bucket_uri = self.CreateBucket() |
| object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
| contents='a' * self.halt_size) |
| fpath = self.CreateTempFile() |
| test_callback_file = self.CreateTempFile( |
| contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5))) |
| boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| with SetBotoConfigForTest([boto_config_for_test]): |
| # This will create a tracker file with an ETag. |
| stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, |
| suri(object_uri), fpath], |
| expected_status=1, return_stderr=True) |
| self.assertIn('Artifically halting download.', stderr) |
| # Create a new object with different contents - it should have a |
| # different ETag since the content has changed. |
| object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
| contents='b' * self.halt_size) |
| stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], |
| return_stderr=True) |
| self.assertNotIn('Resuming download', stderr) |
| |
| def test_cp_resumable_download_file_larger(self): |
| """Tests download deletes the tracker file when existing file is larger.""" |
| bucket_uri = self.CreateBucket() |
| fpath = self.CreateTempFile() |
| object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
| contents='a' * self.halt_size) |
| test_callback_file = self.CreateTempFile( |
| contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5))) |
| boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| with SetBotoConfigForTest([boto_config_for_test]): |
| stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, |
| suri(object_uri), fpath], |
| expected_status=1, return_stderr=True) |
| self.assertIn('Artifically halting download.', stderr) |
| with open(fpath, 'w') as larger_file: |
| for _ in range(self.halt_size * 2): |
| larger_file.write('a') |
| stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], |
| expected_status=1, return_stderr=True) |
| self.assertNotIn('Resuming download', stderr) |
| self.assertIn('is larger', stderr) |
| self.assertIn('Deleting tracker file', stderr) |
| |
| def test_cp_resumable_download_content_differs(self): |
| """Tests that we do not re-download when tracker file matches existing file. |
| |
| We only compare size, not contents, so re-download should not occur even |
| though the contents are technically different. However, hash validation on |
| the file should still occur and we will delete the file then because |
| the hashes differ. |
| """ |
| bucket_uri = self.CreateBucket() |
| tmp_dir = self.CreateTempDir() |
| fpath = self.CreateTempFile(tmpdir=tmp_dir, contents='abcd' * ONE_KIB) |
| object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
| contents='efgh' * ONE_KIB) |
| stdout = self.RunGsUtil(['ls', '-L', suri(object_uri)], return_stdout=True) |
| etag_match = re.search(r'\s*ETag:\s*(.*)', stdout) |
| self.assertIsNotNone(etag_match, 'Could not get object ETag') |
| self.assertEqual(len(etag_match.groups()), 1, |
| 'Did not match expected single ETag') |
| etag = etag_match.group(1) |
| |
| tracker_filename = GetTrackerFilePath( |
| StorageUrlFromString(fpath), TrackerFileType.DOWNLOAD, self.test_api) |
| try: |
| with open(tracker_filename, 'w') as tracker_fp: |
| tracker_fp.write(etag) |
| boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| with SetBotoConfigForTest([boto_config_for_test]): |
| stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], |
| return_stderr=True, expected_status=1) |
| self.assertIn('Download already complete for file', stderr) |
| self.assertIn('doesn\'t match cloud-supplied digest', stderr) |
| # File and tracker file should be deleted. |
| self.assertFalse(os.path.isfile(fpath)) |
| self.assertFalse(os.path.isfile(tracker_filename)) |
| finally: |
| if os.path.exists(tracker_filename): |
| os.unlink(tracker_filename) |
| |
| def test_cp_resumable_download_content_matches(self): |
| """Tests download no-ops when tracker file matches existing file.""" |
| bucket_uri = self.CreateBucket() |
| tmp_dir = self.CreateTempDir() |
| matching_contents = 'abcd' * ONE_KIB |
| fpath = self.CreateTempFile(tmpdir=tmp_dir, contents=matching_contents) |
| object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
| contents=matching_contents) |
| stdout = self.RunGsUtil(['ls', '-L', suri(object_uri)], return_stdout=True) |
| etag_match = re.search(r'\s*ETag:\s*(.*)', stdout) |
| self.assertIsNotNone(etag_match, 'Could not get object ETag') |
| self.assertEqual(len(etag_match.groups()), 1, |
| 'Did not match expected single ETag') |
| etag = etag_match.group(1) |
| tracker_filename = GetTrackerFilePath( |
| StorageUrlFromString(fpath), TrackerFileType.DOWNLOAD, self.test_api) |
| with open(tracker_filename, 'w') as tracker_fp: |
| tracker_fp.write(etag) |
| try: |
| boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| with SetBotoConfigForTest([boto_config_for_test]): |
| stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], |
| return_stderr=True) |
| self.assertIn('Download already complete for file', stderr) |
| # Tracker file should be removed after successful hash validation. |
| self.assertFalse(os.path.isfile(tracker_filename)) |
| finally: |
| if os.path.exists(tracker_filename): |
| os.unlink(tracker_filename) |
| |
| def test_cp_resumable_download_tracker_file_not_matches(self): |
| """Tests that download overwrites when tracker file etag does not match.""" |
| bucket_uri = self.CreateBucket() |
| tmp_dir = self.CreateTempDir() |
| fpath = self.CreateTempFile(tmpdir=tmp_dir, contents='abcd' * ONE_KIB) |
| object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
| contents='efgh' * ONE_KIB) |
| stdout = self.RunGsUtil(['ls', '-L', suri(object_uri)], return_stdout=True) |
| etag_match = re.search(r'\s*ETag:\s*(.*)', stdout) |
| self.assertIsNotNone(etag_match, 'Could not get object ETag') |
| self.assertEqual(len(etag_match.groups()), 1, |
| 'Did not match regex for exactly one object ETag') |
| etag = etag_match.group(1) |
| etag += 'nonmatching' |
| tracker_filename = GetTrackerFilePath( |
| StorageUrlFromString(fpath), TrackerFileType.DOWNLOAD, self.test_api) |
| with open(tracker_filename, 'w') as tracker_fp: |
| tracker_fp.write(etag) |
| try: |
| boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| with SetBotoConfigForTest([boto_config_for_test]): |
| stderr = self.RunGsUtil(['cp', suri(object_uri), fpath], |
| return_stderr=True) |
| self.assertNotIn('Resuming download', stderr) |
| # Ensure the file was overwritten. |
| with open(fpath, 'r') as in_fp: |
| contents = in_fp.read() |
| self.assertEqual(contents, 'efgh' * ONE_KIB, |
| 'File not overwritten when it should have been ' |
| 'due to a non-matching tracker file.') |
| self.assertFalse(os.path.isfile(tracker_filename)) |
| finally: |
| if os.path.exists(tracker_filename): |
| os.unlink(tracker_filename) |
| |
| def test_cp_resumable_download_gzip(self): |
| """Tests that download can be resumed successfully with a gzipped file.""" |
| # Generate some reasonably incompressible data. This compresses to a bit |
| # around 128K in practice, but we assert specifically below that it is |
| # larger than self.halt_size to guarantee that we can halt the download |
| # partway through. |
| object_uri = self.CreateObject() |
| random.seed(0) |
| contents = str([random.choice(string.ascii_letters) |
| for _ in xrange(ONE_KIB * 128)]) |
| random.seed() # Reset the seed for any other tests. |
| fpath1 = self.CreateTempFile(file_name='unzipped.txt', contents=contents) |
| self.RunGsUtil(['cp', '-z', 'txt', suri(fpath1), suri(object_uri)]) |
| |
| # Use @Retry as hedge against bucket listing eventual consistency. |
| @Retry(AssertionError, tries=3, timeout_secs=1) |
| def _GetObjectSize(): |
| stdout = self.RunGsUtil(['du', suri(object_uri)], return_stdout=True) |
| size_match = re.search(r'(\d+)\s+.*', stdout) |
| self.assertIsNotNone(size_match, 'Could not get object size') |
| self.assertEqual(len(size_match.groups()), 1, |
| 'Did not match regex for exactly one object size.') |
| return long(size_match.group(1)) |
| |
| object_size = _GetObjectSize() |
| self.assertGreaterEqual(object_size, self.halt_size, |
| 'Compresed object size was not large enough to ' |
| 'allow for a halted download, so the test results ' |
| 'would be invalid. Please increase the compressed ' |
| 'object size in the test.') |
| fpath2 = self.CreateTempFile() |
| test_callback_file = self.CreateTempFile( |
| contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5))) |
| |
| boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| with SetBotoConfigForTest([boto_config_for_test]): |
| stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, |
| suri(object_uri), suri(fpath2)], |
| return_stderr=True, expected_status=1) |
| self.assertIn('Artifically halting download.', stderr) |
| tracker_filename = GetTrackerFilePath( |
| StorageUrlFromString(fpath2), TrackerFileType.DOWNLOAD, self.test_api) |
| self.assertTrue(os.path.isfile(tracker_filename)) |
| self.assertIn('Downloading to temp gzip filename', stderr) |
| # We should have a temporary gzipped file, a tracker file, and no |
| # final file yet. |
| self.assertTrue(os.path.isfile('%s_.gztmp' % fpath2)) |
| stderr = self.RunGsUtil(['cp', suri(object_uri), suri(fpath2)], |
| return_stderr=True) |
| self.assertIn('Resuming download', stderr) |
| with open(fpath2, 'r') as f: |
| self.assertEqual(f.read(), contents, 'File contents did not match.') |
| self.assertFalse(os.path.isfile(tracker_filename)) |
| self.assertFalse(os.path.isfile('%s_.gztmp' % fpath2)) |
| |
| @SkipForS3('No resumable upload support for S3.') |
| def test_cp_resumable_upload_bucket_deleted(self): |
| """Tests that a not found exception is raised if bucket no longer exists.""" |
| bucket_uri = self.CreateBucket() |
| fpath = self.CreateTempFile(contents='a' * 2 * ONE_KIB) |
| boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| test_callback_file = self.CreateTempFile( |
| contents=pickle.dumps( |
| _DeleteBucketThenStartOverCopyCallbackHandler(5, bucket_uri))) |
| |
| with SetBotoConfigForTest([boto_config_for_test]): |
| stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, |
| fpath, suri(bucket_uri)], return_stderr=True, |
| expected_status=1) |
| self.assertIn('Deleting bucket', stderr) |
| self.assertIn('bucket does not exist', stderr) |
| |
| @SkipForS3('No resumable upload support for S3.') |
| def test_cp_resumable_upload_start_over_http_error(self): |
| for start_over_error in (404, 410): |
| self.start_over_error_test_helper(start_over_error) |
| |
| def start_over_error_test_helper(self, http_error_num): |
| bucket_uri = self.CreateBucket() |
| fpath = self.CreateTempFile(contents='a' * 2 * ONE_KIB) |
| boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB)) |
| if self.test_api == ApiSelector.JSON: |
| test_callback_file = self.CreateTempFile( |
| contents=pickle.dumps(_JSONForceHTTPErrorCopyCallbackHandler(5, 404))) |
| elif self.test_api == ApiSelector.XML: |
| test_callback_file = self.CreateTempFile( |
| contents=pickle.dumps( |
| _XMLResumableUploadStartOverCopyCallbackHandler(5))) |
| |
| with SetBotoConfigForTest([boto_config_for_test]): |
| stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file, |
| fpath, suri(bucket_uri)], return_stderr=True) |
| self.assertIn('Restarting upload from scratch', stderr) |
| |
| def test_cp_minus_c(self): |
| bucket_uri = self.CreateBucket() |
| object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
| contents='foo') |
| self.RunGsUtil( |
| ['cp', '-c', suri(bucket_uri) + '/foo2', suri(object_uri), |
| suri(bucket_uri) + '/dir/'], |
| expected_status=1) |
| self.RunGsUtil(['stat', '%s/dir/foo' % suri(bucket_uri)]) |
| |
| def test_rewrite_cp(self): |
| """Tests the JSON Rewrite API.""" |
| if self.test_api == ApiSelector.XML: |
| return unittest.skip('Rewrite API is only supported in JSON.') |
| bucket_uri = self.CreateBucket() |
| object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
| contents='bar') |
| gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(), |
| self.default_provider) |
| key = object_uri.get_key() |
| src_obj_metadata = apitools_messages.Object( |
| name=key.name, bucket=key.bucket.name, contentType=key.content_type) |
| dst_obj_metadata = apitools_messages.Object( |
| bucket=src_obj_metadata.bucket, |
| name=self.MakeTempName('object'), |
| contentType=src_obj_metadata.contentType) |
| gsutil_api.CopyObject(src_obj_metadata, dst_obj_metadata) |
| self.assertEqual( |
| gsutil_api.GetObjectMetadata(src_obj_metadata.bucket, |
| src_obj_metadata.name, |
| fields=['md5Hash']).md5Hash, |
| gsutil_api.GetObjectMetadata(dst_obj_metadata.bucket, |
| dst_obj_metadata.name, |
| fields=['md5Hash']).md5Hash, |
| 'Error: Rewritten object\'s hash doesn\'t match source object.') |
| |
| def test_rewrite_cp_resume(self): |
| """Tests the JSON Rewrite API, breaking and resuming via a tracker file.""" |
| if self.test_api == ApiSelector.XML: |
| return unittest.skip('Rewrite API is only supported in JSON.') |
| bucket_uri = self.CreateBucket() |
| # Second bucket needs to be a different storage class so the service |
| # actually rewrites the bytes. |
| bucket_uri2 = self.CreateBucket( |
| storage_class='DURABLE_REDUCED_AVAILABILITY') |
| # maxBytesPerCall must be >= 1 MiB, so create an object > 2 MiB because we |
| # need 2 response from the service: 1 success, 1 failure prior to |
| # completion. |
| object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
| contents=('12'*ONE_MIB) + 'bar', |
| prefer_json_api=True) |
| gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(), |
| self.default_provider) |
| key = object_uri.get_key() |
| src_obj_metadata = apitools_messages.Object( |
| name=key.name, bucket=key.bucket.name, contentType=key.content_type, |
| etag=key.etag.strip('"\'')) |
| dst_obj_name = self.MakeTempName('object') |
| dst_obj_metadata = apitools_messages.Object( |
| bucket=bucket_uri2.bucket_name, |
| name=dst_obj_name, |
| contentType=src_obj_metadata.contentType) |
| tracker_file_name = GetRewriteTrackerFilePath( |
| src_obj_metadata.bucket, src_obj_metadata.name, |
| dst_obj_metadata.bucket, dst_obj_metadata.name, self.test_api) |
| try: |
| try: |
| gsutil_api.CopyObject( |
| src_obj_metadata, dst_obj_metadata, |
| progress_callback=_HaltingRewriteCallbackHandler(ONE_MIB*2).call, |
| max_bytes_per_call=ONE_MIB) |
| self.fail('Expected _RewriteHaltException.') |
| except _RewriteHaltException: |
| pass |
| |
| # Tracker file should be left over. |
| self.assertTrue(os.path.exists(tracker_file_name)) |
| |
| # Now resume. Callback ensures we didn't start over. |
| gsutil_api.CopyObject( |
| src_obj_metadata, dst_obj_metadata, |
| progress_callback=_EnsureRewriteResumeCallbackHandler(ONE_MIB*2).call, |
| max_bytes_per_call=ONE_MIB) |
| |
| # Copy completed; tracker file should be deleted. |
| self.assertFalse(os.path.exists(tracker_file_name)) |
| |
| self.assertEqual( |
| gsutil_api.GetObjectMetadata(src_obj_metadata.bucket, |
| src_obj_metadata.name, |
| fields=['md5Hash']).md5Hash, |
| gsutil_api.GetObjectMetadata(dst_obj_metadata.bucket, |
| dst_obj_metadata.name, |
| fields=['md5Hash']).md5Hash, |
| 'Error: Rewritten object\'s hash doesn\'t match source object.') |
| finally: |
| # Clean up if something went wrong. |
| DeleteTrackerFile(tracker_file_name) |
| |
| def test_rewrite_cp_resume_source_changed(self): |
| """Tests that Rewrite starts over when the source object has changed.""" |
| if self.test_api == ApiSelector.XML: |
| return unittest.skip('Rewrite API is only supported in JSON.') |
| bucket_uri = self.CreateBucket() |
| # Second bucket needs to be a different storage class so the service |
| # actually rewrites the bytes. |
| bucket_uri2 = self.CreateBucket( |
| storage_class='DURABLE_REDUCED_AVAILABILITY') |
| # maxBytesPerCall must be >= 1 MiB, so create an object > 2 MiB because we |
| # need 2 response from the service: 1 success, 1 failure prior to |
| # completion. |
| object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
| contents=('12'*ONE_MIB) + 'bar', |
| prefer_json_api=True) |
| gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(), |
| self.default_provider) |
| key = object_uri.get_key() |
| src_obj_metadata = apitools_messages.Object( |
| name=key.name, bucket=key.bucket.name, contentType=key.content_type, |
| etag=key.etag.strip('"\'')) |
| dst_obj_name = self.MakeTempName('object') |
| dst_obj_metadata = apitools_messages.Object( |
| bucket=bucket_uri2.bucket_name, |
| name=dst_obj_name, |
| contentType=src_obj_metadata.contentType) |
| tracker_file_name = GetRewriteTrackerFilePath( |
| src_obj_metadata.bucket, src_obj_metadata.name, |
| dst_obj_metadata.bucket, dst_obj_metadata.name, self.test_api) |
| try: |
| try: |
| gsutil_api.CopyObject( |
| src_obj_metadata, dst_obj_metadata, |
| progress_callback=_HaltingRewriteCallbackHandler(ONE_MIB*2).call, |
| max_bytes_per_call=ONE_MIB) |
| self.fail('Expected _RewriteHaltException.') |
| except _RewriteHaltException: |
| pass |
| # Overwrite the original object. |
| object_uri2 = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
| contents='bar', prefer_json_api=True) |
| key2 = object_uri2.get_key() |
| src_obj_metadata2 = apitools_messages.Object( |
| name=key2.name, bucket=key2.bucket.name, |
| contentType=key2.content_type, etag=key2.etag.strip('"\'')) |
| |
| # Tracker file for original object should still exist. |
| self.assertTrue(os.path.exists(tracker_file_name)) |
| |
| # Copy the new object. |
| gsutil_api.CopyObject(src_obj_metadata2, dst_obj_metadata, |
| max_bytes_per_call=ONE_MIB) |
| |
| # Copy completed; original tracker file should be deleted. |
| self.assertFalse(os.path.exists(tracker_file_name)) |
| |
| self.assertEqual( |
| gsutil_api.GetObjectMetadata(src_obj_metadata2.bucket, |
| src_obj_metadata2.name, |
| fields=['md5Hash']).md5Hash, |
| gsutil_api.GetObjectMetadata(dst_obj_metadata.bucket, |
| dst_obj_metadata.name, |
| fields=['md5Hash']).md5Hash, |
| 'Error: Rewritten object\'s hash doesn\'t match source object.') |
| finally: |
| # Clean up if something went wrong. |
| DeleteTrackerFile(tracker_file_name) |
| |
| def test_rewrite_cp_resume_command_changed(self): |
| """Tests that Rewrite starts over when the arguments changed.""" |
| if self.test_api == ApiSelector.XML: |
| return unittest.skip('Rewrite API is only supported in JSON.') |
| bucket_uri = self.CreateBucket() |
| # Second bucket needs to be a different storage class so the service |
| # actually rewrites the bytes. |
| bucket_uri2 = self.CreateBucket( |
| storage_class='DURABLE_REDUCED_AVAILABILITY') |
| # maxBytesPerCall must be >= 1 MiB, so create an object > 2 MiB because we |
| # need 2 response from the service: 1 success, 1 failure prior to |
| # completion. |
| object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
| contents=('12'*ONE_MIB) + 'bar', |
| prefer_json_api=True) |
| gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(), |
| self.default_provider) |
| key = object_uri.get_key() |
| src_obj_metadata = apitools_messages.Object( |
| name=key.name, bucket=key.bucket.name, contentType=key.content_type, |
| etag=key.etag.strip('"\'')) |
| dst_obj_name = self.MakeTempName('object') |
| dst_obj_metadata = apitools_messages.Object( |
| bucket=bucket_uri2.bucket_name, |
| name=dst_obj_name, |
| contentType=src_obj_metadata.contentType) |
| tracker_file_name = GetRewriteTrackerFilePath( |
| src_obj_metadata.bucket, src_obj_metadata.name, |
| dst_obj_metadata.bucket, dst_obj_metadata.name, self.test_api) |
| try: |
| try: |
| gsutil_api.CopyObject( |
| src_obj_metadata, dst_obj_metadata, canned_acl='private', |
| progress_callback=_HaltingRewriteCallbackHandler(ONE_MIB*2).call, |
| max_bytes_per_call=ONE_MIB) |
| self.fail('Expected _RewriteHaltException.') |
| except _RewriteHaltException: |
| pass |
| |
| # Tracker file for original object should still exist. |
| self.assertTrue(os.path.exists(tracker_file_name)) |
| |
| # Copy the same object but with different call parameters. |
| gsutil_api.CopyObject(src_obj_metadata, dst_obj_metadata, |
| canned_acl='public-read', |
| max_bytes_per_call=ONE_MIB) |
| |
| # Copy completed; original tracker file should be deleted. |
| self.assertFalse(os.path.exists(tracker_file_name)) |
| |
| new_obj_metadata = gsutil_api.GetObjectMetadata( |
| dst_obj_metadata.bucket, dst_obj_metadata.name, |
| fields=['acl,md5Hash']) |
| self.assertEqual( |
| gsutil_api.GetObjectMetadata(src_obj_metadata.bucket, |
| src_obj_metadata.name, |
| fields=['md5Hash']).md5Hash, |
| new_obj_metadata.md5Hash, |
| 'Error: Rewritten object\'s hash doesn\'t match source object.') |
| # New object should have a public-read ACL from the second command. |
| found_public_acl = False |
| for acl_entry in new_obj_metadata.acl: |
| if acl_entry.entity == 'allUsers': |
| found_public_acl = True |
| self.assertTrue(found_public_acl, |
| 'New object was not written with a public ACL.') |
| finally: |
| # Clean up if something went wrong. |
| DeleteTrackerFile(tracker_file_name) |
| |
| |
| class TestCpUnitTests(testcase.GsUtilUnitTestCase): |
| """Unit tests for gsutil cp.""" |
| |
| def testDownloadWithNoHashAvailable(self): |
| """Tests a download with no valid server-supplied hash.""" |
| # S3 should have a special message for non-MD5 etags. |
| bucket_uri = self.CreateBucket(provider='s3') |
| object_uri = self.CreateObject(bucket_uri=bucket_uri, contents='foo') |
| object_uri.get_key().etag = '12345' # Not an MD5 |
| dst_dir = self.CreateTempDir() |
| |
| log_handler = self.RunCommand( |
| 'cp', [suri(object_uri), dst_dir], return_log_handler=True) |
| warning_messages = log_handler.messages['warning'] |
| self.assertEquals(2, len(warning_messages)) |
| self.assertRegexpMatches( |
| warning_messages[0], |
| r'Non-MD5 etag \(12345\) present for key .*, ' |
| r'data integrity checks are not possible') |
| self.assertIn('Integrity cannot be assured', warning_messages[1]) |
| |
| def test_object_and_prefix_same_name(self): |
| bucket_uri = self.CreateBucket() |
| object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo', |
| contents='foo') |
| self.CreateObject(bucket_uri=bucket_uri, |
| object_name='foo/bar', contents='bar') |
| fpath = self.CreateTempFile() |
| # MockKey doesn't support hash_algs, so the MD5 will not match. |
| with SetBotoConfigForTest([('GSUtil', 'check_hashes', 'never')]): |
| self.RunCommand('cp', [suri(object_uri), fpath]) |
| with open(fpath, 'r') as f: |
| self.assertEqual(f.read(), 'foo') |
| |
| def test_cp_upload_respects_no_hashes(self): |
| bucket_uri = self.CreateBucket() |
| fpath = self.CreateTempFile(contents='abcd') |
| with SetBotoConfigForTest([('GSUtil', 'check_hashes', 'never')]): |
| log_handler = self.RunCommand('cp', [fpath, suri(bucket_uri)], |
| return_log_handler=True) |
| warning_messages = log_handler.messages['warning'] |
| self.assertEquals(1, len(warning_messages)) |
| self.assertIn('Found no hashes to validate object upload', |
| warning_messages[0]) |