blob: c8ecdac170d4d265590e887fddb037663a527a30 [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2017 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for compression."""
from apitools.base.py import compression
from apitools.base.py import gzip
import six
import unittest2
class CompressionTest(unittest2.TestCase):
def setUp(self):
# Sample highly compressible data (~50MB).
self.sample_data = b'abc' * 16777216
# Stream of the sample data.
self.stream = six.BytesIO()
self.stream.write(self.sample_data)
self.length = self.stream.tell()
self.stream.seek(0)
def testCompressionExhausted(self):
"""Test full compression.
Test that highly compressible data is actually compressed in entirety.
"""
output, read, exhausted = compression.CompressStream(
self.stream,
self.length,
9)
# Ensure the compressed buffer is smaller than the input buffer.
self.assertLess(output.length, self.length)
# Ensure we read the entire input stream.
self.assertEqual(read, self.length)
# Ensure the input stream was exhausted.
self.assertTrue(exhausted)
def testCompressionUnbounded(self):
"""Test unbounded compression.
Test that the input stream is exhausted when length is none.
"""
output, read, exhausted = compression.CompressStream(
self.stream,
None,
9)
# Ensure the compressed buffer is smaller than the input buffer.
self.assertLess(output.length, self.length)
# Ensure we read the entire input stream.
self.assertEqual(read, self.length)
# Ensure the input stream was exhausted.
self.assertTrue(exhausted)
def testCompressionPartial(self):
"""Test partial compression.
Test that the length parameter works correctly. The amount of data
that's compressed can be greater than or equal to the requested length.
"""
output_length = 40
output, _, exhausted = compression.CompressStream(
self.stream,
output_length,
9)
# Ensure the requested read size is <= the compressed buffer size.
self.assertLessEqual(output_length, output.length)
# Ensure the input stream was not exhausted.
self.assertFalse(exhausted)
def testCompressionIntegrity(self):
"""Test that compressed data can be decompressed."""
output, read, exhausted = compression.CompressStream(
self.stream,
self.length,
9)
# Ensure uncompressed data matches the sample data.
with gzip.GzipFile(fileobj=output) as f:
original = f.read()
self.assertEqual(original, self.sample_data)
# Ensure we read the entire input stream.
self.assertEqual(read, self.length)
# Ensure the input stream was exhausted.
self.assertTrue(exhausted)
class StreamingBufferTest(unittest2.TestCase):
def setUp(self):
self.stream = compression.StreamingBuffer()
def testSimpleStream(self):
"""Test simple stream operations.
Test that the stream can be written to and read from. Also test that
reading from the stream consumes the bytes.
"""
# Ensure the stream is empty.
self.assertEqual(self.stream.length, 0)
# Ensure data is correctly written.
self.stream.write(b'Sample data')
self.assertEqual(self.stream.length, 11)
# Ensure data can be read and the read data is purged from the stream.
data = self.stream.read(11)
self.assertEqual(data, b'Sample data')
self.assertEqual(self.stream.length, 0)
def testPartialReads(self):
"""Test partial stream reads.
Test that the stream can be read in chunks while perserving the
consumption mechanics.
"""
self.stream.write(b'Sample data')
# Ensure data can be read and the read data is purged from the stream.
data = self.stream.read(6)
self.assertEqual(data, b'Sample')
self.assertEqual(self.stream.length, 5)
# Ensure the remaining data can be read.
data = self.stream.read(5)
self.assertEqual(data, b' data')
self.assertEqual(self.stream.length, 0)
def testTooShort(self):
"""Test excessive stream reads.
Test that more data can be requested from the stream than available
without raising an exception.
"""
self.stream.write(b'Sample')
# Ensure requesting more data than available does not raise an
# exception.
data = self.stream.read(100)
self.assertEqual(data, b'Sample')
self.assertEqual(self.stream.length, 0)