Update io, pyio, test_io from CPython

This commit is contained in:
CPython Developers
2024-07-28 14:27:13 +09:00
committed by Jeong YunWon
parent 623415d843
commit 0600ae6213
3 changed files with 435 additions and 167 deletions

90
Lib/_pyio.py vendored
View File

@@ -44,8 +44,9 @@ def text_encoding(encoding, stacklevel=2):
"""
A helper function to choose the text encoding.
When encoding is not None, just return it.
Otherwise, return the default text encoding (i.e. "locale").
When encoding is not None, this function returns it.
Otherwise, this function returns the default text encoding
(i.e. "locale" or "utf-8" depends on UTF-8 mode).
This function emits an EncodingWarning if *encoding* is None and
sys.flags.warn_default_encoding is true.
@@ -55,7 +56,10 @@ def text_encoding(encoding, stacklevel=2):
However, please consider using encoding="utf-8" for new APIs.
"""
if encoding is None:
encoding = "locale"
if sys.flags.utf8_mode:
encoding = "utf-8"
else:
encoding = "locale"
if sys.flags.warn_default_encoding:
import warnings
warnings.warn("'encoding' argument not specified.",
@@ -101,7 +105,6 @@ def open(file, mode="r", buffering=-1, encoding=None, errors=None,
'b' binary mode
't' text mode (default)
'+' open a disk file for updating (reading and writing)
'U' universal newline mode (deprecated)
========= ===============================================================
The default mode is 'rt' (open for reading text). For binary random
@@ -117,10 +120,6 @@ def open(file, mode="r", buffering=-1, encoding=None, errors=None,
returned as strings, the bytes having been first decoded using a
platform-dependent encoding or using the specified encoding if given.
'U' mode is deprecated and will raise an exception in future versions
of Python. It has no effect in Python 3. Use newline to control
universal newlines mode.
buffering is an optional integer used to set the buffering policy.
Pass 0 to switch buffering off (only allowed in binary mode), 1 to select
line buffering (only usable in text mode), and an integer > 1 to indicate
@@ -206,7 +205,7 @@ def open(file, mode="r", buffering=-1, encoding=None, errors=None,
if errors is not None and not isinstance(errors, str):
raise TypeError("invalid errors: %r" % errors)
modes = set(mode)
if modes - set("axrwb+tU") or len(mode) > len(modes):
if modes - set("axrwb+t") or len(mode) > len(modes):
raise ValueError("invalid mode: %r" % mode)
creating = "x" in modes
reading = "r" in modes
@@ -215,13 +214,6 @@ def open(file, mode="r", buffering=-1, encoding=None, errors=None,
updating = "+" in modes
text = "t" in modes
binary = "b" in modes
if "U" in modes:
if creating or writing or appending or updating:
raise ValueError("mode U cannot be combined with 'x', 'w', 'a', or '+'")
import warnings
warnings.warn("'U' mode is deprecated",
DeprecationWarning, 2)
reading = True
if text and binary:
raise ValueError("can't have text and binary mode at once")
if creating + reading + writing + appending > 1:
@@ -311,22 +303,6 @@ except AttributeError:
open_code = _open_code_with_warning
def __getattr__(name):
if name == "OpenWrapper":
# bpo-43680: Until Python 3.9, _pyio.open was not a static method and
# builtins.open was set to OpenWrapper to not become a bound method
# when set to a class variable. _io.open is a built-in function whereas
# _pyio.open is a Python function. In Python 3.10, _pyio.open() is now
# a static method, and builtins.open() is now io.open().
import warnings
warnings.warn('OpenWrapper is deprecated, use open instead',
DeprecationWarning, stacklevel=2)
global OpenWrapper
OpenWrapper = open
return OpenWrapper
raise AttributeError(name)
# In normal operation, both `UnsupportedOperation`s should be bound to the
# same object.
try:
@@ -338,8 +314,7 @@ except AttributeError:
class IOBase(metaclass=abc.ABCMeta):
"""The abstract base class for all I/O classes, acting on streams of
bytes. There is no public constructor.
"""The abstract base class for all I/O classes.
This class provides dummy implementations for many methods that
derived classes can override selectively; the default implementations
@@ -1154,6 +1129,7 @@ class BufferedReader(_BufferedIOMixin):
do at most one raw read to satisfy it. We never return more
than self.buffer_size.
"""
self._checkClosed("peek of closed file")
with self._read_lock:
return self._peek_unlocked(size)
@@ -1172,6 +1148,7 @@ class BufferedReader(_BufferedIOMixin):
"""Reads up to size bytes, with at most one read() system call."""
# Returns up to size bytes. If at least one byte is buffered, we
# only return buffered bytes. Otherwise, we do one raw read.
self._checkClosed("read of closed file")
if size < 0:
size = self.buffer_size
if size == 0:
@@ -1189,6 +1166,8 @@ class BufferedReader(_BufferedIOMixin):
def _readinto(self, buf, read1):
"""Read data into *buf* with at most one system call."""
self._checkClosed("readinto of closed file")
# Need to create a memoryview object of type 'b', otherwise
# we may not be able to assign bytes to it, and slicing it
# would create a new object.
@@ -1233,11 +1212,13 @@ class BufferedReader(_BufferedIOMixin):
return written
def tell(self):
return _BufferedIOMixin.tell(self) - len(self._read_buf) + self._read_pos
# GH-95782: Keep return value non-negative
return max(_BufferedIOMixin.tell(self) - len(self._read_buf) + self._read_pos, 0)
def seek(self, pos, whence=0):
if whence not in valid_seek_flags:
raise ValueError("invalid whence value")
self._checkClosed("seek of closed file")
with self._read_lock:
if whence == 1:
pos -= len(self._read_buf) - self._read_pos
@@ -1845,7 +1826,7 @@ class TextIOBase(IOBase):
"""Base class for text I/O.
This class provides a character and line based interface to stream
I/O. There is no public constructor.
I/O.
"""
def read(self, size=-1):
@@ -1997,7 +1978,7 @@ class TextIOWrapper(TextIOBase):
r"""Character and line based layer over a BufferedIOBase object, buffer.
encoding gives the name of the encoding that the stream will be
decoded or encoded with. It defaults to locale.getpreferredencoding(False).
decoded or encoded with. It defaults to locale.getencoding().
errors determines the strictness of encoding and decoding (see the
codecs.register) and defaults to "strict".
@@ -2031,19 +2012,7 @@ class TextIOWrapper(TextIOBase):
encoding = text_encoding(encoding)
if encoding == "locale":
try:
encoding = os.device_encoding(buffer.fileno()) or "locale"
except (AttributeError, UnsupportedOperation):
pass
if encoding == "locale":
try:
import locale
except ImportError:
# Importing locale may fail if Python is being built
encoding = "utf-8"
else:
encoding = locale.getpreferredencoding(False)
encoding = self._get_locale_encoding()
if not isinstance(encoding, str):
raise ValueError("invalid encoding: %r" % encoding)
@@ -2176,6 +2145,8 @@ class TextIOWrapper(TextIOBase):
else:
if not isinstance(encoding, str):
raise TypeError("invalid encoding: %r" % encoding)
if encoding == "locale":
encoding = self._get_locale_encoding()
if newline is Ellipsis:
newline = self._readnl
@@ -2243,8 +2214,9 @@ class TextIOWrapper(TextIOBase):
self.buffer.write(b)
if self._line_buffering and (haslf or "\r" in s):
self.flush()
self._set_decoded_chars('')
self._snapshot = None
if self._snapshot is not None:
self._set_decoded_chars('')
self._snapshot = None
if self._decoder:
self._decoder.reset()
return length
@@ -2280,6 +2252,15 @@ class TextIOWrapper(TextIOBase):
self._decoded_chars_used += len(chars)
return chars
def _get_locale_encoding(self):
try:
import locale
except ImportError:
# Importing locale may fail if Python is being built
return "utf-8"
else:
return locale.getencoding()
def _rewind_decoded_chars(self, n):
"""Rewind the _decoded_chars buffer."""
if self._decoded_chars_used < n:
@@ -2549,8 +2530,9 @@ class TextIOWrapper(TextIOBase):
# Read everything.
result = (self._get_decoded_chars() +
decoder.decode(self.buffer.read(), final=True))
self._set_decoded_chars('')
self._snapshot = None
if self._snapshot is not None:
self._set_decoded_chars('')
self._snapshot = None
return result
else:
# Keep reading chunks until we have size characters to return.

34
Lib/io.py vendored
View File

@@ -45,39 +45,22 @@ __all__ = ["BlockingIOError", "open", "open_code", "IOBase", "RawIOBase",
"FileIO", "BytesIO", "StringIO", "BufferedIOBase",
"BufferedReader", "BufferedWriter", "BufferedRWPair",
"BufferedRandom", "TextIOBase", "TextIOWrapper",
"UnsupportedOperation", "SEEK_SET", "SEEK_CUR", "SEEK_END"]
"UnsupportedOperation", "SEEK_SET", "SEEK_CUR", "SEEK_END",
"DEFAULT_BUFFER_SIZE", "text_encoding",
# "IncrementalNewlineDecoder"
]
import _io
import abc
from _io import (DEFAULT_BUFFER_SIZE, BlockingIOError, UnsupportedOperation,
open, open_code, BytesIO, StringIO, BufferedReader,
open, open_code, FileIO, BytesIO, StringIO, BufferedReader,
BufferedWriter, BufferedRWPair, BufferedRandom,
# XXX RUSTPYTHON TODO: IncrementalNewlineDecoder
# IncrementalNewlineDecoder, text_encoding, TextIOWrapper)
# IncrementalNewlineDecoder,
text_encoding, TextIOWrapper)
try:
from _io import FileIO
except ImportError:
pass
def __getattr__(name):
if name == "OpenWrapper":
# bpo-43680: Until Python 3.9, _pyio.open was not a static method and
# builtins.open was set to OpenWrapper to not become a bound method
# when set to a class variable. _io.open is a built-in function whereas
# _pyio.open is a Python function. In Python 3.10, _pyio.open() is now
# a static method, and builtins.open() is now io.open().
import warnings
warnings.warn('OpenWrapper is deprecated, use open instead',
DeprecationWarning, stacklevel=2)
global OpenWrapper
OpenWrapper = open
return OpenWrapper
raise AttributeError(name)
# Pretend this exception was created here.
UnsupportedOperation.__module__ = "io"
@@ -102,10 +85,7 @@ class BufferedIOBase(_io._BufferedIOBase, IOBase):
class TextIOBase(_io._TextIOBase, IOBase):
__doc__ = _io._TextIOBase.__doc__
try:
RawIOBase.register(FileIO)
except NameError:
pass
RawIOBase.register(FileIO)
for klass in (BytesIO, BufferedReader, BufferedWriter, BufferedRandom,
BufferedRWPair):

478
Lib/test/test_io.py vendored
View File

@@ -28,7 +28,6 @@ import pickle
import random
import signal
import sys
import sysconfig
import textwrap
import threading
import time
@@ -44,6 +43,7 @@ from test.support import import_helper
from test.support import os_helper
from test.support import threading_helper
from test.support import warnings_helper
from test.support import skip_if_sanitizer
from test.support.os_helper import FakePath
import codecs
@@ -66,20 +66,9 @@ else:
class EmptyStruct(ctypes.Structure):
pass
_cflags = sysconfig.get_config_var('CFLAGS') or ''
_config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
MEMORY_SANITIZER = (
'-fsanitize=memory' in _cflags or
'--with-memory-sanitizer' in _config_args
)
ADDRESS_SANITIZER = (
'-fsanitize=address' in _cflags
)
# Does io.IOBase finalizer log the exception if the close() method fails?
# The exception is ignored silently by default in release build.
IOBASE_EMITS_UNRAISABLE = (hasattr(sys, "gettotalrefcount") or sys.flags.dev_mode)
IOBASE_EMITS_UNRAISABLE = (support.Py_DEBUG or sys.flags.dev_mode)
def _default_chunk_size():
@@ -87,6 +76,14 @@ def _default_chunk_size():
with open(__file__, "r", encoding="latin-1") as f:
return f._CHUNK_SIZE
requires_alarm = unittest.skipUnless(
hasattr(signal, "alarm"), "test requires signal.alarm()"
)
class BadIndex:
def __index__(self):
1/0
class MockRawIOWithoutRead:
"""A RawIO implementation without read(), so as to exercise the default
@@ -266,6 +263,27 @@ class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
UnsupportedOperation = pyio.UnsupportedOperation
class MockCharPseudoDevFileIO(MockFileIO):
# GH-95782
# ftruncate() does not work on these special files (and CPython then raises
# appropriate exceptions), so truncate() does not have to be accounted for
# here.
def __init__(self, data):
super().__init__(data)
def seek(self, *args):
return 0
def tell(self, *args):
return 0
class CMockCharPseudoDevFileIO(MockCharPseudoDevFileIO, io.BytesIO):
pass
class PyMockCharPseudoDevFileIO(MockCharPseudoDevFileIO, pyio.BytesIO):
pass
class MockNonBlockWriterIO:
def __init__(self):
@@ -415,8 +433,8 @@ class IOTest(unittest.TestCase):
self.assertRaises(exc, fp.read)
self.assertRaises(exc, fp.readline)
with self.open(os_helper.TESTFN, "wb") as fp:
self.assertRaises(exc, fp.read)
self.assertRaises(exc, fp.readline)
self.assertRaises(exc, fp.read)
self.assertRaises(exc, fp.readline)
with self.open(os_helper.TESTFN, "wb", buffering=0) as fp:
self.assertRaises(exc, fp.read)
self.assertRaises(exc, fp.readline)
@@ -433,6 +451,10 @@ class IOTest(unittest.TestCase):
self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
@unittest.skipIf(
support.is_emscripten, "fstat() of a pipe fd is not supported"
)
@unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
def test_optional_abilities(self):
# Test for OSError when optional APIs are not supported
# The purpose of this test is to try fileno(), reading, writing and
@@ -893,6 +915,14 @@ class IOTest(unittest.TestCase):
open('non-existent', 'r', opener=badopener)
self.assertEqual(str(cm.exception), 'opener returned -2')
def test_opener_invalid_fd(self):
# Check that OSError is raised with error code EBADF if the
# opener returns an invalid file descriptor (see gh-82212).
fd = os_helper.make_bad_fd()
with self.assertRaises(OSError) as cm:
self.open('foo', opener=lambda name, flags: fd)
self.assertEqual(cm.exception.errno, errno.EBADF)
def test_fileio_closefd(self):
# Issue #4841
with self.open(__file__, 'rb') as f1, \
@@ -1040,12 +1070,101 @@ class CIOTest(IOTest):
del obj
support.gc_collect()
self.assertIsNone(wr(), wr)
# TODO: RUSTPYTHON, AssertionError: filter ('', ResourceWarning) did not catch any warning
@unittest.expectedFailure
def test_destructor(self):
super().test_destructor(self)
@support.cpython_only
class TestIOCTypes(unittest.TestCase):
def setUp(self):
_io = import_helper.import_module("_io")
self.types = [
_io.BufferedRWPair,
_io.BufferedRandom,
_io.BufferedReader,
_io.BufferedWriter,
_io.BytesIO,
_io.FileIO,
_io.IncrementalNewlineDecoder,
_io.StringIO,
_io.TextIOWrapper,
_io._BufferedIOBase,
_io._BytesIOBuffer,
_io._IOBase,
_io._RawIOBase,
_io._TextIOBase,
]
if sys.platform == "win32":
self.types.append(_io._WindowsConsoleIO)
self._io = _io
def test_immutable_types(self):
for tp in self.types:
with self.subTest(tp=tp):
with self.assertRaisesRegex(TypeError, "immutable"):
tp.foo = "bar"
def test_class_hierarchy(self):
def check_subs(types, base):
for tp in types:
with self.subTest(tp=tp, base=base):
self.assertTrue(issubclass(tp, base))
def recursive_check(d):
for k, v in d.items():
if isinstance(v, dict):
recursive_check(v)
elif isinstance(v, set):
check_subs(v, k)
else:
self.fail("corrupt test dataset")
_io = self._io
hierarchy = {
_io._IOBase: {
_io._BufferedIOBase: {
_io.BufferedRWPair,
_io.BufferedRandom,
_io.BufferedReader,
_io.BufferedWriter,
_io.BytesIO,
},
_io._RawIOBase: {
_io.FileIO,
},
_io._TextIOBase: {
_io.StringIO,
_io.TextIOWrapper,
},
},
}
if sys.platform == "win32":
hierarchy[_io._IOBase][_io._RawIOBase].add(_io._WindowsConsoleIO)
recursive_check(hierarchy)
def test_subclassing(self):
_io = self._io
dataset = {k: True for k in self.types}
dataset[_io._BytesIOBuffer] = False
for tp, is_basetype in dataset.items():
with self.subTest(tp=tp, is_basetype=is_basetype):
name = f"{tp.__name__}_subclass"
bases = (tp,)
if is_basetype:
_ = type(name, bases, {})
else:
msg = "not an acceptable base type"
with self.assertRaisesRegex(TypeError, msg):
_ = type(name, bases, {})
def test_disallow_instantiation(self):
_io = self._io
support.check_disallow_instantiation(self, _io._BytesIOBuffer)
class PyIOTest(IOTest):
pass
@@ -1463,6 +1582,7 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
self.assertEqual(b"abcdefg", bufio.read())
@threading_helper.requires_working_threading()
@support.requires_resource('cpu')
def test_threads(self):
try:
@@ -1542,11 +1662,25 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
def test_read_on_closed(self):
# Issue #23796
b = io.BufferedReader(io.BytesIO(b"12"))
b = self.BufferedReader(self.BytesIO(b"12"))
b.read(1)
b.close()
self.assertRaises(ValueError, b.peek)
self.assertRaises(ValueError, b.read1, 1)
with self.subTest('peek'):
self.assertRaises(ValueError, b.peek)
with self.subTest('read1'):
self.assertRaises(ValueError, b.read1, 1)
with self.subTest('read'):
self.assertRaises(ValueError, b.read)
with self.subTest('readinto'):
self.assertRaises(ValueError, b.readinto, bytearray())
with self.subTest('readinto1'):
self.assertRaises(ValueError, b.readinto1, bytearray())
with self.subTest('flush'):
self.assertRaises(ValueError, b.flush)
with self.subTest('truncate'):
self.assertRaises(ValueError, b.truncate)
with self.subTest('seek'):
self.assertRaises(ValueError, b.seek, 0)
def test_truncate_on_read_only(self):
rawio = self.MockFileIO(b"abc")
@@ -1555,13 +1689,38 @@ class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
self.assertRaises(self.UnsupportedOperation, bufio.truncate)
self.assertRaises(self.UnsupportedOperation, bufio.truncate, 0)
def test_tell_character_device_file(self):
# GH-95782
# For the (former) bug in BufferedIO to manifest, the wrapped IO obj
# must be able to produce at least 2 bytes.
raw = self.MockCharPseudoDevFileIO(b"12")
buf = self.tp(raw)
self.assertEqual(buf.tell(), 0)
self.assertEqual(buf.read(1), b"1")
self.assertEqual(buf.tell(), 0)
def test_seek_character_device_file(self):
raw = self.MockCharPseudoDevFileIO(b"12")
buf = self.tp(raw)
self.assertEqual(buf.seek(0, io.SEEK_CUR), 0)
self.assertEqual(buf.seek(1, io.SEEK_SET), 0)
self.assertEqual(buf.seek(0, io.SEEK_CUR), 0)
self.assertEqual(buf.read(1), b"1")
# In the C implementation, tell() sets the BufferedIO's abs_pos to 0,
# which means that the next seek() could return a negative offset if it
# does not sanity-check:
self.assertEqual(buf.tell(), 0)
self.assertEqual(buf.seek(0, io.SEEK_CUR), 0)
class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
tp = io.BufferedReader
@unittest.skip("TODO: RUSTPYTHON, fallible allocation")
@unittest.skipIf(MEMORY_SANITIZER or ADDRESS_SANITIZER, "sanitizer defaults to crashing "
"instead of returning NULL for malloc failure.")
@skip_if_sanitizer(memory=True, address=True, thread=True,
reason="sanitizer defaults to crashing "
"instead of returning NULL for malloc failure.")
def test_constructor(self):
BufferedReaderTest.test_constructor(self)
# The allocation can succeed on 32-bit builds, e.g. with more
@@ -1775,7 +1934,7 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
self.assertTrue(s.startswith(b"01234567A"), s)
def test_write_and_rewind(self):
raw = io.BytesIO()
raw = self.BytesIO()
bufio = self.tp(raw, 4)
self.assertEqual(bufio.write(b"abcdef"), 6)
self.assertEqual(bufio.tell(), 6)
@@ -1854,6 +2013,7 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
f.truncate()
self.assertEqual(f.tell(), buffer_size + 2)
@threading_helper.requires_working_threading()
@support.requires_resource('cpu')
def test_threads(self):
try:
@@ -1925,6 +2085,7 @@ class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
self.assertRaises(OSError, b.close) # exception not swallowed
self.assertTrue(b.closed)
@threading_helper.requires_working_threading()
def test_slow_close_from_thread(self):
# Issue #31976
rawio = self.SlowFlushRawIO()
@@ -1942,8 +2103,9 @@ class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
tp = io.BufferedWriter
@unittest.skip("TODO: RUSTPYTHON, fallible allocation")
@unittest.skipIf(MEMORY_SANITIZER or ADDRESS_SANITIZER, "sanitizer defaults to crashing "
"instead of returning NULL for malloc failure.")
@skip_if_sanitizer(memory=True, address=True, thread=True,
reason="sanitizer defaults to crashing "
"instead of returning NULL for malloc failure.")
def test_constructor(self):
BufferedWriterTest.test_constructor(self)
# The allocation can succeed on 32-bit builds, e.g. with more
@@ -1986,7 +2148,7 @@ class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
def test_args_error(self):
# Issue #17275
with self.assertRaisesRegex(TypeError, "BufferedWriter"):
self.tp(io.BytesIO(), 1024, 1024, 1024)
self.tp(self.BytesIO(), 1024, 1024, 1024)
# TODO: RUSTPYTHON
@unittest.expectedFailure
@@ -2425,6 +2587,28 @@ class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
f.flush()
self.assertEqual(raw.getvalue(), b'a2c')
def test_read1_after_write(self):
with self.BytesIO(b'abcdef') as raw:
with self.tp(raw, 3) as f:
f.write(b"1")
self.assertEqual(f.read1(1), b'b')
f.flush()
self.assertEqual(raw.getvalue(), b'1bcdef')
with self.BytesIO(b'abcdef') as raw:
with self.tp(raw, 3) as f:
f.write(b"1")
self.assertEqual(f.read1(), b'bcd')
f.flush()
self.assertEqual(raw.getvalue(), b'1bcdef')
with self.BytesIO(b'abcdef') as raw:
with self.tp(raw, 3) as f:
f.write(b"1")
# XXX: read(100) returns different numbers of bytes
# in Python and C implementations.
self.assertEqual(f.read1(100)[:3], b'bcd')
f.flush()
self.assertEqual(raw.getvalue(), b'1bcdef')
def test_interleaved_readline_write(self):
with self.BytesIO(b'ab\ncdef\ng\n') as raw:
with self.tp(raw) as f:
@@ -2449,8 +2633,9 @@ class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
tp = io.BufferedRandom
@unittest.skip("TODO: RUSTPYTHON, fallible allocation")
@unittest.skipIf(MEMORY_SANITIZER or ADDRESS_SANITIZER, "sanitizer defaults to crashing "
"instead of returning NULL for malloc failure.")
@skip_if_sanitizer(memory=True, address=True, thread=True,
reason="sanitizer defaults to crashing "
"instead of returning NULL for malloc failure.")
def test_constructor(self):
BufferedRandomTest.test_constructor(self)
# The allocation can succeed on 32-bit builds, e.g. with more
@@ -2470,7 +2655,7 @@ class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
def test_args_error(self):
# Issue #17275
with self.assertRaisesRegex(TypeError, "BufferedRandom"):
self.tp(io.BytesIO(), 1024, 1024, 1024)
self.tp(self.BytesIO(), 1024, 1024, 1024)
# TODO: RUSTPYTHON
@unittest.expectedFailure
@@ -2647,8 +2832,29 @@ class TextIOWrapperTest(unittest.TestCase):
self.assertEqual(t.encoding, "utf-8")
self.assertEqual(t.line_buffering, True)
self.assertEqual("\xe9\n", t.readline())
self.assertRaises(TypeError, t.__init__, b, encoding="utf-8", newline=42)
self.assertRaises(ValueError, t.__init__, b, encoding="utf-8", newline='xyzzy')
invalid_type = TypeError if self.is_C else ValueError
with self.assertRaises(invalid_type):
t.__init__(b, encoding=42)
with self.assertRaises(UnicodeEncodeError):
t.__init__(b, encoding='\udcfe')
with self.assertRaises(ValueError):
t.__init__(b, encoding='utf-8\0')
with self.assertRaises(invalid_type):
t.__init__(b, encoding="utf-8", errors=42)
if support.Py_DEBUG or sys.flags.dev_mode or self.is_C:
with self.assertRaises(UnicodeEncodeError):
t.__init__(b, encoding="utf-8", errors='\udcfe')
if support.Py_DEBUG or sys.flags.dev_mode or self.is_C:
with self.assertRaises(ValueError):
t.__init__(b, encoding="utf-8", errors='replace\0')
with self.assertRaises(TypeError):
t.__init__(b, encoding="utf-8", newline=42)
with self.assertRaises(ValueError):
t.__init__(b, encoding="utf-8", newline='\udcfe')
with self.assertRaises(ValueError):
t.__init__(b, encoding="utf-8", newline='\n\0')
with self.assertRaises(ValueError):
t.__init__(b, encoding="utf-8", newline='xyzzy')
def test_uninitialized(self):
t = self.TextIOWrapper.__new__(self.TextIOWrapper)
@@ -2769,27 +2975,16 @@ class TextIOWrapperTest(unittest.TestCase):
if key in os.environ:
del os.environ[key]
current_locale_encoding = locale.getpreferredencoding(False)
current_locale_encoding = locale.getencoding()
b = self.BytesIO()
with warnings.catch_warnings():
warnings.simplefilter("ignore", EncodingWarning)
t = self.TextIOWrapper(b)
t = self.TextIOWrapper(b)
self.assertEqual(t.encoding, current_locale_encoding)
finally:
os.environ.clear()
os.environ.update(old_environ)
@support.cpython_only
@unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
def test_device_encoding(self):
# Issue 15989
import _testcapi
b = self.BytesIO()
b.fileno = lambda: _testcapi.INT_MAX + 1
self.assertRaises(OverflowError, self.TextIOWrapper, b, encoding="locale")
b.fileno = lambda: _testcapi.UINT_MAX + 1
self.assertRaises(OverflowError, self.TextIOWrapper, b, encoding="locale")
def test_encoding(self):
# Check the encoding attribute is always set, and valid
b = self.BytesIO()
@@ -2797,7 +2992,7 @@ class TextIOWrapperTest(unittest.TestCase):
self.assertEqual(t.encoding, "utf-8")
with warnings.catch_warnings():
warnings.simplefilter("ignore", EncodingWarning)
t = self.TextIOWrapper(b)
t = self.TextIOWrapper(b)
self.assertIsNotNone(t.encoding)
codecs.lookup(t.encoding)
@@ -3345,6 +3540,7 @@ class TextIOWrapperTest(unittest.TestCase):
self.assertEqual(f.errors, "replace")
@support.no_tracing
@threading_helper.requires_working_threading()
def test_threads_write(self):
# Issue6750: concurrent writes could duplicate data
event = threading.Event()
@@ -3529,7 +3725,7 @@ class TextIOWrapperTest(unittest.TestCase):
# encode() is invalid shouldn't cause an assertion failure.
rot13 = codecs.lookup("rot13")
with support.swap_attr(rot13, '_is_text_encoding', True):
t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
t = self.TextIOWrapper(self.BytesIO(b'foo'), encoding="rot13")
self.assertRaises(TypeError, t.write, 'bar')
def test_illegal_decoder(self):
@@ -3638,6 +3834,10 @@ class TextIOWrapperTest(unittest.TestCase):
F.tell = lambda x: 0
t = self.TextIOWrapper(F(), encoding='utf-8')
def test_reconfigure_locale(self):
wrapper = self.TextIOWrapper(self.BytesIO(b"test"))
wrapper.reconfigure(encoding="locale")
def test_reconfigure_encoding_read(self):
# latin1 -> utf8
# (latin1 can decode utf-8 encoded string)
@@ -3719,6 +3919,59 @@ class TextIOWrapperTest(unittest.TestCase):
self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
def test_reconfigure_errors(self):
txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\r')
with self.assertRaises(TypeError): # there was a crash
txt.reconfigure(encoding=42)
if self.is_C:
with self.assertRaises(UnicodeEncodeError):
txt.reconfigure(encoding='\udcfe')
with self.assertRaises(LookupError):
txt.reconfigure(encoding='locale\0')
# TODO: txt.reconfigure(encoding='utf-8\0')
# TODO: txt.reconfigure(encoding='nonexisting')
with self.assertRaises(TypeError):
txt.reconfigure(errors=42)
if self.is_C:
with self.assertRaises(UnicodeEncodeError):
txt.reconfigure(errors='\udcfe')
# TODO: txt.reconfigure(errors='ignore\0')
# TODO: txt.reconfigure(errors='nonexisting')
with self.assertRaises(TypeError):
txt.reconfigure(newline=42)
with self.assertRaises(ValueError):
txt.reconfigure(newline='\udcfe')
with self.assertRaises(ValueError):
txt.reconfigure(newline='xyz')
if not self.is_C:
# TODO: Should fail in C too.
with self.assertRaises(ValueError):
txt.reconfigure(newline='\n\0')
if self.is_C:
# TODO: Use __bool__(), not __index__().
with self.assertRaises(ZeroDivisionError):
txt.reconfigure(line_buffering=BadIndex())
with self.assertRaises(OverflowError):
txt.reconfigure(line_buffering=2**1000)
with self.assertRaises(ZeroDivisionError):
txt.reconfigure(write_through=BadIndex())
with self.assertRaises(OverflowError):
txt.reconfigure(write_through=2**1000)
with self.assertRaises(ZeroDivisionError): # there was a crash
txt.reconfigure(line_buffering=BadIndex(),
write_through=BadIndex())
self.assertEqual(txt.encoding, 'ascii')
self.assertEqual(txt.errors, 'replace')
self.assertIs(txt.line_buffering, False)
self.assertIs(txt.write_through, False)
txt.reconfigure(encoding='latin1', errors='ignore', newline='\r\n',
line_buffering=True, write_through=True)
self.assertEqual(txt.encoding, 'latin1')
self.assertEqual(txt.errors, 'ignore')
self.assertIs(txt.line_buffering, True)
self.assertIs(txt.write_through, True)
def test_reconfigure_newline(self):
raw = self.BytesIO(b'CR\rEOF')
txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
@@ -3766,6 +4019,14 @@ class TextIOWrapperTest(unittest.TestCase):
t.write('x')
t.tell()
def test_issue35928(self):
p = self.BufferedRWPair(self.BytesIO(b'foo\nbar\n'), self.BytesIO())
f = self.TextIOWrapper(p)
res = f.readline()
self.assertEqual(res, 'foo\n')
f.write(res)
self.assertEqual(res + f.readline(), 'foo\nbar\n')
class MemviewBytesIO(io.BytesIO):
'''A BytesIO object whose read method returns memoryviews
@@ -3924,7 +4185,7 @@ class CTextIOWrapperTest(TextIOWrapperTest):
# all data to disk.
# The Python version has __del__, so it ends in gc.garbage instead.
with warnings_helper.check_warnings(('', ResourceWarning)):
rawio = io.FileIO(os_helper.TESTFN, "wb")
rawio = self.FileIO(os_helper.TESTFN, "wb")
b = self.BufferedWriter(rawio)
t = self.TextIOWrapper(b, encoding="ascii")
t.write("456def")
@@ -4117,7 +4378,15 @@ class IncrementalNewlineDecoderTest(unittest.TestCase):
self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
pass
@support.cpython_only
def test_uninitialized(self):
uninitialized = self.IncrementalNewlineDecoder.__new__(
self.IncrementalNewlineDecoder)
self.assertRaises(ValueError, uninitialized.decode, b'bar')
self.assertRaises(ValueError, uninitialized.getstate)
self.assertRaises(ValueError, uninitialized.setstate, (b'foo', 0))
self.assertRaises(ValueError, uninitialized.reset)
class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
pass
@@ -4127,38 +4396,24 @@ class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
class MiscIOTest(unittest.TestCase):
# for test__all__, actual values are set in subclasses
name_of_module = None
extra_exported = ()
not_exported = ()
def tearDown(self):
os_helper.unlink(os_helper.TESTFN)
def test___all__(self):
for name in self.io.__all__:
obj = getattr(self.io, name, None)
self.assertIsNotNone(obj, name)
if name in ("open", "open_code"):
continue
elif "error" in name.lower() or name == "UnsupportedOperation":
self.assertTrue(issubclass(obj, Exception), name)
elif not name.startswith("SEEK_"):
self.assertTrue(issubclass(obj, self.IOBase))
support.check__all__(self, self.io, self.name_of_module,
extra=self.extra_exported,
not_exported=self.not_exported)
def test_attributes(self):
f = self.open(os_helper.TESTFN, "wb", buffering=0)
self.assertEqual(f.mode, "wb")
f.close()
# XXX RUSTPYTHON: universal mode is deprecated anyway, so I
# feel fine about skipping it
# with warnings_helper.check_warnings(('', DeprecationWarning)):
# f = self.open(os_helper.TESTFN, "U", encoding="utf-8")
# self.assertEqual(f.name, os_helper.TESTFN)
# self.assertEqual(f.buffer.name, os_helper.TESTFN)
# self.assertEqual(f.buffer.raw.name, os_helper.TESTFN)
# self.assertEqual(f.mode, "U")
# self.assertEqual(f.buffer.mode, "rb")
# self.assertEqual(f.buffer.raw.mode, "rb")
# f.close()
f = self.open(os_helper.TESTFN, "w+", encoding="utf-8")
self.assertEqual(f.mode, "w+")
self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
@@ -4172,6 +4427,17 @@ class MiscIOTest(unittest.TestCase):
f.close()
g.close()
def test_removed_u_mode(self):
# bpo-37330: The "U" mode has been removed in Python 3.11
for mode in ("U", "rU", "r+U"):
with self.assertRaises(ValueError) as cm:
self.open(os_helper.TESTFN, mode)
self.assertIn('invalid mode', str(cm.exception))
@unittest.skipIf(
support.is_emscripten, "fstat() of a pipe fd is not supported"
)
@unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
def test_open_pipe_with_append(self):
# bpo-27805: Ignore ESPIPE from lseek() in open().
r, w = os.pipe()
@@ -4316,6 +4582,7 @@ class MiscIOTest(unittest.TestCase):
# TODO: RUSTPYTHON
@unittest.expectedFailure
@unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
def test_warn_on_dealloc_fd(self):
self._check_warn_on_dealloc_fd("rb", buffering=0)
self._check_warn_on_dealloc_fd("rb")
@@ -4324,6 +4591,7 @@ class MiscIOTest(unittest.TestCase):
def test_pickling(self):
# Pickling file objects is forbidden
msg = "cannot pickle"
for kwargs in [
{"mode": "w"},
{"mode": "wb"},
@@ -4338,21 +4606,30 @@ class MiscIOTest(unittest.TestCase):
if "b" not in kwargs["mode"]:
kwargs["encoding"] = "utf-8"
for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
with self.open(os_helper.TESTFN, **kwargs) as f:
self.assertRaises(TypeError, pickle.dumps, f, protocol)
with self.subTest(protocol=protocol, kwargs=kwargs):
with self.open(os_helper.TESTFN, **kwargs) as f:
with self.assertRaisesRegex(TypeError, msg):
pickle.dumps(f, protocol)
# TODO: RUSTPYTHON
@unittest.expectedFailure
@unittest.skipIf(
support.is_emscripten, "fstat() of a pipe fd is not supported"
)
def test_nonblock_pipe_write_bigbuf(self):
self._test_nonblock_pipe_write(16*1024)
# TODO: RUSTPYTHON
@unittest.expectedFailure
@unittest.skipIf(
support.is_emscripten, "fstat() of a pipe fd is not supported"
)
def test_nonblock_pipe_write_smallbuf(self):
self._test_nonblock_pipe_write(1024)
@unittest.skipUnless(hasattr(os, 'set_blocking'),
'os.set_blocking() required for this test')
@unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
def _test_nonblock_pipe_write(self, bufsize):
sent = []
received = []
@@ -4490,17 +4767,22 @@ class MiscIOTest(unittest.TestCase):
self.assertTrue(
warnings[1].startswith(b"<string>:8: EncodingWarning: "))
@support.cpython_only
# Depending if OpenWrapper was already created or not, the warning is
# emitted or not. For example, the attribute is already created when this
# test is run multiple times.
@warnings_helper.ignore_warnings(category=DeprecationWarning)
def test_openwrapper(self):
self.assertIs(self.io.OpenWrapper, self.io.open)
def test_text_encoding(self):
# PEP 597, bpo-47000. io.text_encoding() returns "locale" or "utf-8"
# based on sys.flags.utf8_mode
code = "import io; print(io.text_encoding(None))"
proc = assert_python_ok('-X', 'utf8=0', '-c', code)
self.assertEqual(b"locale", proc.out.strip())
proc = assert_python_ok('-X', 'utf8=1', '-c', code)
self.assertEqual(b"utf-8", proc.out.strip())
class CMiscIOTest(MiscIOTest):
io = io
name_of_module = "io", "_io"
extra_exported = "BlockingIOError",
def test_readinto_buffer_overflow(self):
# Issue #18025
@@ -4552,9 +4834,13 @@ class CMiscIOTest(MiscIOTest):
else:
self.assertFalse(err.strip('.!'))
@threading_helper.requires_working_threading()
@support.requires_resource('walltime')
def test_daemon_threads_shutdown_stdout_deadlock(self):
self.check_daemon_threads_shutdown_deadlock('stdout')
@threading_helper.requires_working_threading()
@support.requires_resource('walltime')
def test_daemon_threads_shutdown_stderr_deadlock(self):
self.check_daemon_threads_shutdown_deadlock('stderr')
@@ -4566,6 +4852,9 @@ class CMiscIOTest(MiscIOTest):
class PyMiscIOTest(MiscIOTest):
io = pyio
name_of_module = "_pyio", "io"
extra_exported = "BlockingIOError", "open_code",
not_exported = "valid_seek_flags",
@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
@@ -4657,12 +4946,18 @@ class SignalsTest(unittest.TestCase):
if e.errno != errno.EBADF:
raise
@requires_alarm
@unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
def test_interrupted_write_unbuffered(self):
self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
@requires_alarm
@unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
def test_interrupted_write_buffered(self):
self.check_interrupted_write(b"xy", b"xy", mode="wb")
@requires_alarm
@unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
def test_interrupted_write_text(self):
self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
@@ -4694,9 +4989,11 @@ class SignalsTest(unittest.TestCase):
wio.close()
os.close(r)
@requires_alarm
def test_reentrant_write_buffered(self):
self.check_reentrant_write(b"xy", mode="wb")
@requires_alarm
def test_reentrant_write_text(self):
self.check_reentrant_write("xy", mode="w", encoding="ascii")
@@ -4726,12 +5023,16 @@ class SignalsTest(unittest.TestCase):
# TODO: RUSTPYTHON
@unittest.expectedFailure
@requires_alarm
@support.requires_resource('walltime')
def test_interrupted_read_retry_buffered(self):
self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
mode="rb")
# TODO: RUSTPYTHON
@unittest.expectedFailure
@requires_alarm
@support.requires_resource('walltime')
def test_interrupted_read_retry_text(self):
self.check_interrupted_read_retry(lambda x: x,
mode="r", encoding="latin1")
@@ -4805,10 +5106,14 @@ class SignalsTest(unittest.TestCase):
raise
@unittest.skip("TODO: RUSTPYTHON, thread 'main' panicked at 'already borrowed: BorrowMutError'")
@requires_alarm
@support.requires_resource('walltime')
def test_interrupted_write_retry_buffered(self):
self.check_interrupted_write_retry(b"x", mode="wb")
@unittest.skip("TODO: RUSTPYTHON, thread 'main' panicked at 'already borrowed: BorrowMutError'")
@requires_alarm
@support.requires_resource('walltime')
def test_interrupted_write_retry_text(self):
self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
@@ -4825,7 +5130,7 @@ class PySignalsTest(SignalsTest):
test_reentrant_write_text = None
def load_tests(*args):
def load_tests(loader, tests, pattern):
tests = (CIOTest, PyIOTest, APIMismatchTest,
CBufferedReaderTest, PyBufferedReaderTest,
CBufferedWriterTest, PyBufferedWriterTest,
@@ -4835,32 +5140,33 @@ def load_tests(*args):
CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
CTextIOWrapperTest, PyTextIOWrapperTest,
CMiscIOTest, PyMiscIOTest,
CSignalsTest, PySignalsTest,
CSignalsTest, PySignalsTest, TestIOCTypes,
)
# Put the namespaces of the IO module we are testing and some useful mock
# classes in the __dict__ of each test.
mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead,
SlowFlushRawIO)
all_members = io.__all__# + ["IncrementalNewlineDecoder"] XXX RUSTPYTHON
SlowFlushRawIO, MockCharPseudoDevFileIO)
all_members = io.__all__
c_io_ns = {name : getattr(io, name) for name in all_members}
py_io_ns = {name : getattr(pyio, name) for name in all_members}
globs = globals()
c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
# TODO: RUSTPYTHON (need to update io.py, see bpo-43680)
# Avoid turning open into a bound method.
py_io_ns["open"] = pyio.OpenWrapper
for test in tests:
if test.__name__.startswith("C"):
for name, obj in c_io_ns.items():
setattr(test, name, obj)
test.is_C = True
elif test.__name__.startswith("Py"):
for name, obj in py_io_ns.items():
setattr(test, name, obj)
test.is_C = False
suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
suite = loader.suiteClass()
for test in tests:
suite.addTest(loader.loadTestsFromTestCase(test))
return suite
if __name__ == "__main__":