blob: ca111be39be9d7f04548bac81a14152410f32975 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2017 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Compression support for apitools."""
from collections import deque
from apitools.base.py import gzip
__all__ = [
'CompressStream',
]
# pylint: disable=invalid-name
# Note: Apitools only uses the default chunksize when compressing.
def CompressStream(in_stream, length=None, compresslevel=2,
chunksize=16777216):
"""Compresses an input stream into a file-like buffer.
This reads from the input stream until either we've stored at least length
compressed bytes, or the input stream has been exhausted.
This supports streams of unknown size.
Args:
in_stream: The input stream to read from.
length: The target number of compressed bytes to buffer in the output
stream. If length is none, the input stream will be compressed
until it's exhausted.
The actual length of the output buffer can vary from the target.
If the input stream is exhaused, the output buffer may be smaller
than expected. If the data is incompressible, the maximum length
can be exceeded by can be calculated to be:
chunksize + 5 * (floor((chunksize - 1) / 16383) + 1) + 17
This accounts for additional header data gzip adds. For the default
16MiB chunksize, this results in the max size of the output buffer
being:
length + 16Mib + 5142 bytes
compresslevel: Optional, defaults to 2. The desired compression level.
chunksize: Optional, defaults to 16MiB. The chunk size used when
reading data from the input stream to write into the output
buffer.
Returns:
A file-like output buffer of compressed bytes, the number of bytes read
from the input stream, and a flag denoting if the input stream was
exhausted.
"""
in_read = 0
in_exhausted = False
out_stream = StreamingBuffer()
with gzip.GzipFile(mode='wb',
fileobj=out_stream,
compresslevel=compresslevel) as compress_stream:
# Read until we've written at least length bytes to the output stream.
while not length or out_stream.length < length:
data = in_stream.read(chunksize)
data_length = len(data)
compress_stream.write(data)
in_read += data_length
# If we read less than requested, the stream is exhausted.
if data_length < chunksize:
in_exhausted = True
break
return out_stream, in_read, in_exhausted
class StreamingBuffer(object):
"""Provides a file-like object that writes to a temporary buffer.
When data is read from the buffer, it is permanently removed. This is
useful when there are memory constraints preventing the entire buffer from
being stored in memory.
"""
def __init__(self):
# The buffer of byte arrays.
self.__buf = deque()
# The number of bytes in __buf.
self.__size = 0
def __len__(self):
return self.__size
def __nonzero__(self):
# For 32-bit python2.x, len() cannot exceed a 32-bit number; avoid
# accidental len() calls from httplib in the form of "if this_object:".
return bool(self.__size)
@property
def length(self):
# For 32-bit python2.x, len() cannot exceed a 32-bit number.
return self.__size
def write(self, data):
# Gzip can write many 0 byte chunks for highly compressible data.
# Prevent them from being added internally.
if data is not None and data:
self.__buf.append(data)
self.__size += len(data)
def read(self, size=None):
"""Read at most size bytes from this buffer.
Bytes read from this buffer are consumed and are permanently removed.
Args:
size: If provided, read no more than size bytes from the buffer.
Otherwise, this reads the entire buffer.
Returns:
The bytes read from this buffer.
"""
if size is None:
size = self.__size
ret_list = []
while size > 0 and self.__buf:
data = self.__buf.popleft()
size -= len(data)
ret_list.append(data)
if size < 0:
ret_list[-1], remainder = ret_list[-1][:size], ret_list[-1][size:]
self.__buf.appendleft(remainder)
ret = b''.join(ret_list)
self.__size -= len(ret)
return ret