Update test_asyncgen from v3.14.2

This commit is contained in:
CPython Developers
2026-02-02 08:18:46 +09:00
committed by Jeong, YunWon
parent 7f46112984
commit 52d23158ed

View File

@@ -4,10 +4,12 @@ import unittest
import contextlib
from test.support.import_helper import import_module
from test.support import gc_collect
from test.support import gc_collect, requires_working_socket
asyncio = import_module("asyncio")
requires_working_socket(module=True)
_no_default = object()
@@ -375,6 +377,178 @@ class AsyncGenTest(unittest.TestCase):
self.compare_generators(sync_gen_wrapper(), async_gen_wrapper())
def test_async_gen_exception_12(self):
async def gen():
with self.assertWarnsRegex(RuntimeWarning,
f"coroutine method 'asend' of '{gen.__qualname__}' "
f"was never awaited"):
await anext(me)
yield 123
me = gen()
ai = me.__aiter__()
an = ai.__anext__()
with self.assertRaisesRegex(RuntimeError,
r'anext\(\): asynchronous generator is already running'):
an.__next__()
with self.assertRaisesRegex(RuntimeError,
r"cannot reuse already awaited __anext__\(\)/asend\(\)"):
an.send(None)
def test_async_gen_asend_throw_concurrent_with_send(self):
import types
@types.coroutine
def _async_yield(v):
return (yield v)
class MyExc(Exception):
pass
async def agenfn():
while True:
try:
await _async_yield(None)
except MyExc:
pass
return
yield
agen = agenfn()
gen = agen.asend(None)
gen.send(None)
gen2 = agen.asend(None)
with self.assertRaisesRegex(RuntimeError,
r'anext\(\): asynchronous generator is already running'):
gen2.throw(MyExc)
with self.assertRaisesRegex(RuntimeError,
r"cannot reuse already awaited __anext__\(\)/asend\(\)"):
gen2.send(None)
def test_async_gen_athrow_throw_concurrent_with_send(self):
import types
@types.coroutine
def _async_yield(v):
return (yield v)
class MyExc(Exception):
pass
async def agenfn():
while True:
try:
await _async_yield(None)
except MyExc:
pass
return
yield
agen = agenfn()
gen = agen.asend(None)
gen.send(None)
gen2 = agen.athrow(MyExc)
with self.assertRaisesRegex(RuntimeError,
r'athrow\(\): asynchronous generator is already running'):
gen2.throw(MyExc)
with self.assertRaisesRegex(RuntimeError,
r"cannot reuse already awaited aclose\(\)/athrow\(\)"):
gen2.send(None)
def test_async_gen_asend_throw_concurrent_with_throw(self):
import types
@types.coroutine
def _async_yield(v):
return (yield v)
class MyExc(Exception):
pass
async def agenfn():
try:
yield
except MyExc:
pass
while True:
try:
await _async_yield(None)
except MyExc:
pass
agen = agenfn()
with self.assertRaises(StopIteration):
agen.asend(None).send(None)
gen = agen.athrow(MyExc)
gen.throw(MyExc)
gen2 = agen.asend(MyExc)
with self.assertRaisesRegex(RuntimeError,
r'anext\(\): asynchronous generator is already running'):
gen2.throw(MyExc)
with self.assertRaisesRegex(RuntimeError,
r"cannot reuse already awaited __anext__\(\)/asend\(\)"):
gen2.send(None)
def test_async_gen_athrow_throw_concurrent_with_throw(self):
import types
@types.coroutine
def _async_yield(v):
return (yield v)
class MyExc(Exception):
pass
async def agenfn():
try:
yield
except MyExc:
pass
while True:
try:
await _async_yield(None)
except MyExc:
pass
agen = agenfn()
with self.assertRaises(StopIteration):
agen.asend(None).send(None)
gen = agen.athrow(MyExc)
gen.throw(MyExc)
gen2 = agen.athrow(None)
with self.assertRaisesRegex(RuntimeError,
r'athrow\(\): asynchronous generator is already running'):
gen2.throw(MyExc)
with self.assertRaisesRegex(RuntimeError,
r"cannot reuse already awaited aclose\(\)/athrow\(\)"):
gen2.send(None)
def test_async_gen_3_arg_deprecation_warning(self):
async def gen():
yield 123
with self.assertWarns(DeprecationWarning):
x = gen().athrow(GeneratorExit, GeneratorExit(), None)
with self.assertRaises(GeneratorExit):
x.send(None)
del x
gc_collect()
def test_async_gen_api_01(self):
async def gen():
yield 123
@@ -393,8 +567,57 @@ class AsyncGenTest(unittest.TestCase):
self.assertIsInstance(g.ag_frame, types.FrameType)
self.assertFalse(g.ag_running)
self.assertIsInstance(g.ag_code, types.CodeType)
aclose = g.aclose()
self.assertTrue(inspect.isawaitable(aclose))
aclose.close()
self.assertTrue(inspect.isawaitable(g.aclose()))
def test_async_gen_asend_close_runtime_error(self):
import types
@types.coroutine
def _async_yield(v):
return (yield v)
async def agenfn():
try:
await _async_yield(None)
except GeneratorExit:
await _async_yield(None)
return
yield
agen = agenfn()
gen = agen.asend(None)
gen.send(None)
with self.assertRaisesRegex(RuntimeError, "coroutine ignored GeneratorExit"):
gen.close()
def test_async_gen_athrow_close_runtime_error(self):
import types
@types.coroutine
def _async_yield(v):
return (yield v)
class MyExc(Exception):
pass
async def agenfn():
try:
yield
except MyExc:
try:
await _async_yield(None)
except GeneratorExit:
await _async_yield(None)
agen = agenfn()
with self.assertRaises(StopIteration):
agen.asend(None).send(None)
gen = agen.athrow(MyExc)
gen.send(None)
with self.assertRaisesRegex(RuntimeError, "coroutine ignored GeneratorExit"):
gen.close()
class AsyncGenAsyncioTest(unittest.TestCase):
@@ -406,7 +629,7 @@ class AsyncGenAsyncioTest(unittest.TestCase):
def tearDown(self):
self.loop.close()
self.loop = None
asyncio.set_event_loop_policy(None)
asyncio.events._set_event_loop_policy(None)
def check_async_iterator_anext(self, ait_class):
with self.subTest(anext="pure-Python"):
@@ -648,7 +871,7 @@ class AsyncGenAsyncioTest(unittest.TestCase):
agen = agenfn()
with contextlib.closing(anext(agen, "default").__await__()) as g:
self.assertEqual(g.send(None), 1)
self.assertEqual(g.throw(MyError, MyError(), None), 2)
self.assertEqual(g.throw(MyError()), 2)
try:
g.send(None)
except StopIteration as e:
@@ -661,9 +884,9 @@ class AsyncGenAsyncioTest(unittest.TestCase):
agen = agenfn()
with contextlib.closing(anext(agen, "default").__await__()) as g:
self.assertEqual(g.send(None), 1)
self.assertEqual(g.throw(MyError, MyError(), None), 2)
self.assertEqual(g.throw(MyError()), 2)
with self.assertRaises(MyError):
g.throw(MyError, MyError(), None)
g.throw(MyError())
def test3(anext):
agen = agenfn()
@@ -690,9 +913,9 @@ class AsyncGenAsyncioTest(unittest.TestCase):
agen = agenfn()
with contextlib.closing(anext(agen, "default").__await__()) as g:
self.assertEqual(g.send(None), 10)
self.assertEqual(g.throw(MyError, MyError(), None), 20)
self.assertEqual(g.throw(MyError()), 20)
with self.assertRaisesRegex(MyError, 'val'):
g.throw(MyError, MyError('val'), None)
g.throw(MyError('val'))
def test5(anext):
@types.coroutine
@@ -711,7 +934,7 @@ class AsyncGenAsyncioTest(unittest.TestCase):
with contextlib.closing(anext(agen, "default").__await__()) as g:
self.assertEqual(g.send(None), 10)
with self.assertRaisesRegex(StopIteration, 'default'):
g.throw(MyError, MyError(), None)
g.throw(MyError())
def test6(anext):
@types.coroutine
@@ -726,7 +949,7 @@ class AsyncGenAsyncioTest(unittest.TestCase):
agen = agenfn()
with contextlib.closing(anext(agen, "default").__await__()) as g:
with self.assertRaises(MyError):
g.throw(MyError, MyError(), None)
g.throw(MyError())
def run_test(test):
with self.subTest('pure-Python anext()'):
@@ -929,6 +1152,43 @@ class AsyncGenAsyncioTest(unittest.TestCase):
self.loop.run_until_complete(run())
def test_async_gen_asyncio_anext_tuple_no_exceptions(self):
# StopAsyncIteration exceptions should be cleared.
# See: https://github.com/python/cpython/issues/128078.
async def foo():
if False:
yield (1, 2)
async def run():
it = foo().__aiter__()
with self.assertRaises(StopAsyncIteration):
await it.__anext__()
res = await anext(it, ('a', 'b'))
self.assertTupleEqual(res, ('a', 'b'))
self.loop.run_until_complete(run())
def test_sync_anext_raises_exception(self):
# See: https://github.com/python/cpython/issues/131670
msg = 'custom'
for exc_type in [
StopAsyncIteration,
StopIteration,
ValueError,
Exception,
]:
exc = exc_type(msg)
with self.subTest(exc=exc):
class A:
def __anext__(self):
raise exc
with self.assertRaisesRegex(exc_type, msg):
anext(A())
with self.assertRaisesRegex(exc_type, msg):
anext(A(), 1)
def test_async_gen_asyncio_anext_stopiteration(self):
async def foo():
try:
@@ -1035,8 +1295,7 @@ class AsyncGenAsyncioTest(unittest.TestCase):
while True:
yield 1
finally:
await asyncio.sleep(0.01)
await asyncio.sleep(0.01)
await asyncio.sleep(0)
DONE = 1
async def run():
@@ -1046,7 +1305,10 @@ class AsyncGenAsyncioTest(unittest.TestCase):
del g
gc_collect() # For PyPy or other GCs.
await asyncio.sleep(0.1)
# Starts running the aclose task
await asyncio.sleep(0)
# For asyncio.sleep(0) in finally block
await asyncio.sleep(0)
self.loop.run_until_complete(run())
self.assertEqual(DONE, 1)
@@ -1539,6 +1801,8 @@ class AsyncGenAsyncioTest(unittest.TestCase):
self.assertIsInstance(message['exception'], ZeroDivisionError)
self.assertIn('unhandled exception during asyncio.run() shutdown',
message['message'])
del message, messages
gc_collect()
def test_async_gen_expression_01(self):
async def arange(n):
@@ -1556,21 +1820,35 @@ class AsyncGenAsyncioTest(unittest.TestCase):
res = self.loop.run_until_complete(run())
self.assertEqual(res, [i * 2 for i in range(10)])
# TODO: RUSTPYTHON: async for gen expression compilation
# def test_async_gen_expression_02(self):
# async def wrap(n):
# await asyncio.sleep(0.01)
# return n
def test_async_gen_expression_02(self):
async def wrap(n):
await asyncio.sleep(0.01)
return n
# def make_arange(n):
# # This syntax is legal starting with Python 3.7
# return (i * 2 for i in range(n) if await wrap(i))
def make_arange(n):
# This syntax is legal starting with Python 3.7
return (i * 2 for i in range(n) if await wrap(i))
# async def run():
# return [i async for i in make_arange(10)]
async def run():
return [i async for i in make_arange(10)]
# res = self.loop.run_until_complete(run())
# self.assertEqual(res, [i * 2 for i in range(1, 10)])
res = self.loop.run_until_complete(run())
self.assertEqual(res, [i * 2 for i in range(1, 10)])
@unittest.expectedFailure # TODO: RUSTPYTHON; AttributeError: __aiter__
def test_async_gen_expression_incorrect(self):
async def ag():
yield 42
async def run(arg):
(x async for x in arg)
err_msg_async = "'async for' requires an object with " \
"__aiter__ method, got .*"
self.loop.run_until_complete(run(ag()))
with self.assertRaisesRegex(TypeError, err_msg_async):
self.loop.run_until_complete(run(None))
def test_asyncgen_nonstarted_hooks_are_cancellable(self):
# See https://bugs.python.org/issue38013
@@ -1593,6 +1871,7 @@ class AsyncGenAsyncioTest(unittest.TestCase):
asyncio.run(main())
self.assertEqual([], messages)
gc_collect()
def test_async_gen_await_same_anext_coro_twice(self):
async def async_iterate():
@@ -1630,6 +1909,62 @@ class AsyncGenAsyncioTest(unittest.TestCase):
self.loop.run_until_complete(run())
def test_async_gen_throw_same_aclose_coro_twice(self):
async def async_iterate():
yield 1
yield 2
it = async_iterate()
nxt = it.aclose()
with self.assertRaises(StopIteration):
nxt.throw(GeneratorExit)
with self.assertRaisesRegex(
RuntimeError,
r"cannot reuse already awaited aclose\(\)/athrow\(\)"
):
nxt.throw(GeneratorExit)
def test_async_gen_throw_custom_same_aclose_coro_twice(self):
async def async_iterate():
yield 1
yield 2
it = async_iterate()
class MyException(Exception):
pass
nxt = it.aclose()
with self.assertRaises(MyException):
nxt.throw(MyException)
with self.assertRaisesRegex(
RuntimeError,
r"cannot reuse already awaited aclose\(\)/athrow\(\)"
):
nxt.throw(MyException)
def test_async_gen_throw_custom_same_athrow_coro_twice(self):
async def async_iterate():
yield 1
yield 2
it = async_iterate()
class MyException(Exception):
pass
nxt = it.athrow(MyException)
with self.assertRaises(MyException):
nxt.throw(MyException)
with self.assertRaisesRegex(
RuntimeError,
r"cannot reuse already awaited aclose\(\)/athrow\(\)"
):
nxt.throw(MyException)
def test_async_gen_aclose_twice_with_different_coros(self):
# Regression test for https://bugs.python.org/issue39606
async def async_iterate():
@@ -1672,5 +2007,109 @@ class AsyncGenAsyncioTest(unittest.TestCase):
self.loop.run_until_complete(run())
class TestUnawaitedWarnings(unittest.TestCase):
@unittest.expectedFailure # TODO: RUSTPYTHON; AssertionError: RuntimeWarning not triggered
def test_asend(self):
async def gen():
yield 1
# gh-113753: asend objects allocated from a free-list should warn.
# Ensure there is a finalized 'asend' object ready to be reused.
try:
g = gen()
g.asend(None).send(None)
except StopIteration:
pass
msg = f"coroutine method 'asend' of '{gen.__qualname__}' was never awaited"
with self.assertWarnsRegex(RuntimeWarning, msg):
g = gen()
g.asend(None)
gc_collect()
@unittest.expectedFailure # TODO: RUSTPYTHON; AssertionError: RuntimeWarning not triggered
def test_athrow(self):
async def gen():
yield 1
msg = f"coroutine method 'athrow' of '{gen.__qualname__}' was never awaited"
with self.assertWarnsRegex(RuntimeWarning, msg):
g = gen()
g.athrow(RuntimeError)
gc_collect()
@unittest.expectedFailure # TODO: RUSTPYTHON; AssertionError: RuntimeWarning not triggered
def test_aclose(self):
async def gen():
yield 1
msg = f"coroutine method 'aclose' of '{gen.__qualname__}' was never awaited"
with self.assertWarnsRegex(RuntimeWarning, msg):
g = gen()
g.aclose()
gc_collect()
def test_aclose_throw(self):
async def gen():
return
yield
class MyException(Exception):
pass
g = gen()
with self.assertRaises(MyException):
g.aclose().throw(MyException)
del g
gc_collect() # does not warn unawaited
def test_asend_send_already_running(self):
@types.coroutine
def _async_yield(v):
return (yield v)
async def agenfn():
while True:
await _async_yield(1)
return
yield
agen = agenfn()
gen = agen.asend(None)
gen.send(None)
gen2 = agen.asend(None)
with self.assertRaisesRegex(RuntimeError,
r'anext\(\): asynchronous generator is already running'):
gen2.send(None)
del gen2
gc_collect() # does not warn unawaited
def test_athrow_send_already_running(self):
@types.coroutine
def _async_yield(v):
return (yield v)
async def agenfn():
while True:
await _async_yield(1)
return
yield
agen = agenfn()
gen = agen.asend(None)
gen.send(None)
gen2 = agen.athrow(Exception)
with self.assertRaisesRegex(RuntimeError,
r'athrow\(\): asynchronous generator is already running'):
gen2.send(None)
del gen2
gc_collect() # does not warn unawaited
if __name__ == "__main__":
unittest.main()