| #!/usr/bin/env python |
| """Upload and download support for apitools.""" |
| from __future__ import print_function |
| |
| import email.generator as email_generator |
| import email.mime.multipart as mime_multipart |
| import email.mime.nonmultipart as mime_nonmultipart |
| import io |
| import json |
| import mimetypes |
| import os |
| import threading |
| |
| import six |
| from six.moves import http_client |
| |
| from apitools.base.py import buffered_stream |
| from apitools.base.py import exceptions |
| from apitools.base.py import http_wrapper |
| from apitools.base.py import stream_slice |
| from apitools.base.py import util |
| |
| __all__ = [ |
| 'Download', |
| 'Upload', |
| 'RESUMABLE_UPLOAD', |
| 'SIMPLE_UPLOAD', |
| 'DownloadProgressPrinter', |
| 'DownloadCompletePrinter', |
| 'UploadProgressPrinter', |
| 'UploadCompletePrinter', |
| ] |
| |
| _RESUMABLE_UPLOAD_THRESHOLD = 5 << 20 |
| SIMPLE_UPLOAD = 'simple' |
| RESUMABLE_UPLOAD = 'resumable' |
| |
| |
| def DownloadProgressPrinter(response, unused_download): |
| """Print download progress based on response.""" |
| if 'content-range' in response.info: |
| print('Received %s' % response.info['content-range']) |
| else: |
| print('Received %d bytes' % response.length) |
| |
| |
| def DownloadCompletePrinter(unused_response, unused_download): |
| """Print information about a completed download.""" |
| print('Download complete') |
| |
| |
| def UploadProgressPrinter(response, unused_upload): |
| """Print upload progress based on response.""" |
| print('Sent %s' % response.info['range']) |
| |
| |
| def UploadCompletePrinter(unused_response, unused_upload): |
| """Print information about a completed upload.""" |
| print('Upload complete') |
| |
| |
| class _Transfer(object): |
| |
| """Generic bits common to Uploads and Downloads.""" |
| |
| def __init__(self, stream, close_stream=False, chunksize=None, |
| auto_transfer=True, http=None, num_retries=5): |
| self.__bytes_http = None |
| self.__close_stream = close_stream |
| self.__http = http |
| self.__stream = stream |
| self.__url = None |
| |
| self.__num_retries = 5 |
| # Let the @property do validation |
| self.num_retries = num_retries |
| |
| self.retry_func = ( |
| http_wrapper.HandleExceptionsAndRebuildHttpConnections) |
| self.auto_transfer = auto_transfer |
| self.chunksize = chunksize or 1048576 |
| |
| def __repr__(self): |
| return str(self) |
| |
| @property |
| def close_stream(self): |
| return self.__close_stream |
| |
| @property |
| def http(self): |
| return self.__http |
| |
| @property |
| def bytes_http(self): |
| return self.__bytes_http or self.http |
| |
| @bytes_http.setter |
| def bytes_http(self, value): |
| self.__bytes_http = value |
| |
| @property |
| def num_retries(self): |
| return self.__num_retries |
| |
| @num_retries.setter |
| def num_retries(self, value): |
| util.Typecheck(value, six.integer_types) |
| if value < 0: |
| raise exceptions.InvalidDataError( |
| 'Cannot have negative value for num_retries') |
| self.__num_retries = value |
| |
| @property |
| def stream(self): |
| return self.__stream |
| |
| @property |
| def url(self): |
| return self.__url |
| |
| def _Initialize(self, http, url): |
| """Initialize this download by setting self.http and self.url. |
| |
| We want the user to be able to override self.http by having set |
| the value in the constructor; in that case, we ignore the provided |
| http. |
| |
| Args: |
| http: An httplib2.Http instance or None. |
| url: The url for this transfer. |
| |
| Returns: |
| None. Initializes self. |
| """ |
| self.EnsureUninitialized() |
| if self.http is None: |
| self.__http = http or http_wrapper.GetHttp() |
| self.__url = url |
| |
| @property |
| def initialized(self): |
| return self.url is not None and self.http is not None |
| |
| @property |
| def _type_name(self): |
| return type(self).__name__ |
| |
| def EnsureInitialized(self): |
| if not self.initialized: |
| raise exceptions.TransferInvalidError( |
| 'Cannot use uninitialized %s', self._type_name) |
| |
| def EnsureUninitialized(self): |
| if self.initialized: |
| raise exceptions.TransferInvalidError( |
| 'Cannot re-initialize %s', self._type_name) |
| |
| def __del__(self): |
| if self.__close_stream: |
| self.__stream.close() |
| |
| def _ExecuteCallback(self, callback, response): |
| # TODO(craigcitro): Push these into a queue. |
| if callback is not None: |
| threading.Thread(target=callback, args=(response, self)).start() |
| |
| |
| class Download(_Transfer): |
| |
| """Data for a single download. |
| |
| Public attributes: |
| chunksize: default chunksize to use for transfers. |
| """ |
| _ACCEPTABLE_STATUSES = set(( |
| http_client.OK, |
| http_client.NO_CONTENT, |
| http_client.PARTIAL_CONTENT, |
| http_client.REQUESTED_RANGE_NOT_SATISFIABLE, |
| )) |
| _REQUIRED_SERIALIZATION_KEYS = set(( |
| 'auto_transfer', 'progress', 'total_size', 'url')) |
| |
| def __init__(self, stream, progress_callback=None, finish_callback=None, |
| **kwds): |
| total_size = kwds.pop('total_size', None) |
| super(Download, self).__init__(stream, **kwds) |
| self.__initial_response = None |
| self.__progress = 0 |
| self.__total_size = total_size |
| self.__encoding = None |
| |
| self.progress_callback = progress_callback |
| self.finish_callback = finish_callback |
| |
| @property |
| def progress(self): |
| return self.__progress |
| |
| @property |
| def encoding(self): |
| return self.__encoding |
| |
| @classmethod |
| def FromFile(cls, filename, overwrite=False, auto_transfer=True, **kwds): |
| """Create a new download object from a filename.""" |
| path = os.path.expanduser(filename) |
| if os.path.exists(path) and not overwrite: |
| raise exceptions.InvalidUserInputError( |
| 'File %s exists and overwrite not specified' % path) |
| return cls(open(path, 'wb'), close_stream=True, |
| auto_transfer=auto_transfer, **kwds) |
| |
| @classmethod |
| def FromStream(cls, stream, auto_transfer=True, total_size=None, **kwds): |
| """Create a new Download object from a stream.""" |
| return cls(stream, auto_transfer=auto_transfer, total_size=total_size, |
| **kwds) |
| |
| @classmethod |
| def FromData(cls, stream, json_data, http=None, auto_transfer=None, |
| **kwds): |
| """Create a new Download object from a stream and serialized data.""" |
| info = json.loads(json_data) |
| missing_keys = cls._REQUIRED_SERIALIZATION_KEYS - set(info.keys()) |
| if missing_keys: |
| raise exceptions.InvalidDataError( |
| 'Invalid serialization data, missing keys: %s' % ( |
| ', '.join(missing_keys))) |
| download = cls.FromStream(stream, **kwds) |
| if auto_transfer is not None: |
| download.auto_transfer = auto_transfer |
| else: |
| download.auto_transfer = info['auto_transfer'] |
| setattr(download, '_Download__progress', info['progress']) |
| setattr(download, '_Download__total_size', info['total_size']) |
| download._Initialize( # pylint: disable=protected-access |
| http, info['url']) |
| return download |
| |
| @property |
| def serialization_data(self): |
| self.EnsureInitialized() |
| return { |
| 'auto_transfer': self.auto_transfer, |
| 'progress': self.progress, |
| 'total_size': self.total_size, |
| 'url': self.url, |
| } |
| |
| @property |
| def total_size(self): |
| return self.__total_size |
| |
| def __str__(self): |
| if not self.initialized: |
| return 'Download (uninitialized)' |
| else: |
| return 'Download with %d/%s bytes transferred from url %s' % ( |
| self.progress, self.total_size, self.url) |
| |
| def ConfigureRequest(self, http_request, url_builder): |
| url_builder.query_params['alt'] = 'media' |
| # TODO(craigcitro): We need to send range requests because by |
| # default httplib2 stores entire reponses in memory. Override |
| # httplib2's download method (as gsutil does) so that this is not |
| # necessary. |
| http_request.headers['Range'] = 'bytes=0-%d' % (self.chunksize - 1,) |
| |
| def __SetTotal(self, info): |
| if 'content-range' in info: |
| _, _, total = info['content-range'].rpartition('/') |
| if total != '*': |
| self.__total_size = int(total) |
| # Note "total_size is None" means we don't know it; if no size |
| # info was returned on our initial range request, that means we |
| # have a 0-byte file. (That last statement has been verified |
| # empirically, but is not clearly documented anywhere.) |
| if self.total_size is None: |
| self.__total_size = 0 |
| |
| def InitializeDownload(self, http_request, http=None, client=None): |
| """Initialize this download by making a request. |
| |
| Args: |
| http_request: The HttpRequest to use to initialize this download. |
| http: The httplib2.Http instance for this request. |
| client: If provided, let this client process the final URL before |
| sending any additional requests. If client is provided and |
| http is not, client.http will be used instead. |
| """ |
| self.EnsureUninitialized() |
| if http is None and client is None: |
| raise exceptions.UserError('Must provide client or http.') |
| http = http or client.http |
| if client is not None: |
| http_request.url = client.FinalizeTransferUrl(http_request.url) |
| url = http_request.url |
| if self.auto_transfer: |
| end_byte = self.__ComputeEndByte(0) |
| self.__SetRangeHeader(http_request, 0, end_byte) |
| response = http_wrapper.MakeRequest( |
| self.bytes_http or http, http_request) |
| if response.status_code not in self._ACCEPTABLE_STATUSES: |
| raise exceptions.HttpError.FromResponse(response) |
| self.__initial_response = response |
| self.__SetTotal(response.info) |
| url = response.info.get('content-location', response.request_url) |
| if client is not None: |
| url = client.FinalizeTransferUrl(url) |
| self._Initialize(http, url) |
| # Unless the user has requested otherwise, we want to just |
| # go ahead and pump the bytes now. |
| if self.auto_transfer: |
| self.StreamInChunks() |
| |
| def __NormalizeStartEnd(self, start, end=None): |
| if end is not None: |
| if start < 0: |
| raise exceptions.TransferInvalidError( |
| 'Cannot have end index with negative start index') |
| elif start >= self.total_size: |
| raise exceptions.TransferInvalidError( |
| 'Cannot have start index greater than total size') |
| end = min(end, self.total_size - 1) |
| if end < start: |
| raise exceptions.TransferInvalidError( |
| 'Range requested with end[%s] < start[%s]' % (end, start)) |
| return start, end |
| else: |
| if start < 0: |
| start = max(0, start + self.total_size) |
| return start, self.total_size - 1 |
| |
| def __SetRangeHeader(self, request, start, end=None): |
| if start < 0: |
| request.headers['range'] = 'bytes=%d' % start |
| elif end is None: |
| request.headers['range'] = 'bytes=%d-' % start |
| else: |
| request.headers['range'] = 'bytes=%d-%d' % (start, end) |
| |
| def __ComputeEndByte(self, start, end=None, use_chunks=True): |
| """Compute the last byte to fetch for this request. |
| |
| This is all based on the HTTP spec for Range and |
| Content-Range. |
| |
| Note that this is potentially confusing in several ways: |
| * the value for the last byte is 0-based, eg "fetch 10 bytes |
| from the beginning" would return 9 here. |
| * if we have no information about size, and don't want to |
| use the chunksize, we'll return None. |
| See the tests for more examples. |
| |
| Args: |
| start: byte to start at. |
| end: (int or None, default: None) Suggested last byte. |
| use_chunks: (bool, default: True) If False, ignore self.chunksize. |
| |
| Returns: |
| Last byte to use in a Range header, or None. |
| |
| """ |
| end_byte = end |
| |
| if start < 0 and not self.total_size: |
| return end_byte |
| |
| if use_chunks: |
| alternate = start + self.chunksize - 1 |
| if end_byte is not None: |
| end_byte = min(end_byte, alternate) |
| else: |
| end_byte = alternate |
| |
| if self.total_size: |
| alternate = self.total_size - 1 |
| if end_byte is not None: |
| end_byte = min(end_byte, alternate) |
| else: |
| end_byte = alternate |
| |
| return end_byte |
| |
| def __GetChunk(self, start, end, additional_headers=None): |
| """Retrieve a chunk, and return the full response.""" |
| self.EnsureInitialized() |
| request = http_wrapper.Request(url=self.url) |
| self.__SetRangeHeader(request, start, end=end) |
| if additional_headers is not None: |
| request.headers.update(additional_headers) |
| return http_wrapper.MakeRequest( |
| self.bytes_http, request, retry_func=self.retry_func, |
| retries=self.num_retries) |
| |
| def __ProcessResponse(self, response): |
| """Process response (by updating self and writing to self.stream).""" |
| if response.status_code not in self._ACCEPTABLE_STATUSES: |
| # We distinguish errors that mean we made a mistake in setting |
| # up the transfer versus something we should attempt again. |
| if response.status_code in (http_client.FORBIDDEN, |
| http_client.NOT_FOUND): |
| raise exceptions.HttpError.FromResponse(response) |
| else: |
| raise exceptions.TransferRetryError(response.content) |
| if response.status_code in (http_client.OK, |
| http_client.PARTIAL_CONTENT): |
| self.stream.write(response.content) |
| self.__progress += response.length |
| if response.info and 'content-encoding' in response.info: |
| # TODO(craigcitro): Handle the case where this changes over a |
| # download. |
| self.__encoding = response.info['content-encoding'] |
| elif response.status_code == http_client.NO_CONTENT: |
| # It's important to write something to the stream for the case |
| # of a 0-byte download to a file, as otherwise python won't |
| # create the file. |
| self.stream.write('') |
| return response |
| |
| def GetRange(self, start, end=None, additional_headers=None, |
| use_chunks=True): |
| """Retrieve a given byte range from this download, inclusive. |
| |
| Range must be of one of these three forms: |
| * 0 <= start, end = None: Fetch from start to the end of the file. |
| * 0 <= start <= end: Fetch the bytes from start to end. |
| * start < 0, end = None: Fetch the last -start bytes of the file. |
| |
| (These variations correspond to those described in the HTTP 1.1 |
| protocol for range headers in RFC 2616, sec. 14.35.1.) |
| |
| Args: |
| start: (int) Where to start fetching bytes. (See above.) |
| end: (int, optional) Where to stop fetching bytes. (See above.) |
| additional_headers: (bool, optional) Any additional headers to |
| pass with the request. |
| use_chunks: (bool, default: True) If False, ignore self.chunksize |
| and fetch this range in a single request. |
| |
| Returns: |
| None. Streams bytes into self.stream. |
| """ |
| self.EnsureInitialized() |
| progress_end_normalized = False |
| if self.total_size is not None: |
| progress, end_byte = self.__NormalizeStartEnd(start, end) |
| progress_end_normalized = True |
| else: |
| progress = start |
| end_byte = end |
| while (not progress_end_normalized or end_byte is None or |
| progress <= end_byte): |
| end_byte = self.__ComputeEndByte(progress, end=end_byte, |
| use_chunks=use_chunks) |
| response = self.__GetChunk(progress, end_byte, |
| additional_headers=additional_headers) |
| if not progress_end_normalized: |
| self.__SetTotal(response.info) |
| progress, end_byte = self.__NormalizeStartEnd(start, end) |
| progress_end_normalized = True |
| response = self.__ProcessResponse(response) |
| progress += response.length |
| if response.length == 0: |
| raise exceptions.TransferRetryError( |
| 'Zero bytes unexpectedly returned in download response') |
| |
| def StreamInChunks(self, callback=None, finish_callback=None, |
| additional_headers=None): |
| """Stream the entire download in chunks.""" |
| self.StreamMedia(callback=callback, finish_callback=finish_callback, |
| additional_headers=additional_headers, |
| use_chunks=True) |
| |
| def StreamMedia(self, callback=None, finish_callback=None, |
| additional_headers=None, use_chunks=True): |
| """Stream the entire download. |
| |
| Args: |
| callback: (default: None) Callback to call as each chunk is |
| completed. |
| finish_callback: (default: None) Callback to call when the |
| download is complete. |
| additional_headers: (default: None) Additional headers to |
| include in fetching bytes. |
| use_chunks: (bool, default: True) If False, ignore self.chunksize |
| and stream this download in a single request. |
| |
| Returns: |
| None. Streams bytes into self.stream. |
| """ |
| callback = callback or self.progress_callback |
| finish_callback = finish_callback or self.finish_callback |
| |
| self.EnsureInitialized() |
| while True: |
| if self.__initial_response is not None: |
| response = self.__initial_response |
| self.__initial_response = None |
| else: |
| end_byte = self.__ComputeEndByte(self.progress, |
| use_chunks=use_chunks) |
| response = self.__GetChunk( |
| self.progress, end_byte, |
| additional_headers=additional_headers) |
| if self.total_size is None: |
| self.__SetTotal(response.info) |
| response = self.__ProcessResponse(response) |
| self._ExecuteCallback(callback, response) |
| if (response.status_code == http_client.OK or |
| self.progress >= self.total_size): |
| break |
| self._ExecuteCallback(finish_callback, response) |
| |
| |
| class Upload(_Transfer): |
| |
| """Data for a single Upload. |
| |
| Fields: |
| stream: The stream to upload. |
| mime_type: MIME type of the upload. |
| total_size: (optional) Total upload size for the stream. |
| close_stream: (default: False) Whether or not we should close the |
| stream when finished with the upload. |
| auto_transfer: (default: True) If True, stream all bytes as soon as |
| the upload is created. |
| """ |
| _REQUIRED_SERIALIZATION_KEYS = set(( |
| 'auto_transfer', 'mime_type', 'total_size', 'url')) |
| |
| def __init__(self, stream, mime_type, total_size=None, http=None, |
| close_stream=False, chunksize=None, auto_transfer=True, |
| progress_callback=None, finish_callback=None, |
| **kwds): |
| super(Upload, self).__init__( |
| stream, close_stream=close_stream, chunksize=chunksize, |
| auto_transfer=auto_transfer, http=http, **kwds) |
| self.__complete = False |
| self.__final_response = None |
| self.__mime_type = mime_type |
| self.__progress = 0 |
| self.__server_chunk_granularity = None |
| self.__strategy = None |
| self.__total_size = None |
| |
| self.progress_callback = progress_callback |
| self.finish_callback = finish_callback |
| self.total_size = total_size |
| |
| @property |
| def progress(self): |
| return self.__progress |
| |
| @classmethod |
| def FromFile(cls, filename, mime_type=None, auto_transfer=True, **kwds): |
| """Create a new Upload object from a filename.""" |
| path = os.path.expanduser(filename) |
| if not os.path.exists(path): |
| raise exceptions.NotFoundError('Could not find file %s' % path) |
| if not mime_type: |
| mime_type, _ = mimetypes.guess_type(path) |
| if mime_type is None: |
| raise exceptions.InvalidUserInputError( |
| 'Could not guess mime type for %s' % path) |
| size = os.stat(path).st_size |
| return cls(open(path, 'rb'), mime_type, total_size=size, |
| close_stream=True, auto_transfer=auto_transfer, **kwds) |
| |
| @classmethod |
| def FromStream(cls, stream, mime_type, total_size=None, auto_transfer=True, |
| **kwds): |
| """Create a new Upload object from a stream.""" |
| if mime_type is None: |
| raise exceptions.InvalidUserInputError( |
| 'No mime_type specified for stream') |
| return cls(stream, mime_type, total_size=total_size, |
| close_stream=False, auto_transfer=auto_transfer, **kwds) |
| |
| @classmethod |
| def FromData(cls, stream, json_data, http, auto_transfer=None, **kwds): |
| """Create a new Upload of stream from serialized json_data and http.""" |
| info = json.loads(json_data) |
| missing_keys = cls._REQUIRED_SERIALIZATION_KEYS - set(info.keys()) |
| if missing_keys: |
| raise exceptions.InvalidDataError( |
| 'Invalid serialization data, missing keys: %s' % ( |
| ', '.join(missing_keys))) |
| if 'total_size' in kwds: |
| raise exceptions.InvalidUserInputError( |
| 'Cannot override total_size on serialized Upload') |
| upload = cls.FromStream(stream, info['mime_type'], |
| total_size=info.get('total_size'), **kwds) |
| if isinstance(stream, io.IOBase) and not stream.seekable(): |
| raise exceptions.InvalidUserInputError( |
| 'Cannot restart resumable upload on non-seekable stream') |
| if auto_transfer is not None: |
| upload.auto_transfer = auto_transfer |
| else: |
| upload.auto_transfer = info['auto_transfer'] |
| upload.strategy = RESUMABLE_UPLOAD |
| upload._Initialize( # pylint: disable=protected-access |
| http, info['url']) |
| upload.RefreshResumableUploadState() |
| upload.EnsureInitialized() |
| if upload.auto_transfer: |
| upload.StreamInChunks() |
| return upload |
| |
| @property |
| def serialization_data(self): |
| self.EnsureInitialized() |
| if self.strategy != RESUMABLE_UPLOAD: |
| raise exceptions.InvalidDataError( |
| 'Serialization only supported for resumable uploads') |
| return { |
| 'auto_transfer': self.auto_transfer, |
| 'mime_type': self.mime_type, |
| 'total_size': self.total_size, |
| 'url': self.url, |
| } |
| |
| @property |
| def complete(self): |
| return self.__complete |
| |
| @property |
| def mime_type(self): |
| return self.__mime_type |
| |
| def __str__(self): |
| if not self.initialized: |
| return 'Upload (uninitialized)' |
| else: |
| return 'Upload with %d/%s bytes transferred for url %s' % ( |
| self.progress, self.total_size or '???', self.url) |
| |
| @property |
| def strategy(self): |
| return self.__strategy |
| |
| @strategy.setter |
| def strategy(self, value): |
| if value not in (SIMPLE_UPLOAD, RESUMABLE_UPLOAD): |
| raise exceptions.UserError(( |
| 'Invalid value "%s" for upload strategy, must be one of ' |
| '"simple" or "resumable".') % value) |
| self.__strategy = value |
| |
| @property |
| def total_size(self): |
| return self.__total_size |
| |
| @total_size.setter |
| def total_size(self, value): |
| self.EnsureUninitialized() |
| self.__total_size = value |
| |
| def __SetDefaultUploadStrategy(self, upload_config, http_request): |
| """Determine and set the default upload strategy for this upload. |
| |
| We generally prefer simple or multipart, unless we're forced to |
| use resumable. This happens when any of (1) the upload is too |
| large, (2) the simple endpoint doesn't support multipart requests |
| and we have metadata, or (3) there is no simple upload endpoint. |
| |
| Args: |
| upload_config: Configuration for the upload endpoint. |
| http_request: The associated http request. |
| |
| Returns: |
| None. |
| """ |
| if upload_config.resumable_path is None: |
| self.strategy = SIMPLE_UPLOAD |
| if self.strategy is not None: |
| return |
| strategy = SIMPLE_UPLOAD |
| if (self.total_size is not None and |
| self.total_size > _RESUMABLE_UPLOAD_THRESHOLD): |
| strategy = RESUMABLE_UPLOAD |
| if http_request.body and not upload_config.simple_multipart: |
| strategy = RESUMABLE_UPLOAD |
| if not upload_config.simple_path: |
| strategy = RESUMABLE_UPLOAD |
| self.strategy = strategy |
| |
| def ConfigureRequest(self, upload_config, http_request, url_builder): |
| """Configure the request and url for this upload.""" |
| # Validate total_size vs. max_size |
| if (self.total_size and upload_config.max_size and |
| self.total_size > upload_config.max_size): |
| raise exceptions.InvalidUserInputError( |
| 'Upload too big: %s larger than max size %s' % ( |
| self.total_size, upload_config.max_size)) |
| # Validate mime type |
| if not util.AcceptableMimeType(upload_config.accept, self.mime_type): |
| raise exceptions.InvalidUserInputError( |
| 'MIME type %s does not match any accepted MIME ranges %s' % ( |
| self.mime_type, upload_config.accept)) |
| |
| self.__SetDefaultUploadStrategy(upload_config, http_request) |
| if self.strategy == SIMPLE_UPLOAD: |
| url_builder.relative_path = upload_config.simple_path |
| if http_request.body: |
| url_builder.query_params['uploadType'] = 'multipart' |
| self.__ConfigureMultipartRequest(http_request) |
| else: |
| url_builder.query_params['uploadType'] = 'media' |
| self.__ConfigureMediaRequest(http_request) |
| else: |
| url_builder.relative_path = upload_config.resumable_path |
| url_builder.query_params['uploadType'] = 'resumable' |
| self.__ConfigureResumableRequest(http_request) |
| |
| def __ConfigureMediaRequest(self, http_request): |
| """Configure http_request as a simple request for this upload.""" |
| http_request.headers['content-type'] = self.mime_type |
| http_request.body = self.stream.read() |
| http_request.loggable_body = '<media body>' |
| |
| def __ConfigureMultipartRequest(self, http_request): |
| """Configure http_request as a multipart request for this upload.""" |
| # This is a multipart/related upload. |
| msg_root = mime_multipart.MIMEMultipart('related') |
| # msg_root should not write out its own headers |
| setattr(msg_root, '_write_headers', lambda self: None) |
| |
| # attach the body as one part |
| msg = mime_nonmultipart.MIMENonMultipart( |
| *http_request.headers['content-type'].split('/')) |
| msg.set_payload(http_request.body) |
| msg_root.attach(msg) |
| |
| # attach the media as the second part |
| msg = mime_nonmultipart.MIMENonMultipart(*self.mime_type.split('/')) |
| msg['Content-Transfer-Encoding'] = 'binary' |
| msg.set_payload(self.stream.read()) |
| msg_root.attach(msg) |
| |
| # NOTE: We encode the body, but can't use |
| # `email.message.Message.as_string` because it prepends |
| # `> ` to `From ` lines. |
| # NOTE: We must use six.StringIO() instead of io.StringIO() since the |
| # `email` library uses cStringIO in Py2 and io.StringIO in Py3. |
| fp = six.StringIO() |
| g = email_generator.Generator(fp, mangle_from_=False) |
| g.flatten(msg_root, unixfrom=False) |
| http_request.body = fp.getvalue() |
| |
| multipart_boundary = msg_root.get_boundary() |
| http_request.headers['content-type'] = ( |
| 'multipart/related; boundary=%r' % multipart_boundary) |
| |
| body_components = http_request.body.split(multipart_boundary) |
| headers, _, _ = body_components[-2].partition('\n\n') |
| body_components[-2] = '\n\n'.join([headers, '<media body>\n\n--']) |
| http_request.loggable_body = multipart_boundary.join(body_components) |
| |
| def __ConfigureResumableRequest(self, http_request): |
| http_request.headers['X-Upload-Content-Type'] = self.mime_type |
| if self.total_size is not None: |
| http_request.headers[ |
| 'X-Upload-Content-Length'] = str(self.total_size) |
| |
| def RefreshResumableUploadState(self): |
| """Talk to the server and refresh the state of this resumable upload. |
| |
| Returns: |
| Response if the upload is complete. |
| """ |
| if self.strategy != RESUMABLE_UPLOAD: |
| return |
| self.EnsureInitialized() |
| refresh_request = http_wrapper.Request( |
| url=self.url, http_method='PUT', |
| headers={'Content-Range': 'bytes */*'}) |
| refresh_response = http_wrapper.MakeRequest( |
| self.http, refresh_request, redirections=0, |
| retries=self.num_retries) |
| range_header = self._GetRangeHeaderFromResponse(refresh_response) |
| if refresh_response.status_code in (http_client.OK, |
| http_client.CREATED): |
| self.__complete = True |
| self.__progress = self.total_size |
| self.stream.seek(self.progress) |
| # If we're finished, the refresh response will contain the metadata |
| # originally requested. Cache it so it can be returned in |
| # StreamInChunks. |
| self.__final_response = refresh_response |
| elif refresh_response.status_code == http_wrapper.RESUME_INCOMPLETE: |
| if range_header is None: |
| self.__progress = 0 |
| else: |
| self.__progress = self.__GetLastByte(range_header) + 1 |
| self.stream.seek(self.progress) |
| else: |
| raise exceptions.HttpError.FromResponse(refresh_response) |
| |
| def _GetRangeHeaderFromResponse(self, response): |
| return response.info.get('Range', response.info.get('range')) |
| |
| def InitializeUpload(self, http_request, http=None, client=None): |
| """Initialize this upload from the given http_request.""" |
| if self.strategy is None: |
| raise exceptions.UserError( |
| 'No upload strategy set; did you call ConfigureRequest?') |
| if http is None and client is None: |
| raise exceptions.UserError('Must provide client or http.') |
| if self.strategy != RESUMABLE_UPLOAD: |
| return |
| http = http or client.http |
| if client is not None: |
| http_request.url = client.FinalizeTransferUrl(http_request.url) |
| self.EnsureUninitialized() |
| http_response = http_wrapper.MakeRequest(http, http_request, |
| retries=self.num_retries) |
| if http_response.status_code != http_client.OK: |
| raise exceptions.HttpError.FromResponse(http_response) |
| |
| self.__server_chunk_granularity = http_response.info.get( |
| 'X-Goog-Upload-Chunk-Granularity') |
| url = http_response.info['location'] |
| if client is not None: |
| url = client.FinalizeTransferUrl(url) |
| self._Initialize(http, url) |
| |
| # Unless the user has requested otherwise, we want to just |
| # go ahead and pump the bytes now. |
| if self.auto_transfer: |
| return self.StreamInChunks() |
| |
| def __GetLastByte(self, range_header): |
| _, _, end = range_header.partition('-') |
| # TODO(craigcitro): Validate start == 0? |
| return int(end) |
| |
| def __ValidateChunksize(self, chunksize=None): |
| if self.__server_chunk_granularity is None: |
| return |
| chunksize = chunksize or self.chunksize |
| if chunksize % self.__server_chunk_granularity: |
| raise exceptions.ConfigurationValueError( |
| 'Server requires chunksize to be a multiple of %d', |
| self.__server_chunk_granularity) |
| |
| def __StreamMedia(self, callback=None, finish_callback=None, |
| additional_headers=None, use_chunks=True): |
| """Helper function for StreamMedia / StreamInChunks.""" |
| if self.strategy != RESUMABLE_UPLOAD: |
| raise exceptions.InvalidUserInputError( |
| 'Cannot stream non-resumable upload') |
| callback = callback or self.progress_callback |
| finish_callback = finish_callback or self.finish_callback |
| # final_response is set if we resumed an already-completed upload. |
| response = self.__final_response |
| send_func = self.__SendChunk if use_chunks else self.__SendMediaBody |
| if use_chunks: |
| self.__ValidateChunksize(self.chunksize) |
| self.EnsureInitialized() |
| while not self.complete: |
| response = send_func(self.stream.tell(), |
| additional_headers=additional_headers) |
| if response.status_code in (http_client.OK, http_client.CREATED): |
| self.__complete = True |
| break |
| self.__progress = self.__GetLastByte(response.info['range']) |
| if self.progress + 1 != self.stream.tell(): |
| # TODO(craigcitro): Add a better way to recover here. |
| raise exceptions.CommunicationError( |
| 'Failed to transfer all bytes in chunk, upload paused at ' |
| 'byte %d' % self.progress) |
| self._ExecuteCallback(callback, response) |
| if self.__complete and hasattr(self.stream, 'seek'): |
| current_pos = self.stream.tell() |
| self.stream.seek(0, os.SEEK_END) |
| end_pos = self.stream.tell() |
| self.stream.seek(current_pos) |
| if current_pos != end_pos: |
| raise exceptions.TransferInvalidError( |
| 'Upload complete with %s additional bytes left in stream' % |
| (int(end_pos) - int(current_pos))) |
| self._ExecuteCallback(finish_callback, response) |
| return response |
| |
| def StreamMedia(self, callback=None, finish_callback=None, |
| additional_headers=None): |
| """Send this resumable upload in a single request. |
| |
| Args: |
| callback: Progress callback function with inputs |
| (http_wrapper.Response, transfer.Upload) |
| finish_callback: Final callback function with inputs |
| (http_wrapper.Response, transfer.Upload) |
| additional_headers: Dict of headers to include with the upload |
| http_wrapper.Request. |
| |
| Returns: |
| http_wrapper.Response of final response. |
| """ |
| return self.__StreamMedia( |
| callback=callback, finish_callback=finish_callback, |
| additional_headers=additional_headers, use_chunks=False) |
| |
| def StreamInChunks(self, callback=None, finish_callback=None, |
| additional_headers=None): |
| """Send this (resumable) upload in chunks.""" |
| return self.__StreamMedia( |
| callback=callback, finish_callback=finish_callback, |
| additional_headers=additional_headers) |
| |
| def __SendMediaRequest(self, request, end): |
| """Request helper function for SendMediaBody & SendChunk.""" |
| response = http_wrapper.MakeRequest( |
| self.bytes_http, request, retry_func=self.retry_func, |
| retries=self.num_retries) |
| if response.status_code not in (http_client.OK, http_client.CREATED, |
| http_wrapper.RESUME_INCOMPLETE): |
| # We want to reset our state to wherever the server left us |
| # before this failed request, and then raise. |
| self.RefreshResumableUploadState() |
| raise exceptions.HttpError.FromResponse(response) |
| if response.status_code == http_wrapper.RESUME_INCOMPLETE: |
| last_byte = self.__GetLastByte( |
| self._GetRangeHeaderFromResponse(response)) |
| if last_byte + 1 != end: |
| self.stream.seek(last_byte) |
| return response |
| |
| def __SendMediaBody(self, start, additional_headers=None): |
| """Send the entire media stream in a single request.""" |
| self.EnsureInitialized() |
| if self.total_size is None: |
| raise exceptions.TransferInvalidError( |
| 'Total size must be known for SendMediaBody') |
| body_stream = stream_slice.StreamSlice( |
| self.stream, self.total_size - start) |
| |
| request = http_wrapper.Request(url=self.url, http_method='PUT', |
| body=body_stream) |
| request.headers['Content-Type'] = self.mime_type |
| if start == self.total_size: |
| # End of an upload with 0 bytes left to send; just finalize. |
| range_string = 'bytes */%s' % self.total_size |
| else: |
| range_string = 'bytes %s-%s/%s' % (start, self.total_size - 1, |
| self.total_size) |
| |
| request.headers['Content-Range'] = range_string |
| if additional_headers: |
| request.headers.update(additional_headers) |
| |
| return self.__SendMediaRequest(request, self.total_size) |
| |
| def __SendChunk(self, start, additional_headers=None): |
| """Send the specified chunk.""" |
| self.EnsureInitialized() |
| no_log_body = self.total_size is None |
| if self.total_size is None: |
| # For the streaming resumable case, we need to detect when |
| # we're at the end of the stream. |
| body_stream = buffered_stream.BufferedStream( |
| self.stream, start, self.chunksize) |
| end = body_stream.stream_end_position |
| if body_stream.stream_exhausted: |
| self.__total_size = end |
| # TODO: Here, change body_stream from a stream to a string object, |
| # which means reading a chunk into memory. This works around |
| # https://code.google.com/p/httplib2/issues/detail?id=176 which can |
| # cause httplib2 to skip bytes on 401's for file objects. |
| # Rework this solution to be more general. |
| body_stream = body_stream.read(self.chunksize) |
| else: |
| end = min(start + self.chunksize, self.total_size) |
| body_stream = stream_slice.StreamSlice(self.stream, end - start) |
| # TODO(craigcitro): Think about clearer errors on "no data in |
| # stream". |
| request = http_wrapper.Request(url=self.url, http_method='PUT', |
| body=body_stream) |
| request.headers['Content-Type'] = self.mime_type |
| if no_log_body: |
| # Disable logging of streaming body. |
| # TODO: Remove no_log_body and rework as part of a larger logs |
| # refactor. |
| request.loggable_body = '<media body>' |
| if self.total_size is None: |
| # Streaming resumable upload case, unknown total size. |
| range_string = 'bytes %s-%s/*' % (start, end - 1) |
| elif end == start: |
| # End of an upload with 0 bytes left to send; just finalize. |
| range_string = 'bytes */%s' % self.total_size |
| else: |
| # Normal resumable upload case with known sizes. |
| range_string = 'bytes %s-%s/%s' % (start, end - 1, self.total_size) |
| |
| request.headers['Content-Range'] = range_string |
| if additional_headers: |
| request.headers.update(additional_headers) |
| |
| return self.__SendMediaRequest(request, end) |