"""Unit tests for the io module.""" | |
# Tests of io are scattered over the test suite: | |
# * test_bufio - tests file buffering | |
# * test_memoryio - tests BytesIO and StringIO | |
# * test_fileio - tests FileIO | |
# * test_file - tests the file interface | |
# * test_io - tests everything else in the io module | |
# * test_univnewlines - tests universal newline support | |
# * test_largefile - tests operations on a file greater than 2**32 bytes | |
# (only enabled with -ulargefile) | |
################################################################################ | |
# ATTENTION TEST WRITERS!!! | |
################################################################################ | |
# When writing tests for io, it's important to test both the C and Python | |
# implementations. This is usually done by writing a base test that refers to | |
# the type it is testing as a attribute. Then it provides custom subclasses to | |
# test both implementations. This file has lots of examples. | |
################################################################################ | |
from __future__ import print_function | |
from __future__ import unicode_literals | |
import os | |
import sys | |
import time | |
import array | |
import random | |
import unittest | |
import weakref | |
import abc | |
import signal | |
import errno | |
from itertools import cycle, count | |
from collections import deque | |
from test import test_support as support | |
import codecs | |
import io # C implementation of io | |
import _pyio as pyio # Python implementation of io | |
try: | |
import threading | |
except ImportError: | |
threading = None | |
__metaclass__ = type | |
bytes = support.py3k_bytes | |
def _default_chunk_size(): | |
"""Get the default TextIOWrapper chunk size""" | |
with io.open(__file__, "r", encoding="latin1") as f: | |
return f._CHUNK_SIZE | |
class MockRawIOWithoutRead: | |
"""A RawIO implementation without read(), so as to exercise the default | |
RawIO.read() which calls readinto().""" | |
def __init__(self, read_stack=()): | |
self._read_stack = list(read_stack) | |
self._write_stack = [] | |
self._reads = 0 | |
self._extraneous_reads = 0 | |
def write(self, b): | |
self._write_stack.append(bytes(b)) | |
return len(b) | |
def writable(self): | |
return True | |
def fileno(self): | |
return 42 | |
def readable(self): | |
return True | |
def seekable(self): | |
return True | |
def seek(self, pos, whence): | |
return 0 # wrong but we gotta return something | |
def tell(self): | |
return 0 # same comment as above | |
def readinto(self, buf): | |
self._reads += 1 | |
max_len = len(buf) | |
try: | |
data = self._read_stack[0] | |
except IndexError: | |
self._extraneous_reads += 1 | |
return 0 | |
if data is None: | |
del self._read_stack[0] | |
return None | |
n = len(data) | |
if len(data) <= max_len: | |
del self._read_stack[0] | |
buf[:n] = data | |
return n | |
else: | |
buf[:] = data[:max_len] | |
self._read_stack[0] = data[max_len:] | |
return max_len | |
def truncate(self, pos=None): | |
return pos | |
class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase): | |
pass | |
class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase): | |
pass | |
class MockRawIO(MockRawIOWithoutRead): | |
def read(self, n=None): | |
self._reads += 1 | |
try: | |
return self._read_stack.pop(0) | |
except: | |
self._extraneous_reads += 1 | |
return b"" | |
class CMockRawIO(MockRawIO, io.RawIOBase): | |
pass | |
class PyMockRawIO(MockRawIO, pyio.RawIOBase): | |
pass | |
class MisbehavedRawIO(MockRawIO): | |
def write(self, b): | |
return MockRawIO.write(self, b) * 2 | |
def read(self, n=None): | |
return MockRawIO.read(self, n) * 2 | |
def seek(self, pos, whence): | |
return -123 | |
def tell(self): | |
return -456 | |
def readinto(self, buf): | |
MockRawIO.readinto(self, buf) | |
return len(buf) * 5 | |
class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase): | |
pass | |
class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase): | |
pass | |
class CloseFailureIO(MockRawIO): | |
closed = 0 | |
def close(self): | |
if not self.closed: | |
self.closed = 1 | |
raise IOError | |
class CCloseFailureIO(CloseFailureIO, io.RawIOBase): | |
pass | |
class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase): | |
pass | |
class MockFileIO: | |
def __init__(self, data): | |
self.read_history = [] | |
super(MockFileIO, self).__init__(data) | |
def read(self, n=None): | |
res = super(MockFileIO, self).read(n) | |
self.read_history.append(None if res is None else len(res)) | |
return res | |
def readinto(self, b): | |
res = super(MockFileIO, self).readinto(b) | |
self.read_history.append(res) | |
return res | |
class CMockFileIO(MockFileIO, io.BytesIO): | |
pass | |
class PyMockFileIO(MockFileIO, pyio.BytesIO): | |
pass | |
class MockNonBlockWriterIO: | |
def __init__(self): | |
self._write_stack = [] | |
self._blocker_char = None | |
def pop_written(self): | |
s = b"".join(self._write_stack) | |
self._write_stack[:] = [] | |
return s | |
def block_on(self, char): | |
"""Block when a given char is encountered.""" | |
self._blocker_char = char | |
def readable(self): | |
return True | |
def seekable(self): | |
return True | |
def writable(self): | |
return True | |
def write(self, b): | |
b = bytes(b) | |
n = -1 | |
if self._blocker_char: | |
try: | |
n = b.index(self._blocker_char) | |
except ValueError: | |
pass | |
else: | |
self._blocker_char = None | |
self._write_stack.append(b[:n]) | |
raise self.BlockingIOError(0, "test blocking", n) | |
self._write_stack.append(b) | |
return len(b) | |
class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase): | |
BlockingIOError = io.BlockingIOError | |
class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase): | |
BlockingIOError = pyio.BlockingIOError | |
class IOTest(unittest.TestCase): | |
def setUp(self): | |
support.unlink(support.TESTFN) | |
def tearDown(self): | |
support.unlink(support.TESTFN) | |
def write_ops(self, f): | |
self.assertEqual(f.write(b"blah."), 5) | |
f.truncate(0) | |
self.assertEqual(f.tell(), 5) | |
f.seek(0) | |
self.assertEqual(f.write(b"blah."), 5) | |
self.assertEqual(f.seek(0), 0) | |
self.assertEqual(f.write(b"Hello."), 6) | |
self.assertEqual(f.tell(), 6) | |
self.assertEqual(f.seek(-1, 1), 5) | |
self.assertEqual(f.tell(), 5) | |
self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9) | |
self.assertEqual(f.seek(0), 0) | |
self.assertEqual(f.write(b"h"), 1) | |
self.assertEqual(f.seek(-1, 2), 13) | |
self.assertEqual(f.tell(), 13) | |
self.assertEqual(f.truncate(12), 12) | |
self.assertEqual(f.tell(), 13) | |
self.assertRaises(TypeError, f.seek, 0.0) | |
def read_ops(self, f, buffered=False): | |
data = f.read(5) | |
self.assertEqual(data, b"hello") | |
data = bytearray(data) | |
self.assertEqual(f.readinto(data), 5) | |
self.assertEqual(data, b" worl") | |
self.assertEqual(f.readinto(data), 2) | |
self.assertEqual(len(data), 5) | |
self.assertEqual(data[:2], b"d\n") | |
self.assertEqual(f.seek(0), 0) | |
self.assertEqual(f.read(20), b"hello world\n") | |
self.assertEqual(f.read(1), b"") | |
self.assertEqual(f.readinto(bytearray(b"x")), 0) | |
self.assertEqual(f.seek(-6, 2), 6) | |
self.assertEqual(f.read(5), b"world") | |
self.assertEqual(f.read(0), b"") | |
self.assertEqual(f.readinto(bytearray()), 0) | |
self.assertEqual(f.seek(-6, 1), 5) | |
self.assertEqual(f.read(5), b" worl") | |
self.assertEqual(f.tell(), 10) | |
self.assertRaises(TypeError, f.seek, 0.0) | |
if buffered: | |
f.seek(0) | |
self.assertEqual(f.read(), b"hello world\n") | |
f.seek(6) | |
self.assertEqual(f.read(), b"world\n") | |
self.assertEqual(f.read(), b"") | |
LARGE = 2**31 | |
def large_file_ops(self, f): | |
assert f.readable() | |
assert f.writable() | |
self.assertEqual(f.seek(self.LARGE), self.LARGE) | |
self.assertEqual(f.tell(), self.LARGE) | |
self.assertEqual(f.write(b"xxx"), 3) | |
self.assertEqual(f.tell(), self.LARGE + 3) | |
self.assertEqual(f.seek(-1, 1), self.LARGE + 2) | |
self.assertEqual(f.truncate(), self.LARGE + 2) | |
self.assertEqual(f.tell(), self.LARGE + 2) | |
self.assertEqual(f.seek(0, 2), self.LARGE + 2) | |
self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1) | |
self.assertEqual(f.tell(), self.LARGE + 2) | |
self.assertEqual(f.seek(0, 2), self.LARGE + 1) | |
self.assertEqual(f.seek(-1, 2), self.LARGE) | |
self.assertEqual(f.read(2), b"x") | |
def test_invalid_operations(self): | |
# Try writing on a file opened in read mode and vice-versa. | |
for mode in ("w", "wb"): | |
with self.open(support.TESTFN, mode) as fp: | |
self.assertRaises(IOError, fp.read) | |
self.assertRaises(IOError, fp.readline) | |
with self.open(support.TESTFN, "rb") as fp: | |
self.assertRaises(IOError, fp.write, b"blah") | |
self.assertRaises(IOError, fp.writelines, [b"blah\n"]) | |
with self.open(support.TESTFN, "r") as fp: | |
self.assertRaises(IOError, fp.write, "blah") | |
self.assertRaises(IOError, fp.writelines, ["blah\n"]) | |
def test_raw_file_io(self): | |
with self.open(support.TESTFN, "wb", buffering=0) as f: | |
self.assertEqual(f.readable(), False) | |
self.assertEqual(f.writable(), True) | |
self.assertEqual(f.seekable(), True) | |
self.write_ops(f) | |
with self.open(support.TESTFN, "rb", buffering=0) as f: | |
self.assertEqual(f.readable(), True) | |
self.assertEqual(f.writable(), False) | |
self.assertEqual(f.seekable(), True) | |
self.read_ops(f) | |
def test_buffered_file_io(self): | |
with self.open(support.TESTFN, "wb") as f: | |
self.assertEqual(f.readable(), False) | |
self.assertEqual(f.writable(), True) | |
self.assertEqual(f.seekable(), True) | |
self.write_ops(f) | |
with self.open(support.TESTFN, "rb") as f: | |
self.assertEqual(f.readable(), True) | |
self.assertEqual(f.writable(), False) | |
self.assertEqual(f.seekable(), True) | |
self.read_ops(f, True) | |
def test_readline(self): | |
with self.open(support.TESTFN, "wb") as f: | |
f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line") | |
with self.open(support.TESTFN, "rb") as f: | |
self.assertEqual(f.readline(), b"abc\n") | |
self.assertEqual(f.readline(10), b"def\n") | |
self.assertEqual(f.readline(2), b"xy") | |
self.assertEqual(f.readline(4), b"zzy\n") | |
self.assertEqual(f.readline(), b"foo\x00bar\n") | |
self.assertEqual(f.readline(None), b"another line") | |
self.assertRaises(TypeError, f.readline, 5.3) | |
with self.open(support.TESTFN, "r") as f: | |
self.assertRaises(TypeError, f.readline, 5.3) | |
def test_raw_bytes_io(self): | |
f = self.BytesIO() | |
self.write_ops(f) | |
data = f.getvalue() | |
self.assertEqual(data, b"hello world\n") | |
f = self.BytesIO(data) | |
self.read_ops(f, True) | |
def test_large_file_ops(self): | |
# On Windows and Mac OSX this test comsumes large resources; It takes | |
# a long time to build the >2GB file and takes >2GB of disk space | |
# therefore the resource must be enabled to run this test. | |
if sys.platform[:3] == 'win' or sys.platform == 'darwin': | |
if not support.is_resource_enabled("largefile"): | |
print("\nTesting large file ops skipped on %s." % sys.platform, | |
file=sys.stderr) | |
print("It requires %d bytes and a long time." % self.LARGE, | |
file=sys.stderr) | |
print("Use 'regrtest.py -u largefile test_io' to run it.", | |
file=sys.stderr) | |
return | |
with self.open(support.TESTFN, "w+b", 0) as f: | |
self.large_file_ops(f) | |
with self.open(support.TESTFN, "w+b") as f: | |
self.large_file_ops(f) | |
def test_with_open(self): | |
for bufsize in (0, 1, 100): | |
f = None | |
with self.open(support.TESTFN, "wb", bufsize) as f: | |
f.write(b"xxx") | |
self.assertEqual(f.closed, True) | |
f = None | |
try: | |
with self.open(support.TESTFN, "wb", bufsize) as f: | |
1 // 0 | |
except ZeroDivisionError: | |
self.assertEqual(f.closed, True) | |
else: | |
self.fail("1 // 0 didn't raise an exception") | |
# issue 5008 | |
def test_append_mode_tell(self): | |
with self.open(support.TESTFN, "wb") as f: | |
f.write(b"xxx") | |
with self.open(support.TESTFN, "ab", buffering=0) as f: | |
self.assertEqual(f.tell(), 3) | |
with self.open(support.TESTFN, "ab") as f: | |
self.assertEqual(f.tell(), 3) | |
with self.open(support.TESTFN, "a") as f: | |
self.assertTrue(f.tell() > 0) | |
def test_destructor(self): | |
record = [] | |
class MyFileIO(self.FileIO): | |
def __del__(self): | |
record.append(1) | |
try: | |
f = super(MyFileIO, self).__del__ | |
except AttributeError: | |
pass | |
else: | |
f() | |
def close(self): | |
record.append(2) | |
super(MyFileIO, self).close() | |
def flush(self): | |
record.append(3) | |
super(MyFileIO, self).flush() | |
f = MyFileIO(support.TESTFN, "wb") | |
f.write(b"xxx") | |
del f | |
support.gc_collect() | |
self.assertEqual(record, [1, 2, 3]) | |
with self.open(support.TESTFN, "rb") as f: | |
self.assertEqual(f.read(), b"xxx") | |
def _check_base_destructor(self, base): | |
record = [] | |
class MyIO(base): | |
def __init__(self): | |
# This exercises the availability of attributes on object | |
# destruction. | |
# (in the C version, close() is called by the tp_dealloc | |
# function, not by __del__) | |
self.on_del = 1 | |
self.on_close = 2 | |
self.on_flush = 3 | |
def __del__(self): | |
record.append(self.on_del) | |
try: | |
f = super(MyIO, self).__del__ | |
except AttributeError: | |
pass | |
else: | |
f() | |
def close(self): | |
record.append(self.on_close) | |
super(MyIO, self).close() | |
def flush(self): | |
record.append(self.on_flush) | |
super(MyIO, self).flush() | |
f = MyIO() | |
del f | |
support.gc_collect() | |
self.assertEqual(record, [1, 2, 3]) | |
def test_IOBase_destructor(self): | |
self._check_base_destructor(self.IOBase) | |
def test_RawIOBase_destructor(self): | |
self._check_base_destructor(self.RawIOBase) | |
def test_BufferedIOBase_destructor(self): | |
self._check_base_destructor(self.BufferedIOBase) | |
def test_TextIOBase_destructor(self): | |
self._check_base_destructor(self.TextIOBase) | |
def test_close_flushes(self): | |
with self.open(support.TESTFN, "wb") as f: | |
f.write(b"xxx") | |
with self.open(support.TESTFN, "rb") as f: | |
self.assertEqual(f.read(), b"xxx") | |
def test_array_writes(self): | |
a = array.array(b'i', range(10)) | |
n = len(a.tostring()) | |
with self.open(support.TESTFN, "wb", 0) as f: | |
self.assertEqual(f.write(a), n) | |
with self.open(support.TESTFN, "wb") as f: | |
self.assertEqual(f.write(a), n) | |
def test_closefd(self): | |
self.assertRaises(ValueError, self.open, support.TESTFN, 'w', | |
closefd=False) | |
def test_read_closed(self): | |
with self.open(support.TESTFN, "w") as f: | |
f.write("egg\n") | |
with self.open(support.TESTFN, "r") as f: | |
file = self.open(f.fileno(), "r", closefd=False) | |
self.assertEqual(file.read(), "egg\n") | |
file.seek(0) | |
file.close() | |
self.assertRaises(ValueError, file.read) | |
def test_no_closefd_with_filename(self): | |
# can't use closefd in combination with a file name | |
self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False) | |
def test_closefd_attr(self): | |
with self.open(support.TESTFN, "wb") as f: | |
f.write(b"egg\n") | |
with self.open(support.TESTFN, "r") as f: | |
self.assertEqual(f.buffer.raw.closefd, True) | |
file = self.open(f.fileno(), "r", closefd=False) | |
self.assertEqual(file.buffer.raw.closefd, False) | |
def test_garbage_collection(self): | |
# FileIO objects are collected, and collecting them flushes | |
# all data to disk. | |
f = self.FileIO(support.TESTFN, "wb") | |
f.write(b"abcxxx") | |
f.f = f | |
wr = weakref.ref(f) | |
del f | |
support.gc_collect() | |
self.assertTrue(wr() is None, wr) | |
with self.open(support.TESTFN, "rb") as f: | |
self.assertEqual(f.read(), b"abcxxx") | |
def test_unbounded_file(self): | |
# Issue #1174606: reading from an unbounded stream such as /dev/zero. | |
zero = "/dev/zero" | |
if not os.path.exists(zero): | |
self.skipTest("{0} does not exist".format(zero)) | |
if sys.maxsize > 0x7FFFFFFF: | |
self.skipTest("test can only run in a 32-bit address space") | |
if support.real_max_memuse < support._2G: | |
self.skipTest("test requires at least 2GB of memory") | |
with self.open(zero, "rb", buffering=0) as f: | |
self.assertRaises(OverflowError, f.read) | |
with self.open(zero, "rb") as f: | |
self.assertRaises(OverflowError, f.read) | |
with self.open(zero, "r") as f: | |
self.assertRaises(OverflowError, f.read) | |
def test_flush_error_on_close(self): | |
f = self.open(support.TESTFN, "wb", buffering=0) | |
def bad_flush(): | |
raise IOError() | |
f.flush = bad_flush | |
self.assertRaises(IOError, f.close) # exception not swallowed | |
def test_multi_close(self): | |
f = self.open(support.TESTFN, "wb", buffering=0) | |
f.close() | |
f.close() | |
f.close() | |
self.assertRaises(ValueError, f.flush) | |
def test_RawIOBase_read(self): | |
# Exercise the default RawIOBase.read() implementation (which calls | |
# readinto() internally). | |
rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None)) | |
self.assertEqual(rawio.read(2), b"ab") | |
self.assertEqual(rawio.read(2), b"c") | |
self.assertEqual(rawio.read(2), b"d") | |
self.assertEqual(rawio.read(2), None) | |
self.assertEqual(rawio.read(2), b"ef") | |
self.assertEqual(rawio.read(2), b"g") | |
self.assertEqual(rawio.read(2), None) | |
self.assertEqual(rawio.read(2), b"") | |
class CIOTest(IOTest): | |
pass | |
class PyIOTest(IOTest): | |
test_array_writes = unittest.skip( | |
"len(array.array) returns number of elements rather than bytelength" | |
)(IOTest.test_array_writes) | |
class CommonBufferedTests: | |
# Tests common to BufferedReader, BufferedWriter and BufferedRandom | |
def test_detach(self): | |
raw = self.MockRawIO() | |
buf = self.tp(raw) | |
self.assertIs(buf.detach(), raw) | |
self.assertRaises(ValueError, buf.detach) | |
def test_fileno(self): | |
rawio = self.MockRawIO() | |
bufio = self.tp(rawio) | |
self.assertEqual(42, bufio.fileno()) | |
def test_no_fileno(self): | |
# XXX will we always have fileno() function? If so, kill | |
# this test. Else, write it. | |
pass | |
def test_invalid_args(self): | |
rawio = self.MockRawIO() | |
bufio = self.tp(rawio) | |
# Invalid whence | |
self.assertRaises(ValueError, bufio.seek, 0, -1) | |
self.assertRaises(ValueError, bufio.seek, 0, 3) | |
def test_override_destructor(self): | |
tp = self.tp | |
record = [] | |
class MyBufferedIO(tp): | |
def __del__(self): | |
record.append(1) | |
try: | |
f = super(MyBufferedIO, self).__del__ | |
except AttributeError: | |
pass | |
else: | |
f() | |
def close(self): | |
record.append(2) | |
super(MyBufferedIO, self).close() | |
def flush(self): | |
record.append(3) | |
super(MyBufferedIO, self).flush() | |
rawio = self.MockRawIO() | |
bufio = MyBufferedIO(rawio) | |
writable = bufio.writable() | |
del bufio | |
support.gc_collect() | |
if writable: | |
self.assertEqual(record, [1, 2, 3]) | |
else: | |
self.assertEqual(record, [1, 2]) | |
def test_context_manager(self): | |
# Test usability as a context manager | |
rawio = self.MockRawIO() | |
bufio = self.tp(rawio) | |
def _with(): | |
with bufio: | |
pass | |
_with() | |
# bufio should now be closed, and using it a second time should raise | |
# a ValueError. | |
self.assertRaises(ValueError, _with) | |
def test_error_through_destructor(self): | |
# Test that the exception state is not modified by a destructor, | |
# even if close() fails. | |
rawio = self.CloseFailureIO() | |
def f(): | |
self.tp(rawio).xyzzy | |
with support.captured_output("stderr") as s: | |
self.assertRaises(AttributeError, f) | |
s = s.getvalue().strip() | |
if s: | |
# The destructor *may* have printed an unraisable error, check it | |
self.assertEqual(len(s.splitlines()), 1) | |
self.assertTrue(s.startswith("Exception IOError: "), s) | |
self.assertTrue(s.endswith(" ignored"), s) | |
def test_repr(self): | |
raw = self.MockRawIO() | |
b = self.tp(raw) | |
clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__) | |
self.assertEqual(repr(b), "<%s>" % clsname) | |
raw.name = "dummy" | |
self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname) | |
raw.name = b"dummy" | |
self.assertEqual(repr(b), "<%s name='dummy'>" % clsname) | |
def test_flush_error_on_close(self): | |
raw = self.MockRawIO() | |
def bad_flush(): | |
raise IOError() | |
raw.flush = bad_flush | |
b = self.tp(raw) | |
self.assertRaises(IOError, b.close) # exception not swallowed | |
def test_multi_close(self): | |
raw = self.MockRawIO() | |
b = self.tp(raw) | |
b.close() | |
b.close() | |
b.close() | |
self.assertRaises(ValueError, b.flush) | |
def test_readonly_attributes(self): | |
raw = self.MockRawIO() | |
buf = self.tp(raw) | |
x = self.MockRawIO() | |
with self.assertRaises((AttributeError, TypeError)): | |
buf.raw = x | |
class BufferedReaderTest(unittest.TestCase, CommonBufferedTests): | |
read_mode = "rb" | |
def test_constructor(self): | |
rawio = self.MockRawIO([b"abc"]) | |
bufio = self.tp(rawio) | |
bufio.__init__(rawio) | |
bufio.__init__(rawio, buffer_size=1024) | |
bufio.__init__(rawio, buffer_size=16) | |
self.assertEqual(b"abc", bufio.read()) | |
self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) | |
self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) | |
self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) | |
rawio = self.MockRawIO([b"abc"]) | |
bufio.__init__(rawio) | |
self.assertEqual(b"abc", bufio.read()) | |
def test_read(self): | |
for arg in (None, 7): | |
rawio = self.MockRawIO((b"abc", b"d", b"efg")) | |
bufio = self.tp(rawio) | |
self.assertEqual(b"abcdefg", bufio.read(arg)) | |
# Invalid args | |
self.assertRaises(ValueError, bufio.read, -2) | |
def test_read1(self): | |
rawio = self.MockRawIO((b"abc", b"d", b"efg")) | |
bufio = self.tp(rawio) | |
self.assertEqual(b"a", bufio.read(1)) | |
self.assertEqual(b"b", bufio.read1(1)) | |
self.assertEqual(rawio._reads, 1) | |
self.assertEqual(b"c", bufio.read1(100)) | |
self.assertEqual(rawio._reads, 1) | |
self.assertEqual(b"d", bufio.read1(100)) | |
self.assertEqual(rawio._reads, 2) | |
self.assertEqual(b"efg", bufio.read1(100)) | |
self.assertEqual(rawio._reads, 3) | |
self.assertEqual(b"", bufio.read1(100)) | |
self.assertEqual(rawio._reads, 4) | |
# Invalid args | |
self.assertRaises(ValueError, bufio.read1, -1) | |
def test_readinto(self): | |
rawio = self.MockRawIO((b"abc", b"d", b"efg")) | |
bufio = self.tp(rawio) | |
b = bytearray(2) | |
self.assertEqual(bufio.readinto(b), 2) | |
self.assertEqual(b, b"ab") | |
self.assertEqual(bufio.readinto(b), 2) | |
self.assertEqual(b, b"cd") | |
self.assertEqual(bufio.readinto(b), 2) | |
self.assertEqual(b, b"ef") | |
self.assertEqual(bufio.readinto(b), 1) | |
self.assertEqual(b, b"gf") | |
self.assertEqual(bufio.readinto(b), 0) | |
self.assertEqual(b, b"gf") | |
def test_readlines(self): | |
def bufio(): | |
rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef")) | |
return self.tp(rawio) | |
self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"]) | |
self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"]) | |
self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"]) | |
def test_buffering(self): | |
data = b"abcdefghi" | |
dlen = len(data) | |
tests = [ | |
[ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ], | |
[ 100, [ 3, 3, 3], [ dlen ] ], | |
[ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ], | |
] | |
for bufsize, buf_read_sizes, raw_read_sizes in tests: | |
rawio = self.MockFileIO(data) | |
bufio = self.tp(rawio, buffer_size=bufsize) | |
pos = 0 | |
for nbytes in buf_read_sizes: | |
self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes]) | |
pos += nbytes | |
# this is mildly implementation-dependent | |
self.assertEqual(rawio.read_history, raw_read_sizes) | |
def test_read_non_blocking(self): | |
# Inject some None's in there to simulate EWOULDBLOCK | |
rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None)) | |
bufio = self.tp(rawio) | |
self.assertEqual(b"abcd", bufio.read(6)) | |
self.assertEqual(b"e", bufio.read(1)) | |
self.assertEqual(b"fg", bufio.read()) | |
self.assertEqual(b"", bufio.peek(1)) | |
self.assertIsNone(bufio.read()) | |
self.assertEqual(b"", bufio.read()) | |
rawio = self.MockRawIO((b"a", None, None)) | |
self.assertEqual(b"a", rawio.readall()) | |
self.assertIsNone(rawio.readall()) | |
def test_read_past_eof(self): | |
rawio = self.MockRawIO((b"abc", b"d", b"efg")) | |
bufio = self.tp(rawio) | |
self.assertEqual(b"abcdefg", bufio.read(9000)) | |
def test_read_all(self): | |
rawio = self.MockRawIO((b"abc", b"d", b"efg")) | |
bufio = self.tp(rawio) | |
self.assertEqual(b"abcdefg", bufio.read()) | |
@unittest.skipUnless(threading, 'Threading required for this test.') | |
@support.requires_resource('cpu') | |
def test_threads(self): | |
try: | |
# Write out many bytes with exactly the same number of 0's, | |
# 1's... 255's. This will help us check that concurrent reading | |
# doesn't duplicate or forget contents. | |
N = 1000 | |
l = list(range(256)) * N | |
random.shuffle(l) | |
s = bytes(bytearray(l)) | |
with self.open(support.TESTFN, "wb") as f: | |
f.write(s) | |
with self.open(support.TESTFN, self.read_mode, buffering=0) as raw: | |
bufio = self.tp(raw, 8) | |
errors = [] | |
results = [] | |
def f(): | |
try: | |
# Intra-buffer read then buffer-flushing read | |
for n in cycle([1, 19]): | |
s = bufio.read(n) | |
if not s: | |
break | |
# list.append() is atomic | |
results.append(s) | |
except Exception as e: | |
errors.append(e) | |
raise | |
threads = [threading.Thread(target=f) for x in range(20)] | |
for t in threads: | |
t.start() | |
time.sleep(0.02) # yield | |
for t in threads: | |
t.join() | |
self.assertFalse(errors, | |
"the following exceptions were caught: %r" % errors) | |
s = b''.join(results) | |
for i in range(256): | |
c = bytes(bytearray([i])) | |
self.assertEqual(s.count(c), N) | |
finally: | |
support.unlink(support.TESTFN) | |
def test_misbehaved_io(self): | |
rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) | |
bufio = self.tp(rawio) | |
self.assertRaises(IOError, bufio.seek, 0) | |
self.assertRaises(IOError, bufio.tell) | |
def test_no_extraneous_read(self): | |
# Issue #9550; when the raw IO object has satisfied the read request, | |
# we should not issue any additional reads, otherwise it may block | |
# (e.g. socket). | |
bufsize = 16 | |
for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2): | |
rawio = self.MockRawIO([b"x" * n]) | |
bufio = self.tp(rawio, bufsize) | |
self.assertEqual(bufio.read(n), b"x" * n) | |
# Simple case: one raw read is enough to satisfy the request. | |
self.assertEqual(rawio._extraneous_reads, 0, | |
"failed for {}: {} != 0".format(n, rawio._extraneous_reads)) | |
# A more complex case where two raw reads are needed to satisfy | |
# the request. | |
rawio = self.MockRawIO([b"x" * (n - 1), b"x"]) | |
bufio = self.tp(rawio, bufsize) | |
self.assertEqual(bufio.read(n), b"x" * n) | |
self.assertEqual(rawio._extraneous_reads, 0, | |
"failed for {}: {} != 0".format(n, rawio._extraneous_reads)) | |
class CBufferedReaderTest(BufferedReaderTest): | |
tp = io.BufferedReader | |
def test_constructor(self): | |
BufferedReaderTest.test_constructor(self) | |
# The allocation can succeed on 32-bit builds, e.g. with more | |
# than 2GB RAM and a 64-bit kernel. | |
if sys.maxsize > 0x7FFFFFFF: | |
rawio = self.MockRawIO() | |
bufio = self.tp(rawio) | |
self.assertRaises((OverflowError, MemoryError, ValueError), | |
bufio.__init__, rawio, sys.maxsize) | |
def test_initialization(self): | |
rawio = self.MockRawIO([b"abc"]) | |
bufio = self.tp(rawio) | |
self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) | |
self.assertRaises(ValueError, bufio.read) | |
self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) | |
self.assertRaises(ValueError, bufio.read) | |
self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) | |
self.assertRaises(ValueError, bufio.read) | |
def test_misbehaved_io_read(self): | |
rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) | |
bufio = self.tp(rawio) | |
# _pyio.BufferedReader seems to implement reading different, so that | |
# checking this is not so easy. | |
self.assertRaises(IOError, bufio.read, 10) | |
def test_garbage_collection(self): | |
# C BufferedReader objects are collected. | |
# The Python version has __del__, so it ends into gc.garbage instead | |
rawio = self.FileIO(support.TESTFN, "w+b") | |
f = self.tp(rawio) | |
f.f = f | |
wr = weakref.ref(f) | |
del f | |
support.gc_collect() | |
self.assertTrue(wr() is None, wr) | |
class PyBufferedReaderTest(BufferedReaderTest): | |
tp = pyio.BufferedReader | |
class BufferedWriterTest(unittest.TestCase, CommonBufferedTests): | |
write_mode = "wb" | |
def test_constructor(self): | |
rawio = self.MockRawIO() | |
bufio = self.tp(rawio) | |
bufio.__init__(rawio) | |
bufio.__init__(rawio, buffer_size=1024) | |
bufio.__init__(rawio, buffer_size=16) | |
self.assertEqual(3, bufio.write(b"abc")) | |
bufio.flush() | |
self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) | |
self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) | |
self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) | |
bufio.__init__(rawio) | |
self.assertEqual(3, bufio.write(b"ghi")) | |
bufio.flush() | |
self.assertEqual(b"".join(rawio._write_stack), b"abcghi") | |
def test_detach_flush(self): | |
raw = self.MockRawIO() | |
buf = self.tp(raw) | |
buf.write(b"howdy!") | |
self.assertFalse(raw._write_stack) | |
buf.detach() | |
self.assertEqual(raw._write_stack, [b"howdy!"]) | |
def test_write(self): | |
# Write to the buffered IO but don't overflow the buffer. | |
writer = self.MockRawIO() | |
bufio = self.tp(writer, 8) | |
bufio.write(b"abc") | |
self.assertFalse(writer._write_stack) | |
def test_write_overflow(self): | |
writer = self.MockRawIO() | |
bufio = self.tp(writer, 8) | |
contents = b"abcdefghijklmnop" | |
for n in range(0, len(contents), 3): | |
bufio.write(contents[n:n+3]) | |
flushed = b"".join(writer._write_stack) | |
# At least (total - 8) bytes were implicitly flushed, perhaps more | |
# depending on the implementation. | |
self.assertTrue(flushed.startswith(contents[:-8]), flushed) | |
def check_writes(self, intermediate_func): | |
# Lots of writes, test the flushed output is as expected. | |
contents = bytes(range(256)) * 1000 | |
n = 0 | |
writer = self.MockRawIO() | |
bufio = self.tp(writer, 13) | |
# Generator of write sizes: repeat each N 15 times then proceed to N+1 | |
def gen_sizes(): | |
for size in count(1): | |
for i in range(15): | |
yield size | |
sizes = gen_sizes() | |
while n < len(contents): | |
size = min(next(sizes), len(contents) - n) | |
self.assertEqual(bufio.write(contents[n:n+size]), size) | |
intermediate_func(bufio) | |
n += size | |
bufio.flush() | |
self.assertEqual(contents, | |
b"".join(writer._write_stack)) | |
def test_writes(self): | |
self.check_writes(lambda bufio: None) | |
def test_writes_and_flushes(self): | |
self.check_writes(lambda bufio: bufio.flush()) | |
def test_writes_and_seeks(self): | |
def _seekabs(bufio): | |
pos = bufio.tell() | |
bufio.seek(pos + 1, 0) | |
bufio.seek(pos - 1, 0) | |
bufio.seek(pos, 0) | |
self.check_writes(_seekabs) | |
def _seekrel(bufio): | |
pos = bufio.seek(0, 1) | |
bufio.seek(+1, 1) | |
bufio.seek(-1, 1) | |
bufio.seek(pos, 0) | |
self.check_writes(_seekrel) | |
def test_writes_and_truncates(self): | |
self.check_writes(lambda bufio: bufio.truncate(bufio.tell())) | |
def test_write_non_blocking(self): | |
raw = self.MockNonBlockWriterIO() | |
bufio = self.tp(raw, 8) | |
self.assertEqual(bufio.write(b"abcd"), 4) | |
self.assertEqual(bufio.write(b"efghi"), 5) | |
# 1 byte will be written, the rest will be buffered | |
raw.block_on(b"k") | |
self.assertEqual(bufio.write(b"jklmn"), 5) | |
# 8 bytes will be written, 8 will be buffered and the rest will be lost | |
raw.block_on(b"0") | |
try: | |
bufio.write(b"opqrwxyz0123456789") | |
except self.BlockingIOError as e: | |
written = e.characters_written | |
else: | |
self.fail("BlockingIOError should have been raised") | |
self.assertEqual(written, 16) | |
self.assertEqual(raw.pop_written(), | |
b"abcdefghijklmnopqrwxyz") | |
self.assertEqual(bufio.write(b"ABCDEFGHI"), 9) | |
s = raw.pop_written() | |
# Previously buffered bytes were flushed | |
self.assertTrue(s.startswith(b"01234567A"), s) | |
def test_write_and_rewind(self): | |
raw = io.BytesIO() | |
bufio = self.tp(raw, 4) | |
self.assertEqual(bufio.write(b"abcdef"), 6) | |
self.assertEqual(bufio.tell(), 6) | |
bufio.seek(0, 0) | |
self.assertEqual(bufio.write(b"XY"), 2) | |
bufio.seek(6, 0) | |
self.assertEqual(raw.getvalue(), b"XYcdef") | |
self.assertEqual(bufio.write(b"123456"), 6) | |
bufio.flush() | |
self.assertEqual(raw.getvalue(), b"XYcdef123456") | |
def test_flush(self): | |
writer = self.MockRawIO() | |
bufio = self.tp(writer, 8) | |
bufio.write(b"abc") | |
bufio.flush() | |
self.assertEqual(b"abc", writer._write_stack[0]) | |
def test_destructor(self): | |
writer = self.MockRawIO() | |
bufio = self.tp(writer, 8) | |
bufio.write(b"abc") | |
del bufio | |
support.gc_collect() | |
self.assertEqual(b"abc", writer._write_stack[0]) | |
def test_truncate(self): | |
# Truncate implicitly flushes the buffer. | |
with self.open(support.TESTFN, self.write_mode, buffering=0) as raw: | |
bufio = self.tp(raw, 8) | |
bufio.write(b"abcdef") | |
self.assertEqual(bufio.truncate(3), 3) | |
self.assertEqual(bufio.tell(), 6) | |
with self.open(support.TESTFN, "rb", buffering=0) as f: | |
self.assertEqual(f.read(), b"abc") | |
@unittest.skipUnless(threading, 'Threading required for this test.') | |
@support.requires_resource('cpu') | |
def test_threads(self): | |
try: | |
# Write out many bytes from many threads and test they were | |
# all flushed. | |
N = 1000 | |
contents = bytes(range(256)) * N | |
sizes = cycle([1, 19]) | |
n = 0 | |
queue = deque() | |
while n < len(contents): | |
size = next(sizes) | |
queue.append(contents[n:n+size]) | |
n += size | |
del contents | |
# We use a real file object because it allows us to | |
# exercise situations where the GIL is released before | |
# writing the buffer to the raw streams. This is in addition | |
# to concurrency issues due to switching threads in the middle | |
# of Python code. | |
with self.open(support.TESTFN, self.write_mode, buffering=0) as raw: | |
bufio = self.tp(raw, 8) | |
errors = [] | |
def f(): | |
try: | |
while True: | |
try: | |
s = queue.popleft() | |
except IndexError: | |
return | |
bufio.write(s) | |
except Exception as e: | |
errors.append(e) | |
raise | |
threads = [threading.Thread(target=f) for x in range(20)] | |
for t in threads: | |
t.start() | |
time.sleep(0.02) # yield | |
for t in threads: | |
t.join() | |
self.assertFalse(errors, | |
"the following exceptions were caught: %r" % errors) | |
bufio.close() | |
with self.open(support.TESTFN, "rb") as f: | |
s = f.read() | |
for i in range(256): | |
self.assertEqual(s.count(bytes([i])), N) | |
finally: | |
support.unlink(support.TESTFN) | |
def test_misbehaved_io(self): | |
rawio = self.MisbehavedRawIO() | |
bufio = self.tp(rawio, 5) | |
self.assertRaises(IOError, bufio.seek, 0) | |
self.assertRaises(IOError, bufio.tell) | |
self.assertRaises(IOError, bufio.write, b"abcdef") | |
def test_max_buffer_size_deprecation(self): | |
with support.check_warnings(("max_buffer_size is deprecated", | |
DeprecationWarning)): | |
self.tp(self.MockRawIO(), 8, 12) | |
class CBufferedWriterTest(BufferedWriterTest): | |
tp = io.BufferedWriter | |
def test_constructor(self): | |
BufferedWriterTest.test_constructor(self) | |
# The allocation can succeed on 32-bit builds, e.g. with more | |
# than 2GB RAM and a 64-bit kernel. | |
if sys.maxsize > 0x7FFFFFFF: | |
rawio = self.MockRawIO() | |
bufio = self.tp(rawio) | |
self.assertRaises((OverflowError, MemoryError, ValueError), | |
bufio.__init__, rawio, sys.maxsize) | |
def test_initialization(self): | |
rawio = self.MockRawIO() | |
bufio = self.tp(rawio) | |
self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) | |
self.assertRaises(ValueError, bufio.write, b"def") | |
self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) | |
self.assertRaises(ValueError, bufio.write, b"def") | |
self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) | |
self.assertRaises(ValueError, bufio.write, b"def") | |
def test_garbage_collection(self): | |
# C BufferedWriter objects are collected, and collecting them flushes | |
# all data to disk. | |
# The Python version has __del__, so it ends into gc.garbage instead | |
rawio = self.FileIO(support.TESTFN, "w+b") | |
f = self.tp(rawio) | |
f.write(b"123xxx") | |
f.x = f | |
wr = weakref.ref(f) | |
del f | |
support.gc_collect() | |
self.assertTrue(wr() is None, wr) | |
with self.open(support.TESTFN, "rb") as f: | |
self.assertEqual(f.read(), b"123xxx") | |
class PyBufferedWriterTest(BufferedWriterTest): | |
tp = pyio.BufferedWriter | |
class BufferedRWPairTest(unittest.TestCase): | |
def test_constructor(self): | |
pair = self.tp(self.MockRawIO(), self.MockRawIO()) | |
self.assertFalse(pair.closed) | |
def test_detach(self): | |
pair = self.tp(self.MockRawIO(), self.MockRawIO()) | |
self.assertRaises(self.UnsupportedOperation, pair.detach) | |
def test_constructor_max_buffer_size_deprecation(self): | |
with support.check_warnings(("max_buffer_size is deprecated", | |
DeprecationWarning)): | |
self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12) | |
def test_constructor_with_not_readable(self): | |
class NotReadable(MockRawIO): | |
def readable(self): | |
return False | |
self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO()) | |
def test_constructor_with_not_writeable(self): | |
class NotWriteable(MockRawIO): | |
def writable(self): | |
return False | |
self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable()) | |
def test_read(self): | |
pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) | |
self.assertEqual(pair.read(3), b"abc") | |
self.assertEqual(pair.read(1), b"d") | |
self.assertEqual(pair.read(), b"ef") | |
pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO()) | |
self.assertEqual(pair.read(None), b"abc") | |
def test_readlines(self): | |
pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO()) | |
self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"]) | |
self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"]) | |
self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"]) | |
def test_read1(self): | |
# .read1() is delegated to the underlying reader object, so this test | |
# can be shallow. | |
pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) | |
self.assertEqual(pair.read1(3), b"abc") | |
def test_readinto(self): | |
pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) | |
data = bytearray(5) | |
self.assertEqual(pair.readinto(data), 5) | |
self.assertEqual(data, b"abcde") | |
def test_write(self): | |
w = self.MockRawIO() | |
pair = self.tp(self.MockRawIO(), w) | |
pair.write(b"abc") | |
pair.flush() | |
pair.write(b"def") | |
pair.flush() | |
self.assertEqual(w._write_stack, [b"abc", b"def"]) | |
def test_peek(self): | |
pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) | |
self.assertTrue(pair.peek(3).startswith(b"abc")) | |
self.assertEqual(pair.read(3), b"abc") | |
def test_readable(self): | |
pair = self.tp(self.MockRawIO(), self.MockRawIO()) | |
self.assertTrue(pair.readable()) | |
def test_writeable(self): | |
pair = self.tp(self.MockRawIO(), self.MockRawIO()) | |
self.assertTrue(pair.writable()) | |
def test_seekable(self): | |
# BufferedRWPairs are never seekable, even if their readers and writers | |
# are. | |
pair = self.tp(self.MockRawIO(), self.MockRawIO()) | |
self.assertFalse(pair.seekable()) | |
# .flush() is delegated to the underlying writer object and has been | |
# tested in the test_write method. | |
def test_close_and_closed(self): | |
pair = self.tp(self.MockRawIO(), self.MockRawIO()) | |
self.assertFalse(pair.closed) | |
pair.close() | |
self.assertTrue(pair.closed) | |
def test_isatty(self): | |
class SelectableIsAtty(MockRawIO): | |
def __init__(self, isatty): | |
MockRawIO.__init__(self) | |
self._isatty = isatty | |
def isatty(self): | |
return self._isatty | |
pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False)) | |
self.assertFalse(pair.isatty()) | |
pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False)) | |
self.assertTrue(pair.isatty()) | |
pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True)) | |
self.assertTrue(pair.isatty()) | |
pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True)) | |
self.assertTrue(pair.isatty()) | |
class CBufferedRWPairTest(BufferedRWPairTest): | |
tp = io.BufferedRWPair | |
class PyBufferedRWPairTest(BufferedRWPairTest): | |
tp = pyio.BufferedRWPair | |
class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest): | |
read_mode = "rb+" | |
write_mode = "wb+" | |
def test_constructor(self): | |
BufferedReaderTest.test_constructor(self) | |
BufferedWriterTest.test_constructor(self) | |
def test_read_and_write(self): | |
raw = self.MockRawIO((b"asdf", b"ghjk")) | |
rw = self.tp(raw, 8) | |
self.assertEqual(b"as", rw.read(2)) | |
rw.write(b"ddd") | |
rw.write(b"eee") | |
self.assertFalse(raw._write_stack) # Buffer writes | |
self.assertEqual(b"ghjk", rw.read()) | |
self.assertEqual(b"dddeee", raw._write_stack[0]) | |
def test_seek_and_tell(self): | |
raw = self.BytesIO(b"asdfghjkl") | |
rw = self.tp(raw) | |
self.assertEqual(b"as", rw.read(2)) | |
self.assertEqual(2, rw.tell()) | |
rw.seek(0, 0) | |
self.assertEqual(b"asdf", rw.read(4)) | |
rw.write(b"asdf") | |
rw.seek(0, 0) | |
self.assertEqual(b"asdfasdfl", rw.read()) | |
self.assertEqual(9, rw.tell()) | |
rw.seek(-4, 2) | |
self.assertEqual(5, rw.tell()) | |
rw.seek(2, 1) | |
self.assertEqual(7, rw.tell()) | |
self.assertEqual(b"fl", rw.read(11)) | |
self.assertRaises(TypeError, rw.seek, 0.0) | |
def check_flush_and_read(self, read_func): | |
raw = self.BytesIO(b"abcdefghi") | |
bufio = self.tp(raw) | |
self.assertEqual(b"ab", read_func(bufio, 2)) | |
bufio.write(b"12") | |
self.assertEqual(b"ef", read_func(bufio, 2)) | |
self.assertEqual(6, bufio.tell()) | |
bufio.flush() | |
self.assertEqual(6, bufio.tell()) | |
self.assertEqual(b"ghi", read_func(bufio)) | |
raw.seek(0, 0) | |
raw.write(b"XYZ") | |
# flush() resets the read buffer | |
bufio.flush() | |
bufio.seek(0, 0) | |
self.assertEqual(b"XYZ", read_func(bufio, 3)) | |
def test_flush_and_read(self): | |
self.check_flush_and_read(lambda bufio, *args: bufio.read(*args)) | |
def test_flush_and_readinto(self): | |
def _readinto(bufio, n=-1): | |
b = bytearray(n if n >= 0 else 9999) | |
n = bufio.readinto(b) | |
return bytes(b[:n]) | |
self.check_flush_and_read(_readinto) | |
def test_flush_and_peek(self): | |
def _peek(bufio, n=-1): | |
# This relies on the fact that the buffer can contain the whole | |
# raw stream, otherwise peek() can return less. | |
b = bufio.peek(n) | |
if n != -1: | |
b = b[:n] | |
bufio.seek(len(b), 1) | |
return b | |
self.check_flush_and_read(_peek) | |
def test_flush_and_write(self): | |
raw = self.BytesIO(b"abcdefghi") | |
bufio = self.tp(raw) | |
bufio.write(b"123") | |
bufio.flush() | |
bufio.write(b"45") | |
bufio.flush() | |
bufio.seek(0, 0) | |
self.assertEqual(b"12345fghi", raw.getvalue()) | |
self.assertEqual(b"12345fghi", bufio.read()) | |
def test_threads(self): | |
BufferedReaderTest.test_threads(self) | |
BufferedWriterTest.test_threads(self) | |
def test_writes_and_peek(self): | |
def _peek(bufio): | |
bufio.peek(1) | |
self.check_writes(_peek) | |
def _peek(bufio): | |
pos = bufio.tell() | |
bufio.seek(-1, 1) | |
bufio.peek(1) | |
bufio.seek(pos, 0) | |
self.check_writes(_peek) | |
def test_writes_and_reads(self): | |
def _read(bufio): | |
bufio.seek(-1, 1) | |
bufio.read(1) | |
self.check_writes(_read) | |
def test_writes_and_read1s(self): | |
def _read1(bufio): | |
bufio.seek(-1, 1) | |
bufio.read1(1) | |
self.check_writes(_read1) | |
def test_writes_and_readintos(self): | |
def _read(bufio): | |
bufio.seek(-1, 1) | |
bufio.readinto(bytearray(1)) | |
self.check_writes(_read) | |
def test_write_after_readahead(self): | |
# Issue #6629: writing after the buffer was filled by readahead should | |
# first rewind the raw stream. | |
for overwrite_size in [1, 5]: | |
raw = self.BytesIO(b"A" * 10) | |
bufio = self.tp(raw, 4) | |
# Trigger readahead | |
self.assertEqual(bufio.read(1), b"A") | |
self.assertEqual(bufio.tell(), 1) | |
# Overwriting should rewind the raw stream if it needs so | |
bufio.write(b"B" * overwrite_size) | |
self.assertEqual(bufio.tell(), overwrite_size + 1) | |
# If the write size was smaller than the buffer size, flush() and | |
# check that rewind happens. | |
bufio.flush() | |
self.assertEqual(bufio.tell(), overwrite_size + 1) | |
s = raw.getvalue() | |
self.assertEqual(s, | |
b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size)) | |
def test_write_rewind_write(self): | |
# Various combinations of reading / writing / seeking backwards / writing again | |
def mutate(bufio, pos1, pos2): | |
assert pos2 >= pos1 | |
# Fill the buffer | |
bufio.seek(pos1) | |
bufio.read(pos2 - pos1) | |
bufio.write(b'\x02') | |
# This writes earlier than the previous write, but still inside | |
# the buffer. | |
bufio.seek(pos1) | |
bufio.write(b'\x01') | |
b = b"\x80\x81\x82\x83\x84" | |
for i in range(0, len(b)): | |
for j in range(i, len(b)): | |
raw = self.BytesIO(b) | |
bufio = self.tp(raw, 100) | |
mutate(bufio, i, j) | |
bufio.flush() | |
expected = bytearray(b) | |
expected[j] = 2 | |
expected[i] = 1 | |
self.assertEqual(raw.getvalue(), expected, | |
"failed result for i=%d, j=%d" % (i, j)) | |
def test_truncate_after_read_or_write(self): | |
raw = self.BytesIO(b"A" * 10) | |
bufio = self.tp(raw, 100) | |
self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled | |
self.assertEqual(bufio.truncate(), 2) | |
self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases | |
self.assertEqual(bufio.truncate(), 4) | |
def test_misbehaved_io(self): | |
BufferedReaderTest.test_misbehaved_io(self) | |
BufferedWriterTest.test_misbehaved_io(self) | |
class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest, BufferedRandomTest): | |
tp = io.BufferedRandom | |
def test_constructor(self): | |
BufferedRandomTest.test_constructor(self) | |
# The allocation can succeed on 32-bit builds, e.g. with more | |
# than 2GB RAM and a 64-bit kernel. | |
if sys.maxsize > 0x7FFFFFFF: | |
rawio = self.MockRawIO() | |
bufio = self.tp(rawio) | |
self.assertRaises((OverflowError, MemoryError, ValueError), | |
bufio.__init__, rawio, sys.maxsize) | |
def test_garbage_collection(self): | |
CBufferedReaderTest.test_garbage_collection(self) | |
CBufferedWriterTest.test_garbage_collection(self) | |
class PyBufferedRandomTest(BufferedRandomTest): | |
tp = pyio.BufferedRandom | |
# To fully exercise seek/tell, the StatefulIncrementalDecoder has these | |
# properties: | |
# - A single output character can correspond to many bytes of input. | |
# - The number of input bytes to complete the character can be | |
# undetermined until the last input byte is received. | |
# - The number of input bytes can vary depending on previous input. | |
# - A single input byte can correspond to many characters of output. | |
# - The number of output characters can be undetermined until the | |
# last input byte is received. | |
# - The number of output characters can vary depending on previous input. | |
class StatefulIncrementalDecoder(codecs.IncrementalDecoder): | |
""" | |
For testing seek/tell behavior with a stateful, buffering decoder. | |
Input is a sequence of words. Words may be fixed-length (length set | |
by input) or variable-length (period-terminated). In variable-length | |
mode, extra periods are ignored. Possible words are: | |
- 'i' followed by a number sets the input length, I (maximum 99). | |
When I is set to 0, words are space-terminated. | |
- 'o' followed by a number sets the output length, O (maximum 99). | |
- Any other word is converted into a word followed by a period on | |
the output. The output word consists of the input word truncated | |
or padded out with hyphens to make its length equal to O. If O | |
is 0, the word is output verbatim without truncating or padding. | |
I and O are initially set to 1. When I changes, any buffered input is | |
re-scanned according to the new I. EOF also terminates the last word. | |
""" | |
def __init__(self, errors='strict'): | |
codecs.IncrementalDecoder.__init__(self, errors) | |
self.reset() | |
def __repr__(self): | |
return '<SID %x>' % id(self) | |
def reset(self): | |
self.i = 1 | |
self.o = 1 | |
self.buffer = bytearray() | |
def getstate(self): | |
i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset() | |
return bytes(self.buffer), i*100 + o | |
def setstate(self, state): | |
buffer, io = state | |
self.buffer = bytearray(buffer) | |
i, o = divmod(io, 100) | |
self.i, self.o = i ^ 1, o ^ 1 | |
def decode(self, input, final=False): | |
output = '' | |
for b in input: | |
if self.i == 0: # variable-length, terminated with period | |
if b == '.': | |
if self.buffer: | |
output += self.process_word() | |
else: | |
self.buffer.append(b) | |
else: # fixed-length, terminate after self.i bytes | |
self.buffer.append(b) | |
if len(self.buffer) == self.i: | |
output += self.process_word() | |
if final and self.buffer: # EOF terminates the last word | |
output += self.process_word() | |
return output | |
def process_word(self): | |
output = '' | |
if self.buffer[0] == ord('i'): | |
self.i = min(99, int(self.buffer[1:] or 0)) # set input length | |
elif self.buffer[0] == ord('o'): | |
self.o = min(99, int(self.buffer[1:] or 0)) # set output length | |
else: | |
output = self.buffer.decode('ascii') | |
if len(output) < self.o: | |
output += '-'*self.o # pad out with hyphens | |
if self.o: | |
output = output[:self.o] # truncate to output length | |
output += '.' | |
self.buffer = bytearray() | |
return output | |
codecEnabled = False | |
@classmethod | |
def lookupTestDecoder(cls, name): | |
if cls.codecEnabled and name == 'test_decoder': | |
latin1 = codecs.lookup('latin-1') | |
return codecs.CodecInfo( | |
name='test_decoder', encode=latin1.encode, decode=None, | |
incrementalencoder=None, | |
streamreader=None, streamwriter=None, | |
incrementaldecoder=cls) | |
# Register the previous decoder for testing. | |
# Disabled by default, tests will enable it. | |
codecs.register(StatefulIncrementalDecoder.lookupTestDecoder) | |
class StatefulIncrementalDecoderTest(unittest.TestCase): | |
""" | |
Make sure the StatefulIncrementalDecoder actually works. | |
""" | |
test_cases = [ | |
# I=1, O=1 (fixed-length input == fixed-length output) | |
(b'abcd', False, 'a.b.c.d.'), | |
# I=0, O=0 (variable-length input, variable-length output) | |
(b'oiabcd', True, 'abcd.'), | |
# I=0, O=0 (should ignore extra periods) | |
(b'oi...abcd...', True, 'abcd.'), | |
# I=0, O=6 (variable-length input, fixed-length output) | |
(b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'), | |
# I=2, O=6 (fixed-length input < fixed-length output) | |
(b'i.i2.o6xyz', True, 'xy----.z-----.'), | |
# I=6, O=3 (fixed-length input > fixed-length output) | |
(b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'), | |
# I=0, then 3; O=29, then 15 (with longer output) | |
(b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True, | |
'a----------------------------.' + | |
'b----------------------------.' + | |
'cde--------------------------.' + | |
'abcdefghijabcde.' + | |
'a.b------------.' + | |
'.c.------------.' + | |
'd.e------------.' + | |
'k--------------.' + | |
'l--------------.' + | |
'm--------------.') | |
] | |
def test_decoder(self): | |
# Try a few one-shot test cases. | |
for input, eof, output in self.test_cases: | |
d = StatefulIncrementalDecoder() | |
self.assertEqual(d.decode(input, eof), output) | |
# Also test an unfinished decode, followed by forcing EOF. | |
d = StatefulIncrementalDecoder() | |
self.assertEqual(d.decode(b'oiabcd'), '') | |
self.assertEqual(d.decode(b'', 1), 'abcd.') | |
class TextIOWrapperTest(unittest.TestCase): | |
def setUp(self): | |
self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n" | |
self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii") | |
support.unlink(support.TESTFN) | |
def tearDown(self): | |
support.unlink(support.TESTFN) | |
def test_constructor(self): | |
r = self.BytesIO(b"\xc3\xa9\n\n") | |
b = self.BufferedReader(r, 1000) | |
t = self.TextIOWrapper(b) | |
t.__init__(b, encoding="latin1", newline="\r\n") | |
self.assertEqual(t.encoding, "latin1") | |
self.assertEqual(t.line_buffering, False) | |
t.__init__(b, encoding="utf8", line_buffering=True) | |
self.assertEqual(t.encoding, "utf8") | |
self.assertEqual(t.line_buffering, True) | |
self.assertEqual("\xe9\n", t.readline()) | |
self.assertRaises(TypeError, t.__init__, b, newline=42) | |
self.assertRaises(ValueError, t.__init__, b, newline='xyzzy') | |
def test_detach(self): | |
r = self.BytesIO() | |
b = self.BufferedWriter(r) | |
t = self.TextIOWrapper(b) | |
self.assertIs(t.detach(), b) | |
t = self.TextIOWrapper(b, encoding="ascii") | |
t.write("howdy") | |
self.assertFalse(r.getvalue()) | |
t.detach() | |
self.assertEqual(r.getvalue(), b"howdy") | |
self.assertRaises(ValueError, t.detach) | |
def test_repr(self): | |
raw = self.BytesIO("hello".encode("utf-8")) | |
b = self.BufferedReader(raw) | |
t = self.TextIOWrapper(b, encoding="utf-8") | |
modname = self.TextIOWrapper.__module__ | |
self.assertEqual(repr(t), | |
"<%s.TextIOWrapper encoding='utf-8'>" % modname) | |
raw.name = "dummy" | |
self.assertEqual(repr(t), | |
"<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname) | |
raw.name = b"dummy" | |
self.assertEqual(repr(t), | |
"<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname) | |
def test_line_buffering(self): | |
r = self.BytesIO() | |
b = self.BufferedWriter(r, 1000) | |
t = self.TextIOWrapper(b, newline="\n", line_buffering=True) | |
t.write("X") | |
self.assertEqual(r.getvalue(), b"") # No flush happened | |
t.write("Y\nZ") | |
self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed | |
t.write("A\rB") | |
self.assertEqual(r.getvalue(), b"XY\nZA\rB") | |
def test_encoding(self): | |
# Check the encoding attribute is always set, and valid | |
b = self.BytesIO() | |
t = self.TextIOWrapper(b, encoding="utf8") | |
self.assertEqual(t.encoding, "utf8") | |
t = self.TextIOWrapper(b) | |
self.assertTrue(t.encoding is not None) | |
codecs.lookup(t.encoding) | |
def test_encoding_errors_reading(self): | |
# (1) default | |
b = self.BytesIO(b"abc\n\xff\n") | |
t = self.TextIOWrapper(b, encoding="ascii") | |
self.assertRaises(UnicodeError, t.read) | |
# (2) explicit strict | |
b = self.BytesIO(b"abc\n\xff\n") | |
t = self.TextIOWrapper(b, encoding="ascii", errors="strict") | |
self.assertRaises(UnicodeError, t.read) | |
# (3) ignore | |
b = self.BytesIO(b"abc\n\xff\n") | |
t = self.TextIOWrapper(b, encoding="ascii", errors="ignore") | |
self.assertEqual(t.read(), "abc\n\n") | |
# (4) replace | |
b = self.BytesIO(b"abc\n\xff\n") | |
t = self.TextIOWrapper(b, encoding="ascii", errors="replace") | |
self.assertEqual(t.read(), "abc\n\ufffd\n") | |
def test_encoding_errors_writing(self): | |
# (1) default | |
b = self.BytesIO() | |
t = self.TextIOWrapper(b, encoding="ascii") | |
self.assertRaises(UnicodeError, t.write, "\xff") | |
# (2) explicit strict | |
b = self.BytesIO() | |
t = self.TextIOWrapper(b, encoding="ascii", errors="strict") | |
self.assertRaises(UnicodeError, t.write, "\xff") | |
# (3) ignore | |
b = self.BytesIO() | |
t = self.TextIOWrapper(b, encoding="ascii", errors="ignore", | |
newline="\n") | |
t.write("abc\xffdef\n") | |
t.flush() | |
self.assertEqual(b.getvalue(), b"abcdef\n") | |
# (4) replace | |
b = self.BytesIO() | |
t = self.TextIOWrapper(b, encoding="ascii", errors="replace", | |
newline="\n") | |
t.write("abc\xffdef\n") | |
t.flush() | |
self.assertEqual(b.getvalue(), b"abc?def\n") | |
def test_newlines(self): | |
input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ] | |
tests = [ | |
[ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ], | |
[ '', input_lines ], | |
[ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ], | |
[ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ], | |
[ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ], | |
] | |
encodings = ( | |
'utf-8', 'latin-1', | |
'utf-16', 'utf-16-le', 'utf-16-be', | |
'utf-32', 'utf-32-le', 'utf-32-be', | |
) | |
# Try a range of buffer sizes to test the case where \r is the last | |
# character in TextIOWrapper._pending_line. | |
for encoding in encodings: | |
# XXX: str.encode() should return bytes | |
data = bytes(''.join(input_lines).encode(encoding)) | |
for do_reads in (False, True): | |
for bufsize in range(1, 10): | |
for newline, exp_lines in tests: | |
bufio = self.BufferedReader(self.BytesIO(data), bufsize) | |
textio = self.TextIOWrapper(bufio, newline=newline, | |
encoding=encoding) | |
if do_reads: | |
got_lines = [] | |
while True: | |
c2 = textio.read(2) | |
if c2 == '': | |
break | |
self.assertEqual(len(c2), 2) | |
got_lines.append(c2 + textio.readline()) | |
else: | |
got_lines = list(textio) | |
for got_line, exp_line in zip(got_lines, exp_lines): | |
self.assertEqual(got_line, exp_line) | |
self.assertEqual(len(got_lines), len(exp_lines)) | |
def test_newlines_input(self): | |
testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG" | |
normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n") | |
for newline, expected in [ | |
(None, normalized.decode("ascii").splitlines(True)), | |
("", testdata.decode("ascii").splitlines(True)), | |
("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), | |
("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), | |
("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]), | |
]: | |
buf = self.BytesIO(testdata) | |
txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) | |
self.assertEqual(txt.readlines(), expected) | |
txt.seek(0) | |
self.assertEqual(txt.read(), "".join(expected)) | |
def test_newlines_output(self): | |
testdict = { | |
"": b"AAA\nBBB\nCCC\nX\rY\r\nZ", | |
"\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ", | |
"\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ", | |
"\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ", | |
} | |
tests = [(None, testdict[os.linesep])] + sorted(testdict.items()) | |
for newline, expected in tests: | |
buf = self.BytesIO() | |
txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) | |
txt.write("AAA\nB") | |
txt.write("BB\nCCC\n") | |
txt.write("X\rY\r\nZ") | |
txt.flush() | |
self.assertEqual(buf.closed, False) | |
self.assertEqual(buf.getvalue(), expected) | |
def test_destructor(self): | |
l = [] | |
base = self.BytesIO | |
class MyBytesIO(base): | |
def close(self): | |
l.append(self.getvalue()) | |
base.close(self) | |
b = MyBytesIO() | |
t = self.TextIOWrapper(b, encoding="ascii") | |
t.write("abc") | |
del t | |
support.gc_collect() | |
self.assertEqual([b"abc"], l) | |
def test_override_destructor(self): | |
record = [] | |
class MyTextIO(self.TextIOWrapper): | |
def __del__(self): | |
record.append(1) | |
try: | |
f = super(MyTextIO, self).__del__ | |
except AttributeError: | |
pass | |
else: | |
f() | |
def close(self): | |
record.append(2) | |
super(MyTextIO, self).close() | |
def flush(self): | |
record.append(3) | |
super(MyTextIO, self).flush() | |
b = self.BytesIO() | |
t = MyTextIO(b, encoding="ascii") | |
del t | |
support.gc_collect() | |
self.assertEqual(record, [1, 2, 3]) | |
def test_error_through_destructor(self): | |
# Test that the exception state is not modified by a destructor, | |
# even if close() fails. | |
rawio = self.CloseFailureIO() | |
def f(): | |
self.TextIOWrapper(rawio).xyzzy | |
with support.captured_output("stderr") as s: | |
self.assertRaises(AttributeError, f) | |
s = s.getvalue().strip() | |
if s: | |
# The destructor *may* have printed an unraisable error, check it | |
self.assertEqual(len(s.splitlines()), 1) | |
self.assertTrue(s.startswith("Exception IOError: "), s) | |
self.assertTrue(s.endswith(" ignored"), s) | |
# Systematic tests of the text I/O API | |
def test_basic_io(self): | |
for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65): | |
for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le": | |
f = self.open(support.TESTFN, "w+", encoding=enc) | |
f._CHUNK_SIZE = chunksize | |
self.assertEqual(f.write("abc"), 3) | |
f.close() | |
f = self.open(support.TESTFN, "r+", encoding=enc) | |
f._CHUNK_SIZE = chunksize | |
self.assertEqual(f.tell(), 0) | |
self.assertEqual(f.read(), "abc") | |
cookie = f.tell() | |
self.assertEqual(f.seek(0), 0) | |
self.assertEqual(f.read(None), "abc") | |
f.seek(0) | |
self.assertEqual(f.read(2), "ab") | |
self.assertEqual(f.read(1), "c") | |
self.assertEqual(f.read(1), "") | |
self.assertEqual(f.read(), "") | |
self.assertEqual(f.tell(), cookie) | |
self.assertEqual(f.seek(0), 0) | |
self.assertEqual(f.seek(0, 2), cookie) | |
self.assertEqual(f.write("def"), 3) | |
self.assertEqual(f.seek(cookie), cookie) | |
self.assertEqual(f.read(), "def") | |
if enc.startswith("utf"): | |
self.multi_line_test(f, enc) | |
f.close() | |
def multi_line_test(self, f, enc): | |
f.seek(0) | |
f.truncate() | |
sample = "s\xff\u0fff\uffff" | |
wlines = [] | |
for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000): | |
chars = [] | |
for i in range(size): | |
chars.append(sample[i % len(sample)]) | |
line = "".join(chars) + "\n" | |
wlines.append((f.tell(), line)) | |
f.write(line) | |
f.seek(0) | |
rlines = [] | |
while True: | |
pos = f.tell() | |
line = f.readline() | |
if not line: | |
break | |
rlines.append((pos, line)) | |
self.assertEqual(rlines, wlines) | |
def test_telling(self): | |
f = self.open(support.TESTFN, "w+", encoding="utf8") | |
p0 = f.tell() | |
f.write("\xff\n") | |
p1 = f.tell() | |
f.write("\xff\n") | |
p2 = f.tell() | |
f.seek(0) | |
self.assertEqual(f.tell(), p0) | |
self.assertEqual(f.readline(), "\xff\n") | |
self.assertEqual(f.tell(), p1) | |
self.assertEqual(f.readline(), "\xff\n") | |
self.assertEqual(f.tell(), p2) | |
f.seek(0) | |
for line in f: | |
self.assertEqual(line, "\xff\n") | |
self.assertRaises(IOError, f.tell) | |
self.assertEqual(f.tell(), p2) | |
f.close() | |
def test_seeking(self): | |
chunk_size = _default_chunk_size() | |
prefix_size = chunk_size - 2 | |
u_prefix = "a" * prefix_size | |
prefix = bytes(u_prefix.encode("utf-8")) | |
self.assertEqual(len(u_prefix), len(prefix)) | |
u_suffix = "\u8888\n" | |
suffix = bytes(u_suffix.encode("utf-8")) | |
line = prefix + suffix | |
f = self.open(support.TESTFN, "wb") | |
f.write(line*2) | |
f.close() | |
f = self.open(support.TESTFN, "r", encoding="utf-8") | |
s = f.read(prefix_size) | |
self.assertEqual(s, prefix.decode("ascii")) | |
self.assertEqual(f.tell(), prefix_size) | |
self.assertEqual(f.readline(), u_suffix) | |
def test_seeking_too(self): | |
# Regression test for a specific bug | |
data = b'\xe0\xbf\xbf\n' | |
f = self.open(support.TESTFN, "wb") | |
f.write(data) | |
f.close() | |
f = self.open(support.TESTFN, "r", encoding="utf-8") | |
f._CHUNK_SIZE # Just test that it exists | |
f._CHUNK_SIZE = 2 | |
f.readline() | |
f.tell() | |
def test_seek_and_tell(self): | |
#Test seek/tell using the StatefulIncrementalDecoder. | |
# Make test faster by doing smaller seeks | |
CHUNK_SIZE = 128 | |
def test_seek_and_tell_with_data(data, min_pos=0): | |
"""Tell/seek to various points within a data stream and ensure | |
that the decoded data returned by read() is consistent.""" | |
f = self.open(support.TESTFN, 'wb') | |
f.write(data) | |
f.close() | |
f = self.open(support.TESTFN, encoding='test_decoder') | |
f._CHUNK_SIZE = CHUNK_SIZE | |
decoded = f.read() | |
f.close() | |
for i in range(min_pos, len(decoded) + 1): # seek positions | |
for j in [1, 5, len(decoded) - i]: # read lengths | |
f = self.open(support.TESTFN, encoding='test_decoder') | |
self.assertEqual(f.read(i), decoded[:i]) | |
cookie = f.tell() | |
self.assertEqual(f.read(j), decoded[i:i + j]) | |
f.seek(cookie) | |
self.assertEqual(f.read(), decoded[i:]) | |
f.close() | |
# Enable the test decoder. | |
StatefulIncrementalDecoder.codecEnabled = 1 | |
# Run the tests. | |
try: | |
# Try each test case. | |
for input, _, _ in StatefulIncrementalDecoderTest.test_cases: | |
test_seek_and_tell_with_data(input) | |
# Position each test case so that it crosses a chunk boundary. | |
for input, _, _ in StatefulIncrementalDecoderTest.test_cases: | |
offset = CHUNK_SIZE - len(input)//2 | |
prefix = b'.'*offset | |
# Don't bother seeking into the prefix (takes too long). | |
min_pos = offset*2 | |
test_seek_and_tell_with_data(prefix + input, min_pos) | |
# Ensure our test decoder won't interfere with subsequent tests. | |
finally: | |
StatefulIncrementalDecoder.codecEnabled = 0 | |
def test_encoded_writes(self): | |
data = "1234567890" | |
tests = ("utf-16", | |
"utf-16-le", | |
"utf-16-be", | |
"utf-32", | |
"utf-32-le", | |
"utf-32-be") | |
for encoding in tests: | |
buf = self.BytesIO() | |
f = self.TextIOWrapper(buf, encoding=encoding) | |
# Check if the BOM is written only once (see issue1753). | |
f.write(data) | |
f.write(data) | |
f.seek(0) | |
self.assertEqual(f.read(), data * 2) | |
f.seek(0) | |
self.assertEqual(f.read(), data * 2) | |
self.assertEqual(buf.getvalue(), (data * 2).encode(encoding)) | |
def test_unreadable(self): | |
class UnReadable(self.BytesIO): | |
def readable(self): | |
return False | |
txt = self.TextIOWrapper(UnReadable()) | |
self.assertRaises(IOError, txt.read) | |
def test_read_one_by_one(self): | |
txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB")) | |
reads = "" | |
while True: | |
c = txt.read(1) | |
if not c: | |
break | |
reads += c | |
self.assertEqual(reads, "AA\nBB") | |
def test_readlines(self): | |
txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC")) | |
self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"]) | |
txt.seek(0) | |
self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"]) | |
txt.seek(0) | |
self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"]) | |
# read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128. | |
def test_read_by_chunk(self): | |
# make sure "\r\n" straddles 128 char boundary. | |
txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB")) | |
reads = "" | |
while True: | |
c = txt.read(128) | |
if not c: | |
break | |
reads += c | |
self.assertEqual(reads, "A"*127+"\nB") | |
def test_issue1395_1(self): | |
txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") | |
# read one char at a time | |
reads = "" | |
while True: | |
c = txt.read(1) | |
if not c: | |
break | |
reads += c | |
self.assertEqual(reads, self.normalized) | |
def test_issue1395_2(self): | |
txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") | |
txt._CHUNK_SIZE = 4 | |
reads = "" | |
while True: | |
c = txt.read(4) | |
if not c: | |
break | |
reads += c | |
self.assertEqual(reads, self.normalized) | |
def test_issue1395_3(self): | |
txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") | |
txt._CHUNK_SIZE = 4 | |
reads = txt.read(4) | |
reads += txt.read(4) | |
reads += txt.readline() | |
reads += txt.readline() | |
reads += txt.readline() | |
self.assertEqual(reads, self.normalized) | |
def test_issue1395_4(self): | |
txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") | |
txt._CHUNK_SIZE = 4 | |
reads = txt.read(4) | |
reads += txt.read() | |
self.assertEqual(reads, self.normalized) | |
def test_issue1395_5(self): | |
txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") | |
txt._CHUNK_SIZE = 4 | |
reads = txt.read(4) | |
pos = txt.tell() | |
txt.seek(0) | |
txt.seek(pos) | |
self.assertEqual(txt.read(4), "BBB\n") | |
def test_issue2282(self): | |
buffer = self.BytesIO(self.testdata) | |
txt = self.TextIOWrapper(buffer, encoding="ascii") | |
self.assertEqual(buffer.seekable(), txt.seekable()) | |
def test_append_bom(self): | |
# The BOM is not written again when appending to a non-empty file | |
filename = support.TESTFN | |
for charset in ('utf-8-sig', 'utf-16', 'utf-32'): | |
with self.open(filename, 'w', encoding=charset) as f: | |
f.write('aaa') | |
pos = f.tell() | |
with self.open(filename, 'rb') as f: | |
self.assertEqual(f.read(), 'aaa'.encode(charset)) | |
with self.open(filename, 'a', encoding=charset) as f: | |
f.write('xxx') | |
with self.open(filename, 'rb') as f: | |
self.assertEqual(f.read(), 'aaaxxx'.encode(charset)) | |
def test_seek_bom(self): | |
# Same test, but when seeking manually | |
filename = support.TESTFN | |
for charset in ('utf-8-sig', 'utf-16', 'utf-32'): | |
with self.open(filename, 'w', encoding=charset) as f: | |
f.write('aaa') | |
pos = f.tell() | |
with self.open(filename, 'r+', encoding=charset) as f: | |
f.seek(pos) | |
f.write('zzz') | |
f.seek(0) | |
f.write('bbb') | |
with self.open(filename, 'rb') as f: | |
self.assertEqual(f.read(), 'bbbzzz'.encode(charset)) | |
def test_errors_property(self): | |
with self.open(support.TESTFN, "w") as f: | |
self.assertEqual(f.errors, "strict") | |
with self.open(support.TESTFN, "w", errors="replace") as f: | |
self.assertEqual(f.errors, "replace") | |
@unittest.skipUnless(threading, 'Threading required for this test.') | |
def test_threads_write(self): | |
# Issue6750: concurrent writes could duplicate data | |
event = threading.Event() | |
with self.open(support.TESTFN, "w", buffering=1) as f: | |
def run(n): | |
text = "Thread%03d\n" % n | |
event.wait() | |
f.write(text) | |
threads = [threading.Thread(target=lambda n=x: run(n)) | |
for x in range(20)] | |
for t in threads: | |
t.start() | |
time.sleep(0.02) | |
event.set() | |
for t in threads: | |
t.join() | |
with self.open(support.TESTFN) as f: | |
content = f.read() | |
for n in range(20): | |
self.assertEqual(content.count("Thread%03d\n" % n), 1) | |
def test_flush_error_on_close(self): | |
txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") | |
def bad_flush(): | |
raise IOError() | |
txt.flush = bad_flush | |
self.assertRaises(IOError, txt.close) # exception not swallowed | |
def test_multi_close(self): | |
txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") | |
txt.close() | |
txt.close() | |
txt.close() | |
self.assertRaises(ValueError, txt.flush) | |
def test_readonly_attributes(self): | |
txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") | |
buf = self.BytesIO(self.testdata) | |
with self.assertRaises((AttributeError, TypeError)): | |
txt.buffer = buf | |
class CTextIOWrapperTest(TextIOWrapperTest): | |
def test_initialization(self): | |
r = self.BytesIO(b"\xc3\xa9\n\n") | |
b = self.BufferedReader(r, 1000) | |
t = self.TextIOWrapper(b) | |
self.assertRaises(TypeError, t.__init__, b, newline=42) | |
self.assertRaises(ValueError, t.read) | |
self.assertRaises(ValueError, t.__init__, b, newline='xyzzy') | |
self.assertRaises(ValueError, t.read) | |
def test_garbage_collection(self): | |
# C TextIOWrapper objects are collected, and collecting them flushes | |
# all data to disk. | |
# The Python version has __del__, so it ends in gc.garbage instead. | |
rawio = io.FileIO(support.TESTFN, "wb") | |
b = self.BufferedWriter(rawio) | |
t = self.TextIOWrapper(b, encoding="ascii") | |
t.write("456def") | |
t.x = t | |
wr = weakref.ref(t) | |
del t | |
support.gc_collect() | |
self.assertTrue(wr() is None, wr) | |
with self.open(support.TESTFN, "rb") as f: | |
self.assertEqual(f.read(), b"456def") | |
class PyTextIOWrapperTest(TextIOWrapperTest): | |
pass | |
class IncrementalNewlineDecoderTest(unittest.TestCase): | |
def check_newline_decoding_utf8(self, decoder): | |
# UTF-8 specific tests for a newline decoder | |
def _check_decode(b, s, **kwargs): | |
# We exercise getstate() / setstate() as well as decode() | |
state = decoder.getstate() | |
self.assertEqual(decoder.decode(b, **kwargs), s) | |
decoder.setstate(state) | |
self.assertEqual(decoder.decode(b, **kwargs), s) | |
_check_decode(b'\xe8\xa2\x88', "\u8888") | |
_check_decode(b'\xe8', "") | |
_check_decode(b'\xa2', "") | |
_check_decode(b'\x88', "\u8888") | |
_check_decode(b'\xe8', "") | |
_check_decode(b'\xa2', "") | |
_check_decode(b'\x88', "\u8888") | |
_check_decode(b'\xe8', "") | |
self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True) | |
decoder.reset() | |
_check_decode(b'\n', "\n") | |
_check_decode(b'\r', "") | |
_check_decode(b'', "\n", final=True) | |
_check_decode(b'\r', "\n", final=True) | |
_check_decode(b'\r', "") | |
_check_decode(b'a', "\na") | |
_check_decode(b'\r\r\n', "\n\n") | |
_check_decode(b'\r', "") | |
_check_decode(b'\r', "\n") | |
_check_decode(b'\na', "\na") | |
_check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n") | |
_check_decode(b'\xe8\xa2\x88', "\u8888") | |
_check_decode(b'\n', "\n") | |
_check_decode(b'\xe8\xa2\x88\r', "\u8888") | |
_check_decode(b'\n', "\n") | |
def check_newline_decoding(self, decoder, encoding): | |
result = [] | |
if encoding is not None: | |
encoder = codecs.getincrementalencoder(encoding)() | |
def _decode_bytewise(s): | |
# Decode one byte at a time | |
for b in encoder.encode(s): | |
result.append(decoder.decode(b)) | |
else: | |
encoder = None | |
def _decode_bytewise(s): | |
# Decode one char at a time | |
for c in s: | |
result.append(decoder.decode(c)) | |
self.assertEqual(decoder.newlines, None) | |
_decode_bytewise("abc\n\r") | |
self.assertEqual(decoder.newlines, '\n') | |
_decode_bytewise("\nabc") | |
self.assertEqual(decoder.newlines, ('\n', '\r\n')) | |
_decode_bytewise("abc\r") | |
self.assertEqual(decoder.newlines, ('\n', '\r\n')) | |
_decode_bytewise("abc") | |
self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n')) | |
_decode_bytewise("abc\r") | |
self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc") | |
decoder.reset() | |
input = "abc" | |
if encoder is not None: | |
encoder.reset() | |
input = encoder.encode(input) | |
self.assertEqual(decoder.decode(input), "abc") | |
self.assertEqual(decoder.newlines, None) | |
def test_newline_decoder(self): | |
encodings = ( | |
# None meaning the IncrementalNewlineDecoder takes unicode input | |
# rather than bytes input | |
None, 'utf-8', 'latin-1', | |
'utf-16', 'utf-16-le', 'utf-16-be', | |
'utf-32', 'utf-32-le', 'utf-32-be', | |
) | |
for enc in encodings: | |
decoder = enc and codecs.getincrementaldecoder(enc)() | |
decoder = self.IncrementalNewlineDecoder(decoder, translate=True) | |
self.check_newline_decoding(decoder, enc) | |
decoder = codecs.getincrementaldecoder("utf-8")() | |
decoder = self.IncrementalNewlineDecoder(decoder, translate=True) | |
self.check_newline_decoding_utf8(decoder) | |
def test_newline_bytes(self): | |
# Issue 5433: Excessive optimization in IncrementalNewlineDecoder | |
def _check(dec): | |
self.assertEqual(dec.newlines, None) | |
self.assertEqual(dec.decode("\u0D00"), "\u0D00") | |
self.assertEqual(dec.newlines, None) | |
self.assertEqual(dec.decode("\u0A00"), "\u0A00") | |
self.assertEqual(dec.newlines, None) | |
dec = self.IncrementalNewlineDecoder(None, translate=False) | |
_check(dec) | |
dec = self.IncrementalNewlineDecoder(None, translate=True) | |
_check(dec) | |
class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest): | |
pass | |
class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest): | |
pass | |
# XXX Tests for open() | |
class MiscIOTest(unittest.TestCase): | |
def tearDown(self): | |
support.unlink(support.TESTFN) | |
def test___all__(self): | |
for name in self.io.__all__: | |
obj = getattr(self.io, name, None) | |
self.assertTrue(obj is not None, name) | |
if name == "open": | |
continue | |
elif "error" in name.lower() or name == "UnsupportedOperation": | |
self.assertTrue(issubclass(obj, Exception), name) | |
elif not name.startswith("SEEK_"): | |
self.assertTrue(issubclass(obj, self.IOBase)) | |
def test_attributes(self): | |
f = self.open(support.TESTFN, "wb", buffering=0) | |
self.assertEqual(f.mode, "wb") | |
f.close() | |
f = self.open(support.TESTFN, "U") | |
self.assertEqual(f.name, support.TESTFN) | |
self.assertEqual(f.buffer.name, support.TESTFN) | |
self.assertEqual(f.buffer.raw.name, support.TESTFN) | |
self.assertEqual(f.mode, "U") | |
self.assertEqual(f.buffer.mode, "rb") | |
self.assertEqual(f.buffer.raw.mode, "rb") | |
f.close() | |
f = self.open(support.TESTFN, "w+") | |
self.assertEqual(f.mode, "w+") | |
self.assertEqual(f.buffer.mode, "rb+") # Does it really matter? | |
self.assertEqual(f.buffer.raw.mode, "rb+") | |
g = self.open(f.fileno(), "wb", closefd=False) | |
self.assertEqual(g.mode, "wb") | |
self.assertEqual(g.raw.mode, "wb") | |
self.assertEqual(g.name, f.fileno()) | |
self.assertEqual(g.raw.name, f.fileno()) | |
f.close() | |
g.close() | |
def test_io_after_close(self): | |
for kwargs in [ | |
{"mode": "w"}, | |
{"mode": "wb"}, | |
{"mode": "w", "buffering": 1}, | |
{"mode": "w", "buffering": 2}, | |
{"mode": "wb", "buffering": 0}, | |
{"mode": "r"}, | |
{"mode": "rb"}, | |
{"mode": "r", "buffering": 1}, | |
{"mode": "r", "buffering": 2}, | |
{"mode": "rb", "buffering": 0}, | |
{"mode": "w+"}, | |
{"mode": "w+b"}, | |
{"mode": "w+", "buffering": 1}, | |
{"mode": "w+", "buffering": 2}, | |
{"mode": "w+b", "buffering": 0}, | |
]: | |
f = self.open(support.TESTFN, **kwargs) | |
f.close() | |
self.assertRaises(ValueError, f.flush) | |
self.assertRaises(ValueError, f.fileno) | |
self.assertRaises(ValueError, f.isatty) | |
self.assertRaises(ValueError, f.__iter__) | |
if hasattr(f, "peek"): | |
self.assertRaises(ValueError, f.peek, 1) | |
self.assertRaises(ValueError, f.read) | |
if hasattr(f, "read1"): | |
self.assertRaises(ValueError, f.read1, 1024) | |
if hasattr(f, "readall"): | |
self.assertRaises(ValueError, f.readall) | |
if hasattr(f, "readinto"): | |
self.assertRaises(ValueError, f.readinto, bytearray(1024)) | |
self.assertRaises(ValueError, f.readline) | |
self.assertRaises(ValueError, f.readlines) | |
self.assertRaises(ValueError, f.seek, 0) | |
self.assertRaises(ValueError, f.tell) | |
self.assertRaises(ValueError, f.truncate) | |
self.assertRaises(ValueError, f.write, | |
b"" if "b" in kwargs['mode'] else "") | |
self.assertRaises(ValueError, f.writelines, []) | |
self.assertRaises(ValueError, next, f) | |
def test_blockingioerror(self): | |
# Various BlockingIOError issues | |
self.assertRaises(TypeError, self.BlockingIOError) | |
self.assertRaises(TypeError, self.BlockingIOError, 1) | |
self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4) | |
self.assertRaises(TypeError, self.BlockingIOError, 1, "", None) | |
b = self.BlockingIOError(1, "") | |
self.assertEqual(b.characters_written, 0) | |
class C(unicode): | |
pass | |
c = C("") | |
b = self.BlockingIOError(1, c) | |
c.b = b | |
b.c = c | |
wr = weakref.ref(c) | |
del c, b | |
support.gc_collect() | |
self.assertTrue(wr() is None, wr) | |
def test_abcs(self): | |
# Test the visible base classes are ABCs. | |
self.assertIsInstance(self.IOBase, abc.ABCMeta) | |
self.assertIsInstance(self.RawIOBase, abc.ABCMeta) | |
self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta) | |
self.assertIsInstance(self.TextIOBase, abc.ABCMeta) | |
def _check_abc_inheritance(self, abcmodule): | |
with self.open(support.TESTFN, "wb", buffering=0) as f: | |
self.assertIsInstance(f, abcmodule.IOBase) | |
self.assertIsInstance(f, abcmodule.RawIOBase) | |
self.assertNotIsInstance(f, abcmodule.BufferedIOBase) | |
self.assertNotIsInstance(f, abcmodule.TextIOBase) | |
with self.open(support.TESTFN, "wb") as f: | |
self.assertIsInstance(f, abcmodule.IOBase) | |
self.assertNotIsInstance(f, abcmodule.RawIOBase) | |
self.assertIsInstance(f, abcmodule.BufferedIOBase) | |
self.assertNotIsInstance(f, abcmodule.TextIOBase) | |
with self.open(support.TESTFN, "w") as f: | |
self.assertIsInstance(f, abcmodule.IOBase) | |
self.assertNotIsInstance(f, abcmodule.RawIOBase) | |
self.assertNotIsInstance(f, abcmodule.BufferedIOBase) | |
self.assertIsInstance(f, abcmodule.TextIOBase) | |
def test_abc_inheritance(self): | |
# Test implementations inherit from their respective ABCs | |
self._check_abc_inheritance(self) | |
def test_abc_inheritance_official(self): | |
# Test implementations inherit from the official ABCs of the | |
# baseline "io" module. | |
self._check_abc_inheritance(io) | |
class CMiscIOTest(MiscIOTest): | |
io = io | |
class PyMiscIOTest(MiscIOTest): | |
io = pyio | |
@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.') | |
class SignalsTest(unittest.TestCase): | |
def setUp(self): | |
self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt) | |
def tearDown(self): | |
signal.signal(signal.SIGALRM, self.oldalrm) | |
def alarm_interrupt(self, sig, frame): | |
1 // 0 | |
@unittest.skipUnless(threading, 'Threading required for this test.') | |
def check_interrupted_write(self, item, bytes, **fdopen_kwargs): | |
"""Check that a partial write, when it gets interrupted, properly | |
invokes the signal handler, and bubbles up the exception raised | |
in the latter.""" | |
read_results = [] | |
def _read(): | |
s = os.read(r, 1) | |
read_results.append(s) | |
t = threading.Thread(target=_read) | |
t.daemon = True | |
r, w = os.pipe() | |
try: | |
wio = self.io.open(w, **fdopen_kwargs) | |
t.start() | |
signal.alarm(1) | |
# Fill the pipe enough that the write will be blocking. | |
# It will be interrupted by the timer armed above. Since the | |
# other thread has read one byte, the low-level write will | |
# return with a successful (partial) result rather than an EINTR. | |
# The buffered IO layer must check for pending signal | |
# handlers, which in this case will invoke alarm_interrupt(). | |
self.assertRaises(ZeroDivisionError, | |
wio.write, item * (1024 * 1024)) | |
t.join() | |
# We got one byte, get another one and check that it isn't a | |
# repeat of the first one. | |
read_results.append(os.read(r, 1)) | |
self.assertEqual(read_results, [bytes[0:1], bytes[1:2]]) | |
finally: | |
os.close(w) | |
os.close(r) | |
# This is deliberate. If we didn't close the file descriptor | |
# before closing wio, wio would try to flush its internal | |
# buffer, and block again. | |
try: | |
wio.close() | |
except IOError as e: | |
if e.errno != errno.EBADF: | |
raise | |
def test_interrupted_write_unbuffered(self): | |
self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0) | |
def test_interrupted_write_buffered(self): | |
self.check_interrupted_write(b"xy", b"xy", mode="wb") | |
def test_interrupted_write_text(self): | |
self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii") | |
def check_reentrant_write(self, data, **fdopen_kwargs): | |
def on_alarm(*args): | |
# Will be called reentrantly from the same thread | |
wio.write(data) | |
1/0 | |
signal.signal(signal.SIGALRM, on_alarm) | |
r, w = os.pipe() | |
wio = self.io.open(w, **fdopen_kwargs) | |
try: | |
signal.alarm(1) | |
# Either the reentrant call to wio.write() fails with RuntimeError, | |
# or the signal handler raises ZeroDivisionError. | |
with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm: | |
while 1: | |
for i in range(100): | |
wio.write(data) | |
wio.flush() | |
# Make sure the buffer doesn't fill up and block further writes | |
os.read(r, len(data) * 100) | |
exc = cm.exception | |
if isinstance(exc, RuntimeError): | |
self.assertTrue(str(exc).startswith("reentrant call"), str(exc)) | |
finally: | |
wio.close() | |
os.close(r) | |
def test_reentrant_write_buffered(self): | |
self.check_reentrant_write(b"xy", mode="wb") | |
def test_reentrant_write_text(self): | |
self.check_reentrant_write("xy", mode="w", encoding="ascii") | |
def check_interrupted_read_retry(self, decode, **fdopen_kwargs): | |
"""Check that a buffered read, when it gets interrupted (either | |
returning a partial result or EINTR), properly invokes the signal | |
handler and retries if the latter returned successfully.""" | |
r, w = os.pipe() | |
fdopen_kwargs["closefd"] = False | |
def alarm_handler(sig, frame): | |
os.write(w, b"bar") | |
signal.signal(signal.SIGALRM, alarm_handler) | |
try: | |
rio = self.io.open(r, **fdopen_kwargs) | |
os.write(w, b"foo") | |
signal.alarm(1) | |
# Expected behaviour: | |
# - first raw read() returns partial b"foo" | |
# - second raw read() returns EINTR | |
# - third raw read() returns b"bar" | |
self.assertEqual(decode(rio.read(6)), "foobar") | |
finally: | |
rio.close() | |
os.close(w) | |
os.close(r) | |
def test_interrupterd_read_retry_buffered(self): | |
self.check_interrupted_read_retry(lambda x: x.decode('latin1'), | |
mode="rb") | |
def test_interrupterd_read_retry_text(self): | |
self.check_interrupted_read_retry(lambda x: x, | |
mode="r") | |
@unittest.skipUnless(threading, 'Threading required for this test.') | |
def check_interrupted_write_retry(self, item, **fdopen_kwargs): | |
"""Check that a buffered write, when it gets interrupted (either | |
returning a partial result or EINTR), properly invokes the signal | |
handler and retries if the latter returned successfully.""" | |
select = support.import_module("select") | |
# A quantity that exceeds the buffer size of an anonymous pipe's | |
# write end. | |
N = 1024 * 1024 | |
r, w = os.pipe() | |
fdopen_kwargs["closefd"] = False | |
# We need a separate thread to read from the pipe and allow the | |
# write() to finish. This thread is started after the SIGALRM is | |
# received (forcing a first EINTR in write()). | |
read_results = [] | |
write_finished = False | |
def _read(): | |
while not write_finished: | |
while r in select.select([r], [], [], 1.0)[0]: | |
s = os.read(r, 1024) | |
read_results.append(s) | |
t = threading.Thread(target=_read) | |
t.daemon = True | |
def alarm1(sig, frame): | |
signal.signal(signal.SIGALRM, alarm2) | |
signal.alarm(1) | |
def alarm2(sig, frame): | |
t.start() | |
signal.signal(signal.SIGALRM, alarm1) | |
try: | |
wio = self.io.open(w, **fdopen_kwargs) | |
signal.alarm(1) | |
# Expected behaviour: | |
# - first raw write() is partial (because of the limited pipe buffer | |
# and the first alarm) | |
# - second raw write() returns EINTR (because of the second alarm) | |
# - subsequent write()s are successful (either partial or complete) | |
self.assertEqual(N, wio.write(item * N)) | |
wio.flush() | |
write_finished = True | |
t.join() | |
self.assertEqual(N, sum(len(x) for x in read_results)) | |
finally: | |
write_finished = True | |
os.close(w) | |
os.close(r) | |
# This is deliberate. If we didn't close the file descriptor | |
# before closing wio, wio would try to flush its internal | |
# buffer, and could block (in case of failure). | |
try: | |
wio.close() | |
except IOError as e: | |
if e.errno != errno.EBADF: | |
raise | |
def test_interrupterd_write_retry_buffered(self): | |
self.check_interrupted_write_retry(b"x", mode="wb") | |
def test_interrupterd_write_retry_text(self): | |
self.check_interrupted_write_retry("x", mode="w", encoding="latin1") | |
class CSignalsTest(SignalsTest): | |
io = io | |
class PySignalsTest(SignalsTest): | |
io = pyio | |
# Handling reentrancy issues would slow down _pyio even more, so the | |
# tests are disabled. | |
test_reentrant_write_buffered = None | |
test_reentrant_write_text = None | |
def test_main(): | |
tests = (CIOTest, PyIOTest, | |
CBufferedReaderTest, PyBufferedReaderTest, | |
CBufferedWriterTest, PyBufferedWriterTest, | |
CBufferedRWPairTest, PyBufferedRWPairTest, | |
CBufferedRandomTest, PyBufferedRandomTest, | |
StatefulIncrementalDecoderTest, | |
CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest, | |
CTextIOWrapperTest, PyTextIOWrapperTest, | |
CMiscIOTest, PyMiscIOTest, | |
CSignalsTest, PySignalsTest, | |
) | |
# Put the namespaces of the IO module we are testing and some useful mock | |
# classes in the __dict__ of each test. | |
mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO, | |
MockNonBlockWriterIO, MockRawIOWithoutRead) | |
all_members = io.__all__ + ["IncrementalNewlineDecoder"] | |
c_io_ns = dict((name, getattr(io, name)) for name in all_members) | |
py_io_ns = dict((name, getattr(pyio, name)) for name in all_members) | |
globs = globals() | |
c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks) | |
py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks) | |
# Avoid turning open into a bound method. | |
py_io_ns["open"] = pyio.OpenWrapper | |
for test in tests: | |
if test.__name__.startswith("C"): | |
for name, obj in c_io_ns.items(): | |
setattr(test, name, obj) | |
elif test.__name__.startswith("Py"): | |
for name, obj in py_io_ns.items(): | |
setattr(test, name, obj) | |
support.run_unittest(*tests) | |
if __name__ == "__main__": | |
test_main() |