| from __future__ import absolute_import, division, unicode_literals |
| |
| from . import support # flake8: noqa |
| import unittest |
| import codecs |
| from io import BytesIO |
| |
| from six.moves import http_client |
| |
| from html5lib.inputstream import (BufferedStream, HTMLInputStream, |
| HTMLUnicodeInputStream, HTMLBinaryInputStream) |
| |
| class BufferedStreamTest(unittest.TestCase): |
| def test_basic(self): |
| s = b"abc" |
| fp = BufferedStream(BytesIO(s)) |
| read = fp.read(10) |
| assert read == s |
| |
| def test_read_length(self): |
| fp = BufferedStream(BytesIO(b"abcdef")) |
| read1 = fp.read(1) |
| assert read1 == b"a" |
| read2 = fp.read(2) |
| assert read2 == b"bc" |
| read3 = fp.read(3) |
| assert read3 == b"def" |
| read4 = fp.read(4) |
| assert read4 == b"" |
| |
| def test_tell(self): |
| fp = BufferedStream(BytesIO(b"abcdef")) |
| read1 = fp.read(1) |
| assert fp.tell() == 1 |
| read2 = fp.read(2) |
| assert fp.tell() == 3 |
| read3 = fp.read(3) |
| assert fp.tell() == 6 |
| read4 = fp.read(4) |
| assert fp.tell() == 6 |
| |
| def test_seek(self): |
| fp = BufferedStream(BytesIO(b"abcdef")) |
| read1 = fp.read(1) |
| assert read1 == b"a" |
| fp.seek(0) |
| read2 = fp.read(1) |
| assert read2 == b"a" |
| read3 = fp.read(2) |
| assert read3 == b"bc" |
| fp.seek(2) |
| read4 = fp.read(2) |
| assert read4 == b"cd" |
| fp.seek(4) |
| read5 = fp.read(2) |
| assert read5 == b"ef" |
| |
| def test_seek_tell(self): |
| fp = BufferedStream(BytesIO(b"abcdef")) |
| read1 = fp.read(1) |
| assert fp.tell() == 1 |
| fp.seek(0) |
| read2 = fp.read(1) |
| assert fp.tell() == 1 |
| read3 = fp.read(2) |
| assert fp.tell() == 3 |
| fp.seek(2) |
| read4 = fp.read(2) |
| assert fp.tell() == 4 |
| fp.seek(4) |
| read5 = fp.read(2) |
| assert fp.tell() == 6 |
| |
| |
| class HTMLUnicodeInputStreamShortChunk(HTMLUnicodeInputStream): |
| _defaultChunkSize = 2 |
| |
| |
| class HTMLBinaryInputStreamShortChunk(HTMLBinaryInputStream): |
| _defaultChunkSize = 2 |
| |
| |
| class HTMLInputStreamTest(unittest.TestCase): |
| |
| def test_char_ascii(self): |
| stream = HTMLInputStream(b"'", encoding='ascii') |
| self.assertEqual(stream.charEncoding[0], 'ascii') |
| self.assertEqual(stream.char(), "'") |
| |
| def test_char_utf8(self): |
| stream = HTMLInputStream('\u2018'.encode('utf-8'), encoding='utf-8') |
| self.assertEqual(stream.charEncoding[0], 'utf-8') |
| self.assertEqual(stream.char(), '\u2018') |
| |
| def test_char_win1252(self): |
| stream = HTMLInputStream("\xa9\xf1\u2019".encode('windows-1252')) |
| self.assertEqual(stream.charEncoding[0], 'windows-1252') |
| self.assertEqual(stream.char(), "\xa9") |
| self.assertEqual(stream.char(), "\xf1") |
| self.assertEqual(stream.char(), "\u2019") |
| |
| def test_bom(self): |
| stream = HTMLInputStream(codecs.BOM_UTF8 + b"'") |
| self.assertEqual(stream.charEncoding[0], 'utf-8') |
| self.assertEqual(stream.char(), "'") |
| |
| def test_utf_16(self): |
| stream = HTMLInputStream((' ' * 1025).encode('utf-16')) |
| self.assertTrue(stream.charEncoding[0] in ['utf-16-le', 'utf-16-be'], stream.charEncoding) |
| self.assertEqual(len(stream.charsUntil(' ', True)), 1025) |
| |
| def test_newlines(self): |
| stream = HTMLBinaryInputStreamShortChunk(codecs.BOM_UTF8 + b"a\nbb\r\nccc\rddddxe") |
| self.assertEqual(stream.position(), (1, 0)) |
| self.assertEqual(stream.charsUntil('c'), "a\nbb\n") |
| self.assertEqual(stream.position(), (3, 0)) |
| self.assertEqual(stream.charsUntil('x'), "ccc\ndddd") |
| self.assertEqual(stream.position(), (4, 4)) |
| self.assertEqual(stream.charsUntil('e'), "x") |
| self.assertEqual(stream.position(), (4, 5)) |
| |
| def test_newlines2(self): |
| size = HTMLUnicodeInputStream._defaultChunkSize |
| stream = HTMLInputStream("\r" * size + "\n") |
| self.assertEqual(stream.charsUntil('x'), "\n" * size) |
| |
| def test_position(self): |
| stream = HTMLBinaryInputStreamShortChunk(codecs.BOM_UTF8 + b"a\nbb\nccc\nddde\nf\ngh") |
| self.assertEqual(stream.position(), (1, 0)) |
| self.assertEqual(stream.charsUntil('c'), "a\nbb\n") |
| self.assertEqual(stream.position(), (3, 0)) |
| stream.unget("\n") |
| self.assertEqual(stream.position(), (2, 2)) |
| self.assertEqual(stream.charsUntil('c'), "\n") |
| self.assertEqual(stream.position(), (3, 0)) |
| stream.unget("\n") |
| self.assertEqual(stream.position(), (2, 2)) |
| self.assertEqual(stream.char(), "\n") |
| self.assertEqual(stream.position(), (3, 0)) |
| self.assertEqual(stream.charsUntil('e'), "ccc\nddd") |
| self.assertEqual(stream.position(), (4, 3)) |
| self.assertEqual(stream.charsUntil('h'), "e\nf\ng") |
| self.assertEqual(stream.position(), (6, 1)) |
| |
| def test_position2(self): |
| stream = HTMLUnicodeInputStreamShortChunk("abc\nd") |
| self.assertEqual(stream.position(), (1, 0)) |
| self.assertEqual(stream.char(), "a") |
| self.assertEqual(stream.position(), (1, 1)) |
| self.assertEqual(stream.char(), "b") |
| self.assertEqual(stream.position(), (1, 2)) |
| self.assertEqual(stream.char(), "c") |
| self.assertEqual(stream.position(), (1, 3)) |
| self.assertEqual(stream.char(), "\n") |
| self.assertEqual(stream.position(), (2, 0)) |
| self.assertEqual(stream.char(), "d") |
| self.assertEqual(stream.position(), (2, 1)) |
| |
| def test_python_issue_20007(self): |
| """ |
| Make sure we have a work-around for Python bug #20007 |
| http://bugs.python.org/issue20007 |
| """ |
| class FakeSocket(object): |
| def makefile(self, _mode, _bufsize=None): |
| return BytesIO(b"HTTP/1.1 200 Ok\r\n\r\nText") |
| |
| source = http_client.HTTPResponse(FakeSocket()) |
| source.begin() |
| stream = HTMLInputStream(source) |
| self.assertEqual(stream.charsUntil(" "), "Text") |
| |
| |
| def buildTestSuite(): |
| return unittest.defaultTestLoader.loadTestsFromName(__name__) |
| |
| |
| def main(): |
| buildTestSuite() |
| unittest.main() |
| |
| if __name__ == '__main__': |
| main() |