| # -*- coding: utf-8 -*- |
| # Copyright 2013 Google Inc. All Rights Reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """XML/boto gsutil Cloud API implementation for GCS and Amazon S3.""" |
| |
| from __future__ import absolute_import |
| |
| import base64 |
| import binascii |
| import datetime |
| import errno |
| import httplib |
| import json |
| import multiprocessing |
| import os |
| import pickle |
| import random |
| import re |
| import socket |
| import tempfile |
| import textwrap |
| import threading |
| import time |
| import xml |
| from xml.dom.minidom import parseString as XmlParseString |
| from xml.sax import _exceptions as SaxExceptions |
| |
| import boto |
| from boto import handler |
| from boto.exception import ResumableDownloadException as BotoResumableDownloadException |
| from boto.exception import ResumableTransferDisposition |
| from boto.gs.cors import Cors |
| from boto.gs.lifecycle import LifecycleConfig |
| from boto.s3.cors import CORSConfiguration as S3Cors |
| from boto.s3.deletemarker import DeleteMarker |
| from boto.s3.lifecycle import Lifecycle as S3Lifecycle |
| from boto.s3.prefix import Prefix |
| |
| from gslib.boto_resumable_upload import BotoResumableUpload |
| from gslib.cloud_api import AccessDeniedException |
| from gslib.cloud_api import ArgumentException |
| from gslib.cloud_api import BadRequestException |
| from gslib.cloud_api import CloudApi |
| from gslib.cloud_api import NotEmptyException |
| from gslib.cloud_api import NotFoundException |
| from gslib.cloud_api import PreconditionException |
| from gslib.cloud_api import ResumableDownloadException |
| from gslib.cloud_api import ResumableUploadAbortException |
| from gslib.cloud_api import ResumableUploadException |
| from gslib.cloud_api import ResumableUploadStartOverException |
| from gslib.cloud_api import ServiceException |
| from gslib.cloud_api_helper import ValidateDstObjectMetadata |
| # Imported for boto AuthHandler purposes. |
| import gslib.devshell_auth_plugin # pylint: disable=unused-import |
| from gslib.exception import CommandException |
| from gslib.exception import InvalidUrlError |
| from gslib.hashing_helper import Base64EncodeHash |
| from gslib.hashing_helper import Base64ToHexHash |
| from gslib.project_id import GOOG_PROJ_ID_HDR |
| from gslib.project_id import PopulateProjectId |
| from gslib.storage_url import StorageUrlFromString |
| from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages |
| from gslib.translation_helper import AclTranslation |
| from gslib.translation_helper import AddS3MarkerAclToObjectMetadata |
| from gslib.translation_helper import CorsTranslation |
| from gslib.translation_helper import CreateBucketNotFoundException |
| from gslib.translation_helper import CreateNotFoundExceptionForObjectWrite |
| from gslib.translation_helper import CreateObjectNotFoundException |
| from gslib.translation_helper import DEFAULT_CONTENT_TYPE |
| from gslib.translation_helper import EncodeStringAsLong |
| from gslib.translation_helper import GenerationFromUrlAndString |
| from gslib.translation_helper import HeadersFromObjectMetadata |
| from gslib.translation_helper import LifecycleTranslation |
| from gslib.translation_helper import REMOVE_CORS_CONFIG |
| from gslib.translation_helper import S3MarkerAclFromObjectMetadata |
| from gslib.util import ConfigureNoOpAuthIfNeeded |
| from gslib.util import DEFAULT_FILE_BUFFER_SIZE |
| from gslib.util import GetMaxRetryDelay |
| from gslib.util import GetNumRetries |
| from gslib.util import S3_DELETE_MARKER_GUID |
| from gslib.util import TWO_MIB |
| from gslib.util import UnaryDictToXml |
| from gslib.util import UTF8 |
| from gslib.util import XML_PROGRESS_CALLBACKS |
| |
| TRANSLATABLE_BOTO_EXCEPTIONS = (boto.exception.BotoServerError, |
| boto.exception.InvalidUriError, |
| boto.exception.ResumableDownloadException, |
| boto.exception.ResumableUploadException, |
| boto.exception.StorageCreateError, |
| boto.exception.StorageResponseError) |
| |
| # pylint: disable=global-at-module-level |
| global boto_auth_initialized, boto_auth_initialized_lock |
| # If multiprocessing is available, these will be overridden to process-safe |
| # variables in InitializeMultiprocessingVariables. |
| boto_auth_initialized_lock = threading.Lock() |
| boto_auth_initialized = False |
| |
| NON_EXISTENT_OBJECT_REGEX = re.compile(r'.*non-\s*existent\s*object', |
| flags=re.DOTALL) |
| # Determines whether an etag is a valid MD5. |
| MD5_REGEX = re.compile(r'^"*[a-fA-F0-9]{32}"*$') |
| |
| |
| def InitializeMultiprocessingVariables(): # pylint: disable=invalid-name |
| """Perform necessary initialization for multiprocessing. |
| |
| See gslib.command.InitializeMultiprocessingVariables for an explanation |
| of why this is necessary. |
| """ |
| # pylint: disable=global-variable-undefined |
| global boto_auth_initialized, boto_auth_initialized_lock |
| boto_auth_initialized_lock = gslib.util.CreateLock() |
| boto_auth_initialized = multiprocessing.Value('i', 0) |
| |
| |
| class DownloadProxyCallbackHandler(object): |
| """Intermediary callback to keep track of the number of bytes downloaded.""" |
| |
| def __init__(self, start_byte, callback): |
| self._start_byte = start_byte |
| self._callback = callback |
| |
| def call(self, bytes_downloaded, total_size): |
| """Saves necessary data and then calls the given Cloud API callback. |
| |
| Args: |
| bytes_downloaded: Number of bytes processed so far. |
| total_size: Total size of the ongoing operation. |
| """ |
| if self._callback: |
| self._callback(self._start_byte + bytes_downloaded, total_size) |
| |
| |
| class BotoTranslation(CloudApi): |
| """Boto-based XML translation implementation of gsutil Cloud API. |
| |
| This class takes gsutil Cloud API objects, translates them to XML service |
| calls, and translates the results back into gsutil Cloud API objects for |
| use by the caller. |
| """ |
| |
| def __init__(self, bucket_storage_uri_class, logger, provider=None, |
| credentials=None, debug=0, trace_token=None): |
| """Performs necessary setup for interacting with the cloud storage provider. |
| |
| Args: |
| bucket_storage_uri_class: boto storage_uri class, used by APIs that |
| provide boto translation or mocking. |
| logger: logging.logger for outputting log messages. |
| provider: Provider prefix describing cloud storage provider to connect to. |
| 'gs' and 's3' are supported. Function implementations ignore |
| the provider argument and use this one instead. |
| credentials: Unused. |
| debug: Debug level for the API implementation (0..3). |
| trace_token: Unused in this subclass. |
| """ |
| super(BotoTranslation, self).__init__(bucket_storage_uri_class, logger, |
| provider=provider, debug=debug) |
| _ = credentials |
| # pylint: disable=global-variable-undefined, global-variable-not-assigned |
| global boto_auth_initialized, boto_auth_initialized_lock |
| with boto_auth_initialized_lock: |
| ConfigureNoOpAuthIfNeeded() |
| if isinstance(boto_auth_initialized, bool): |
| boto_auth_initialized = True |
| else: |
| boto_auth_initialized.value = 1 |
| self.api_version = boto.config.get_value( |
| 'GSUtil', 'default_api_version', '1') |
| |
| def GetBucket(self, bucket_name, provider=None, fields=None): |
| """See CloudApi class for function doc strings.""" |
| _ = provider |
| bucket_uri = self._StorageUriForBucket(bucket_name) |
| headers = {} |
| self._AddApiVersionToHeaders(headers) |
| try: |
| return self._BotoBucketToBucket(bucket_uri.get_bucket(validate=True, |
| headers=headers), |
| fields=fields) |
| except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e, bucket_name=bucket_name) |
| |
| def ListBuckets(self, project_id=None, provider=None, fields=None): |
| """See CloudApi class for function doc strings.""" |
| _ = provider |
| get_fields = self._ListToGetFields(list_fields=fields) |
| headers = {} |
| self._AddApiVersionToHeaders(headers) |
| if self.provider == 'gs': |
| headers[GOOG_PROJ_ID_HDR] = PopulateProjectId(project_id) |
| try: |
| provider_uri = boto.storage_uri( |
| '%s://' % self.provider, |
| suppress_consec_slashes=False, |
| bucket_storage_uri_class=self.bucket_storage_uri_class, |
| debug=self.debug) |
| |
| buckets_iter = provider_uri.get_all_buckets(headers=headers) |
| for bucket in buckets_iter: |
| if self.provider == 's3' and bucket.name.lower() != bucket.name: |
| # S3 listings can return buckets with upper-case names, but boto |
| # can't successfully call them. |
| continue |
| yield self._BotoBucketToBucket(bucket, fields=get_fields) |
| except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e) |
| |
| def PatchBucket(self, bucket_name, metadata, canned_acl=None, |
| canned_def_acl=None, preconditions=None, provider=None, |
| fields=None): |
| """See CloudApi class for function doc strings.""" |
| _ = provider |
| bucket_uri = self._StorageUriForBucket(bucket_name) |
| headers = {} |
| self._AddApiVersionToHeaders(headers) |
| try: |
| self._AddPreconditionsToHeaders(preconditions, headers) |
| if metadata.acl: |
| boto_acl = AclTranslation.BotoAclFromMessage(metadata.acl) |
| bucket_uri.set_xml_acl(boto_acl.to_xml(), headers=headers) |
| if canned_acl: |
| canned_acls = bucket_uri.canned_acls() |
| if canned_acl not in canned_acls: |
| raise CommandException('Invalid canned ACL "%s".' % canned_acl) |
| bucket_uri.set_acl(canned_acl, bucket_uri.object_name) |
| if canned_def_acl: |
| canned_acls = bucket_uri.canned_acls() |
| if canned_def_acl not in canned_acls: |
| raise CommandException('Invalid canned ACL "%s".' % canned_def_acl) |
| bucket_uri.set_def_acl(canned_def_acl, bucket_uri.object_name) |
| if metadata.cors: |
| if metadata.cors == REMOVE_CORS_CONFIG: |
| metadata.cors = [] |
| boto_cors = CorsTranslation.BotoCorsFromMessage(metadata.cors) |
| bucket_uri.set_cors(boto_cors, False) |
| if metadata.defaultObjectAcl: |
| boto_acl = AclTranslation.BotoAclFromMessage( |
| metadata.defaultObjectAcl) |
| bucket_uri.set_def_xml_acl(boto_acl.to_xml(), headers=headers) |
| if metadata.lifecycle: |
| boto_lifecycle = LifecycleTranslation.BotoLifecycleFromMessage( |
| metadata.lifecycle) |
| bucket_uri.configure_lifecycle(boto_lifecycle, False) |
| if metadata.logging: |
| if self.provider == 'gs': |
| headers[GOOG_PROJ_ID_HDR] = PopulateProjectId(None) |
| if metadata.logging.logBucket and metadata.logging.logObjectPrefix: |
| bucket_uri.enable_logging(metadata.logging.logBucket, |
| metadata.logging.logObjectPrefix, |
| False, headers) |
| else: # Logging field is present and empty. Disable logging. |
| bucket_uri.disable_logging(False, headers) |
| if metadata.versioning: |
| bucket_uri.configure_versioning(metadata.versioning.enabled, |
| headers=headers) |
| if metadata.website: |
| main_page_suffix = metadata.website.mainPageSuffix |
| error_page = metadata.website.notFoundPage |
| bucket_uri.set_website_config(main_page_suffix, error_page) |
| return self.GetBucket(bucket_name, fields=fields) |
| except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e, bucket_name=bucket_name) |
| |
| def CreateBucket(self, bucket_name, project_id=None, metadata=None, |
| provider=None, fields=None): |
| """See CloudApi class for function doc strings.""" |
| _ = provider |
| bucket_uri = self._StorageUriForBucket(bucket_name) |
| location = '' |
| if metadata and metadata.location: |
| location = metadata.location |
| # Pass storage_class param only if this is a GCS bucket. (In S3 the |
| # storage class is specified on the key object.) |
| headers = {} |
| if bucket_uri.scheme == 'gs': |
| self._AddApiVersionToHeaders(headers) |
| headers[GOOG_PROJ_ID_HDR] = PopulateProjectId(project_id) |
| storage_class = '' |
| if metadata and metadata.storageClass: |
| storage_class = metadata.storageClass |
| try: |
| bucket_uri.create_bucket(headers=headers, location=location, |
| storage_class=storage_class) |
| except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e, bucket_name=bucket_name) |
| else: |
| try: |
| bucket_uri.create_bucket(headers=headers, location=location) |
| except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e, bucket_name=bucket_name) |
| return self.GetBucket(bucket_name, fields=fields) |
| |
| def DeleteBucket(self, bucket_name, preconditions=None, provider=None): |
| """See CloudApi class for function doc strings.""" |
| _ = provider, preconditions |
| bucket_uri = self._StorageUriForBucket(bucket_name) |
| headers = {} |
| self._AddApiVersionToHeaders(headers) |
| try: |
| bucket_uri.delete_bucket(headers=headers) |
| except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
| translated_exception = self._TranslateBotoException( |
| e, bucket_name=bucket_name) |
| if (translated_exception and |
| 'BucketNotEmpty' in translated_exception.reason): |
| try: |
| if bucket_uri.get_versioning_config(): |
| if self.provider == 's3': |
| raise NotEmptyException( |
| 'VersionedBucketNotEmpty (%s). Currently, gsutil does not ' |
| 'support listing or removing S3 DeleteMarkers, so you may ' |
| 'need to delete these using another tool to successfully ' |
| 'delete this bucket.' % bucket_name, status=e.status) |
| raise NotEmptyException( |
| 'VersionedBucketNotEmpty (%s)' % bucket_name, status=e.status) |
| else: |
| raise NotEmptyException('BucketNotEmpty (%s)' % bucket_name, |
| status=e.status) |
| except TRANSLATABLE_BOTO_EXCEPTIONS, e2: |
| self._TranslateExceptionAndRaise(e2, bucket_name=bucket_name) |
| elif translated_exception and translated_exception.status == 404: |
| raise NotFoundException('Bucket %s does not exist.' % bucket_name) |
| else: |
| self._TranslateExceptionAndRaise(e, bucket_name=bucket_name) |
| |
| def ListObjects(self, bucket_name, prefix=None, delimiter=None, |
| all_versions=None, provider=None, fields=None): |
| """See CloudApi class for function doc strings.""" |
| _ = provider |
| get_fields = self._ListToGetFields(list_fields=fields) |
| bucket_uri = self._StorageUriForBucket(bucket_name) |
| headers = {} |
| yield_prefixes = fields is None or 'prefixes' in fields |
| yield_objects = fields is None or any( |
| field.startswith('items/') for field in fields) |
| self._AddApiVersionToHeaders(headers) |
| try: |
| objects_iter = bucket_uri.list_bucket(prefix=prefix or '', |
| delimiter=delimiter or '', |
| all_versions=all_versions, |
| headers=headers) |
| except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e, bucket_name=bucket_name) |
| |
| try: |
| for key in objects_iter: |
| if yield_prefixes and isinstance(key, Prefix): |
| yield CloudApi.CsObjectOrPrefix(key.name, |
| CloudApi.CsObjectOrPrefixType.PREFIX) |
| elif yield_objects: |
| key_to_convert = key |
| |
| # Listed keys are populated with these fields during bucket listing. |
| key_http_fields = set(['bucket', 'etag', 'name', 'updated', |
| 'generation', 'metageneration', 'size']) |
| |
| # When fields == None, the caller is requesting all possible fields. |
| # If the caller requested any fields that are not populated by bucket |
| # listing, we'll need to make a separate HTTP call for each object to |
| # get its metadata and populate the remaining fields with the result. |
| if not get_fields or (get_fields and not |
| get_fields.issubset(key_http_fields)): |
| |
| generation = None |
| if getattr(key, 'generation', None): |
| generation = key.generation |
| if getattr(key, 'version_id', None): |
| generation = key.version_id |
| key_to_convert = self._GetBotoKey(bucket_name, key.name, |
| generation=generation) |
| return_object = self._BotoKeyToObject(key_to_convert, |
| fields=get_fields) |
| |
| yield CloudApi.CsObjectOrPrefix(return_object, |
| CloudApi.CsObjectOrPrefixType.OBJECT) |
| except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e, bucket_name=bucket_name) |
| |
| def GetObjectMetadata(self, bucket_name, object_name, generation=None, |
| provider=None, fields=None): |
| """See CloudApi class for function doc strings.""" |
| _ = provider |
| try: |
| return self._BotoKeyToObject(self._GetBotoKey(bucket_name, object_name, |
| generation=generation), |
| fields=fields) |
| except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e, bucket_name=bucket_name, |
| object_name=object_name, |
| generation=generation) |
| |
| def _CurryDigester(self, digester_object): |
| """Curries a digester object into a form consumable by boto. |
| |
| Key instantiates its own digesters by calling hash_algs[alg]() [note there |
| are no arguments to this function]. So in order to pass in our caught-up |
| digesters during a resumable download, we need to pass the digester |
| object but don't get to look it up based on the algorithm name. Here we |
| use a lambda to make lookup implicit. |
| |
| Args: |
| digester_object: Input object to be returned by the created function. |
| |
| Returns: |
| A function which when called will return the input object. |
| """ |
| return lambda: digester_object |
| |
| def GetObjectMedia( |
| self, bucket_name, object_name, download_stream, provider=None, |
| generation=None, object_size=None, |
| download_strategy=CloudApi.DownloadStrategy.ONE_SHOT, |
| start_byte=0, end_byte=None, progress_callback=None, |
| serialization_data=None, digesters=None): |
| """See CloudApi class for function doc strings.""" |
| # This implementation will get the object metadata first if we don't pass it |
| # in via serialization_data. |
| headers = {} |
| self._AddApiVersionToHeaders(headers) |
| if 'accept-encoding' not in headers: |
| headers['accept-encoding'] = 'gzip' |
| if end_byte is not None: |
| headers['range'] = 'bytes=%s-%s' % (start_byte, end_byte) |
| elif start_byte > 0: |
| headers['range'] = 'bytes=%s-' % (start_byte) |
| elif start_byte < 0: |
| headers['range'] = 'bytes=%s' % (start_byte) |
| |
| # Since in most cases we already made a call to get the object metadata, |
| # here we avoid an extra HTTP call by unpickling the key. This is coupled |
| # with the implementation in _BotoKeyToObject. |
| if serialization_data: |
| serialization_dict = json.loads(serialization_data) |
| key = pickle.loads(binascii.a2b_base64(serialization_dict['url'])) |
| else: |
| key = self._GetBotoKey(bucket_name, object_name, generation=generation) |
| |
| if digesters and self.provider == 'gs': |
| hash_algs = {} |
| for alg in digesters: |
| hash_algs[alg] = self._CurryDigester(digesters[alg]) |
| else: |
| hash_algs = {} |
| |
| total_size = object_size or 0 |
| if serialization_data: |
| total_size = json.loads(serialization_data)['total_size'] |
| |
| if total_size: |
| num_progress_callbacks = max(int(total_size) / TWO_MIB, |
| XML_PROGRESS_CALLBACKS) |
| else: |
| num_progress_callbacks = XML_PROGRESS_CALLBACKS |
| |
| try: |
| if download_strategy is CloudApi.DownloadStrategy.RESUMABLE: |
| self._PerformResumableDownload( |
| download_stream, start_byte, end_byte, key, |
| headers=headers, callback=progress_callback, |
| num_callbacks=num_progress_callbacks, hash_algs=hash_algs) |
| elif download_strategy is CloudApi.DownloadStrategy.ONE_SHOT: |
| self._PerformSimpleDownload( |
| download_stream, key, progress_callback=progress_callback, |
| num_progress_callbacks=num_progress_callbacks, headers=headers, |
| hash_algs=hash_algs) |
| else: |
| raise ArgumentException('Unsupported DownloadStrategy: %s' % |
| download_strategy) |
| except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e, bucket_name=bucket_name, |
| object_name=object_name, |
| generation=generation) |
| |
| if self.provider == 's3': |
| if digesters: |
| |
| class HashToDigester(object): |
| """Wrapper class to expose hash digests. |
| |
| boto creates its own digesters in s3's get_file, returning on-the-fly |
| hashes only by way of key.local_hashes. To propagate the digest back |
| to the caller, this stub class implements the digest() function. |
| """ |
| |
| def __init__(self, hash_val): |
| self.hash_val = hash_val |
| |
| def digest(self): # pylint: disable=invalid-name |
| return self.hash_val |
| |
| for alg_name in digesters: |
| if ((download_strategy == CloudApi.DownloadStrategy.RESUMABLE and |
| start_byte != 0) or |
| not ((getattr(key, 'local_hashes', None) and |
| alg_name in key.local_hashes))): |
| # For resumable downloads, boto does not provide a mechanism to |
| # catch up the hash in the case of a partially complete download. |
| # In this case or in the case where no digest was successfully |
| # calculated, set the digester to None, which indicates that we'll |
| # need to manually calculate the hash from the local file once it |
| # is complete. |
| digesters[alg_name] = None |
| else: |
| # Use the on-the-fly hash. |
| digesters[alg_name] = HashToDigester(key.local_hashes[alg_name]) |
| |
| def _PerformSimpleDownload(self, download_stream, key, progress_callback=None, |
| num_progress_callbacks=XML_PROGRESS_CALLBACKS, |
| headers=None, hash_algs=None): |
| if not headers: |
| headers = {} |
| self._AddApiVersionToHeaders(headers) |
| try: |
| key.get_contents_to_file(download_stream, cb=progress_callback, |
| num_cb=num_progress_callbacks, headers=headers, |
| hash_algs=hash_algs) |
| except TypeError: # s3 and mocks do not support hash_algs |
| key.get_contents_to_file(download_stream, cb=progress_callback, |
| num_cb=num_progress_callbacks, headers=headers) |
| |
| def _PerformResumableDownload(self, fp, start_byte, end_byte, key, |
| headers=None, callback=None, |
| num_callbacks=XML_PROGRESS_CALLBACKS, |
| hash_algs=None): |
| """Downloads bytes from key to fp, resuming as needed. |
| |
| Args: |
| fp: File pointer into which data should be downloaded. |
| start_byte: Start byte of the download. |
| end_byte: End byte of the download. |
| key: Key object from which data is to be downloaded |
| headers: Headers to send when retrieving the file |
| callback: (optional) a callback function that will be called to report |
| progress on the download. The callback should accept two integer |
| parameters. The first integer represents the number of |
| bytes that have been successfully transmitted from the service. The |
| second represents the total number of bytes that need to be |
| transmitted. |
| num_callbacks: (optional) If a callback is specified with the callback |
| parameter, this determines the granularity of the callback |
| by defining the maximum number of times the callback will be |
| called during the file transfer. |
| hash_algs: Dict of hash algorithms to apply to downloaded bytes. |
| |
| Raises: |
| ResumableDownloadException on error. |
| """ |
| if not headers: |
| headers = {} |
| self._AddApiVersionToHeaders(headers) |
| |
| retryable_exceptions = (httplib.HTTPException, IOError, socket.error, |
| socket.gaierror) |
| |
| debug = key.bucket.connection.debug |
| |
| num_retries = GetNumRetries() |
| progress_less_iterations = 0 |
| last_progress_byte = start_byte |
| |
| while True: # Retry as long as we're making progress. |
| try: |
| cb_handler = DownloadProxyCallbackHandler(start_byte, callback) |
| headers = headers.copy() |
| headers['Range'] = 'bytes=%d-%d' % (start_byte, end_byte) |
| |
| # Disable AWSAuthConnection-level retry behavior, since that would |
| # cause downloads to restart from scratch. |
| try: |
| key.get_file(fp, headers, cb_handler.call, num_callbacks, |
| override_num_retries=0, hash_algs=hash_algs) |
| except TypeError: |
| key.get_file(fp, headers, cb_handler.call, num_callbacks, |
| override_num_retries=0) |
| fp.flush() |
| # Download succeeded. |
| return |
| except retryable_exceptions, e: |
| if debug >= 1: |
| self.logger.info('Caught exception (%s)', repr(e)) |
| if isinstance(e, IOError) and e.errno == errno.EPIPE: |
| # Broken pipe error causes httplib to immediately |
| # close the socket (http://bugs.python.org/issue5542), |
| # so we need to close and reopen the key before resuming |
| # the download. |
| if self.provider == 's3': |
| key.get_file(fp, headers, cb_handler.call, num_callbacks, |
| override_num_retries=0) |
| else: # self.provider == 'gs' |
| key.get_file(fp, headers, cb_handler.call, num_callbacks, |
| override_num_retries=0, hash_algs=hash_algs) |
| except BotoResumableDownloadException, e: |
| if (e.disposition == |
| ResumableTransferDisposition.ABORT_CUR_PROCESS): |
| raise ResumableDownloadException(e.message) |
| else: |
| if debug >= 1: |
| self.logger.info('Caught ResumableDownloadException (%s) - will ' |
| 'retry', e.message) |
| |
| # At this point we had a re-tryable failure; see if made progress. |
| start_byte = fp.tell() |
| if start_byte > last_progress_byte: |
| last_progress_byte = start_byte |
| progress_less_iterations = 0 |
| else: |
| progress_less_iterations += 1 |
| |
| if progress_less_iterations > num_retries: |
| # Don't retry any longer in the current process. |
| raise ResumableDownloadException( |
| 'Too many resumable download attempts failed without ' |
| 'progress. You might try this download again later') |
| |
| # Close the key, in case a previous download died partway |
| # through and left data in the underlying key HTTP buffer. |
| # Do this within a try/except block in case the connection is |
| # closed (since key.close() attempts to do a final read, in which |
| # case this read attempt would get an IncompleteRead exception, |
| # which we can safely ignore). |
| try: |
| key.close() |
| except httplib.IncompleteRead: |
| pass |
| |
| sleep_time_secs = min(random.random() * (2 ** progress_less_iterations), |
| GetMaxRetryDelay()) |
| if debug >= 1: |
| self.logger.info( |
| 'Got retryable failure (%d progress-less in a row).\nSleeping %d ' |
| 'seconds before re-trying', progress_less_iterations, |
| sleep_time_secs) |
| time.sleep(sleep_time_secs) |
| |
| def PatchObjectMetadata(self, bucket_name, object_name, metadata, |
| canned_acl=None, generation=None, preconditions=None, |
| provider=None, fields=None): |
| """See CloudApi class for function doc strings.""" |
| _ = provider |
| object_uri = self._StorageUriForObject(bucket_name, object_name, |
| generation=generation) |
| |
| headers = {} |
| self._AddApiVersionToHeaders(headers) |
| meta_headers = HeadersFromObjectMetadata(metadata, self.provider) |
| |
| metadata_plus = {} |
| metadata_minus = set() |
| metadata_changed = False |
| for k, v in meta_headers.iteritems(): |
| metadata_changed = True |
| if v is None: |
| metadata_minus.add(k) |
| else: |
| metadata_plus[k] = v |
| |
| self._AddPreconditionsToHeaders(preconditions, headers) |
| |
| if metadata_changed: |
| try: |
| object_uri.set_metadata(metadata_plus, metadata_minus, False, |
| headers=headers) |
| except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e, bucket_name=bucket_name, |
| object_name=object_name, |
| generation=generation) |
| |
| if metadata.acl: |
| boto_acl = AclTranslation.BotoAclFromMessage(metadata.acl) |
| try: |
| object_uri.set_xml_acl(boto_acl.to_xml(), key_name=object_name) |
| except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e, bucket_name=bucket_name, |
| object_name=object_name, |
| generation=generation) |
| if canned_acl: |
| canned_acls = object_uri.canned_acls() |
| if canned_acl not in canned_acls: |
| raise CommandException('Invalid canned ACL "%s".' % canned_acl) |
| object_uri.set_acl(canned_acl, object_uri.object_name) |
| |
| return self.GetObjectMetadata(bucket_name, object_name, |
| generation=generation, fields=fields) |
| |
| def _PerformSimpleUpload(self, dst_uri, upload_stream, md5=None, |
| canned_acl=None, progress_callback=None, |
| headers=None): |
| dst_uri.set_contents_from_file(upload_stream, md5=md5, policy=canned_acl, |
| cb=progress_callback, headers=headers) |
| |
| def _PerformStreamingUpload(self, dst_uri, upload_stream, canned_acl=None, |
| progress_callback=None, headers=None): |
| if dst_uri.get_provider().supports_chunked_transfer(): |
| dst_uri.set_contents_from_stream(upload_stream, policy=canned_acl, |
| cb=progress_callback, headers=headers) |
| else: |
| # Provider doesn't support chunked transfer, so copy to a temporary |
| # file. |
| (temp_fh, temp_path) = tempfile.mkstemp() |
| try: |
| with open(temp_path, 'wb') as out_fp: |
| stream_bytes = upload_stream.read(DEFAULT_FILE_BUFFER_SIZE) |
| while stream_bytes: |
| out_fp.write(stream_bytes) |
| stream_bytes = upload_stream.read(DEFAULT_FILE_BUFFER_SIZE) |
| with open(temp_path, 'rb') as in_fp: |
| dst_uri.set_contents_from_file(in_fp, policy=canned_acl, |
| headers=headers) |
| finally: |
| os.close(temp_fh) |
| os.unlink(temp_path) |
| |
| def _PerformResumableUpload(self, key, upload_stream, upload_size, |
| tracker_callback, canned_acl=None, |
| serialization_data=None, progress_callback=None, |
| headers=None): |
| resumable_upload = BotoResumableUpload( |
| tracker_callback, self.logger, resume_url=serialization_data) |
| resumable_upload.SendFile(key, upload_stream, upload_size, |
| canned_acl=canned_acl, cb=progress_callback, |
| headers=headers) |
| |
| def _UploadSetup(self, object_metadata, preconditions=None): |
| """Shared upload implementation. |
| |
| Args: |
| object_metadata: Object metadata describing destination object. |
| preconditions: Optional gsutil Cloud API preconditions. |
| |
| Returns: |
| Headers dictionary, StorageUri for upload (based on inputs) |
| """ |
| ValidateDstObjectMetadata(object_metadata) |
| |
| headers = HeadersFromObjectMetadata(object_metadata, self.provider) |
| self._AddApiVersionToHeaders(headers) |
| |
| if object_metadata.crc32c: |
| if 'x-goog-hash' in headers: |
| headers['x-goog-hash'] += ( |
| ',crc32c=%s' % object_metadata.crc32c.rstrip('\n')) |
| else: |
| headers['x-goog-hash'] = ( |
| 'crc32c=%s' % object_metadata.crc32c.rstrip('\n')) |
| if object_metadata.md5Hash: |
| if 'x-goog-hash' in headers: |
| headers['x-goog-hash'] += ( |
| ',md5=%s' % object_metadata.md5Hash.rstrip('\n')) |
| else: |
| headers['x-goog-hash'] = ( |
| 'md5=%s' % object_metadata.md5Hash.rstrip('\n')) |
| |
| if 'content-type' in headers and not headers['content-type']: |
| headers['content-type'] = 'application/octet-stream' |
| |
| self._AddPreconditionsToHeaders(preconditions, headers) |
| |
| dst_uri = self._StorageUriForObject(object_metadata.bucket, |
| object_metadata.name) |
| return headers, dst_uri |
| |
| def _HandleSuccessfulUpload(self, dst_uri, object_metadata, fields=None): |
| """Set ACLs on an uploaded object and return its metadata. |
| |
| Args: |
| dst_uri: Generation-specific StorageUri describing the object. |
| object_metadata: Metadata for the object, including an ACL if applicable. |
| fields: If present, return only these Object metadata fields. |
| |
| Returns: |
| gsutil Cloud API Object metadata. |
| |
| Raises: |
| CommandException if the object was overwritten / deleted concurrently. |
| """ |
| try: |
| # The XML API does not support if-generation-match for GET requests. |
| # Therefore, if the object gets overwritten before the ACL and get_key |
| # operations, the best we can do is warn that it happened. |
| self._SetObjectAcl(object_metadata, dst_uri) |
| return self._BotoKeyToObject(dst_uri.get_key(), fields=fields) |
| except boto.exception.InvalidUriError as e: |
| if e.message and NON_EXISTENT_OBJECT_REGEX.match(e.message.encode(UTF8)): |
| raise CommandException('\n'.join(textwrap.wrap( |
| 'Uploaded object (%s) was deleted or overwritten immediately ' |
| 'after it was uploaded. This can happen if you attempt to upload ' |
| 'to the same object multiple times concurrently.' % dst_uri.uri))) |
| else: |
| raise |
| |
| def _SetObjectAcl(self, object_metadata, dst_uri): |
| """Sets the ACL (if present in object_metadata) on an uploaded object.""" |
| if object_metadata.acl: |
| boto_acl = AclTranslation.BotoAclFromMessage(object_metadata.acl) |
| dst_uri.set_xml_acl(boto_acl.to_xml()) |
| elif self.provider == 's3': |
| s3_acl = S3MarkerAclFromObjectMetadata(object_metadata) |
| if s3_acl: |
| dst_uri.set_xml_acl(s3_acl) |
| |
| def UploadObjectResumable( |
| self, upload_stream, object_metadata, canned_acl=None, preconditions=None, |
| provider=None, fields=None, size=None, serialization_data=None, |
| tracker_callback=None, progress_callback=None): |
| """See CloudApi class for function doc strings.""" |
| if self.provider == 's3': |
| # Resumable uploads are not supported for s3. |
| return self.UploadObject( |
| upload_stream, object_metadata, canned_acl=canned_acl, |
| preconditions=preconditions, fields=fields, size=size) |
| headers, dst_uri = self._UploadSetup(object_metadata, |
| preconditions=preconditions) |
| if not tracker_callback: |
| raise ArgumentException('No tracker callback function set for ' |
| 'resumable upload of %s' % dst_uri) |
| try: |
| self._PerformResumableUpload(dst_uri.new_key(headers=headers), |
| upload_stream, size, tracker_callback, |
| canned_acl=canned_acl, |
| serialization_data=serialization_data, |
| progress_callback=progress_callback, |
| headers=headers) |
| return self._HandleSuccessfulUpload(dst_uri, object_metadata, |
| fields=fields) |
| except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
| not_found_exception = CreateNotFoundExceptionForObjectWrite( |
| self.provider, object_metadata.bucket) |
| self._TranslateExceptionAndRaise(e, bucket_name=object_metadata.bucket, |
| object_name=object_metadata.name, |
| not_found_exception=not_found_exception) |
| |
| def UploadObjectStreaming(self, upload_stream, object_metadata, |
| canned_acl=None, progress_callback=None, |
| preconditions=None, provider=None, fields=None): |
| """See CloudApi class for function doc strings.""" |
| headers, dst_uri = self._UploadSetup(object_metadata, |
| preconditions=preconditions) |
| |
| try: |
| self._PerformStreamingUpload( |
| dst_uri, upload_stream, canned_acl=canned_acl, |
| progress_callback=progress_callback, headers=headers) |
| return self._HandleSuccessfulUpload(dst_uri, object_metadata, |
| fields=fields) |
| except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
| not_found_exception = CreateNotFoundExceptionForObjectWrite( |
| self.provider, object_metadata.bucket) |
| self._TranslateExceptionAndRaise(e, bucket_name=object_metadata.bucket, |
| object_name=object_metadata.name, |
| not_found_exception=not_found_exception) |
| |
| def UploadObject(self, upload_stream, object_metadata, canned_acl=None, |
| preconditions=None, size=None, progress_callback=None, |
| provider=None, fields=None): |
| """See CloudApi class for function doc strings.""" |
| headers, dst_uri = self._UploadSetup(object_metadata, |
| preconditions=preconditions) |
| |
| try: |
| md5 = None |
| if object_metadata.md5Hash: |
| md5 = [] |
| # boto expects hex at index 0, base64 at index 1 |
| md5.append(Base64ToHexHash(object_metadata.md5Hash)) |
| md5.append(object_metadata.md5Hash.strip('\n"\'')) |
| self._PerformSimpleUpload(dst_uri, upload_stream, md5=md5, |
| canned_acl=canned_acl, |
| progress_callback=progress_callback, |
| headers=headers) |
| return self._HandleSuccessfulUpload(dst_uri, object_metadata, |
| fields=fields) |
| except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
| not_found_exception = CreateNotFoundExceptionForObjectWrite( |
| self.provider, object_metadata.bucket) |
| self._TranslateExceptionAndRaise(e, bucket_name=object_metadata.bucket, |
| object_name=object_metadata.name, |
| not_found_exception=not_found_exception) |
| |
| def DeleteObject(self, bucket_name, object_name, preconditions=None, |
| generation=None, provider=None): |
| """See CloudApi class for function doc strings.""" |
| _ = provider |
| headers = {} |
| self._AddApiVersionToHeaders(headers) |
| self._AddPreconditionsToHeaders(preconditions, headers) |
| |
| uri = self._StorageUriForObject(bucket_name, object_name, |
| generation=generation) |
| try: |
| uri.delete_key(validate=False, headers=headers) |
| except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e, bucket_name=bucket_name, |
| object_name=object_name, |
| generation=generation) |
| |
| def CopyObject(self, src_obj_metadata, dst_obj_metadata, src_generation=None, |
| canned_acl=None, preconditions=None, progress_callback=None, |
| max_bytes_per_call=None, provider=None, fields=None): |
| """See CloudApi class for function doc strings.""" |
| _ = provider |
| |
| if max_bytes_per_call is not None: |
| raise NotImplementedError('XML API does not suport max_bytes_per_call') |
| dst_uri = self._StorageUriForObject(dst_obj_metadata.bucket, |
| dst_obj_metadata.name) |
| |
| # Usually it's okay to treat version_id and generation as |
| # the same, but in this case the underlying boto call determines the |
| # provider based on the presence of one or the other. |
| src_version_id = None |
| if self.provider == 's3': |
| src_version_id = src_generation |
| src_generation = None |
| |
| headers = HeadersFromObjectMetadata(dst_obj_metadata, self.provider) |
| self._AddApiVersionToHeaders(headers) |
| self._AddPreconditionsToHeaders(preconditions, headers) |
| |
| if canned_acl: |
| headers[dst_uri.get_provider().acl_header] = canned_acl |
| |
| preserve_acl = True if dst_obj_metadata.acl else False |
| if self.provider == 's3': |
| s3_acl = S3MarkerAclFromObjectMetadata(dst_obj_metadata) |
| if s3_acl: |
| preserve_acl = True |
| |
| try: |
| new_key = dst_uri.copy_key( |
| src_obj_metadata.bucket, src_obj_metadata.name, |
| preserve_acl=preserve_acl, headers=headers, |
| src_version_id=src_version_id, src_generation=src_generation) |
| |
| return self._BotoKeyToObject(new_key, fields=fields) |
| except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
| not_found_exception = CreateNotFoundExceptionForObjectWrite( |
| self.provider, dst_obj_metadata.bucket, src_provider=self.provider, |
| src_bucket_name=src_obj_metadata.bucket, |
| src_object_name=src_obj_metadata.name, src_generation=src_generation) |
| self._TranslateExceptionAndRaise(e, bucket_name=dst_obj_metadata.bucket, |
| object_name=dst_obj_metadata.name, |
| not_found_exception=not_found_exception) |
| |
| def ComposeObject(self, src_objs_metadata, dst_obj_metadata, |
| preconditions=None, provider=None, fields=None): |
| """See CloudApi class for function doc strings.""" |
| _ = provider |
| ValidateDstObjectMetadata(dst_obj_metadata) |
| |
| dst_obj_name = dst_obj_metadata.name |
| dst_obj_metadata.name = None |
| dst_bucket_name = dst_obj_metadata.bucket |
| dst_obj_metadata.bucket = None |
| headers = HeadersFromObjectMetadata(dst_obj_metadata, self.provider) |
| if not dst_obj_metadata.contentType: |
| dst_obj_metadata.contentType = DEFAULT_CONTENT_TYPE |
| headers['content-type'] = dst_obj_metadata.contentType |
| self._AddApiVersionToHeaders(headers) |
| self._AddPreconditionsToHeaders(preconditions, headers) |
| |
| dst_uri = self._StorageUriForObject(dst_bucket_name, dst_obj_name) |
| |
| src_components = [] |
| for src_obj in src_objs_metadata: |
| src_uri = self._StorageUriForObject(dst_bucket_name, src_obj.name, |
| generation=src_obj.generation) |
| src_components.append(src_uri) |
| |
| try: |
| dst_uri.compose(src_components, headers=headers) |
| |
| return self.GetObjectMetadata(dst_bucket_name, dst_obj_name, |
| fields=fields) |
| except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e, dst_obj_metadata.bucket, |
| dst_obj_metadata.name) |
| |
| def _AddPreconditionsToHeaders(self, preconditions, headers): |
| """Adds preconditions (if any) to headers.""" |
| if preconditions and self.provider == 'gs': |
| if preconditions.gen_match is not None: |
| headers['x-goog-if-generation-match'] = str(preconditions.gen_match) |
| if preconditions.meta_gen_match is not None: |
| headers['x-goog-if-metageneration-match'] = str( |
| preconditions.meta_gen_match) |
| |
| def _AddApiVersionToHeaders(self, headers): |
| if self.provider == 'gs': |
| headers['x-goog-api-version'] = self.api_version |
| |
| def _GetMD5FromETag(self, src_etag): |
| """Returns an MD5 from the etag iff the etag is a valid MD5 hash. |
| |
| Args: |
| src_etag: Object etag for which to return the MD5. |
| |
| Returns: |
| MD5 in hex string format, or None. |
| """ |
| if src_etag and MD5_REGEX.search(src_etag): |
| return src_etag.strip('"\'').lower() |
| |
| def _StorageUriForBucket(self, bucket): |
| """Returns a boto storage_uri for the given bucket name. |
| |
| Args: |
| bucket: Bucket name (string). |
| |
| Returns: |
| Boto storage_uri for the bucket. |
| """ |
| return boto.storage_uri( |
| '%s://%s' % (self.provider, bucket), |
| suppress_consec_slashes=False, |
| bucket_storage_uri_class=self.bucket_storage_uri_class, |
| debug=self.debug) |
| |
| def _StorageUriForObject(self, bucket, object_name, generation=None): |
| """Returns a boto storage_uri for the given object. |
| |
| Args: |
| bucket: Bucket name (string). |
| object_name: Object name (string). |
| generation: Generation or version_id of object. If None, live version |
| of the object is used. |
| |
| Returns: |
| Boto storage_uri for the object. |
| """ |
| uri_string = '%s://%s/%s' % (self.provider, bucket, object_name) |
| if generation: |
| uri_string += '#%s' % generation |
| return boto.storage_uri( |
| uri_string, suppress_consec_slashes=False, |
| bucket_storage_uri_class=self.bucket_storage_uri_class, |
| debug=self.debug) |
| |
| def _GetBotoKey(self, bucket_name, object_name, generation=None): |
| """Gets the boto key for an object. |
| |
| Args: |
| bucket_name: Bucket containing the object. |
| object_name: Object name. |
| generation: Generation or version of the object to retrieve. |
| |
| Returns: |
| Boto key for the object. |
| """ |
| object_uri = self._StorageUriForObject(bucket_name, object_name, |
| generation=generation) |
| try: |
| key = object_uri.get_key() |
| if not key: |
| raise CreateObjectNotFoundException('404', self.provider, |
| bucket_name, object_name, |
| generation=generation) |
| return key |
| except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e, bucket_name=bucket_name, |
| object_name=object_name, |
| generation=generation) |
| |
| def _ListToGetFields(self, list_fields=None): |
| """Removes 'items/' from the input fields and converts it to a set. |
| |
| This way field sets requested for ListBucket/ListObject can be used in |
| _BotoBucketToBucket and _BotoKeyToObject calls. |
| |
| Args: |
| list_fields: Iterable fields usable in ListBucket/ListObject calls. |
| |
| Returns: |
| Set of fields usable in GetBucket/GetObject or |
| _BotoBucketToBucket/_BotoKeyToObject calls. |
| """ |
| if list_fields: |
| get_fields = set() |
| for field in list_fields: |
| if field in ['kind', 'nextPageToken', 'prefixes']: |
| # These are not actually object / bucket metadata fields. |
| # They are fields specific to listing, so we don't consider them. |
| continue |
| get_fields.add(re.sub(r'items/', '', field)) |
| return get_fields |
| |
| # pylint: disable=too-many-statements |
| def _BotoBucketToBucket(self, bucket, fields=None): |
| """Constructs an apitools Bucket from a boto bucket. |
| |
| Args: |
| bucket: Boto bucket. |
| fields: If present, construct the apitools Bucket with only this set of |
| metadata fields. |
| |
| Returns: |
| apitools Bucket. |
| """ |
| bucket_uri = self._StorageUriForBucket(bucket.name) |
| |
| cloud_api_bucket = apitools_messages.Bucket(name=bucket.name, |
| id=bucket.name) |
| headers = {} |
| self._AddApiVersionToHeaders(headers) |
| if self.provider == 'gs': |
| if not fields or 'storageClass' in fields: |
| if hasattr(bucket, 'get_storage_class'): |
| cloud_api_bucket.storageClass = bucket.get_storage_class() |
| if not fields or 'acl' in fields: |
| for acl in AclTranslation.BotoBucketAclToMessage( |
| bucket.get_acl(headers=headers)): |
| try: |
| cloud_api_bucket.acl.append(acl) |
| except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
| translated_exception = self._TranslateBotoException( |
| e, bucket_name=bucket.name) |
| if (translated_exception and |
| isinstance(translated_exception, |
| AccessDeniedException)): |
| # JSON API doesn't differentiate between a blank ACL list |
| # and an access denied, so this is intentionally left blank. |
| pass |
| else: |
| self._TranslateExceptionAndRaise(e, bucket_name=bucket.name) |
| if not fields or 'cors' in fields: |
| try: |
| boto_cors = bucket_uri.get_cors() |
| cloud_api_bucket.cors = CorsTranslation.BotoCorsToMessage(boto_cors) |
| except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e, bucket_name=bucket.name) |
| if not fields or 'defaultObjectAcl' in fields: |
| for acl in AclTranslation.BotoObjectAclToMessage( |
| bucket.get_def_acl(headers=headers)): |
| try: |
| cloud_api_bucket.defaultObjectAcl.append(acl) |
| except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
| translated_exception = self._TranslateBotoException( |
| e, bucket_name=bucket.name) |
| if (translated_exception and |
| isinstance(translated_exception, |
| AccessDeniedException)): |
| # JSON API doesn't differentiate between a blank ACL list |
| # and an access denied, so this is intentionally left blank. |
| pass |
| else: |
| self._TranslateExceptionAndRaise(e, bucket_name=bucket.name) |
| if not fields or 'lifecycle' in fields: |
| try: |
| boto_lifecycle = bucket_uri.get_lifecycle_config() |
| cloud_api_bucket.lifecycle = ( |
| LifecycleTranslation.BotoLifecycleToMessage(boto_lifecycle)) |
| except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e, bucket_name=bucket.name) |
| if not fields or 'logging' in fields: |
| try: |
| boto_logging = bucket_uri.get_logging_config() |
| if boto_logging and 'Logging' in boto_logging: |
| logging_config = boto_logging['Logging'] |
| log_object_prefix_present = 'LogObjectPrefix' in logging_config |
| log_bucket_present = 'LogBucket' in logging_config |
| if log_object_prefix_present or log_bucket_present: |
| cloud_api_bucket.logging = apitools_messages.Bucket.LoggingValue() |
| if log_object_prefix_present: |
| cloud_api_bucket.logging.logObjectPrefix = ( |
| logging_config['LogObjectPrefix']) |
| if log_bucket_present: |
| cloud_api_bucket.logging.logBucket = logging_config['LogBucket'] |
| except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e, bucket_name=bucket.name) |
| if not fields or 'website' in fields: |
| try: |
| boto_website = bucket_uri.get_website_config() |
| if boto_website and 'WebsiteConfiguration' in boto_website: |
| website_config = boto_website['WebsiteConfiguration'] |
| main_page_suffix_present = 'MainPageSuffix' in website_config |
| not_found_page_present = 'NotFoundPage' in website_config |
| if main_page_suffix_present or not_found_page_present: |
| cloud_api_bucket.website = apitools_messages.Bucket.WebsiteValue() |
| if main_page_suffix_present: |
| cloud_api_bucket.website.mainPageSuffix = ( |
| website_config['MainPageSuffix']) |
| if not_found_page_present: |
| cloud_api_bucket.website.notFoundPage = ( |
| website_config['NotFoundPage']) |
| except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e, bucket_name=bucket.name) |
| if not fields or 'location' in fields: |
| cloud_api_bucket.location = bucket_uri.get_location() |
| if not fields or 'versioning' in fields: |
| versioning = bucket_uri.get_versioning_config(headers=headers) |
| if versioning: |
| if (self.provider == 's3' and 'Versioning' in versioning and |
| versioning['Versioning'] == 'Enabled'): |
| cloud_api_bucket.versioning = ( |
| apitools_messages.Bucket.VersioningValue(enabled=True)) |
| elif self.provider == 'gs': |
| cloud_api_bucket.versioning = ( |
| apitools_messages.Bucket.VersioningValue(enabled=True)) |
| |
| # For S3 long bucket listing we do not support CORS, lifecycle, website, and |
| # logging translation. The individual commands can be used to get |
| # the XML equivalents for S3. |
| return cloud_api_bucket |
| |
| def _BotoKeyToObject(self, key, fields=None): |
| """Constructs an apitools Object from a boto key. |
| |
| Args: |
| key: Boto key to construct Object from. |
| fields: If present, construct the apitools Object with only this set of |
| metadata fields. |
| |
| Returns: |
| apitools Object corresponding to key. |
| """ |
| custom_metadata = None |
| if not fields or 'metadata' in fields: |
| custom_metadata = self._TranslateBotoKeyCustomMetadata(key) |
| cache_control = None |
| if not fields or 'cacheControl' in fields: |
| cache_control = getattr(key, 'cache_control', None) |
| component_count = None |
| if not fields or 'componentCount' in fields: |
| component_count = getattr(key, 'component_count', None) |
| content_disposition = None |
| if not fields or 'contentDisposition' in fields: |
| content_disposition = getattr(key, 'content_disposition', None) |
| # Other fields like updated and ACL depend on the generation |
| # of the object, so populate that regardless of whether it was requested. |
| generation = self._TranslateBotoKeyGeneration(key) |
| metageneration = None |
| if not fields or 'metageneration' in fields: |
| metageneration = self._TranslateBotoKeyMetageneration(key) |
| updated = None |
| # Translation code to avoid a dependency on dateutil. |
| if not fields or 'updated' in fields: |
| updated = self._TranslateBotoKeyTimestamp(key) |
| etag = None |
| if not fields or 'etag' in fields: |
| etag = getattr(key, 'etag', None) |
| if etag: |
| etag = etag.strip('"\'') |
| crc32c = None |
| if not fields or 'crc32c' in fields: |
| if hasattr(key, 'cloud_hashes') and 'crc32c' in key.cloud_hashes: |
| crc32c = base64.encodestring(key.cloud_hashes['crc32c']).rstrip('\n') |
| md5_hash = None |
| if not fields or 'md5Hash' in fields: |
| if hasattr(key, 'cloud_hashes') and 'md5' in key.cloud_hashes: |
| md5_hash = base64.encodestring(key.cloud_hashes['md5']).rstrip('\n') |
| elif self._GetMD5FromETag(getattr(key, 'etag', None)): |
| md5_hash = Base64EncodeHash(self._GetMD5FromETag(key.etag)) |
| elif self.provider == 's3': |
| # S3 etags are MD5s for non-multi-part objects, but multi-part objects |
| # (which include all objects >= 5 GB) have a custom checksum |
| # implementation that is not currently supported by gsutil. |
| self.logger.warn( |
| 'Non-MD5 etag (%s) present for key %s, data integrity checks are ' |
| 'not possible.', key.etag, key) |
| |
| # Serialize the boto key in the media link if it is requested. This |
| # way we can later access the key without adding an HTTP call. |
| media_link = None |
| if not fields or 'mediaLink' in fields: |
| media_link = binascii.b2a_base64( |
| pickle.dumps(key, pickle.HIGHEST_PROTOCOL)) |
| size = None |
| if not fields or 'size' in fields: |
| size = key.size or 0 |
| storage_class = None |
| if not fields or 'storageClass' in fields: |
| storage_class = getattr(key, 'storage_class', None) |
| |
| cloud_api_object = apitools_messages.Object( |
| bucket=key.bucket.name, |
| name=key.name, |
| size=size, |
| contentEncoding=key.content_encoding, |
| contentLanguage=key.content_language, |
| contentType=key.content_type, |
| cacheControl=cache_control, |
| contentDisposition=content_disposition, |
| etag=etag, |
| crc32c=crc32c, |
| md5Hash=md5_hash, |
| generation=generation, |
| metageneration=metageneration, |
| componentCount=component_count, |
| updated=updated, |
| metadata=custom_metadata, |
| mediaLink=media_link, |
| storageClass=storage_class) |
| |
| # Remaining functions amend cloud_api_object. |
| self._TranslateDeleteMarker(key, cloud_api_object) |
| if not fields or 'acl' in fields: |
| generation_str = GenerationFromUrlAndString( |
| StorageUrlFromString(self.provider), generation) |
| self._TranslateBotoKeyAcl(key, cloud_api_object, |
| generation=generation_str) |
| |
| return cloud_api_object |
| |
| def _TranslateBotoKeyCustomMetadata(self, key): |
| """Populates an apitools message from custom metadata in the boto key.""" |
| custom_metadata = None |
| if getattr(key, 'metadata', None): |
| custom_metadata = apitools_messages.Object.MetadataValue( |
| additionalProperties=[]) |
| for k, v in key.metadata.iteritems(): |
| if k.lower() == 'content-language': |
| # Work around content-language being inserted into custom metadata. |
| continue |
| custom_metadata.additionalProperties.append( |
| apitools_messages.Object.MetadataValue.AdditionalProperty( |
| key=k, value=v)) |
| return custom_metadata |
| |
| def _TranslateBotoKeyGeneration(self, key): |
| """Returns the generation/version_id number from the boto key if present.""" |
| generation = None |
| if self.provider == 'gs': |
| if getattr(key, 'generation', None): |
| generation = long(key.generation) |
| elif self.provider == 's3': |
| if getattr(key, 'version_id', None): |
| generation = EncodeStringAsLong(key.version_id) |
| return generation |
| |
| def _TranslateBotoKeyMetageneration(self, key): |
| """Returns the metageneration number from the boto key if present.""" |
| metageneration = None |
| if self.provider == 'gs': |
| if getattr(key, 'metageneration', None): |
| metageneration = long(key.metageneration) |
| return metageneration |
| |
| def _TranslateBotoKeyTimestamp(self, key): |
| """Parses the timestamp from the boto key into an datetime object. |
| |
| This avoids a dependency on dateutil. |
| |
| Args: |
| key: Boto key to get timestamp from. |
| |
| Returns: |
| datetime object if string is parsed successfully, None otherwise. |
| """ |
| if key.last_modified: |
| if '.' in key.last_modified: |
| key_us_timestamp = key.last_modified.rstrip('Z') + '000Z' |
| else: |
| key_us_timestamp = key.last_modified.rstrip('Z') + '.000000Z' |
| fmt = '%Y-%m-%dT%H:%M:%S.%fZ' |
| try: |
| return datetime.datetime.strptime(key_us_timestamp, fmt) |
| except ValueError: |
| try: |
| # Try alternate format |
| fmt = '%a, %d %b %Y %H:%M:%S %Z' |
| return datetime.datetime.strptime(key.last_modified, fmt) |
| except ValueError: |
| # Could not parse the time; leave updated as None. |
| return None |
| |
| def _TranslateDeleteMarker(self, key, cloud_api_object): |
| """Marks deleted objects with a metadata value (for S3 compatibility).""" |
| if isinstance(key, DeleteMarker): |
| if not cloud_api_object.metadata: |
| cloud_api_object.metadata = apitools_messages.Object.MetadataValue() |
| cloud_api_object.metadata.additionalProperties = [] |
| cloud_api_object.metadata.additionalProperties.append( |
| apitools_messages.Object.MetadataValue.AdditionalProperty( |
| key=S3_DELETE_MARKER_GUID, value=True)) |
| |
| def _TranslateBotoKeyAcl(self, key, cloud_api_object, generation=None): |
| """Updates cloud_api_object with the ACL from the boto key.""" |
| storage_uri_for_key = self._StorageUriForObject(key.bucket.name, key.name, |
| generation=generation) |
| headers = {} |
| self._AddApiVersionToHeaders(headers) |
| try: |
| if self.provider == 'gs': |
| key_acl = storage_uri_for_key.get_acl(headers=headers) |
| # key.get_acl() does not support versioning so we need to use |
| # storage_uri to ensure we're getting the versioned ACL. |
| for acl in AclTranslation.BotoObjectAclToMessage(key_acl): |
| cloud_api_object.acl.append(acl) |
| if self.provider == 's3': |
| key_acl = key.get_xml_acl(headers=headers) |
| # ACLs for s3 are different and we use special markers to represent |
| # them in the gsutil Cloud API. |
| AddS3MarkerAclToObjectMetadata(cloud_api_object, key_acl) |
| except boto.exception.GSResponseError, e: |
| if e.status == 403: |
| # Consume access denied exceptions to mimic JSON behavior of simply |
| # returning None if sufficient permission is not present. The caller |
| # needs to handle the case where the ACL is not populated. |
| pass |
| else: |
| raise |
| |
| def _TranslateExceptionAndRaise(self, e, bucket_name=None, object_name=None, |
| generation=None, not_found_exception=None): |
| """Translates a Boto exception and raises the translated or original value. |
| |
| Args: |
| e: Any Exception. |
| bucket_name: Optional bucket name in request that caused the exception. |
| object_name: Optional object name in request that caused the exception. |
| generation: Optional generation in request that caused the exception. |
| not_found_exception: Optional exception to raise in the not-found case. |
| |
| Raises: |
| Translated CloudApi exception, or the original exception if it was not |
| translatable. |
| """ |
| translated_exception = self._TranslateBotoException( |
| e, bucket_name=bucket_name, object_name=object_name, |
| generation=generation, not_found_exception=not_found_exception) |
| if translated_exception: |
| raise translated_exception |
| else: |
| raise |
| |
| def _TranslateBotoException(self, e, bucket_name=None, object_name=None, |
| generation=None, not_found_exception=None): |
| """Translates boto exceptions into their gsutil Cloud API equivalents. |
| |
| Args: |
| e: Any exception in TRANSLATABLE_BOTO_EXCEPTIONS. |
| bucket_name: Optional bucket name in request that caused the exception. |
| object_name: Optional object name in request that caused the exception. |
| generation: Optional generation in request that caused the exception. |
| not_found_exception: Optional exception to raise in the not-found case. |
| |
| Returns: |
| CloudStorageApiServiceException for translatable exceptions, None |
| otherwise. |
| |
| Because we're using isinstance, check for subtypes first. |
| """ |
| if isinstance(e, boto.exception.StorageResponseError): |
| if e.status == 400: |
| return BadRequestException(e.code, status=e.status, body=e.body) |
| elif e.status == 401 or e.status == 403: |
| return AccessDeniedException(e.code, status=e.status, body=e.body) |
| elif e.status == 404: |
| if not_found_exception: |
| # The exception is pre-constructed prior to translation; the HTTP |
| # status code isn't available at that time. |
| setattr(not_found_exception, 'status', e.status) |
| return not_found_exception |
| elif bucket_name: |
| if object_name: |
| return CreateObjectNotFoundException(e.status, self.provider, |
| bucket_name, object_name, |
| generation=generation) |
| return CreateBucketNotFoundException(e.status, self.provider, |
| bucket_name) |
| return NotFoundException(e.message, status=e.status, body=e.body) |
| |
| elif e.status == 409 and e.code and 'BucketNotEmpty' in e.code: |
| return NotEmptyException('BucketNotEmpty (%s)' % bucket_name, |
| status=e.status, body=e.body) |
| elif e.status == 410: |
| # 410 errors should always cause us to start over - either the UploadID |
| # has expired or there was a server-side problem that requires starting |
| # the upload over from scratch. |
| return ResumableUploadStartOverException(e.message) |
| elif e.status == 412: |
| return PreconditionException(e.code, status=e.status, body=e.body) |
| if isinstance(e, boto.exception.StorageCreateError): |
| return ServiceException('Bucket already exists.', status=e.status, |
| body=e.body) |
| |
| if isinstance(e, boto.exception.BotoServerError): |
| return ServiceException(e.message, status=e.status, body=e.body) |
| |
| if isinstance(e, boto.exception.InvalidUriError): |
| # Work around textwrap when searching for this string. |
| if e.message and NON_EXISTENT_OBJECT_REGEX.match(e.message.encode(UTF8)): |
| return NotFoundException(e.message, status=404) |
| return InvalidUrlError(e.message) |
| |
| if isinstance(e, boto.exception.ResumableUploadException): |
| if e.disposition == boto.exception.ResumableTransferDisposition.ABORT: |
| return ResumableUploadAbortException(e.message) |
| elif (e.disposition == |
| boto.exception.ResumableTransferDisposition.START_OVER): |
| return ResumableUploadStartOverException(e.message) |
| else: |
| return ResumableUploadException(e.message) |
| |
| if isinstance(e, boto.exception.ResumableDownloadException): |
| return ResumableDownloadException(e.message) |
| |
| return None |
| |
| # For function docstrings, see CloudApiDelegator class. |
| def XmlPassThroughGetAcl(self, storage_url, def_obj_acl=False): |
| """See CloudApiDelegator class for function doc strings.""" |
| try: |
| uri = boto.storage_uri( |
| storage_url.url_string, suppress_consec_slashes=False, |
| bucket_storage_uri_class=self.bucket_storage_uri_class, |
| debug=self.debug) |
| if def_obj_acl: |
| return uri.get_def_acl() |
| else: |
| return uri.get_acl() |
| except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e) |
| |
| def XmlPassThroughSetAcl(self, acl_text, storage_url, canned=True, |
| def_obj_acl=False): |
| """See CloudApiDelegator class for function doc strings.""" |
| try: |
| uri = boto.storage_uri( |
| storage_url.url_string, suppress_consec_slashes=False, |
| bucket_storage_uri_class=self.bucket_storage_uri_class, |
| debug=self.debug) |
| if canned: |
| if def_obj_acl: |
| canned_acls = uri.canned_acls() |
| if acl_text not in canned_acls: |
| raise CommandException('Invalid canned ACL "%s".' % acl_text) |
| uri.set_def_acl(acl_text, uri.object_name) |
| else: |
| canned_acls = uri.canned_acls() |
| if acl_text not in canned_acls: |
| raise CommandException('Invalid canned ACL "%s".' % acl_text) |
| uri.set_acl(acl_text, uri.object_name) |
| else: |
| if def_obj_acl: |
| uri.set_def_xml_acl(acl_text, uri.object_name) |
| else: |
| uri.set_xml_acl(acl_text, uri.object_name) |
| except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e) |
| |
| # pylint: disable=catching-non-exception |
| def XmlPassThroughSetCors(self, cors_text, storage_url): |
| """See CloudApiDelegator class for function doc strings.""" |
| # Parse XML document and convert into Cors object. |
| if storage_url.scheme == 's3': |
| cors_obj = S3Cors() |
| else: |
| cors_obj = Cors() |
| h = handler.XmlHandler(cors_obj, None) |
| try: |
| xml.sax.parseString(cors_text, h) |
| except SaxExceptions.SAXParseException, e: |
| raise CommandException('Requested CORS is invalid: %s at line %s, ' |
| 'column %s' % (e.getMessage(), e.getLineNumber(), |
| e.getColumnNumber())) |
| |
| try: |
| uri = boto.storage_uri( |
| storage_url.url_string, suppress_consec_slashes=False, |
| bucket_storage_uri_class=self.bucket_storage_uri_class, |
| debug=self.debug) |
| uri.set_cors(cors_obj, False) |
| except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e) |
| |
| def XmlPassThroughGetCors(self, storage_url): |
| """See CloudApiDelegator class for function doc strings.""" |
| uri = boto.storage_uri( |
| storage_url.url_string, suppress_consec_slashes=False, |
| bucket_storage_uri_class=self.bucket_storage_uri_class, |
| debug=self.debug) |
| try: |
| cors = uri.get_cors(False) |
| except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e) |
| |
| parsed_xml = xml.dom.minidom.parseString(cors.to_xml().encode(UTF8)) |
| # Pretty-print the XML to make it more easily human editable. |
| return parsed_xml.toprettyxml(indent=' ') |
| |
| def XmlPassThroughGetLifecycle(self, storage_url): |
| """See CloudApiDelegator class for function doc strings.""" |
| try: |
| uri = boto.storage_uri( |
| storage_url.url_string, suppress_consec_slashes=False, |
| bucket_storage_uri_class=self.bucket_storage_uri_class, |
| debug=self.debug) |
| lifecycle = uri.get_lifecycle_config(False) |
| except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e) |
| |
| parsed_xml = xml.dom.minidom.parseString(lifecycle.to_xml().encode(UTF8)) |
| # Pretty-print the XML to make it more easily human editable. |
| return parsed_xml.toprettyxml(indent=' ') |
| |
| def XmlPassThroughSetLifecycle(self, lifecycle_text, storage_url): |
| """See CloudApiDelegator class for function doc strings.""" |
| # Parse XML document and convert into lifecycle object. |
| if storage_url.scheme == 's3': |
| lifecycle_obj = S3Lifecycle() |
| else: |
| lifecycle_obj = LifecycleConfig() |
| h = handler.XmlHandler(lifecycle_obj, None) |
| try: |
| xml.sax.parseString(lifecycle_text, h) |
| except SaxExceptions.SAXParseException, e: |
| raise CommandException( |
| 'Requested lifecycle config is invalid: %s at line %s, column %s' % |
| (e.getMessage(), e.getLineNumber(), e.getColumnNumber())) |
| |
| try: |
| uri = boto.storage_uri( |
| storage_url.url_string, suppress_consec_slashes=False, |
| bucket_storage_uri_class=self.bucket_storage_uri_class, |
| debug=self.debug) |
| uri.configure_lifecycle(lifecycle_obj, False) |
| except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e) |
| |
| def XmlPassThroughGetLogging(self, storage_url): |
| """See CloudApiDelegator class for function doc strings.""" |
| try: |
| uri = boto.storage_uri( |
| storage_url.url_string, suppress_consec_slashes=False, |
| bucket_storage_uri_class=self.bucket_storage_uri_class, |
| debug=self.debug) |
| logging_config_xml = UnaryDictToXml(uri.get_logging_config()) |
| except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e) |
| |
| return XmlParseString(logging_config_xml).toprettyxml() |
| |
| def XmlPassThroughGetWebsite(self, storage_url): |
| """See CloudApiDelegator class for function doc strings.""" |
| try: |
| uri = boto.storage_uri( |
| storage_url.url_string, suppress_consec_slashes=False, |
| bucket_storage_uri_class=self.bucket_storage_uri_class, |
| debug=self.debug) |
| web_config_xml = UnaryDictToXml(uri.get_website_config()) |
| except TRANSLATABLE_BOTO_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e) |
| |
| return XmlParseString(web_config_xml).toprettyxml() |