mirror of
https://github.com/RustPython/RustPython.git
synced 2026-06-02 19:39:49 +09:00
* Mark 3.12 * Update importlib from Python 3.12.0 * Update test_importlib from Python3.12 * Mark failings tests from importlib * Update test.support from Python3.12 * Fix unsupported parser feature * mark failing test * Update functools from Python 3.12 * manual type annotation * slice behavior changed in 3.12 * empty unittest.main returns non-zero * test_decimal from CPython 3.12 * Mark failing tests * Update test_unicode from CPython 3.12 * Update test_functools from Python 3.12 * Update enum from Python 3.12 * enum * Doc format changed * Update test_module from CPython --------- Co-authored-by: CPython developers <>
133 lines
2.7 KiB
Python
133 lines
2.7 KiB
Python
import sys
|
|
import asyncio
|
|
import unittest
|
|
|
|
|
|
class ContextManager:
|
|
async def __aenter__(self):
|
|
print("Entrada")
|
|
ls.append(1)
|
|
return 1
|
|
|
|
def __str__(self):
|
|
ls.append(2)
|
|
return "c'est moi!"
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
ls.append(3)
|
|
print("Wiedersehen")
|
|
|
|
|
|
class AIterWrap:
|
|
def __init__(self, obj):
|
|
self._it = iter(obj)
|
|
|
|
def __aiter__(self):
|
|
return self
|
|
|
|
async def __anext__(self):
|
|
try:
|
|
value = next(self._it)
|
|
except StopIteration:
|
|
raise StopAsyncIteration
|
|
return value
|
|
|
|
SLEEP_UNIT = 0.1
|
|
|
|
async def a(s, m):
|
|
async with ContextManager() as b:
|
|
print(f"val = {b}")
|
|
await asyncio.sleep(s)
|
|
async for i in AIterWrap(range(0, 2)):
|
|
print(i)
|
|
ls.append(m)
|
|
await asyncio.sleep(SLEEP_UNIT)
|
|
|
|
|
|
async def run():
|
|
tasks = [
|
|
asyncio.create_task(c)
|
|
for c in [a(SLEEP_UNIT * 0, "hello1"), a(SLEEP_UNIT * 1, "hello2"), a(SLEEP_UNIT * 2, "hello3"), a(SLEEP_UNIT * 3, "hello4")]
|
|
]
|
|
await asyncio.wait(tasks)
|
|
|
|
expected = [
|
|
1,
|
|
3,
|
|
1,
|
|
3,
|
|
1,
|
|
3,
|
|
1,
|
|
3,
|
|
"hello1",
|
|
"hello2",
|
|
"hello1",
|
|
"hello3",
|
|
"hello2",
|
|
"hello4",
|
|
"hello3",
|
|
"hello4",
|
|
]
|
|
ls = []
|
|
|
|
def test():
|
|
global SLEEP_UNIT
|
|
if sys.platform.startswith("win"):
|
|
SLEEP_UNIT *= 2
|
|
ls.clear()
|
|
asyncio.run(run(), debug=True)
|
|
assert ls == expected
|
|
|
|
|
|
for i in reversed(range(10)):
|
|
try:
|
|
test()
|
|
break
|
|
except AssertionError:
|
|
if i == 0:
|
|
raise
|
|
|
|
|
|
if sys.version_info < (3, 11, 0):
|
|
|
|
class TestAsyncWith(unittest.TestCase):
|
|
def testAenterAttributeError1(self):
|
|
class LacksAenter(object):
|
|
async def __aexit__(self, *exc):
|
|
pass
|
|
|
|
async def foo():
|
|
async with LacksAenter():
|
|
pass
|
|
|
|
with self.assertRaisesRegex(AttributeError, "__aenter__"):
|
|
foo().send(None)
|
|
|
|
def testAenterAttributeError2(self):
|
|
class LacksAenterAndAexit(object):
|
|
pass
|
|
|
|
async def foo():
|
|
async with LacksAenterAndAexit():
|
|
pass
|
|
|
|
with self.assertRaisesRegex(AttributeError, "__aenter__"):
|
|
foo().send(None)
|
|
|
|
def testAexitAttributeError(self):
|
|
class LacksAexit(object):
|
|
async def __aenter__(self):
|
|
pass
|
|
|
|
async def foo():
|
|
async with LacksAexit():
|
|
pass
|
|
|
|
with self.assertRaisesRegex(AttributeError, "__aexit__"):
|
|
foo().send(None)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|