| # -*- coding: utf-8 -*- |
| # Copyright 2014 Google Inc. All Rights Reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """JSON gsutil Cloud API implementation for Google Cloud Storage.""" |
| |
| from __future__ import absolute_import |
| |
| import httplib |
| import json |
| import logging |
| import os |
| import socket |
| import ssl |
| import time |
| import traceback |
| |
| from apitools.base.py import credentials_lib |
| from apitools.base.py import encoding |
| from apitools.base.py import exceptions as apitools_exceptions |
| from apitools.base.py import http_wrapper as apitools_http_wrapper |
| from apitools.base.py import transfer as apitools_transfer |
| from apitools.base.py.util import CalculateWaitForRetry |
| |
| import boto |
| from boto import config |
| from gcs_oauth2_boto_plugin import oauth2_helper |
| import httplib2 |
| import oauth2client |
| from oauth2client import devshell |
| from oauth2client import multistore_file |
| |
| from gslib.cloud_api import AccessDeniedException |
| from gslib.cloud_api import ArgumentException |
| from gslib.cloud_api import BadRequestException |
| from gslib.cloud_api import CloudApi |
| from gslib.cloud_api import NotEmptyException |
| from gslib.cloud_api import NotFoundException |
| from gslib.cloud_api import PreconditionException |
| from gslib.cloud_api import Preconditions |
| from gslib.cloud_api import ResumableDownloadException |
| from gslib.cloud_api import ResumableUploadAbortException |
| from gslib.cloud_api import ResumableUploadException |
| from gslib.cloud_api import ResumableUploadStartOverException |
| from gslib.cloud_api import ServiceException |
| from gslib.cloud_api_helper import ValidateDstObjectMetadata |
| from gslib.cred_types import CredTypes |
| from gslib.exception import CommandException |
| from gslib.gcs_json_media import BytesTransferredContainer |
| from gslib.gcs_json_media import DownloadCallbackConnectionClassFactory |
| from gslib.gcs_json_media import HttpWithDownloadStream |
| from gslib.gcs_json_media import HttpWithNoRetries |
| from gslib.gcs_json_media import UploadCallbackConnectionClassFactory |
| from gslib.gcs_json_media import WrapDownloadHttpRequest |
| from gslib.gcs_json_media import WrapUploadHttpRequest |
| from gslib.no_op_credentials import NoOpCredentials |
| from gslib.progress_callback import ProgressCallbackWithBackoff |
| from gslib.project_id import PopulateProjectId |
| from gslib.third_party.storage_apitools import storage_v1_client as apitools_client |
| from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages |
| from gslib.tracker_file import DeleteTrackerFile |
| from gslib.tracker_file import GetRewriteTrackerFilePath |
| from gslib.tracker_file import HashRewriteParameters |
| from gslib.tracker_file import ReadRewriteTrackerFile |
| from gslib.tracker_file import WriteRewriteTrackerFile |
| from gslib.translation_helper import CreateBucketNotFoundException |
| from gslib.translation_helper import CreateNotFoundExceptionForObjectWrite |
| from gslib.translation_helper import CreateObjectNotFoundException |
| from gslib.translation_helper import DEFAULT_CONTENT_TYPE |
| from gslib.translation_helper import PRIVATE_DEFAULT_OBJ_ACL |
| from gslib.translation_helper import REMOVE_CORS_CONFIG |
| from gslib.util import GetBotoConfigFileList |
| from gslib.util import GetCertsFile |
| from gslib.util import GetCredentialStoreFilename |
| from gslib.util import GetGceCredentialCacheFilename |
| from gslib.util import GetJsonResumableChunkSize |
| from gslib.util import GetMaxRetryDelay |
| from gslib.util import GetNewHttp |
| from gslib.util import GetNumRetries |
| from gslib.util import UTF8 |
| |
| |
| # Implementation supports only 'gs' URLs, so provider is unused. |
| # pylint: disable=unused-argument |
| |
| DEFAULT_GCS_JSON_VERSION = 'v1' |
| |
| NUM_BUCKETS_PER_LIST_PAGE = 1000 |
| NUM_OBJECTS_PER_LIST_PAGE = 1000 |
| |
| TRANSLATABLE_APITOOLS_EXCEPTIONS = (apitools_exceptions.HttpError, |
| apitools_exceptions.StreamExhausted, |
| apitools_exceptions.TransferError, |
| apitools_exceptions.TransferInvalidError) |
| |
| # TODO: Distribute these exceptions better through apitools and here. |
| # Right now, apitools is configured not to handle any exceptions on |
| # uploads/downloads. |
| # oauth2_client tries to JSON-decode the response, which can result |
| # in a ValueError if the response was invalid. Until that is fixed in |
| # oauth2_client, need to handle it here. |
| HTTP_TRANSFER_EXCEPTIONS = (apitools_exceptions.TransferRetryError, |
| apitools_exceptions.BadStatusCodeError, |
| # TODO: Honor retry-after headers. |
| apitools_exceptions.RetryAfterError, |
| apitools_exceptions.RequestError, |
| httplib.BadStatusLine, |
| httplib.IncompleteRead, |
| httplib.ResponseNotReady, |
| httplib2.ServerNotFoundError, |
| socket.error, |
| socket.gaierror, |
| socket.timeout, |
| ssl.SSLError, |
| ValueError) |
| |
| _VALIDATE_CERTIFICATES_503_MESSAGE = ( |
| """Service Unavailable. If you have recently changed |
| https_validate_certificates from True to False in your boto configuration |
| file, please delete any cached access tokens in your filesystem (at %s) |
| and try again.""" % GetCredentialStoreFilename()) |
| |
| |
| class GcsJsonApi(CloudApi): |
| """Google Cloud Storage JSON implementation of gsutil Cloud API.""" |
| |
| def __init__(self, bucket_storage_uri_class, logger, provider=None, |
| credentials=None, debug=0, trace_token=None): |
| """Performs necessary setup for interacting with Google Cloud Storage. |
| |
| Args: |
| bucket_storage_uri_class: Unused. |
| logger: logging.logger for outputting log messages. |
| provider: Unused. This implementation supports only Google Cloud Storage. |
| credentials: Credentials to be used for interacting with Google Cloud |
| Storage. |
| debug: Debug level for the API implementation (0..3). |
| trace_token: Trace token to pass to the API implementation. |
| """ |
| # TODO: Plumb host_header for perfdiag / test_perfdiag. |
| # TODO: Add jitter to apitools' http_wrapper retry mechanism. |
| super(GcsJsonApi, self).__init__(bucket_storage_uri_class, logger, |
| provider='gs', debug=debug) |
| no_op_credentials = False |
| if not credentials: |
| loaded_credentials = self._CheckAndGetCredentials(logger) |
| |
| if not loaded_credentials: |
| loaded_credentials = NoOpCredentials() |
| no_op_credentials = True |
| else: |
| if isinstance(credentials, NoOpCredentials): |
| no_op_credentials = True |
| |
| self.credentials = credentials or loaded_credentials |
| |
| self.certs_file = GetCertsFile() |
| |
| self.http = GetNewHttp() |
| |
| # Re-use download and upload connections. This class is only called |
| # sequentially, but we can share TCP warmed-up connections across calls. |
| self.download_http = self._GetNewDownloadHttp() |
| self.upload_http = self._GetNewUploadHttp() |
| if self.credentials: |
| self.authorized_download_http = self.credentials.authorize( |
| self.download_http) |
| self.authorized_upload_http = self.credentials.authorize(self.upload_http) |
| else: |
| self.authorized_download_http = self.download_http |
| self.authorized_upload_http = self.upload_http |
| |
| WrapDownloadHttpRequest(self.authorized_download_http) |
| WrapUploadHttpRequest(self.authorized_upload_http) |
| |
| self.http_base = 'https://' |
| gs_json_host = config.get('Credentials', 'gs_json_host', None) |
| self.host_base = gs_json_host or 'www.googleapis.com' |
| |
| if not gs_json_host: |
| gs_host = config.get('Credentials', 'gs_host', None) |
| if gs_host: |
| raise ArgumentException( |
| 'JSON API is selected but gs_json_host is not configured, ' |
| 'while gs_host is configured to %s. Please also configure ' |
| 'gs_json_host and gs_json_port to match your desired endpoint.' |
| % gs_host) |
| |
| gs_json_port = config.get('Credentials', 'gs_json_port', None) |
| |
| if not gs_json_port: |
| gs_port = config.get('Credentials', 'gs_port', None) |
| if gs_port: |
| raise ArgumentException( |
| 'JSON API is selected but gs_json_port is not configured, ' |
| 'while gs_port is configured to %s. Please also configure ' |
| 'gs_json_host and gs_json_port to match your desired endpoint.' |
| % gs_port) |
| self.host_port = '' |
| else: |
| self.host_port = ':' + config.get('Credentials', 'gs_json_port') |
| |
| self.api_version = config.get('GSUtil', 'json_api_version', |
| DEFAULT_GCS_JSON_VERSION) |
| self.url_base = (self.http_base + self.host_base + self.host_port + '/' + |
| 'storage/' + self.api_version + '/') |
| |
| credential_store_key_dict = self._GetCredentialStoreKeyDict( |
| self.credentials) |
| |
| self.credentials.set_store( |
| multistore_file.get_credential_storage_custom_key( |
| GetCredentialStoreFilename(), credential_store_key_dict)) |
| |
| self.num_retries = GetNumRetries() |
| self.max_retry_wait = GetMaxRetryDelay() |
| |
| log_request = (debug >= 3) |
| log_response = (debug >= 3) |
| |
| self.global_params = apitools_messages.StandardQueryParameters( |
| trace='token:%s' % trace_token) if trace_token else None |
| |
| self.api_client = apitools_client.StorageV1( |
| url=self.url_base, http=self.http, log_request=log_request, |
| log_response=log_response, credentials=self.credentials, |
| version=self.api_version, default_global_params=self.global_params) |
| self.api_client.max_retry_wait = self.max_retry_wait |
| self.api_client.num_retries = self.num_retries |
| |
| if no_op_credentials: |
| # This API key is not secret and is used to identify gsutil during |
| # anonymous requests. |
| self.api_client.AddGlobalParam('key', |
| u'AIzaSyDnacJHrKma0048b13sh8cgxNUwulubmJM') |
| |
| def _CheckAndGetCredentials(self, logger): |
| configured_cred_types = [] |
| try: |
| if self._HasOauth2UserAccountCreds(): |
| configured_cred_types.append(CredTypes.OAUTH2_USER_ACCOUNT) |
| if self._HasOauth2ServiceAccountCreds(): |
| configured_cred_types.append(CredTypes.OAUTH2_SERVICE_ACCOUNT) |
| if len(configured_cred_types) > 1: |
| # We only allow one set of configured credentials. Otherwise, we're |
| # choosing one arbitrarily, which can be very confusing to the user |
| # (e.g., if only one is authorized to perform some action) and can |
| # also mask errors. |
| # Because boto merges config files, GCE credentials show up by default |
| # for GCE VMs. We don't want to fail when a user creates a boto file |
| # with their own credentials, so in this case we'll use the OAuth2 |
| # user credentials. |
| failed_cred_type = None |
| raise CommandException( |
| ('You have multiple types of configured credentials (%s), which is ' |
| 'not supported. One common way this happens is if you run gsutil ' |
| 'config to create credentials and later run gcloud auth, and ' |
| 'create a second set of credentials. Your boto config path is: ' |
| '%s. For more help, see "gsutil help creds".') |
| % (configured_cred_types, GetBotoConfigFileList())) |
| |
| failed_cred_type = CredTypes.OAUTH2_USER_ACCOUNT |
| user_creds = self._GetOauth2UserAccountCreds() |
| failed_cred_type = CredTypes.OAUTH2_SERVICE_ACCOUNT |
| service_account_creds = self._GetOauth2ServiceAccountCreds() |
| failed_cred_type = CredTypes.GCE |
| gce_creds = self._GetGceCreds() |
| failed_cred_type = CredTypes.DEVSHELL |
| devshell_creds = self._GetDevshellCreds() |
| return user_creds or service_account_creds or gce_creds or devshell_creds |
| except: # pylint: disable=bare-except |
| |
| # If we didn't actually try to authenticate because there were multiple |
| # types of configured credentials, don't emit this warning. |
| if failed_cred_type: |
| if os.environ.get('CLOUDSDK_WRAPPER') == '1': |
| logger.warn( |
| 'Your "%s" credentials are invalid. Please run\n' |
| ' $ gcloud auth login', failed_cred_type) |
| else: |
| logger.warn( |
| 'Your "%s" credentials are invalid. For more help, see ' |
| '"gsutil help creds", or re-run the gsutil config command (see ' |
| '"gsutil help config").', failed_cred_type) |
| |
| # If there's any set of configured credentials, we'll fail if they're |
| # invalid, rather than silently falling back to anonymous config (as |
| # boto does). That approach leads to much confusion if users don't |
| # realize their credentials are invalid. |
| raise |
| |
| def _HasOauth2ServiceAccountCreds(self): |
| return config.has_option('Credentials', 'gs_service_key_file') |
| |
| def _HasOauth2UserAccountCreds(self): |
| return config.has_option('Credentials', 'gs_oauth2_refresh_token') |
| |
| def _HasGceCreds(self): |
| return config.has_option('GoogleCompute', 'service_account') |
| |
| def _GetOauth2ServiceAccountCreds(self): |
| if self._HasOauth2ServiceAccountCreds(): |
| return oauth2_helper.OAuth2ClientFromBotoConfig( |
| boto.config, |
| cred_type=CredTypes.OAUTH2_SERVICE_ACCOUNT).GetCredentials() |
| |
| def _GetOauth2UserAccountCreds(self): |
| if self._HasOauth2UserAccountCreds(): |
| return oauth2_helper.OAuth2ClientFromBotoConfig( |
| boto.config).GetCredentials() |
| |
| def _GetGceCreds(self): |
| if self._HasGceCreds(): |
| try: |
| return credentials_lib.GceAssertionCredentials( |
| cache_filename=GetGceCredentialCacheFilename()) |
| except apitools_exceptions.ResourceUnavailableError, e: |
| if 'service account' in str(e) and 'does not exist' in str(e): |
| return None |
| raise |
| |
| def _GetDevshellCreds(self): |
| try: |
| return devshell.DevshellCredentials() |
| except devshell.NoDevshellServer: |
| return None |
| except: |
| raise |
| |
| def _GetCredentialStoreKeyDict(self, credentials): |
| """Disambiguates a credential for caching in a credential store. |
| |
| Different credential types have different fields that identify them. |
| This function assembles relevant information in a dict and returns it. |
| |
| Args: |
| credentials: An OAuth2Credentials object. |
| |
| Returns: |
| Dict of relevant identifiers for credentials. |
| """ |
| # TODO: If scopes ever become available in the credentials themselves, |
| # include them in the key dict. |
| key_dict = {'api_version': self.api_version} |
| # pylint: disable=protected-access |
| if isinstance(credentials, devshell.DevshellCredentials): |
| key_dict['user_email'] = credentials.user_email |
| elif isinstance(credentials, |
| oauth2client.service_account._ServiceAccountCredentials): |
| key_dict['_service_account_email'] = credentials._service_account_email |
| elif isinstance(credentials, |
| oauth2client.client.SignedJwtAssertionCredentials): |
| key_dict['service_account_name'] = credentials.service_account_name |
| elif isinstance(credentials, oauth2client.client.OAuth2Credentials): |
| if credentials.client_id and credentials.client_id != 'null': |
| key_dict['client_id'] = credentials.client_id |
| key_dict['refresh_token'] = credentials.refresh_token |
| # pylint: enable=protected-access |
| |
| return key_dict |
| |
| def _GetNewDownloadHttp(self): |
| return GetNewHttp(http_class=HttpWithDownloadStream) |
| |
| def _GetNewUploadHttp(self): |
| """Returns an upload-safe Http object (by disabling httplib2 retries).""" |
| return GetNewHttp(http_class=HttpWithNoRetries) |
| |
| def GetBucket(self, bucket_name, provider=None, fields=None): |
| """See CloudApi class for function doc strings.""" |
| projection = (apitools_messages.StorageBucketsGetRequest |
| .ProjectionValueValuesEnum.full) |
| apitools_request = apitools_messages.StorageBucketsGetRequest( |
| bucket=bucket_name, projection=projection) |
| global_params = apitools_messages.StandardQueryParameters() |
| if fields: |
| global_params.fields = ','.join(set(fields)) |
| |
| # Here and in list buckets, we have no way of knowing |
| # whether we requested a field and didn't get it because it didn't exist |
| # or because we didn't have permission to access it. |
| try: |
| return self.api_client.buckets.Get(apitools_request, |
| global_params=global_params) |
| except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e, bucket_name=bucket_name) |
| |
| def PatchBucket(self, bucket_name, metadata, canned_acl=None, |
| canned_def_acl=None, preconditions=None, provider=None, |
| fields=None): |
| """See CloudApi class for function doc strings.""" |
| projection = (apitools_messages.StorageBucketsPatchRequest |
| .ProjectionValueValuesEnum.full) |
| bucket_metadata = metadata |
| |
| if not preconditions: |
| preconditions = Preconditions() |
| |
| # For blank metadata objects, we need to explicitly call |
| # them out to apitools so it will send/erase them. |
| apitools_include_fields = [] |
| for metadata_field in ('metadata', 'lifecycle', 'logging', 'versioning', |
| 'website'): |
| attr = getattr(bucket_metadata, metadata_field, None) |
| if attr and not encoding.MessageToDict(attr): |
| setattr(bucket_metadata, metadata_field, None) |
| apitools_include_fields.append(metadata_field) |
| |
| if bucket_metadata.cors and bucket_metadata.cors == REMOVE_CORS_CONFIG: |
| bucket_metadata.cors = [] |
| apitools_include_fields.append('cors') |
| |
| if (bucket_metadata.defaultObjectAcl and |
| bucket_metadata.defaultObjectAcl[0] == PRIVATE_DEFAULT_OBJ_ACL): |
| bucket_metadata.defaultObjectAcl = [] |
| apitools_include_fields.append('defaultObjectAcl') |
| |
| predefined_acl = None |
| if canned_acl: |
| # Must null out existing ACLs to apply a canned ACL. |
| apitools_include_fields.append('acl') |
| predefined_acl = ( |
| apitools_messages.StorageBucketsPatchRequest. |
| PredefinedAclValueValuesEnum( |
| self._BucketCannedAclToPredefinedAcl(canned_acl))) |
| |
| predefined_def_acl = None |
| if canned_def_acl: |
| # Must null out existing default object ACLs to apply a canned ACL. |
| apitools_include_fields.append('defaultObjectAcl') |
| predefined_def_acl = ( |
| apitools_messages.StorageBucketsPatchRequest. |
| PredefinedDefaultObjectAclValueValuesEnum( |
| self._ObjectCannedAclToPredefinedAcl(canned_def_acl))) |
| |
| apitools_request = apitools_messages.StorageBucketsPatchRequest( |
| bucket=bucket_name, bucketResource=bucket_metadata, |
| projection=projection, |
| ifMetagenerationMatch=preconditions.meta_gen_match, |
| predefinedAcl=predefined_acl, |
| predefinedDefaultObjectAcl=predefined_def_acl) |
| global_params = apitools_messages.StandardQueryParameters() |
| if fields: |
| global_params.fields = ','.join(set(fields)) |
| with self.api_client.IncludeFields(apitools_include_fields): |
| try: |
| return self.api_client.buckets.Patch(apitools_request, |
| global_params=global_params) |
| except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e) |
| |
| def CreateBucket(self, bucket_name, project_id=None, metadata=None, |
| provider=None, fields=None): |
| """See CloudApi class for function doc strings.""" |
| projection = (apitools_messages.StorageBucketsInsertRequest |
| .ProjectionValueValuesEnum.full) |
| if not metadata: |
| metadata = apitools_messages.Bucket() |
| metadata.name = bucket_name |
| |
| if metadata.location: |
| metadata.location = metadata.location.upper() |
| if metadata.storageClass: |
| metadata.storageClass = metadata.storageClass.upper() |
| |
| project_id = PopulateProjectId(project_id) |
| |
| apitools_request = apitools_messages.StorageBucketsInsertRequest( |
| bucket=metadata, project=project_id, projection=projection) |
| global_params = apitools_messages.StandardQueryParameters() |
| if fields: |
| global_params.fields = ','.join(set(fields)) |
| try: |
| return self.api_client.buckets.Insert(apitools_request, |
| global_params=global_params) |
| except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e, bucket_name=bucket_name) |
| |
| def DeleteBucket(self, bucket_name, preconditions=None, provider=None): |
| """See CloudApi class for function doc strings.""" |
| if not preconditions: |
| preconditions = Preconditions() |
| |
| apitools_request = apitools_messages.StorageBucketsDeleteRequest( |
| bucket=bucket_name, ifMetagenerationMatch=preconditions.meta_gen_match) |
| |
| try: |
| self.api_client.buckets.Delete(apitools_request) |
| except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| if isinstance( |
| self._TranslateApitoolsException(e, bucket_name=bucket_name), |
| NotEmptyException): |
| # If bucket is not empty, check to see if versioning is enabled and |
| # signal that in the exception if it is. |
| bucket_metadata = self.GetBucket(bucket_name, |
| fields=['versioning']) |
| if bucket_metadata.versioning and bucket_metadata.versioning.enabled: |
| raise NotEmptyException('VersionedBucketNotEmpty', |
| status=e.status_code) |
| self._TranslateExceptionAndRaise(e, bucket_name=bucket_name) |
| |
| def ListBuckets(self, project_id=None, provider=None, fields=None): |
| """See CloudApi class for function doc strings.""" |
| projection = (apitools_messages.StorageBucketsListRequest |
| .ProjectionValueValuesEnum.full) |
| project_id = PopulateProjectId(project_id) |
| |
| apitools_request = apitools_messages.StorageBucketsListRequest( |
| project=project_id, maxResults=NUM_BUCKETS_PER_LIST_PAGE, |
| projection=projection) |
| global_params = apitools_messages.StandardQueryParameters() |
| if fields: |
| if 'nextPageToken' not in fields: |
| fields.add('nextPageToken') |
| global_params.fields = ','.join(set(fields)) |
| try: |
| bucket_list = self.api_client.buckets.List(apitools_request, |
| global_params=global_params) |
| except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e) |
| |
| for bucket in self._YieldBuckets(bucket_list): |
| yield bucket |
| |
| while bucket_list.nextPageToken: |
| apitools_request = apitools_messages.StorageBucketsListRequest( |
| project=project_id, pageToken=bucket_list.nextPageToken, |
| maxResults=NUM_BUCKETS_PER_LIST_PAGE, projection=projection) |
| try: |
| bucket_list = self.api_client.buckets.List(apitools_request, |
| global_params=global_params) |
| except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e) |
| |
| for bucket in self._YieldBuckets(bucket_list): |
| yield bucket |
| |
| def _YieldBuckets(self, bucket_list): |
| """Yields buckets from a list returned by apitools.""" |
| if bucket_list.items: |
| for bucket in bucket_list.items: |
| yield bucket |
| |
| def ListObjects(self, bucket_name, prefix=None, delimiter=None, |
| all_versions=None, provider=None, fields=None): |
| """See CloudApi class for function doc strings.""" |
| projection = (apitools_messages.StorageObjectsListRequest |
| .ProjectionValueValuesEnum.full) |
| apitools_request = apitools_messages.StorageObjectsListRequest( |
| bucket=bucket_name, prefix=prefix, delimiter=delimiter, |
| versions=all_versions, projection=projection, |
| maxResults=NUM_OBJECTS_PER_LIST_PAGE) |
| global_params = apitools_messages.StandardQueryParameters() |
| |
| if fields: |
| fields = set(fields) |
| if 'nextPageToken' not in fields: |
| fields.add('nextPageToken') |
| global_params.fields = ','.join(fields) |
| |
| try: |
| object_list = self.api_client.objects.List(apitools_request, |
| global_params=global_params) |
| except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e, bucket_name=bucket_name) |
| |
| for object_or_prefix in self._YieldObjectsAndPrefixes(object_list): |
| yield object_or_prefix |
| |
| while object_list.nextPageToken: |
| apitools_request = apitools_messages.StorageObjectsListRequest( |
| bucket=bucket_name, prefix=prefix, delimiter=delimiter, |
| versions=all_versions, projection=projection, |
| pageToken=object_list.nextPageToken, |
| maxResults=NUM_OBJECTS_PER_LIST_PAGE) |
| try: |
| object_list = self.api_client.objects.List(apitools_request, |
| global_params=global_params) |
| except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e, bucket_name=bucket_name) |
| |
| for object_or_prefix in self._YieldObjectsAndPrefixes(object_list): |
| yield object_or_prefix |
| |
| def _YieldObjectsAndPrefixes(self, object_list): |
| # Yield prefixes first so that checking for the presence of a subdirectory |
| # is fast. |
| if object_list.prefixes: |
| for prefix in object_list.prefixes: |
| yield CloudApi.CsObjectOrPrefix(prefix, |
| CloudApi.CsObjectOrPrefixType.PREFIX) |
| if object_list.items: |
| for cloud_obj in object_list.items: |
| yield CloudApi.CsObjectOrPrefix(cloud_obj, |
| CloudApi.CsObjectOrPrefixType.OBJECT) |
| |
| def GetObjectMetadata(self, bucket_name, object_name, generation=None, |
| provider=None, fields=None): |
| """See CloudApi class for function doc strings.""" |
| projection = (apitools_messages.StorageObjectsGetRequest |
| .ProjectionValueValuesEnum.full) |
| |
| if generation: |
| generation = long(generation) |
| |
| apitools_request = apitools_messages.StorageObjectsGetRequest( |
| bucket=bucket_name, object=object_name, projection=projection, |
| generation=generation) |
| global_params = apitools_messages.StandardQueryParameters() |
| if fields: |
| global_params.fields = ','.join(set(fields)) |
| |
| try: |
| return self.api_client.objects.Get(apitools_request, |
| global_params=global_params) |
| except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e, bucket_name=bucket_name, |
| object_name=object_name, |
| generation=generation) |
| |
| def GetObjectMedia( |
| self, bucket_name, object_name, download_stream, |
| provider=None, generation=None, object_size=None, |
| download_strategy=CloudApi.DownloadStrategy.ONE_SHOT, start_byte=0, |
| end_byte=None, progress_callback=None, serialization_data=None, |
| digesters=None): |
| """See CloudApi class for function doc strings.""" |
| # This implementation will get the object metadata first if we don't pass it |
| # in via serialization_data. |
| if generation: |
| generation = long(generation) |
| |
| # 'outer_total_size' is only used for formatting user output, and is |
| # expected to be one higher than the last byte that should be downloaded. |
| # TODO: Change DownloadCallbackConnectionClassFactory and progress callbacks |
| # to more elegantly handle total size for components of files. |
| outer_total_size = object_size |
| if end_byte: |
| outer_total_size = end_byte + 1 |
| elif serialization_data: |
| outer_total_size = json.loads(serialization_data)['total_size'] |
| |
| if progress_callback: |
| if outer_total_size is None: |
| raise ArgumentException('Download size is required when callbacks are ' |
| 'requested for a download, but no size was ' |
| 'provided.') |
| progress_callback(start_byte, outer_total_size) |
| |
| bytes_downloaded_container = BytesTransferredContainer() |
| bytes_downloaded_container.bytes_transferred = start_byte |
| |
| callback_class_factory = DownloadCallbackConnectionClassFactory( |
| bytes_downloaded_container, total_size=outer_total_size, |
| progress_callback=progress_callback, digesters=digesters) |
| download_http_class = callback_class_factory.GetConnectionClass() |
| |
| # Point our download HTTP at our download stream. |
| self.download_http.stream = download_stream |
| self.download_http.connections = {'https': download_http_class} |
| |
| if serialization_data: |
| apitools_download = apitools_transfer.Download.FromData( |
| download_stream, serialization_data, self.api_client.http, |
| num_retries=self.num_retries) |
| else: |
| apitools_download = apitools_transfer.Download.FromStream( |
| download_stream, auto_transfer=False, total_size=object_size, |
| num_retries=self.num_retries) |
| |
| apitools_download.bytes_http = self.authorized_download_http |
| apitools_request = apitools_messages.StorageObjectsGetRequest( |
| bucket=bucket_name, object=object_name, generation=generation) |
| |
| try: |
| if download_strategy == CloudApi.DownloadStrategy.RESUMABLE: |
| # Disable retries in apitools. We will handle them explicitly here. |
| apitools_download.retry_func = ( |
| apitools_http_wrapper.RethrowExceptionHandler) |
| return self._PerformResumableDownload( |
| bucket_name, object_name, download_stream, apitools_request, |
| apitools_download, bytes_downloaded_container, |
| generation=generation, start_byte=start_byte, end_byte=end_byte, |
| serialization_data=serialization_data) |
| else: |
| return self._PerformDownload( |
| bucket_name, object_name, download_stream, apitools_request, |
| apitools_download, generation=generation, start_byte=start_byte, |
| end_byte=end_byte, serialization_data=serialization_data) |
| except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e, bucket_name=bucket_name, |
| object_name=object_name, |
| generation=generation) |
| |
| def _PerformResumableDownload( |
| self, bucket_name, object_name, download_stream, apitools_request, |
| apitools_download, bytes_downloaded_container, generation=None, |
| start_byte=0, end_byte=None, serialization_data=None): |
| retries = 0 |
| last_progress_byte = start_byte |
| while retries <= self.num_retries: |
| try: |
| return self._PerformDownload( |
| bucket_name, object_name, download_stream, apitools_request, |
| apitools_download, generation=generation, start_byte=start_byte, |
| end_byte=end_byte, serialization_data=serialization_data) |
| except HTTP_TRANSFER_EXCEPTIONS, e: |
| start_byte = download_stream.tell() |
| bytes_downloaded_container.bytes_transferred = start_byte |
| if start_byte > last_progress_byte: |
| # We've made progress, so allow a fresh set of retries. |
| last_progress_byte = start_byte |
| retries = 0 |
| retries += 1 |
| if retries > self.num_retries: |
| raise ResumableDownloadException( |
| 'Transfer failed after %d retries. Final exception: %s' % |
| (self.num_retries, unicode(e).encode(UTF8))) |
| time.sleep(CalculateWaitForRetry(retries, max_wait=self.max_retry_wait)) |
| if self.logger.isEnabledFor(logging.DEBUG): |
| self.logger.debug( |
| 'Retrying download from byte %s after exception: %s. Trace: %s', |
| start_byte, unicode(e).encode(UTF8), traceback.format_exc()) |
| apitools_http_wrapper.RebuildHttpConnections( |
| apitools_download.bytes_http) |
| |
| def _PerformDownload( |
| self, bucket_name, object_name, download_stream, apitools_request, |
| apitools_download, generation=None, start_byte=0, end_byte=None, |
| serialization_data=None): |
| if not serialization_data: |
| try: |
| self.api_client.objects.Get(apitools_request, |
| download=apitools_download) |
| except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e, bucket_name=bucket_name, |
| object_name=object_name, |
| generation=generation) |
| |
| # Disable apitools' default print callbacks. |
| def _NoOpCallback(unused_response, unused_download_object): |
| pass |
| |
| # TODO: If we have a resumable download with accept-encoding:gzip |
| # on a object that is compressible but not in gzip form in the cloud, |
| # on-the-fly compression will gzip the object. In this case if our |
| # download breaks, future requests will ignore the range header and just |
| # return the object (gzipped) in its entirety. Ideally, we would unzip |
| # the bytes that we have locally and send a range request without |
| # accept-encoding:gzip so that we can download only the (uncompressed) bytes |
| # that we don't yet have. |
| |
| # Since bytes_http is created in this function, we don't get the |
| # user-agent header from api_client's http automatically. |
| additional_headers = { |
| 'accept-encoding': 'gzip', |
| 'user-agent': self.api_client.user_agent |
| } |
| if start_byte or end_byte is not None: |
| apitools_download.GetRange(additional_headers=additional_headers, |
| start=start_byte, end=end_byte, |
| use_chunks=False) |
| else: |
| apitools_download.StreamMedia( |
| callback=_NoOpCallback, finish_callback=_NoOpCallback, |
| additional_headers=additional_headers, use_chunks=False) |
| return apitools_download.encoding |
| |
| def PatchObjectMetadata(self, bucket_name, object_name, metadata, |
| canned_acl=None, generation=None, preconditions=None, |
| provider=None, fields=None): |
| """See CloudApi class for function doc strings.""" |
| projection = (apitools_messages.StorageObjectsPatchRequest |
| .ProjectionValueValuesEnum.full) |
| |
| if not preconditions: |
| preconditions = Preconditions() |
| |
| if generation: |
| generation = long(generation) |
| |
| predefined_acl = None |
| apitools_include_fields = [] |
| if canned_acl: |
| # Must null out existing ACLs to apply a canned ACL. |
| apitools_include_fields.append('acl') |
| predefined_acl = ( |
| apitools_messages.StorageObjectsPatchRequest. |
| PredefinedAclValueValuesEnum( |
| self._ObjectCannedAclToPredefinedAcl(canned_acl))) |
| |
| apitools_request = apitools_messages.StorageObjectsPatchRequest( |
| bucket=bucket_name, object=object_name, objectResource=metadata, |
| generation=generation, projection=projection, |
| ifGenerationMatch=preconditions.gen_match, |
| ifMetagenerationMatch=preconditions.meta_gen_match, |
| predefinedAcl=predefined_acl) |
| global_params = apitools_messages.StandardQueryParameters() |
| if fields: |
| global_params.fields = ','.join(set(fields)) |
| |
| try: |
| with self.api_client.IncludeFields(apitools_include_fields): |
| return self.api_client.objects.Patch(apitools_request, |
| global_params=global_params) |
| except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e, bucket_name=bucket_name, |
| object_name=object_name, |
| generation=generation) |
| |
| def _UploadObject(self, upload_stream, object_metadata, canned_acl=None, |
| size=None, preconditions=None, provider=None, fields=None, |
| serialization_data=None, tracker_callback=None, |
| progress_callback=None, |
| apitools_strategy=apitools_transfer.SIMPLE_UPLOAD, |
| total_size=0): |
| # pylint: disable=g-doc-args |
| """Upload implementation. Cloud API arguments, plus two more. |
| |
| Additional args: |
| apitools_strategy: SIMPLE_UPLOAD or RESUMABLE_UPLOAD. |
| total_size: Total size of the upload; None if it is unknown (streaming). |
| |
| Returns: |
| Uploaded object metadata. |
| """ |
| # pylint: enable=g-doc-args |
| ValidateDstObjectMetadata(object_metadata) |
| predefined_acl = None |
| if canned_acl: |
| predefined_acl = ( |
| apitools_messages.StorageObjectsInsertRequest. |
| PredefinedAclValueValuesEnum( |
| self._ObjectCannedAclToPredefinedAcl(canned_acl))) |
| |
| bytes_uploaded_container = BytesTransferredContainer() |
| |
| if progress_callback and size: |
| total_size = size |
| progress_callback(0, size) |
| |
| callback_class_factory = UploadCallbackConnectionClassFactory( |
| bytes_uploaded_container, total_size=total_size, |
| progress_callback=progress_callback) |
| |
| upload_http_class = callback_class_factory.GetConnectionClass() |
| self.upload_http.connections = {'http': upload_http_class, |
| 'https': upload_http_class} |
| |
| # Since bytes_http is created in this function, we don't get the |
| # user-agent header from api_client's http automatically. |
| additional_headers = { |
| 'user-agent': self.api_client.user_agent |
| } |
| |
| try: |
| content_type = None |
| apitools_request = None |
| global_params = None |
| if not serialization_data: |
| # This is a new upload, set up initial upload state. |
| content_type = object_metadata.contentType |
| if not content_type: |
| content_type = DEFAULT_CONTENT_TYPE |
| |
| if not preconditions: |
| preconditions = Preconditions() |
| |
| apitools_request = apitools_messages.StorageObjectsInsertRequest( |
| bucket=object_metadata.bucket, object=object_metadata, |
| ifGenerationMatch=preconditions.gen_match, |
| ifMetagenerationMatch=preconditions.meta_gen_match, |
| predefinedAcl=predefined_acl) |
| global_params = apitools_messages.StandardQueryParameters() |
| if fields: |
| global_params.fields = ','.join(set(fields)) |
| |
| if apitools_strategy == apitools_transfer.SIMPLE_UPLOAD: |
| # One-shot upload. |
| apitools_upload = apitools_transfer.Upload( |
| upload_stream, content_type, total_size=size, auto_transfer=True, |
| num_retries=self.num_retries) |
| apitools_upload.strategy = apitools_strategy |
| apitools_upload.bytes_http = self.authorized_upload_http |
| |
| return self.api_client.objects.Insert( |
| apitools_request, |
| upload=apitools_upload, |
| global_params=global_params) |
| else: # Resumable upload. |
| return self._PerformResumableUpload( |
| upload_stream, self.authorized_upload_http, content_type, size, |
| serialization_data, apitools_strategy, apitools_request, |
| global_params, bytes_uploaded_container, tracker_callback, |
| additional_headers, progress_callback) |
| except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| not_found_exception = CreateNotFoundExceptionForObjectWrite( |
| self.provider, object_metadata.bucket) |
| self._TranslateExceptionAndRaise(e, bucket_name=object_metadata.bucket, |
| object_name=object_metadata.name, |
| not_found_exception=not_found_exception) |
| |
| def _PerformResumableUpload( |
| self, upload_stream, authorized_upload_http, content_type, size, |
| serialization_data, apitools_strategy, apitools_request, global_params, |
| bytes_uploaded_container, tracker_callback, addl_headers, |
| progress_callback): |
| try: |
| if serialization_data: |
| # Resuming an existing upload. |
| apitools_upload = apitools_transfer.Upload.FromData( |
| upload_stream, serialization_data, self.api_client.http, |
| num_retries=self.num_retries) |
| apitools_upload.chunksize = GetJsonResumableChunkSize() |
| apitools_upload.bytes_http = authorized_upload_http |
| else: |
| # New resumable upload. |
| apitools_upload = apitools_transfer.Upload( |
| upload_stream, content_type, total_size=size, |
| chunksize=GetJsonResumableChunkSize(), auto_transfer=False, |
| num_retries=self.num_retries) |
| apitools_upload.strategy = apitools_strategy |
| apitools_upload.bytes_http = authorized_upload_http |
| self.api_client.objects.Insert( |
| apitools_request, |
| upload=apitools_upload, |
| global_params=global_params) |
| # Disable retries in apitools. We will handle them explicitly here. |
| apitools_upload.retry_func = ( |
| apitools_http_wrapper.RethrowExceptionHandler) |
| |
| # Disable apitools' default print callbacks. |
| def _NoOpCallback(unused_response, unused_upload_object): |
| pass |
| |
| # If we're resuming an upload, apitools has at this point received |
| # from the server how many bytes it already has. Update our |
| # callback class with this information. |
| bytes_uploaded_container.bytes_transferred = apitools_upload.progress |
| if tracker_callback: |
| tracker_callback(json.dumps(apitools_upload.serialization_data)) |
| |
| retries = 0 |
| last_progress_byte = apitools_upload.progress |
| while retries <= self.num_retries: |
| try: |
| # TODO: On retry, this will seek to the bytes that the server has, |
| # causing the hash to be recalculated. Make HashingFileUploadWrapper |
| # save a digest according to json_resumable_chunk_size. |
| if size: |
| # If size is known, we can send it all in one request and avoid |
| # making a round-trip per chunk. |
| http_response = apitools_upload.StreamMedia( |
| callback=_NoOpCallback, finish_callback=_NoOpCallback, |
| additional_headers=addl_headers) |
| else: |
| # Otherwise it's a streaming request and we need to ensure that we |
| # send the bytes in chunks so that we can guarantee that we never |
| # need to seek backwards more than our buffer (and also that the |
| # chunks are aligned to 256KB). |
| http_response = apitools_upload.StreamInChunks( |
| callback=_NoOpCallback, finish_callback=_NoOpCallback, |
| additional_headers=addl_headers) |
| processed_response = self.api_client.objects.ProcessHttpResponse( |
| self.api_client.objects.GetMethodConfig('Insert'), http_response) |
| if size is None and progress_callback: |
| # Make final progress callback; total size should now be known. |
| # This works around the fact the send function counts header bytes. |
| # However, this will make the progress appear to go slightly |
| # backwards at the end. |
| progress_callback(apitools_upload.total_size, |
| apitools_upload.total_size) |
| return processed_response |
| except HTTP_TRANSFER_EXCEPTIONS, e: |
| apitools_http_wrapper.RebuildHttpConnections( |
| apitools_upload.bytes_http) |
| while retries <= self.num_retries: |
| try: |
| # TODO: Simulate the refresh case in tests. Right now, our |
| # mocks are not complex enough to simulate a failure. |
| apitools_upload.RefreshResumableUploadState() |
| start_byte = apitools_upload.progress |
| bytes_uploaded_container.bytes_transferred = start_byte |
| break |
| except HTTP_TRANSFER_EXCEPTIONS, e2: |
| apitools_http_wrapper.RebuildHttpConnections( |
| apitools_upload.bytes_http) |
| retries += 1 |
| if retries > self.num_retries: |
| raise ResumableUploadException( |
| 'Transfer failed after %d retries. Final exception: %s' % |
| (self.num_retries, e2)) |
| time.sleep( |
| CalculateWaitForRetry(retries, max_wait=self.max_retry_wait)) |
| if start_byte > last_progress_byte: |
| # We've made progress, so allow a fresh set of retries. |
| last_progress_byte = start_byte |
| retries = 0 |
| else: |
| retries += 1 |
| if retries > self.num_retries: |
| raise ResumableUploadException( |
| 'Transfer failed after %d retries. Final exception: %s' % |
| (self.num_retries, unicode(e).encode(UTF8))) |
| time.sleep( |
| CalculateWaitForRetry(retries, max_wait=self.max_retry_wait)) |
| if self.logger.isEnabledFor(logging.DEBUG): |
| self.logger.debug( |
| 'Retrying upload from byte %s after exception: %s. Trace: %s', |
| start_byte, unicode(e).encode(UTF8), traceback.format_exc()) |
| except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| resumable_ex = self._TranslateApitoolsResumableUploadException(e) |
| if resumable_ex: |
| raise resumable_ex |
| else: |
| raise |
| |
| def UploadObject(self, upload_stream, object_metadata, canned_acl=None, |
| size=None, preconditions=None, progress_callback=None, |
| provider=None, fields=None): |
| """See CloudApi class for function doc strings.""" |
| return self._UploadObject( |
| upload_stream, object_metadata, canned_acl=canned_acl, |
| size=size, preconditions=preconditions, |
| progress_callback=progress_callback, fields=fields, |
| apitools_strategy=apitools_transfer.SIMPLE_UPLOAD) |
| |
| def UploadObjectStreaming(self, upload_stream, object_metadata, |
| canned_acl=None, preconditions=None, |
| progress_callback=None, provider=None, |
| fields=None): |
| """See CloudApi class for function doc strings.""" |
| # Streaming indicated by not passing a size. |
| # Resumable capabilities are present up to the resumable chunk size using |
| # a buffered stream. |
| return self._UploadObject( |
| upload_stream, object_metadata, canned_acl=canned_acl, |
| preconditions=preconditions, progress_callback=progress_callback, |
| fields=fields, apitools_strategy=apitools_transfer.RESUMABLE_UPLOAD, |
| total_size=None) |
| |
| def UploadObjectResumable( |
| self, upload_stream, object_metadata, canned_acl=None, preconditions=None, |
| provider=None, fields=None, size=None, serialization_data=None, |
| tracker_callback=None, progress_callback=None): |
| """See CloudApi class for function doc strings.""" |
| return self._UploadObject( |
| upload_stream, object_metadata, canned_acl=canned_acl, |
| preconditions=preconditions, fields=fields, size=size, |
| serialization_data=serialization_data, |
| tracker_callback=tracker_callback, progress_callback=progress_callback, |
| apitools_strategy=apitools_transfer.RESUMABLE_UPLOAD) |
| |
| def CopyObject(self, src_obj_metadata, dst_obj_metadata, src_generation=None, |
| canned_acl=None, preconditions=None, progress_callback=None, |
| max_bytes_per_call=None, provider=None, fields=None): |
| """See CloudApi class for function doc strings.""" |
| ValidateDstObjectMetadata(dst_obj_metadata) |
| predefined_acl = None |
| if canned_acl: |
| predefined_acl = ( |
| apitools_messages.StorageObjectsRewriteRequest. |
| DestinationPredefinedAclValueValuesEnum( |
| self._ObjectCannedAclToPredefinedAcl(canned_acl))) |
| |
| if src_generation: |
| src_generation = long(src_generation) |
| |
| if not preconditions: |
| preconditions = Preconditions() |
| |
| projection = (apitools_messages.StorageObjectsRewriteRequest. |
| ProjectionValueValuesEnum.full) |
| global_params = apitools_messages.StandardQueryParameters() |
| if fields: |
| # Rewrite returns the resultant object under the 'resource' field. |
| new_fields = set(['done', 'objectSize', 'rewriteToken', |
| 'totalBytesRewritten']) |
| for field in fields: |
| new_fields.add('resource/' + field) |
| global_params.fields = ','.join(set(new_fields)) |
| |
| # Check to see if we are resuming a rewrite. |
| tracker_file_name = GetRewriteTrackerFilePath( |
| src_obj_metadata.bucket, src_obj_metadata.name, dst_obj_metadata.bucket, |
| dst_obj_metadata.name, 'JSON') |
| rewrite_params_hash = HashRewriteParameters( |
| src_obj_metadata, dst_obj_metadata, projection, |
| src_generation=src_generation, gen_match=preconditions.gen_match, |
| meta_gen_match=preconditions.meta_gen_match, |
| canned_acl=predefined_acl, fields=global_params.fields, |
| max_bytes_per_call=max_bytes_per_call) |
| resume_rewrite_token = ReadRewriteTrackerFile(tracker_file_name, |
| rewrite_params_hash) |
| |
| progress_cb_with_backoff = None |
| try: |
| last_bytes_written = 0L |
| while True: |
| apitools_request = apitools_messages.StorageObjectsRewriteRequest( |
| sourceBucket=src_obj_metadata.bucket, |
| sourceObject=src_obj_metadata.name, |
| destinationBucket=dst_obj_metadata.bucket, |
| destinationObject=dst_obj_metadata.name, |
| projection=projection, object=dst_obj_metadata, |
| sourceGeneration=src_generation, |
| ifGenerationMatch=preconditions.gen_match, |
| ifMetagenerationMatch=preconditions.meta_gen_match, |
| destinationPredefinedAcl=predefined_acl, |
| rewriteToken=resume_rewrite_token, |
| maxBytesRewrittenPerCall=max_bytes_per_call) |
| rewrite_response = self.api_client.objects.Rewrite( |
| apitools_request, global_params=global_params) |
| bytes_written = long(rewrite_response.totalBytesRewritten) |
| if progress_callback and not progress_cb_with_backoff: |
| progress_cb_with_backoff = ProgressCallbackWithBackoff( |
| long(rewrite_response.objectSize), progress_callback) |
| if progress_cb_with_backoff: |
| progress_cb_with_backoff.Progress( |
| bytes_written - last_bytes_written) |
| |
| if rewrite_response.done: |
| break |
| elif not resume_rewrite_token: |
| # Save the token and make a tracker file if they don't already exist. |
| resume_rewrite_token = rewrite_response.rewriteToken |
| WriteRewriteTrackerFile(tracker_file_name, rewrite_params_hash, |
| rewrite_response.rewriteToken) |
| last_bytes_written = bytes_written |
| |
| DeleteTrackerFile(tracker_file_name) |
| return rewrite_response.resource |
| except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| not_found_exception = CreateNotFoundExceptionForObjectWrite( |
| self.provider, dst_obj_metadata.bucket, src_provider=self.provider, |
| src_bucket_name=src_obj_metadata.bucket, |
| src_object_name=src_obj_metadata.name, src_generation=src_generation) |
| self._TranslateExceptionAndRaise(e, bucket_name=dst_obj_metadata.bucket, |
| object_name=dst_obj_metadata.name, |
| not_found_exception=not_found_exception) |
| |
| def DeleteObject(self, bucket_name, object_name, preconditions=None, |
| generation=None, provider=None): |
| """See CloudApi class for function doc strings.""" |
| if not preconditions: |
| preconditions = Preconditions() |
| |
| if generation: |
| generation = long(generation) |
| |
| apitools_request = apitools_messages.StorageObjectsDeleteRequest( |
| bucket=bucket_name, object=object_name, generation=generation, |
| ifGenerationMatch=preconditions.gen_match, |
| ifMetagenerationMatch=preconditions.meta_gen_match) |
| try: |
| return self.api_client.objects.Delete(apitools_request) |
| except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e, bucket_name=bucket_name, |
| object_name=object_name, |
| generation=generation) |
| |
| def ComposeObject(self, src_objs_metadata, dst_obj_metadata, |
| preconditions=None, provider=None, fields=None): |
| """See CloudApi class for function doc strings.""" |
| ValidateDstObjectMetadata(dst_obj_metadata) |
| |
| dst_obj_name = dst_obj_metadata.name |
| dst_obj_metadata.name = None |
| dst_bucket_name = dst_obj_metadata.bucket |
| dst_obj_metadata.bucket = None |
| if not dst_obj_metadata.contentType: |
| dst_obj_metadata.contentType = DEFAULT_CONTENT_TYPE |
| |
| if not preconditions: |
| preconditions = Preconditions() |
| |
| global_params = apitools_messages.StandardQueryParameters() |
| if fields: |
| global_params.fields = ','.join(set(fields)) |
| |
| src_objs_compose_request = apitools_messages.ComposeRequest( |
| sourceObjects=src_objs_metadata, destination=dst_obj_metadata) |
| |
| apitools_request = apitools_messages.StorageObjectsComposeRequest( |
| composeRequest=src_objs_compose_request, |
| destinationBucket=dst_bucket_name, |
| destinationObject=dst_obj_name, |
| ifGenerationMatch=preconditions.gen_match, |
| ifMetagenerationMatch=preconditions.meta_gen_match) |
| try: |
| return self.api_client.objects.Compose(apitools_request, |
| global_params=global_params) |
| except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| # We can't be sure which object was missing in the 404 case. |
| if isinstance(e, apitools_exceptions.HttpError) and e.status_code == 404: |
| raise NotFoundException('One of the source objects does not exist.') |
| else: |
| self._TranslateExceptionAndRaise(e) |
| |
| def WatchBucket(self, bucket_name, address, channel_id, token=None, |
| provider=None, fields=None): |
| """See CloudApi class for function doc strings.""" |
| projection = (apitools_messages.StorageObjectsWatchAllRequest |
| .ProjectionValueValuesEnum.full) |
| |
| channel = apitools_messages.Channel(address=address, id=channel_id, |
| token=token, type='WEB_HOOK') |
| |
| apitools_request = apitools_messages.StorageObjectsWatchAllRequest( |
| bucket=bucket_name, channel=channel, projection=projection) |
| |
| global_params = apitools_messages.StandardQueryParameters() |
| if fields: |
| global_params.fields = ','.join(set(fields)) |
| |
| try: |
| return self.api_client.objects.WatchAll(apitools_request, |
| global_params=global_params) |
| except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e, bucket_name=bucket_name) |
| |
| def StopChannel(self, channel_id, resource_id, provider=None): |
| """See CloudApi class for function doc strings.""" |
| channel = apitools_messages.Channel(id=channel_id, resourceId=resource_id) |
| try: |
| self.api_client.channels.Stop(channel) |
| except TRANSLATABLE_APITOOLS_EXCEPTIONS, e: |
| self._TranslateExceptionAndRaise(e) |
| |
| def _BucketCannedAclToPredefinedAcl(self, canned_acl_string): |
| """Translates the input string to a bucket PredefinedAcl string. |
| |
| Args: |
| canned_acl_string: Canned ACL string. |
| |
| Returns: |
| String that can be used as a query parameter with the JSON API. This |
| corresponds to a flavor of *PredefinedAclValueValuesEnum and can be |
| used as input to apitools requests that affect bucket access controls. |
| """ |
| # XML : JSON |
| translation_dict = { |
| None: None, |
| 'authenticated-read': 'authenticatedRead', |
| 'private': 'private', |
| 'project-private': 'projectPrivate', |
| 'public-read': 'publicRead', |
| 'public-read-write': 'publicReadWrite' |
| } |
| if canned_acl_string in translation_dict: |
| return translation_dict[canned_acl_string] |
| raise ArgumentException('Invalid canned ACL %s' % canned_acl_string) |
| |
| def _ObjectCannedAclToPredefinedAcl(self, canned_acl_string): |
| """Translates the input string to an object PredefinedAcl string. |
| |
| Args: |
| canned_acl_string: Canned ACL string. |
| |
| Returns: |
| String that can be used as a query parameter with the JSON API. This |
| corresponds to a flavor of *PredefinedAclValueValuesEnum and can be |
| used as input to apitools requests that affect object access controls. |
| """ |
| # XML : JSON |
| translation_dict = { |
| None: None, |
| 'authenticated-read': 'authenticatedRead', |
| 'bucket-owner-read': 'bucketOwnerRead', |
| 'bucket-owner-full-control': 'bucketOwnerFullControl', |
| 'private': 'private', |
| 'project-private': 'projectPrivate', |
| 'public-read': 'publicRead' |
| } |
| if canned_acl_string in translation_dict: |
| return translation_dict[canned_acl_string] |
| raise ArgumentException('Invalid canned ACL %s' % canned_acl_string) |
| |
| def _TranslateExceptionAndRaise(self, e, bucket_name=None, object_name=None, |
| generation=None, not_found_exception=None): |
| """Translates an HTTP exception and raises the translated or original value. |
| |
| Args: |
| e: Any Exception. |
| bucket_name: Optional bucket name in request that caused the exception. |
| object_name: Optional object name in request that caused the exception. |
| generation: Optional generation in request that caused the exception. |
| not_found_exception: Optional exception to raise in the not-found case. |
| |
| Raises: |
| Translated CloudApi exception, or the original exception if it was not |
| translatable. |
| """ |
| translated_exception = self._TranslateApitoolsException( |
| e, bucket_name=bucket_name, object_name=object_name, |
| generation=generation, not_found_exception=not_found_exception) |
| if translated_exception: |
| raise translated_exception |
| else: |
| raise |
| |
| def _GetMessageFromHttpError(self, http_error): |
| if isinstance(http_error, apitools_exceptions.HttpError): |
| if getattr(http_error, 'content', None): |
| try: |
| json_obj = json.loads(http_error.content) |
| if 'error' in json_obj and 'message' in json_obj['error']: |
| return json_obj['error']['message'] |
| except Exception: # pylint: disable=broad-except |
| # If we couldn't decode anything, just leave the message as None. |
| pass |
| |
| def _TranslateApitoolsResumableUploadException(self, e): |
| if isinstance(e, apitools_exceptions.HttpError): |
| message = self._GetMessageFromHttpError(e) |
| if (e.status_code == 503 and |
| self.http.disable_ssl_certificate_validation): |
| return ServiceException(_VALIDATE_CERTIFICATES_503_MESSAGE, |
| status=e.status_code) |
| elif e.status_code >= 500: |
| return ResumableUploadException( |
| message or 'Server Error', status=e.status_code) |
| elif e.status_code == 429: |
| return ResumableUploadException( |
| message or 'Too Many Requests', status=e.status_code) |
| elif e.status_code == 410: |
| return ResumableUploadStartOverException( |
| message or 'Bad Request', status=e.status_code) |
| elif e.status_code == 404: |
| return ResumableUploadStartOverException( |
| message or 'Bad Request', status=e.status_code) |
| elif e.status_code >= 400: |
| return ResumableUploadAbortException( |
| message or 'Bad Request', status=e.status_code) |
| if isinstance(e, apitools_exceptions.StreamExhausted): |
| return ResumableUploadAbortException(e.message) |
| if (isinstance(e, apitools_exceptions.TransferError) and |
| ('Aborting transfer' in e.message or |
| 'Not enough bytes in stream' in e.message or |
| 'additional bytes left in stream' in e.message)): |
| return ResumableUploadAbortException(e.message) |
| |
| def _TranslateApitoolsException(self, e, bucket_name=None, object_name=None, |
| generation=None, not_found_exception=None): |
| """Translates apitools exceptions into their gsutil Cloud Api equivalents. |
| |
| Args: |
| e: Any exception in TRANSLATABLE_APITOOLS_EXCEPTIONS. |
| bucket_name: Optional bucket name in request that caused the exception. |
| object_name: Optional object name in request that caused the exception. |
| generation: Optional generation in request that caused the exception. |
| not_found_exception: Optional exception to raise in the not-found case. |
| |
| Returns: |
| CloudStorageApiServiceException for translatable exceptions, None |
| otherwise. |
| """ |
| if isinstance(e, apitools_exceptions.HttpError): |
| message = self._GetMessageFromHttpError(e) |
| if e.status_code == 400: |
| # It is possible that the Project ID is incorrect. Unfortunately the |
| # JSON API does not give us much information about what part of the |
| # request was bad. |
| return BadRequestException(message or 'Bad Request', |
| status=e.status_code) |
| elif e.status_code == 401: |
| if 'Login Required' in str(e): |
| return AccessDeniedException( |
| message or 'Access denied: login required.', |
| status=e.status_code) |
| elif e.status_code == 403: |
| if 'The account for the specified project has been disabled' in str(e): |
| return AccessDeniedException(message or 'Account disabled.', |
| status=e.status_code) |
| elif 'Daily Limit for Unauthenticated Use Exceeded' in str(e): |
| return AccessDeniedException( |
| message or 'Access denied: quota exceeded. ' |
| 'Is your project ID valid?', |
| status=e.status_code) |
| elif 'The bucket you tried to delete was not empty.' in str(e): |
| return NotEmptyException('BucketNotEmpty (%s)' % bucket_name, |
| status=e.status_code) |
| elif ('The bucket you tried to create requires domain ownership ' |
| 'verification.' in str(e)): |
| return AccessDeniedException( |
| 'The bucket you tried to create requires domain ownership ' |
| 'verification. Please see ' |
| 'https://developers.google.com/storage/docs/bucketnaming' |
| '?hl=en#verification for more details.', status=e.status_code) |
| elif 'User Rate Limit Exceeded' in str(e): |
| return AccessDeniedException('Rate limit exceeded. Please retry this ' |
| 'request later.', status=e.status_code) |
| elif 'Access Not Configured' in str(e): |
| return AccessDeniedException( |
| 'Access Not Configured. Please go to the Google Developers ' |
| 'Console (https://cloud.google.com/console#/project) for your ' |
| 'project, select APIs and Auth and enable the ' |
| 'Google Cloud Storage JSON API.', |
| status=e.status_code) |
| else: |
| return AccessDeniedException(message or e.message, |
| status=e.status_code) |
| elif e.status_code == 404: |
| if not_found_exception: |
| # The exception is pre-constructed prior to translation; the HTTP |
| # status code isn't available at that time. |
| setattr(not_found_exception, 'status', e.status_code) |
| return not_found_exception |
| elif bucket_name: |
| if object_name: |
| return CreateObjectNotFoundException(e.status_code, self.provider, |
| bucket_name, object_name, |
| generation=generation) |
| return CreateBucketNotFoundException(e.status_code, self.provider, |
| bucket_name) |
| return NotFoundException(e.message, status=e.status_code) |
| |
| elif e.status_code == 409 and bucket_name: |
| if 'The bucket you tried to delete was not empty.' in str(e): |
| return NotEmptyException('BucketNotEmpty (%s)' % bucket_name, |
| status=e.status_code) |
| return ServiceException( |
| 'Bucket %s already exists.' % bucket_name, status=e.status_code) |
| elif e.status_code == 412: |
| return PreconditionException(message, status=e.status_code) |
| elif (e.status_code == 503 and |
| not self.http.disable_ssl_certificate_validation): |
| return ServiceException(_VALIDATE_CERTIFICATES_503_MESSAGE, |
| status=e.status_code) |
| return ServiceException(message, status=e.status_code) |
| elif isinstance(e, apitools_exceptions.TransferInvalidError): |
| return ServiceException('Transfer invalid (possible encoding error: %s)' |
| % str(e)) |