Update test/test_contextlib.py from CPython 3.11.2

This commit is contained in:
CPython Developers
2023-03-07 11:19:42 +09:00
committed by LYK
parent 26a3ec9ae3
commit df48df5ede

View File

@@ -1,9 +1,11 @@
"""Unit tests for contextlib.py, and other context managers."""
import io
import os
import sys
import tempfile
import threading
import traceback
import unittest
from contextlib import * # Tests __all__
from test import support
@@ -86,6 +88,56 @@ class ContextManagerTestCase(unittest.TestCase):
raise ZeroDivisionError()
self.assertEqual(state, [1, 42, 999])
def test_contextmanager_traceback(self):
@contextmanager
def f():
yield
try:
with f():
1/0
except ZeroDivisionError as e:
frames = traceback.extract_tb(e.__traceback__)
self.assertEqual(len(frames), 1)
self.assertEqual(frames[0].name, 'test_contextmanager_traceback')
self.assertEqual(frames[0].line, '1/0')
# Repeat with RuntimeError (which goes through a different code path)
class RuntimeErrorSubclass(RuntimeError):
pass
try:
with f():
raise RuntimeErrorSubclass(42)
except RuntimeErrorSubclass as e:
frames = traceback.extract_tb(e.__traceback__)
self.assertEqual(len(frames), 1)
self.assertEqual(frames[0].name, 'test_contextmanager_traceback')
self.assertEqual(frames[0].line, 'raise RuntimeErrorSubclass(42)')
class StopIterationSubclass(StopIteration):
pass
for stop_exc in (
StopIteration('spam'),
StopIterationSubclass('spam'),
):
with self.subTest(type=type(stop_exc)):
try:
with f():
raise stop_exc
except type(stop_exc) as e:
self.assertIs(e, stop_exc)
frames = traceback.extract_tb(e.__traceback__)
else:
self.fail(f'{stop_exc} was suppressed')
self.assertEqual(len(frames), 1)
self.assertEqual(frames[0].name, 'test_contextmanager_traceback')
self.assertEqual(frames[0].line, 'raise stop_exc')
def test_contextmanager_no_reraise(self):
@contextmanager
def whee():
@@ -126,19 +178,22 @@ class ContextManagerTestCase(unittest.TestCase):
self.assertEqual(state, [1, 42, 999])
def test_contextmanager_except_stopiter(self):
stop_exc = StopIteration('spam')
@contextmanager
def woohoo():
yield
try:
with self.assertWarnsRegex(DeprecationWarning,
"StopIteration"):
with woohoo():
raise stop_exc
except Exception as ex:
self.assertIs(ex, stop_exc)
else:
self.fail('StopIteration was suppressed')
class StopIterationSubclass(StopIteration):
pass
for stop_exc in (StopIteration('spam'), StopIterationSubclass('spam')):
with self.subTest(type=type(stop_exc)):
try:
with woohoo():
raise stop_exc
except Exception as ex:
self.assertIs(ex, stop_exc)
else:
self.fail(f'{stop_exc} was suppressed')
# TODO: RUSTPYTHON
@unittest.expectedFailure
@@ -230,6 +285,8 @@ def woohoo():
def woohoo(a, b):
a = weakref.ref(a)
b = weakref.ref(b)
# Allow test to work with a non-refcounted GC
support.gc_collect()
self.assertIsNone(a())
self.assertIsNone(b())
yield
@@ -318,13 +375,13 @@ class FileContextTestCase(unittest.TestCase):
tfn = tempfile.mktemp()
try:
f = None
with open(tfn, "w") as f:
with open(tfn, "w", encoding="utf-8") as f:
self.assertFalse(f.closed)
f.write("Booh\n")
self.assertTrue(f.closed)
f = None
with self.assertRaises(ZeroDivisionError):
with open(tfn, "r") as f:
with open(tfn, "r", encoding="utf-8") as f:
self.assertFalse(f.closed)
self.assertEqual(f.read(), "Booh\n")
1 / 0
@@ -493,7 +550,7 @@ class TestContextDecorator(unittest.TestCase):
def __exit__(self, *exc):
pass
with self.assertRaises(AttributeError):
with self.assertRaisesRegex(TypeError, 'the context manager'):
with mycontext():
pass
@@ -505,7 +562,7 @@ class TestContextDecorator(unittest.TestCase):
def __uxit__(self, *exc):
pass
with self.assertRaises(AttributeError):
with self.assertRaisesRegex(TypeError, 'the context manager.*__exit__'):
with mycontext():
pass
@@ -608,9 +665,9 @@ class TestBaseExitStack:
stack.callback(arg=1)
with self.assertRaises(TypeError):
self.exit_stack.callback(arg=2)
with self.assertWarns(DeprecationWarning):
with self.assertRaises(TypeError):
stack.callback(callback=_exit, arg=3)
self.assertEqual(result, [((), {'arg': 3})])
self.assertEqual(result, [])
def test_push(self):
exc_raised = ZeroDivisionError
@@ -665,6 +722,25 @@ class TestBaseExitStack:
result.append(2)
self.assertEqual(result, [1, 2, 3, 4])
def test_enter_context_errors(self):
class LacksEnterAndExit:
pass
class LacksEnter:
def __exit__(self, *exc_info):
pass
class LacksExit:
def __enter__(self):
pass
with self.exit_stack() as stack:
with self.assertRaisesRegex(TypeError, 'the context manager'):
stack.enter_context(LacksEnterAndExit())
with self.assertRaisesRegex(TypeError, 'the context manager'):
stack.enter_context(LacksEnter())
with self.assertRaisesRegex(TypeError, 'the context manager'):
stack.enter_context(LacksExit())
self.assertFalse(stack._exit_callbacks)
def test_close(self):
result = []
with self.exit_stack() as stack:
@@ -700,6 +776,38 @@ class TestBaseExitStack:
stack.push(lambda *exc: True)
1/0
def test_exit_exception_traceback(self):
# This test captures the current behavior of ExitStack so that we know
# if we ever unintendedly change it. It is not a statement of what the
# desired behavior is (for instance, we may want to remove some of the
# internal contextlib frames).
def raise_exc(exc):
raise exc
try:
with self.exit_stack() as stack:
stack.callback(raise_exc, ValueError)
1/0
except ValueError as e:
exc = e
self.assertIsInstance(exc, ValueError)
ve_frames = traceback.extract_tb(exc.__traceback__)
expected = \
[('test_exit_exception_traceback', 'with self.exit_stack() as stack:')] + \
self.callback_error_internal_frames + \
[('_exit_wrapper', 'callback(*args, **kwds)'),
('raise_exc', 'raise exc')]
self.assertEqual(
[(f.name, f.line) for f in ve_frames], expected)
self.assertIsInstance(exc.__context__, ZeroDivisionError)
zde_frames = traceback.extract_tb(exc.__context__.__traceback__)
self.assertEqual([(f.name, f.line) for f in zde_frames],
[('test_exit_exception_traceback', '1/0')])
def test_exit_exception_chaining_reference(self):
# Sanity check to make sure that ExitStack chaining matches
# actual nested with statements
@@ -781,6 +889,40 @@ class TestBaseExitStack:
self.assertIsInstance(inner_exc, ValueError)
self.assertIsInstance(inner_exc.__context__, ZeroDivisionError)
def test_exit_exception_explicit_none_context(self):
# Ensure ExitStack chaining matches actual nested `with` statements
# regarding explicit __context__ = None.
class MyException(Exception):
pass
@contextmanager
def my_cm():
try:
yield
except BaseException:
exc = MyException()
try:
raise exc
finally:
exc.__context__ = None
@contextmanager
def my_cm_with_exit_stack():
with self.exit_stack() as stack:
stack.enter_context(my_cm())
yield stack
for cm in (my_cm, my_cm_with_exit_stack):
with self.subTest():
try:
with cm():
raise IndexError()
except MyException as exc:
self.assertIsNone(exc.__context__)
else:
self.fail("Expected IndexError, but no exception was raised")
def test_exit_exception_non_suppressing(self):
# http://bugs.python.org/issue19092
def raise_exc(exc):
@@ -896,9 +1038,11 @@ class TestBaseExitStack:
def test_instance_bypass(self):
class Example(object): pass
cm = Example()
cm.__enter__ = object()
cm.__exit__ = object()
stack = self.exit_stack()
self.assertRaises(AttributeError, stack.enter_context, cm)
with self.assertRaisesRegex(TypeError, 'the context manager'):
stack.enter_context(cm)
stack.push(cm)
self.assertIs(stack._exit_callbacks[-1][1], cm)
@@ -939,6 +1083,10 @@ class TestBaseExitStack:
class TestExitStack(TestBaseExitStack, unittest.TestCase):
exit_stack = ExitStack
callback_error_internal_frames = [
('__exit__', 'raise exc_details[1]'),
('__exit__', 'if cb(*exc_details):'),
]
class TestRedirectStream:
@@ -1064,5 +1212,53 @@ class TestSuppress(unittest.TestCase):
1/0
self.assertTrue(outer_continued)
class TestChdir(unittest.TestCase):
def make_relative_path(self, *parts):
return os.path.join(
os.path.dirname(os.path.realpath(__file__)),
*parts,
)
def test_simple(self):
old_cwd = os.getcwd()
target = self.make_relative_path('data')
self.assertNotEqual(old_cwd, target)
with chdir(target):
self.assertEqual(os.getcwd(), target)
self.assertEqual(os.getcwd(), old_cwd)
def test_reentrant(self):
old_cwd = os.getcwd()
target1 = self.make_relative_path('data')
target2 = self.make_relative_path('ziptestdata')
self.assertNotIn(old_cwd, (target1, target2))
chdir1, chdir2 = chdir(target1), chdir(target2)
with chdir1:
self.assertEqual(os.getcwd(), target1)
with chdir2:
self.assertEqual(os.getcwd(), target2)
with chdir1:
self.assertEqual(os.getcwd(), target1)
self.assertEqual(os.getcwd(), target2)
self.assertEqual(os.getcwd(), target1)
self.assertEqual(os.getcwd(), old_cwd)
def test_exception(self):
old_cwd = os.getcwd()
target = self.make_relative_path('data')
self.assertNotEqual(old_cwd, target)
try:
with chdir(target):
self.assertEqual(os.getcwd(), target)
raise RuntimeError("boom")
except RuntimeError as re:
self.assertEqual(str(re), "boom")
self.assertEqual(os.getcwd(), old_cwd)
if __name__ == "__main__":
unittest.main()