| # Copyright 2012 Google Inc. All Rights Reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, |
| # either express or implied. See the License for the specific |
| # language governing permissions and limitations under the License. |
| |
| """Python wrappers for the Google Storage RESTful API.""" |
| |
| |
| |
| |
| |
| __all__ = ['ReadBuffer', |
| 'StreamingBuffer', |
| ] |
| |
| import collections |
| import os |
| import urlparse |
| |
| from . import api_utils |
| from . import common |
| from . import errors |
| from . import rest_api |
| |
| try: |
| from google.appengine.api import urlfetch |
| from google.appengine.ext import ndb |
| except ImportError: |
| from google.appengine.api import urlfetch |
| from google.appengine.ext import ndb |
| |
| |
| |
| def _get_storage_api(retry_params, account_id=None): |
| """Returns storage_api instance for API methods. |
| |
| Args: |
| retry_params: An instance of api_utils.RetryParams. If none, |
| thread's default will be used. |
| account_id: Internal-use only. |
| |
| Returns: |
| A storage_api instance to handle urlfetch work to GCS. |
| On dev appserver, this instance by default will talk to a local stub |
| unless common.ACCESS_TOKEN is set. That token will be used to talk |
| to the real GCS. |
| """ |
| |
| |
| api = _StorageApi(_StorageApi.full_control_scope, |
| service_account_id=account_id, |
| retry_params=retry_params) |
| if common.local_run() and not common.get_access_token(): |
| api.api_url = common.local_api_url() |
| if common.get_access_token(): |
| api.token = common.get_access_token() |
| return api |
| |
| |
| class _StorageApi(rest_api._RestApi): |
| """A simple wrapper for the Google Storage RESTful API. |
| |
| WARNING: Do NOT directly use this api. It's an implementation detail |
| and is subject to change at any release. |
| |
| All async methods have similar args and returns. |
| |
| Args: |
| path: The path to the Google Storage object or bucket, e.g. |
| '/mybucket/myfile' or '/mybucket'. |
| **kwd: Options for urlfetch. e.g. |
| headers={'content-type': 'text/plain'}, payload='blah'. |
| |
| Returns: |
| A ndb Future. When fulfilled, future.get_result() should return |
| a tuple of (status, headers, content) that represents a HTTP response |
| of Google Cloud Storage XML API. |
| """ |
| |
| api_url = 'https://storage.googleapis.com' |
| read_only_scope = 'https://www.googleapis.com/auth/devstorage.read_only' |
| read_write_scope = 'https://www.googleapis.com/auth/devstorage.read_write' |
| full_control_scope = 'https://www.googleapis.com/auth/devstorage.full_control' |
| |
| def __getstate__(self): |
| """Store state as part of serialization/pickling. |
| |
| Returns: |
| A tuple (of dictionaries) with the state of this object |
| """ |
| return (super(_StorageApi, self).__getstate__(), {'api_url': self.api_url}) |
| |
| def __setstate__(self, state): |
| """Restore state as part of deserialization/unpickling. |
| |
| Args: |
| state: the tuple from a __getstate__ call |
| """ |
| superstate, localstate = state |
| super(_StorageApi, self).__setstate__(superstate) |
| self.api_url = localstate['api_url'] |
| |
| @api_utils._eager_tasklet |
| @ndb.tasklet |
| def do_request_async(self, url, method='GET', headers=None, payload=None, |
| deadline=None, callback=None): |
| """Inherit docs. |
| |
| This method translates urlfetch exceptions to more service specific ones. |
| """ |
| if headers is None: |
| headers = {} |
| if 'x-goog-api-version' not in headers: |
| headers['x-goog-api-version'] = '2' |
| headers['accept-encoding'] = 'gzip, *' |
| try: |
| resp_tuple = yield super(_StorageApi, self).do_request_async( |
| url, method=method, headers=headers, payload=payload, |
| deadline=deadline, callback=callback) |
| except urlfetch.DownloadError, e: |
| raise errors.TimeoutError( |
| 'Request to Google Cloud Storage timed out.', e) |
| |
| raise ndb.Return(resp_tuple) |
| |
| |
| def post_object_async(self, path, **kwds): |
| """POST to an object.""" |
| return self.do_request_async(self.api_url + path, 'POST', **kwds) |
| |
| def put_object_async(self, path, **kwds): |
| """PUT an object.""" |
| return self.do_request_async(self.api_url + path, 'PUT', **kwds) |
| |
| def get_object_async(self, path, **kwds): |
| """GET an object. |
| |
| Note: No payload argument is supported. |
| """ |
| return self.do_request_async(self.api_url + path, 'GET', **kwds) |
| |
| def delete_object_async(self, path, **kwds): |
| """DELETE an object. |
| |
| Note: No payload argument is supported. |
| """ |
| return self.do_request_async(self.api_url + path, 'DELETE', **kwds) |
| |
| def head_object_async(self, path, **kwds): |
| """HEAD an object. |
| |
| Depending on request headers, HEAD returns various object properties, |
| e.g. Content-Length, Last-Modified, and ETag. |
| |
| Note: No payload argument is supported. |
| """ |
| return self.do_request_async(self.api_url + path, 'HEAD', **kwds) |
| |
| def get_bucket_async(self, path, **kwds): |
| """GET a bucket.""" |
| return self.do_request_async(self.api_url + path, 'GET', **kwds) |
| |
| |
| _StorageApi = rest_api.add_sync_methods(_StorageApi) |
| |
| |
| class ReadBuffer(object): |
| """A class for reading Google storage files.""" |
| |
| DEFAULT_BUFFER_SIZE = 1024 * 1024 |
| MAX_REQUEST_SIZE = 30 * DEFAULT_BUFFER_SIZE |
| |
| def __init__(self, |
| api, |
| path, |
| buffer_size=DEFAULT_BUFFER_SIZE, |
| max_request_size=MAX_REQUEST_SIZE): |
| """Constructor. |
| |
| Args: |
| api: A StorageApi instance. |
| path: Quoted/escaped path to the object, e.g. /mybucket/myfile |
| buffer_size: buffer size. The ReadBuffer keeps |
| one buffer. But there may be a pending future that contains |
| a second buffer. This size must be less than max_request_size. |
| max_request_size: Max bytes to request in one urlfetch. |
| """ |
| self._api = api |
| self._path = path |
| self.name = api_utils._unquote_filename(path) |
| self.closed = False |
| |
| assert buffer_size <= max_request_size |
| self._buffer_size = buffer_size |
| self._max_request_size = max_request_size |
| self._offset = 0 |
| self._buffer = _Buffer() |
| self._etag = None |
| |
| get_future = self._get_segment(0, self._buffer_size, check_response=False) |
| |
| status, headers, content = self._api.head_object(path) |
| errors.check_status(status, [200], path, resp_headers=headers, body=content) |
| self._file_size = long(common.get_stored_content_length(headers)) |
| self._check_etag(headers.get('etag')) |
| |
| self._buffer_future = None |
| |
| if self._file_size != 0: |
| content, check_response_closure = get_future.get_result() |
| check_response_closure() |
| self._buffer.reset(content) |
| self._request_next_buffer() |
| |
| def __getstate__(self): |
| """Store state as part of serialization/pickling. |
| |
| The contents of the read buffer are not stored, only the current offset for |
| data read by the client. A new read buffer is established at unpickling. |
| The head information for the object (file size and etag) are stored to |
| reduce startup and ensure the file has not changed. |
| |
| Returns: |
| A dictionary with the state of this object |
| """ |
| return {'api': self._api, |
| 'path': self._path, |
| 'buffer_size': self._buffer_size, |
| 'request_size': self._max_request_size, |
| 'etag': self._etag, |
| 'size': self._file_size, |
| 'offset': self._offset, |
| 'closed': self.closed} |
| |
| def __setstate__(self, state): |
| """Restore state as part of deserialization/unpickling. |
| |
| Args: |
| state: the dictionary from a __getstate__ call |
| |
| Along with restoring the state, pre-fetch the next read buffer. |
| """ |
| self._api = state['api'] |
| self._path = state['path'] |
| self.name = api_utils._unquote_filename(self._path) |
| self._buffer_size = state['buffer_size'] |
| self._max_request_size = state['request_size'] |
| self._etag = state['etag'] |
| self._file_size = state['size'] |
| self._offset = state['offset'] |
| self._buffer = _Buffer() |
| self.closed = state['closed'] |
| self._buffer_future = None |
| if self._remaining() and not self.closed: |
| self._request_next_buffer() |
| |
| def __iter__(self): |
| """Iterator interface. |
| |
| Note the ReadBuffer container itself is the iterator. It's |
| (quote PEP0234) |
| 'destructive: they consumes all the values and a second iterator |
| cannot easily be created that iterates independently over the same values. |
| You could open the file for the second time, or seek() to the beginning.' |
| |
| Returns: |
| Self. |
| """ |
| return self |
| |
| def next(self): |
| line = self.readline() |
| if not line: |
| raise StopIteration() |
| return line |
| |
| def readline(self, size=-1): |
| """Read one line delimited by '\n' from the file. |
| |
| A trailing newline character is kept in the string. It may be absent when a |
| file ends with an incomplete line. If the size argument is non-negative, |
| it specifies the maximum string size (counting the newline) to return. |
| A negative size is the same as unspecified. Empty string is returned |
| only when EOF is encountered immediately. |
| |
| Args: |
| size: Maximum number of bytes to read. If not specified, readline stops |
| only on '\n' or EOF. |
| |
| Returns: |
| The data read as a string. |
| |
| Raises: |
| IOError: When this buffer is closed. |
| """ |
| self._check_open() |
| if size == 0 or not self._remaining(): |
| return '' |
| |
| data_list = [] |
| newline_offset = self._buffer.find_newline(size) |
| while newline_offset < 0: |
| data = self._buffer.read(size) |
| size -= len(data) |
| self._offset += len(data) |
| data_list.append(data) |
| if size == 0 or not self._remaining(): |
| return ''.join(data_list) |
| self._buffer.reset(self._buffer_future.get_result()) |
| self._request_next_buffer() |
| newline_offset = self._buffer.find_newline(size) |
| |
| data = self._buffer.read_to_offset(newline_offset + 1) |
| self._offset += len(data) |
| data_list.append(data) |
| |
| return ''.join(data_list) |
| |
| def read(self, size=-1): |
| """Read data from RAW file. |
| |
| Args: |
| size: Number of bytes to read as integer. Actual number of bytes |
| read is always equal to size unless EOF is reached. If size is |
| negative or unspecified, read the entire file. |
| |
| Returns: |
| data read as str. |
| |
| Raises: |
| IOError: When this buffer is closed. |
| """ |
| self._check_open() |
| if not self._remaining(): |
| return '' |
| |
| data_list = [] |
| while True: |
| remaining = self._buffer.remaining() |
| if size >= 0 and size < remaining: |
| data_list.append(self._buffer.read(size)) |
| self._offset += size |
| break |
| else: |
| size -= remaining |
| self._offset += remaining |
| data_list.append(self._buffer.read()) |
| |
| if self._buffer_future is None: |
| if size < 0 or size >= self._remaining(): |
| needs = self._remaining() |
| else: |
| needs = size |
| data_list.extend(self._get_segments(self._offset, needs)) |
| self._offset += needs |
| break |
| |
| if self._buffer_future: |
| self._buffer.reset(self._buffer_future.get_result()) |
| self._buffer_future = None |
| |
| if self._buffer_future is None: |
| self._request_next_buffer() |
| return ''.join(data_list) |
| |
| def _remaining(self): |
| return self._file_size - self._offset |
| |
| def _request_next_buffer(self): |
| """Request next buffer. |
| |
| Requires self._offset and self._buffer are in consistent state. |
| """ |
| self._buffer_future = None |
| next_offset = self._offset + self._buffer.remaining() |
| if next_offset != self._file_size: |
| self._buffer_future = self._get_segment(next_offset, |
| self._buffer_size) |
| |
| def _get_segments(self, start, request_size): |
| """Get segments of the file from Google Storage as a list. |
| |
| A large request is broken into segments to avoid hitting urlfetch |
| response size limit. Each segment is returned from a separate urlfetch. |
| |
| Args: |
| start: start offset to request. Inclusive. Have to be within the |
| range of the file. |
| request_size: number of bytes to request. |
| |
| Returns: |
| A list of file segments in order |
| """ |
| if not request_size: |
| return [] |
| |
| end = start + request_size |
| futures = [] |
| |
| while request_size > self._max_request_size: |
| futures.append(self._get_segment(start, self._max_request_size)) |
| request_size -= self._max_request_size |
| start += self._max_request_size |
| if start < end: |
| futures.append(self._get_segment(start, end-start)) |
| return [fut.get_result() for fut in futures] |
| |
| @ndb.tasklet |
| def _get_segment(self, start, request_size, check_response=True): |
| """Get a segment of the file from Google Storage. |
| |
| Args: |
| start: start offset of the segment. Inclusive. Have to be within the |
| range of the file. |
| request_size: number of bytes to request. Have to be small enough |
| for a single urlfetch request. May go over the logical range of the |
| file. |
| check_response: True to check the validity of GCS response automatically |
| before the future returns. False otherwise. See Yields section. |
| |
| Yields: |
| If check_response is True, the segment [start, start + request_size) |
| of the file. |
| Otherwise, a tuple. The first element is the unverified file segment. |
| The second element is a closure that checks response. Caller should |
| first invoke the closure before consuing the file segment. |
| |
| Raises: |
| ValueError: if the file has changed while reading. |
| """ |
| end = start + request_size - 1 |
| content_range = '%d-%d' % (start, end) |
| headers = {'Range': 'bytes=' + content_range} |
| status, resp_headers, content = yield self._api.get_object_async( |
| self._path, headers=headers) |
| def _checker(): |
| errors.check_status(status, [200, 206], self._path, headers, |
| resp_headers, body=content) |
| self._check_etag(resp_headers.get('etag')) |
| if check_response: |
| _checker() |
| raise ndb.Return(content) |
| raise ndb.Return(content, _checker) |
| |
| def _check_etag(self, etag): |
| """Check if etag is the same across requests to GCS. |
| |
| If self._etag is None, set it. If etag is set, check that the new |
| etag equals the old one. |
| |
| In the __init__ method, we fire one HEAD and one GET request using |
| ndb tasklet. One of them would return first and set the first value. |
| |
| Args: |
| etag: etag from a GCS HTTP response. None if etag is not part of the |
| response header. It could be None for example in the case of GCS |
| composite file. |
| |
| Raises: |
| ValueError: if two etags are not equal. |
| """ |
| if etag is None: |
| return |
| elif self._etag is None: |
| self._etag = etag |
| elif self._etag != etag: |
| raise ValueError('File on GCS has changed while reading.') |
| |
| def close(self): |
| self.closed = True |
| self._buffer = None |
| self._buffer_future = None |
| |
| def __enter__(self): |
| return self |
| |
| def __exit__(self, atype, value, traceback): |
| self.close() |
| return False |
| |
| def seek(self, offset, whence=os.SEEK_SET): |
| """Set the file's current offset. |
| |
| Note if the new offset is out of bound, it is adjusted to either 0 or EOF. |
| |
| Args: |
| offset: seek offset as number. |
| whence: seek mode. Supported modes are os.SEEK_SET (absolute seek), |
| os.SEEK_CUR (seek relative to the current position), and os.SEEK_END |
| (seek relative to the end, offset should be negative). |
| |
| Raises: |
| IOError: When this buffer is closed. |
| ValueError: When whence is invalid. |
| """ |
| self._check_open() |
| |
| self._buffer.reset() |
| self._buffer_future = None |
| |
| if whence == os.SEEK_SET: |
| self._offset = offset |
| elif whence == os.SEEK_CUR: |
| self._offset += offset |
| elif whence == os.SEEK_END: |
| self._offset = self._file_size + offset |
| else: |
| raise ValueError('Whence mode %s is invalid.' % str(whence)) |
| |
| self._offset = min(self._offset, self._file_size) |
| self._offset = max(self._offset, 0) |
| if self._remaining(): |
| self._request_next_buffer() |
| |
| def tell(self): |
| """Tell the file's current offset. |
| |
| Returns: |
| current offset in reading this file. |
| |
| Raises: |
| IOError: When this buffer is closed. |
| """ |
| self._check_open() |
| return self._offset |
| |
| def _check_open(self): |
| if self.closed: |
| raise IOError('Buffer is closed.') |
| |
| def seekable(self): |
| return True |
| |
| def readable(self): |
| return True |
| |
| def writable(self): |
| return False |
| |
| |
| class _Buffer(object): |
| """In memory buffer.""" |
| |
| def __init__(self): |
| self.reset() |
| |
| def reset(self, content='', offset=0): |
| self._buffer = content |
| self._offset = offset |
| |
| def read(self, size=-1): |
| """Returns bytes from self._buffer and update related offsets. |
| |
| Args: |
| size: number of bytes to read starting from current offset. |
| Read the entire buffer if negative. |
| |
| Returns: |
| Requested bytes from buffer. |
| """ |
| if size < 0: |
| offset = len(self._buffer) |
| else: |
| offset = self._offset + size |
| return self.read_to_offset(offset) |
| |
| def read_to_offset(self, offset): |
| """Returns bytes from self._buffer and update related offsets. |
| |
| Args: |
| offset: read from current offset to this offset, exclusive. |
| |
| Returns: |
| Requested bytes from buffer. |
| """ |
| assert offset >= self._offset |
| result = self._buffer[self._offset: offset] |
| self._offset += len(result) |
| return result |
| |
| def remaining(self): |
| return len(self._buffer) - self._offset |
| |
| def find_newline(self, size=-1): |
| """Search for newline char in buffer starting from current offset. |
| |
| Args: |
| size: number of bytes to search. -1 means all. |
| |
| Returns: |
| offset of newline char in buffer. -1 if doesn't exist. |
| """ |
| if size < 0: |
| return self._buffer.find('\n', self._offset) |
| return self._buffer.find('\n', self._offset, self._offset + size) |
| |
| |
| class StreamingBuffer(object): |
| """A class for creating large objects using the 'resumable' API. |
| |
| The API is a subset of the Python writable stream API sufficient to |
| support writing zip files using the zipfile module. |
| |
| The exact sequence of calls and use of headers is documented at |
| https://developers.google.com/storage/docs/developer-guide#unknownresumables |
| """ |
| |
| _blocksize = 256 * 1024 |
| |
| _flushsize = 8 * _blocksize |
| |
| _maxrequestsize = 9 * 4 * _blocksize |
| |
| def __init__(self, |
| api, |
| path, |
| content_type=None, |
| gcs_headers=None): |
| """Constructor. |
| |
| Args: |
| api: A StorageApi instance. |
| path: Quoted/escaped path to the object, e.g. /mybucket/myfile |
| content_type: Optional content-type; Default value is |
| delegate to Google Cloud Storage. |
| gcs_headers: additional gs headers as a str->str dict, e.g |
| {'x-goog-acl': 'private', 'x-goog-meta-foo': 'foo'}. |
| Raises: |
| IOError: When this location can not be found. |
| """ |
| assert self._maxrequestsize > self._blocksize |
| assert self._maxrequestsize % self._blocksize == 0 |
| assert self._maxrequestsize >= self._flushsize |
| |
| self._api = api |
| self._path = path |
| |
| self.name = api_utils._unquote_filename(path) |
| self.closed = False |
| |
| self._buffer = collections.deque() |
| self._buffered = 0 |
| self._written = 0 |
| self._offset = 0 |
| |
| headers = {'x-goog-resumable': 'start'} |
| if content_type: |
| headers['content-type'] = content_type |
| if gcs_headers: |
| headers.update(gcs_headers) |
| status, resp_headers, content = self._api.post_object(path, headers=headers) |
| errors.check_status(status, [201], path, headers, resp_headers, |
| body=content) |
| loc = resp_headers.get('location') |
| if not loc: |
| raise IOError('No location header found in 201 response') |
| parsed = urlparse.urlparse(loc) |
| self._path_with_token = '%s?%s' % (self._path, parsed.query) |
| |
| def __getstate__(self): |
| """Store state as part of serialization/pickling. |
| |
| The contents of the write buffer are stored. Writes to the underlying |
| storage are required to be on block boundaries (_blocksize) except for the |
| last write. In the worst case the pickled version of this object may be |
| slightly larger than the blocksize. |
| |
| Returns: |
| A dictionary with the state of this object |
| |
| """ |
| return {'api': self._api, |
| 'path': self._path, |
| 'path_token': self._path_with_token, |
| 'buffer': self._buffer, |
| 'buffered': self._buffered, |
| 'written': self._written, |
| 'offset': self._offset, |
| 'closed': self.closed} |
| |
| def __setstate__(self, state): |
| """Restore state as part of deserialization/unpickling. |
| |
| Args: |
| state: the dictionary from a __getstate__ call |
| """ |
| self._api = state['api'] |
| self._path_with_token = state['path_token'] |
| self._buffer = state['buffer'] |
| self._buffered = state['buffered'] |
| self._written = state['written'] |
| self._offset = state['offset'] |
| self.closed = state['closed'] |
| self._path = state['path'] |
| self.name = api_utils._unquote_filename(self._path) |
| |
| def write(self, data): |
| """Write some bytes. |
| |
| Args: |
| data: data to write. str. |
| |
| Raises: |
| TypeError: if data is not of type str. |
| """ |
| self._check_open() |
| if not isinstance(data, str): |
| raise TypeError('Expected str but got %s.' % type(data)) |
| if not data: |
| return |
| self._buffer.append(data) |
| self._buffered += len(data) |
| self._offset += len(data) |
| if self._buffered >= self._flushsize: |
| self._flush() |
| |
| def flush(self): |
| """Flush as much as possible to GCS. |
| |
| GCS *requires* that all writes except for the final one align on |
| 256KB boundaries. So the internal buffer may still have < 256KB bytes left |
| after flush. |
| """ |
| self._check_open() |
| self._flush(finish=False) |
| |
| def tell(self): |
| """Return the total number of bytes passed to write() so far. |
| |
| (There is no seek() method.) |
| """ |
| return self._offset |
| |
| def close(self): |
| """Flush the buffer and finalize the file. |
| |
| When this returns the new file is available for reading. |
| """ |
| if not self.closed: |
| self.closed = True |
| self._flush(finish=True) |
| self._buffer = None |
| |
| def __enter__(self): |
| return self |
| |
| def __exit__(self, atype, value, traceback): |
| self.close() |
| return False |
| |
| def _flush(self, finish=False): |
| """Internal API to flush. |
| |
| Buffer is flushed to GCS only when the total amount of buffered data is at |
| least self._blocksize, or to flush the final (incomplete) block of |
| the file with finish=True. |
| """ |
| while ((finish and self._buffered >= 0) or |
| (not finish and self._buffered >= self._blocksize)): |
| tmp_buffer = [] |
| tmp_buffer_len = 0 |
| |
| excess = 0 |
| while self._buffer: |
| buf = self._buffer.popleft() |
| size = len(buf) |
| self._buffered -= size |
| tmp_buffer.append(buf) |
| tmp_buffer_len += size |
| if tmp_buffer_len >= self._maxrequestsize: |
| excess = tmp_buffer_len - self._maxrequestsize |
| break |
| if not finish and ( |
| tmp_buffer_len % self._blocksize + self._buffered < |
| self._blocksize): |
| excess = tmp_buffer_len % self._blocksize |
| break |
| |
| if excess: |
| over = tmp_buffer.pop() |
| size = len(over) |
| assert size >= excess |
| tmp_buffer_len -= size |
| head, tail = over[:-excess], over[-excess:] |
| self._buffer.appendleft(tail) |
| self._buffered += len(tail) |
| if head: |
| tmp_buffer.append(head) |
| tmp_buffer_len += len(head) |
| |
| data = ''.join(tmp_buffer) |
| file_len = '*' |
| if finish and not self._buffered: |
| file_len = self._written + len(data) |
| self._send_data(data, self._written, file_len) |
| self._written += len(data) |
| if file_len != '*': |
| break |
| |
| def _send_data(self, data, start_offset, file_len): |
| """Send the block to the storage service. |
| |
| This is a utility method that does not modify self. |
| |
| Args: |
| data: data to send in str. |
| start_offset: start offset of the data in relation to the file. |
| file_len: an int if this is the last data to append to the file. |
| Otherwise '*'. |
| """ |
| headers = {} |
| end_offset = start_offset + len(data) - 1 |
| |
| if data: |
| headers['content-range'] = ('bytes %d-%d/%s' % |
| (start_offset, end_offset, file_len)) |
| else: |
| headers['content-range'] = ('bytes */%s' % file_len) |
| |
| status, response_headers, content = self._api.put_object( |
| self._path_with_token, payload=data, headers=headers) |
| if file_len == '*': |
| expected = 308 |
| else: |
| expected = 200 |
| errors.check_status(status, [expected], self._path, headers, |
| response_headers, content, |
| {'upload_path': self._path_with_token}) |
| |
| def _get_offset_from_gcs(self): |
| """Get the last offset that has been written to GCS. |
| |
| This is a utility method that does not modify self. |
| |
| Returns: |
| an int of the last offset written to GCS by this upload, inclusive. |
| -1 means nothing has been written. |
| """ |
| headers = {'content-range': 'bytes */*'} |
| status, response_headers, content = self._api.put_object( |
| self._path_with_token, headers=headers) |
| errors.check_status(status, [308], self._path, headers, |
| response_headers, content, |
| {'upload_path': self._path_with_token}) |
| val = response_headers.get('range') |
| if val is None: |
| return -1 |
| _, offset = val.rsplit('-', 1) |
| return int(offset) |
| |
| def _force_close(self, file_length=None): |
| """Close this buffer on file_length. |
| |
| Finalize this upload immediately on file_length. |
| Contents that are still in memory will not be uploaded. |
| |
| This is a utility method that does not modify self. |
| |
| Args: |
| file_length: file length. Must match what has been uploaded. If None, |
| it will be queried from GCS. |
| """ |
| if file_length is None: |
| file_length = self._get_offset_from_gcs() + 1 |
| self._send_data('', 0, file_length) |
| |
| def _check_open(self): |
| if self.closed: |
| raise IOError('Buffer is closed.') |
| |
| def seekable(self): |
| return False |
| |
| def readable(self): |
| return False |
| |
| def writable(self): |
| return True |