Compare commits

...

21 Commits

Author SHA1 Message Date
Noa
180746467e Merge pull request #5744 from coolreader18/upd-malachite
Update to malachite 0.6
2025-04-27 23:24:26 -05:00
Ashwin Naren
f55bf8f83b fix openssl error reasons (#5739) 2025-04-28 12:30:14 +09:00
Jeong, YunWon
ff10a64727 Fix test_poll::test_poll3 (#5718)
* test_poll3

* Refactor EventMask
2025-04-28 12:29:21 +09:00
Ashwin Naren
5561b6ead4 Re-add ssl feature to cron-ci.yaml for whats-left (#5750) 2025-04-28 12:20:42 +09:00
Ashwin Naren
392d1c04f6 More overlapped implementation (#5748) 2025-04-27 19:48:06 +09:00
Ashwin Naren
d46bcd9291 typing upgrade to 3.13.2 (#5590) 2025-04-27 15:26:37 +09:00
Ashwin Naren
ca496fb3b1 Collect whats_left data with sqlite feature
This fixes the output on the website, and correctly shows `_sqlite3` as implemented
2025-04-27 15:24:18 +09:00
Noa
7aad6e03e3 Update to malachite 0.6 2025-04-25 00:22:26 -05:00
Ashwin Naren
c97f4d1daf Failure marker (#5695)
* initial auto-upgrader

* platform support

* use ast walking

* detect testname automatically

* handle classes properly

* add instructions to fix_test.py
2025-04-24 13:28:38 +09:00
Jeong, YunWon
7d2a7a0e35 Merge pull request #5731 from arihant2math/pprint-313 2025-04-22 14:18:35 +09:00
Ashwin Naren
92e72aabdc update graphlib to 3.13.3 2025-04-21 21:21:10 -07:00
Ashwin Naren
8603cd9387 update heapq to 3.13.3 2025-04-21 21:12:12 -07:00
Ashwin Naren
1c64bde0ee update sched to 3.13.3 2025-04-21 21:11:42 -07:00
Ashwin Naren
70f3aec552 update linecache to 3.13.3 2025-04-21 21:11:42 -07:00
Ashwin Naren
6567d1d6ec update queue to 3.13.3 2025-04-21 20:48:13 -07:00
Ashwin Naren
a5214a0de7 update colorsys to 3.13.3 2025-04-21 20:45:06 -07:00
Ashwin Naren
a85a84330f update pprint to 3.13.3 2025-04-21 20:43:43 -07:00
Noa
494918d9fe Remove cfg_attr features for redox 2025-04-22 12:00:57 +09:00
Noa
3bfafb0ecb Merge pull request #5720 from youknowone/wasip2-build
basic wasip2 support
2025-04-21 13:39:41 -05:00
Reagan Bohan
ecbc6f7044 Fix mmap aborting with invalid fd in debug mode 2025-04-21 20:40:44 +09:00
Jeong YunWon
e434ff5f6e basic wasip2 support 2025-04-20 19:47:35 +09:00
39 changed files with 8768 additions and 2203 deletions

View File

@@ -74,6 +74,8 @@ unic
unistd
unraw
unsync
wasip1
wasip2
wasmbind
wasmtime
widestring

View File

@@ -84,7 +84,7 @@ jobs:
- name: Collect what is left data
run: |
chmod +x ./whats_left.py
./whats_left.py > whats_left.temp
./whats_left.py --features "ssl,sqlite" > whats_left.temp
env:
RUSTPYTHONPATH: ${{ github.workspace }}/Lib
- name: Upload data to the website

59
Cargo.lock generated
View File

@@ -725,27 +725,6 @@ dependencies = [
"memchr",
]
[[package]]
name = "derive_more"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a9b99b9cbbe49445b21764dc0625032a89b145a2642e67603e1c936f5458d05"
dependencies = [
"derive_more-impl",
]
[[package]]
name = "derive_more-impl"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb7330aeadfbe296029522e6c40f315320aba36fc43a5b3632f3795348f3bd22"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.100",
"unicode-xid",
]
[[package]]
name = "digest"
version = "0.10.7"
@@ -1184,15 +1163,6 @@ dependencies = [
"either",
]
[[package]]
name = "itertools"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57"
dependencies = [
"either",
]
[[package]]
name = "itertools"
version = "0.13.0"
@@ -1457,23 +1427,22 @@ dependencies = [
[[package]]
name = "malachite-base"
version = "0.5.1"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5063891d2cec8fd20cabccbd3fc277bf8d5666f481fb3f79d999559b39a62713"
checksum = "554bcf7f816ff3c1eae8f2b95c4375156884c79988596a6d01b7b070710fa9e5"
dependencies = [
"hashbrown",
"itertools 0.11.0",
"itertools 0.14.0",
"libm",
"ryu",
]
[[package]]
name = "malachite-bigint"
version = "0.5.1"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b1b1fec8b370139968919a5b77071c94b282eaba3da1cf179ae5299060d4e75"
checksum = "df1acde414186498b2a6a1e271f8ce5d65eaa5c492e95271121f30718fe2f925"
dependencies = [
"derive_more",
"malachite-base",
"malachite-nz",
"num-integer",
@@ -1483,22 +1452,22 @@ dependencies = [
[[package]]
name = "malachite-nz"
version = "0.5.1"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "175263cd5b846c552b9afb9d4b03ca465b4ad10717d789cad7dac24441c4fcbe"
checksum = "f43d406336c42a59e07813b57efd651db00118af84c640a221d666964b2ec71f"
dependencies = [
"itertools 0.11.0",
"itertools 0.14.0",
"libm",
"malachite-base",
]
[[package]]
name = "malachite-q"
version = "0.5.1"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5261ba8feb1ad20cddab3d625af28206c663c08014b2e5c5f9bd54b0f230234f"
checksum = "25911a58ea0426e0b7bb1dffc8324e82711c82abff868b8523ae69d8a47e8062"
dependencies = [
"itertools 0.11.0",
"itertools 0.14.0",
"malachite-base",
"malachite-nz",
]
@@ -3230,12 +3199,6 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fc81956842c57dac11422a97c3b8195a1ff727f06e85c84ed2e8aa277c9a0fd"
[[package]]
name = "unicode-xid"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853"
[[package]]
name = "unicode_names2"
version = "1.3.0"

View File

@@ -171,9 +171,9 @@ libc = "0.2.169"
libffi = "3.2"
log = "0.4.27"
nix = { version = "0.29", features = ["fs", "user", "process", "term", "time", "signal", "ioctl", "socket", "sched", "zerocopy", "dir", "hostname", "net", "poll"] }
malachite-bigint = "0.5"
malachite-q = "0.5"
malachite-base = "0.5"
malachite-bigint = "0.6"
malachite-q = "0.6"
malachite-base = "0.6"
memchr = "2.7.4"
num-complex = "0.4.6"
num-integer = "0.1.46"

6
Lib/_py_abc.py vendored
View File

@@ -33,6 +33,8 @@ class ABCMeta(type):
_abc_invalidation_counter = 0
def __new__(mcls, name, bases, namespace, /, **kwargs):
# TODO: RUSTPYTHON remove this line (prevents duplicate bases)
bases = tuple(dict.fromkeys(bases))
cls = super().__new__(mcls, name, bases, namespace, **kwargs)
# Compute set of abstract method names
abstracts = {name
@@ -98,8 +100,8 @@ class ABCMeta(type):
subtype = type(instance)
if subtype is subclass:
if (cls._abc_negative_cache_version ==
ABCMeta._abc_invalidation_counter and
subclass in cls._abc_negative_cache):
ABCMeta._abc_invalidation_counter and
subclass in cls._abc_negative_cache):
return False
# Fall back to the subclass check.
return cls.__subclasscheck__(subclass)

2
Lib/colorsys.py vendored
View File

@@ -24,7 +24,7 @@ HSV: Hue, Saturation, Value
__all__ = ["rgb_to_yiq","yiq_to_rgb","rgb_to_hls","hls_to_rgb",
"rgb_to_hsv","hsv_to_rgb"]
# Some floating point constants
# Some floating-point constants
ONE_THIRD = 1.0/3.0
ONE_SIXTH = 1.0/6.0

2
Lib/graphlib.py vendored
View File

@@ -154,7 +154,7 @@ class TopologicalSorter:
This method unblocks any successor of each node in *nodes* for being returned
in the future by a call to "get_ready".
Raises :exec:`ValueError` if any node in *nodes* has already been marked as
Raises ValueError if any node in *nodes* has already been marked as
processed by a previous call to this method, if a node was not added to the
graph by using "add" or if called without calling "prepare" previously or if
node has not yet been returned by "get_ready".

91
Lib/linecache.py vendored
View File

@@ -5,17 +5,13 @@ is not found, it will look down the module search path for a file by
that name.
"""
import functools
import sys
import os
import tokenize
__all__ = ["getline", "clearcache", "checkcache", "lazycache"]
# The cache. Maps filenames to either a thunk which will provide source code,
# or a tuple (size, mtime, lines, fullname) once loaded.
cache = {}
_interactive_cache = {}
def clearcache():
@@ -49,28 +45,54 @@ def getlines(filename, module_globals=None):
return []
def _getline_from_code(filename, lineno):
lines = _getlines_from_code(filename)
if 1 <= lineno <= len(lines):
return lines[lineno - 1]
return ''
def _make_key(code):
return (code.co_filename, code.co_qualname, code.co_firstlineno)
def _getlines_from_code(code):
code_id = _make_key(code)
if code_id in _interactive_cache:
entry = _interactive_cache[code_id]
if len(entry) != 1:
return _interactive_cache[code_id][2]
return []
def checkcache(filename=None):
"""Discard cache entries that are out of date.
(This is not checked upon each call!)"""
if filename is None:
filenames = list(cache.keys())
elif filename in cache:
filenames = [filename]
# get keys atomically
filenames = cache.copy().keys()
else:
return
filenames = [filename]
for filename in filenames:
entry = cache[filename]
try:
entry = cache[filename]
except KeyError:
continue
if len(entry) == 1:
# lazy cache entry, leave it lazy.
continue
size, mtime, lines, fullname = entry
if mtime is None:
continue # no-op for files loaded via a __loader__
try:
# This import can fail if the interpreter is shutting down
import os
except ImportError:
return
try:
stat = os.stat(fullname)
except OSError:
except (OSError, ValueError):
cache.pop(filename, None)
continue
if size != stat.st_size or mtime != stat.st_mtime:
@@ -82,6 +104,17 @@ def updatecache(filename, module_globals=None):
If something's wrong, print a message, discard the cache entry,
and return an empty list."""
# These imports are not at top level because linecache is in the critical
# path of the interpreter startup and importing os and sys take a lot of time
# and slows down the startup sequence.
try:
import os
import sys
import tokenize
except ImportError:
# These import can fail if the interpreter is shutting down
return []
if filename in cache:
if len(cache[filename]) != 1:
cache.pop(filename, None)
@@ -128,16 +161,20 @@ def updatecache(filename, module_globals=None):
try:
stat = os.stat(fullname)
break
except OSError:
except (OSError, ValueError):
pass
else:
return []
except ValueError: # may be raised by os.stat()
return []
try:
with tokenize.open(fullname) as fp:
lines = fp.readlines()
except (OSError, UnicodeDecodeError, SyntaxError):
return []
if lines and not lines[-1].endswith('\n'):
if not lines:
lines = ['\n']
elif not lines[-1].endswith('\n'):
lines[-1] += '\n'
size, mtime = stat.st_size, stat.st_mtime
cache[filename] = size, mtime, lines, fullname
@@ -166,17 +203,29 @@ def lazycache(filename, module_globals):
return False
# Try for a __loader__, if available
if module_globals and '__name__' in module_globals:
name = module_globals['__name__']
if (loader := module_globals.get('__loader__')) is None:
if spec := module_globals.get('__spec__'):
try:
loader = spec.loader
except AttributeError:
pass
spec = module_globals.get('__spec__')
name = getattr(spec, 'name', None) or module_globals['__name__']
loader = getattr(spec, 'loader', None)
if loader is None:
loader = module_globals.get('__loader__')
get_source = getattr(loader, 'get_source', None)
if name and get_source:
get_lines = functools.partial(get_source, name)
def get_lines(name=name, *args, **kwargs):
return get_source(name, *args, **kwargs)
cache[filename] = (get_lines,)
return True
return False
def _register_code(code, string, name):
entry = (len(string),
None,
[line + '\n' for line in string.splitlines()],
name)
stack = [code]
while stack:
code = stack.pop()
for const in code.co_consts:
if isinstance(const, type(code)):
stack.append(const)
_interactive_cache[_make_key(code)] = entry

3
Lib/pprint.py vendored
View File

@@ -128,6 +128,9 @@ class PrettyPrinter:
sort_dicts
If true, dict keys are sorted.
underscore_numbers
If true, digit groups are separated with underscores.
"""
indent = int(indent)
width = int(width)

60
Lib/queue.py vendored
View File

@@ -10,7 +10,15 @@ try:
except ImportError:
SimpleQueue = None
__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue', 'SimpleQueue']
__all__ = [
'Empty',
'Full',
'ShutDown',
'Queue',
'PriorityQueue',
'LifoQueue',
'SimpleQueue',
]
try:
@@ -25,6 +33,10 @@ class Full(Exception):
pass
class ShutDown(Exception):
'''Raised when put/get with shut-down queue.'''
class Queue:
'''Create a queue object with a given maximum size.
@@ -54,6 +66,9 @@ class Queue:
self.all_tasks_done = threading.Condition(self.mutex)
self.unfinished_tasks = 0
# Queue shutdown state
self.is_shutdown = False
def task_done(self):
'''Indicate that a formerly enqueued task is complete.
@@ -65,6 +80,9 @@ class Queue:
have been processed (meaning that a task_done() call was received
for every item that had been put() into the queue).
shutdown(immediate=True) calls task_done() for each remaining item in
the queue.
Raises a ValueError if called more times than there were items
placed in the queue.
'''
@@ -129,8 +147,12 @@ class Queue:
Otherwise ('block' is false), put an item on the queue if a free slot
is immediately available, else raise the Full exception ('timeout'
is ignored in that case).
Raises ShutDown if the queue has been shut down.
'''
with self.not_full:
if self.is_shutdown:
raise ShutDown
if self.maxsize > 0:
if not block:
if self._qsize() >= self.maxsize:
@@ -138,6 +160,8 @@ class Queue:
elif timeout is None:
while self._qsize() >= self.maxsize:
self.not_full.wait()
if self.is_shutdown:
raise ShutDown
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
@@ -147,6 +171,8 @@ class Queue:
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
if self.is_shutdown:
raise ShutDown
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()
@@ -161,14 +187,21 @@ class Queue:
Otherwise ('block' is false), return an item if one is immediately
available, else raise the Empty exception ('timeout' is ignored
in that case).
Raises ShutDown if the queue has been shut down and is empty,
or if the queue has been shut down immediately.
'''
with self.not_empty:
if self.is_shutdown and not self._qsize():
raise ShutDown
if not block:
if not self._qsize():
raise Empty
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
if self.is_shutdown and not self._qsize():
raise ShutDown
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
@@ -178,6 +211,8 @@ class Queue:
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
if self.is_shutdown and not self._qsize():
raise ShutDown
item = self._get()
self.not_full.notify()
return item
@@ -198,6 +233,29 @@ class Queue:
'''
return self.get(block=False)
def shutdown(self, immediate=False):
'''Shut-down the queue, making queue gets and puts raise ShutDown.
By default, gets will only raise once the queue is empty. Set
'immediate' to True to make gets raise immediately instead.
All blocked callers of put() and get() will be unblocked. If
'immediate', a task is marked as done for each item remaining in
the queue, which may unblock callers of join().
'''
with self.mutex:
self.is_shutdown = True
if immediate:
while self._qsize():
self._get()
if self.unfinished_tasks > 0:
self.unfinished_tasks -= 1
# release all blocked threads in `join()`
self.all_tasks_done.notify_all()
# All getters need to re-check queue-empty to raise ShutDown
self.not_empty.notify_all()
self.not_full.notify_all()
# Override these methods to implement other queue organizations
# (e.g. stack or priority queue).
# These will only be called with appropriate locks held

2
Lib/sched.py vendored
View File

@@ -11,7 +11,7 @@ substituting time and sleep from built-in module time, or you can
implement simulated time by writing your own functions. This can
also be used to integrate scheduling with STDWIN events; the delay
function is allowed to modify the queue. Time can be expressed as
integers or floating point numbers, as long as it is consistent.
integers or floating-point numbers, as long as it is consistent.
Events are specified by tuples (time, priority, action, argument, kwargs).
As in UNIX, lower priority numbers mean higher priority; in this

View File

@@ -1,6 +1,63 @@
from math import copysign, isnan
class ExtraAssertions:
def assertIsSubclass(self, cls, superclass, msg=None):
if issubclass(cls, superclass):
return
standardMsg = f'{cls!r} is not a subclass of {superclass!r}'
self.fail(self._formatMessage(msg, standardMsg))
def assertNotIsSubclass(self, cls, superclass, msg=None):
if not issubclass(cls, superclass):
return
standardMsg = f'{cls!r} is a subclass of {superclass!r}'
self.fail(self._formatMessage(msg, standardMsg))
def assertHasAttr(self, obj, name, msg=None):
if not hasattr(obj, name):
if isinstance(obj, types.ModuleType):
standardMsg = f'module {obj.__name__!r} has no attribute {name!r}'
elif isinstance(obj, type):
standardMsg = f'type object {obj.__name__!r} has no attribute {name!r}'
else:
standardMsg = f'{type(obj).__name__!r} object has no attribute {name!r}'
self.fail(self._formatMessage(msg, standardMsg))
def assertNotHasAttr(self, obj, name, msg=None):
if hasattr(obj, name):
if isinstance(obj, types.ModuleType):
standardMsg = f'module {obj.__name__!r} has unexpected attribute {name!r}'
elif isinstance(obj, type):
standardMsg = f'type object {obj.__name__!r} has unexpected attribute {name!r}'
else:
standardMsg = f'{type(obj).__name__!r} object has unexpected attribute {name!r}'
self.fail(self._formatMessage(msg, standardMsg))
def assertStartsWith(self, s, prefix, msg=None):
if s.startswith(prefix):
return
standardMsg = f"{s!r} doesn't start with {prefix!r}"
self.fail(self._formatMessage(msg, standardMsg))
def assertNotStartsWith(self, s, prefix, msg=None):
if not s.startswith(prefix):
return
self.fail(self._formatMessage(msg, f"{s!r} starts with {prefix!r}"))
def assertEndsWith(self, s, suffix, msg=None):
if s.endswith(suffix):
return
standardMsg = f"{s!r} doesn't end with {suffix!r}"
self.fail(self._formatMessage(msg, standardMsg))
def assertNotEndsWith(self, s, suffix, msg=None):
if not s.endswith(suffix):
return
self.fail(self._formatMessage(msg, f"{s!r} ends with {suffix!r}"))
class ExceptionIsLikeMixin:
def assertExceptionIsLike(self, exc, template):
"""

View File

@@ -1906,6 +1906,8 @@ class TestCase(unittest.TestCase):
c = Alias(10, 1.0)
self.assertEqual(c.new_method(), 1.0)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_generic_dynamic(self):
T = TypeVar('T')
@@ -3250,6 +3252,8 @@ class TestStringAnnotations(unittest.TestCase):
# won't exist on the instance.
self.assertNotIn('not_iv4', c.__dict__)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_text_annotations(self):
from test import dataclass_textanno

View File

@@ -15,8 +15,6 @@ class TestExceptionGroupTypeHierarchy(unittest.TestCase):
with self.assertRaisesRegex(TypeError, 'Exception'):
Exception[OSError]
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_exception_group_is_generic_type(self):
E = OSError
self.assertIsInstance(ExceptionGroup[E], types.GenericAlias)

View File

@@ -442,8 +442,6 @@ class AnnotationsFutureTestCase(unittest.TestCase):
def bar(arg: (yield)): pass
"""))
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_get_type_hints_on_func_with_variadic_arg(self):
# `typing.get_type_hints` might break on a function with a variadic
# annotation (e.g. `f(*args: *Ts)`) if `from __future__ import

View File

@@ -173,6 +173,8 @@ class BaseTest(unittest.TestCase):
self.assertEqual(a.__args__, (int,))
self.assertEqual(a.__parameters__, ())
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_parameters(self):
from typing import List, Dict, Callable
D0 = dict[str, int]
@@ -212,6 +214,8 @@ class BaseTest(unittest.TestCase):
self.assertEqual(L5.__args__, (Callable[[K, V], K],))
self.assertEqual(L5.__parameters__, (K, V))
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_parameter_chaining(self):
from typing import List, Dict, Union, Callable
self.assertEqual(list[T][int], list[int])
@@ -271,6 +275,8 @@ class BaseTest(unittest.TestCase):
with self.assertRaises(TypeError):
MyType[int]
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_pickle(self):
alias = GenericAlias(list, T)
for proto in range(pickle.HIGHEST_PROTOCOL + 1):
@@ -280,6 +286,8 @@ class BaseTest(unittest.TestCase):
self.assertEqual(loaded.__args__, alias.__args__)
self.assertEqual(loaded.__parameters__, alias.__parameters__)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_copy(self):
class X(list):
def __copy__(self):
@@ -303,6 +311,8 @@ class BaseTest(unittest.TestCase):
self.assertEqual(a.__args__, (list[int], list[str]))
self.assertEqual(a.__parameters__, ())
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_union_generic(self):
a = typing.Union[list[T], tuple[T, ...]]
self.assertEqual(a.__args__, (list[T], tuple[T, ...]))

View File

@@ -4,7 +4,6 @@ import random
import unittest
import doctest
from test import support
from test.support import import_helper
from unittest import TestCase, skipUnless
from operator import itemgetter

View File

@@ -1695,8 +1695,6 @@ class HTTPSTest(TestCase):
h = client.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30)
self.assertEqual(h.timeout, 30)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_networked(self):
# Default settings: requires a valid cert from a trusted CA
import ssl
@@ -1769,8 +1767,6 @@ class HTTPSTest(TestCase):
h.close()
self.assertIn('nginx', server_string)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_networked_bad_cert(self):
# We feed a "CA" cert that is unrelated to the server's cert
import ssl

View File

@@ -5,8 +5,10 @@ import unittest
import os.path
import tempfile
import tokenize
from importlib.machinery import ModuleSpec
from test import support
from test.support import os_helper
from test.support.script_helper import assert_python_ok
FILENAME = linecache.__file__
@@ -82,6 +84,10 @@ class GetLineTestsBadData(TempFile):
class EmptyFile(GetLineTestsGoodData, unittest.TestCase):
file_list = []
def test_getlines(self):
lines = linecache.getlines(self.file_name)
self.assertEqual(lines, ['\n'])
class SingleEmptyLine(GetLineTestsGoodData, unittest.TestCase):
file_list = ['\n']
@@ -97,6 +103,16 @@ class BadUnicode_WithDeclaration(GetLineTestsBadData, unittest.TestCase):
file_byte_string = b'# coding=utf-8\n\x80abc'
class FakeLoader:
def get_source(self, fullname):
return f'source for {fullname}'
class NoSourceLoader:
def get_source(self, fullname):
return None
class LineCacheTests(unittest.TestCase):
def test_getline(self):
@@ -238,6 +254,70 @@ class LineCacheTests(unittest.TestCase):
self.assertEqual(lines3, [])
self.assertEqual(linecache.getlines(FILENAME), lines)
def test_loader(self):
filename = 'scheme://path'
for loader in (None, object(), NoSourceLoader()):
linecache.clearcache()
module_globals = {'__name__': 'a.b.c', '__loader__': loader}
self.assertEqual(linecache.getlines(filename, module_globals), [])
linecache.clearcache()
module_globals = {'__name__': 'a.b.c', '__loader__': FakeLoader()}
self.assertEqual(linecache.getlines(filename, module_globals),
['source for a.b.c\n'])
for spec in (None, object(), ModuleSpec('', FakeLoader())):
linecache.clearcache()
module_globals = {'__name__': 'a.b.c', '__loader__': FakeLoader(),
'__spec__': spec}
self.assertEqual(linecache.getlines(filename, module_globals),
['source for a.b.c\n'])
linecache.clearcache()
spec = ModuleSpec('x.y.z', FakeLoader())
module_globals = {'__name__': 'a.b.c', '__loader__': spec.loader,
'__spec__': spec}
self.assertEqual(linecache.getlines(filename, module_globals),
['source for x.y.z\n'])
def test_invalid_names(self):
for name, desc in [
('\x00', 'NUL bytes filename'),
(__file__ + '\x00', 'filename with embedded NUL bytes'),
# A filename with surrogate codes. A UnicodeEncodeError is raised
# by os.stat() upon querying, which is a subclass of ValueError.
("\uD834\uDD1E.py", 'surrogate codes (MUSICAL SYMBOL G CLEF)'),
# For POSIX platforms, an OSError will be raised but for Windows
# platforms, a ValueError is raised due to the path_t converter.
# See: https://github.com/python/cpython/issues/122170
('a' * 1_000_000, 'very long filename'),
]:
with self.subTest(f'updatecache: {desc}'):
linecache.clearcache()
lines = linecache.updatecache(name)
self.assertListEqual(lines, [])
self.assertNotIn(name, linecache.cache)
# hack into the cache (it shouldn't be allowed
# but we never know what people do...)
for key, fullname in [(name, 'ok'), ('key', name), (name, name)]:
with self.subTest(f'checkcache: {desc}',
key=key, fullname=fullname):
linecache.clearcache()
linecache.cache[key] = (0, 1234, [], fullname)
linecache.checkcache(key)
self.assertNotIn(key, linecache.cache)
# just to be sure that we did not mess with cache
linecache.clearcache()
def test_linecache_python_string(self):
cmdline = "import linecache;assert len(linecache.cache) == 0"
retcode, stdout, stderr = assert_python_ok('-c', cmdline)
self.assertEqual(retcode, 0)
self.assertEqual(stdout, b'')
self.assertEqual(stderr, b'')
class LineCacheInvalidationTests(unittest.TestCase):
def setUp(self):

View File

@@ -152,8 +152,6 @@ class PollTests(unittest.TestCase):
else:
self.fail('Unexpected return value from select.poll: %s' % fdlist)
# TODO: RUSTPYTHON int overflow
@unittest.expectedFailure
def test_poll3(self):
# test int overflow
pollster = select.poll()

View File

@@ -7,8 +7,8 @@ import io
import itertools
import pprint
import random
import re
import test.support
import test.test_set
import types
import unittest
@@ -535,7 +535,10 @@ AdvancedNamespace(the=0,
def test_dataclass_no_repr(self):
dc = dataclass3()
formatted = pprint.pformat(dc, width=10)
self.assertRegex(formatted, r"<test.test_pprint.dataclass3 object at \w+>")
self.assertRegex(
formatted,
fr"<{re.escape(__name__)}.dataclass3 object at \w+>",
)
def test_recursive_dataclass(self):
dc = dataclass4(None)
@@ -619,9 +622,6 @@ frozenset2({0,
self.assertEqual(pprint.pformat(frozenset3(range(7)), width=20),
'frozenset3({0, 1, 2, 3, 4, 5, 6})')
@unittest.expectedFailure
#See http://bugs.python.org/issue13907
@test.support.cpython_only
def test_set_of_sets_reprs(self):
# This test creates a complex arrangement of frozensets and
# compares the pretty-printed repr against a string hard-coded in
@@ -632,204 +632,106 @@ frozenset2({0,
# partial ordering (subset relationships), the output of the
# list.sort() method is undefined for lists of sets."
#
# In a nutshell, the test assumes frozenset({0}) will always
# sort before frozenset({1}), but:
#
# >>> frozenset({0}) < frozenset({1})
# False
# >>> frozenset({1}) < frozenset({0})
# False
#
# Consequently, this test is fragile and
# implementation-dependent. Small changes to Python's sort
# algorithm cause the test to fail when it should pass.
# XXX Or changes to the dictionary implementation...
# In this test we list all possible invariants of the result
# for unordered frozensets.
#
# This test has a long history, see:
# - https://github.com/python/cpython/commit/969fe57baa0eb80332990f9cda936a33e13fabef
# - https://github.com/python/cpython/issues/58115
# - https://github.com/python/cpython/issues/111147
cube_repr_tgt = """\
{frozenset(): frozenset({frozenset({2}), frozenset({0}), frozenset({1})}),
frozenset({0}): frozenset({frozenset(),
frozenset({0, 2}),
frozenset({0, 1})}),
frozenset({1}): frozenset({frozenset(),
frozenset({1, 2}),
frozenset({0, 1})}),
frozenset({2}): frozenset({frozenset(),
frozenset({1, 2}),
frozenset({0, 2})}),
frozenset({1, 2}): frozenset({frozenset({2}),
frozenset({1}),
frozenset({0, 1, 2})}),
frozenset({0, 2}): frozenset({frozenset({2}),
frozenset({0}),
frozenset({0, 1, 2})}),
frozenset({0, 1}): frozenset({frozenset({0}),
frozenset({1}),
frozenset({0, 1, 2})}),
frozenset({0, 1, 2}): frozenset({frozenset({1, 2}),
frozenset({0, 2}),
frozenset({0, 1})})}"""
cube = test.test_set.cube(3)
self.assertEqual(pprint.pformat(cube), cube_repr_tgt)
cubo_repr_tgt = """\
{frozenset({frozenset({0, 2}), frozenset({0})}): frozenset({frozenset({frozenset({0,
2}),
frozenset({0,
1,
2})}),
frozenset({frozenset({0}),
frozenset({0,
1})}),
frozenset({frozenset(),
frozenset({0})}),
frozenset({frozenset({2}),
frozenset({0,
2})})}),
frozenset({frozenset({0, 1}), frozenset({1})}): frozenset({frozenset({frozenset({0,
1}),
frozenset({0,
1,
2})}),
frozenset({frozenset({0}),
frozenset({0,
1})}),
frozenset({frozenset({1}),
frozenset({1,
2})}),
frozenset({frozenset(),
frozenset({1})})}),
frozenset({frozenset({1, 2}), frozenset({1})}): frozenset({frozenset({frozenset({1,
2}),
frozenset({0,
1,
2})}),
frozenset({frozenset({2}),
frozenset({1,
2})}),
frozenset({frozenset(),
frozenset({1})}),
frozenset({frozenset({1}),
frozenset({0,
1})})}),
frozenset({frozenset({1, 2}), frozenset({2})}): frozenset({frozenset({frozenset({1,
2}),
frozenset({0,
1,
2})}),
frozenset({frozenset({1}),
frozenset({1,
2})}),
frozenset({frozenset({2}),
frozenset({0,
2})}),
frozenset({frozenset(),
frozenset({2})})}),
frozenset({frozenset(), frozenset({0})}): frozenset({frozenset({frozenset({0}),
frozenset({0,
1})}),
frozenset({frozenset({0}),
frozenset({0,
2})}),
frozenset({frozenset(),
frozenset({1})}),
frozenset({frozenset(),
frozenset({2})})}),
frozenset({frozenset(), frozenset({1})}): frozenset({frozenset({frozenset(),
frozenset({0})}),
frozenset({frozenset({1}),
frozenset({1,
2})}),
frozenset({frozenset(),
frozenset({2})}),
frozenset({frozenset({1}),
frozenset({0,
1})})}),
frozenset({frozenset({2}), frozenset()}): frozenset({frozenset({frozenset({2}),
frozenset({1,
2})}),
frozenset({frozenset(),
frozenset({0})}),
frozenset({frozenset(),
frozenset({1})}),
frozenset({frozenset({2}),
frozenset({0,
2})})}),
frozenset({frozenset({0, 1, 2}), frozenset({0, 1})}): frozenset({frozenset({frozenset({1,
2}),
frozenset({0,
1,
2})}),
frozenset({frozenset({0,
2}),
frozenset({0,
1,
2})}),
frozenset({frozenset({0}),
frozenset({0,
1})}),
frozenset({frozenset({1}),
frozenset({0,
1})})}),
frozenset({frozenset({0}), frozenset({0, 1})}): frozenset({frozenset({frozenset(),
frozenset({0})}),
frozenset({frozenset({0,
1}),
frozenset({0,
1,
2})}),
frozenset({frozenset({0}),
frozenset({0,
2})}),
frozenset({frozenset({1}),
frozenset({0,
1})})}),
frozenset({frozenset({2}), frozenset({0, 2})}): frozenset({frozenset({frozenset({0,
2}),
frozenset({0,
1,
2})}),
frozenset({frozenset({2}),
frozenset({1,
2})}),
frozenset({frozenset({0}),
frozenset({0,
2})}),
frozenset({frozenset(),
frozenset({2})})}),
frozenset({frozenset({0, 1, 2}), frozenset({0, 2})}): frozenset({frozenset({frozenset({1,
2}),
frozenset({0,
1,
2})}),
frozenset({frozenset({0,
1}),
frozenset({0,
1,
2})}),
frozenset({frozenset({0}),
frozenset({0,
2})}),
frozenset({frozenset({2}),
frozenset({0,
2})})}),
frozenset({frozenset({1, 2}), frozenset({0, 1, 2})}): frozenset({frozenset({frozenset({0,
2}),
frozenset({0,
1,
2})}),
frozenset({frozenset({0,
1}),
frozenset({0,
1,
2})}),
frozenset({frozenset({2}),
frozenset({1,
2})}),
frozenset({frozenset({1}),
frozenset({1,
2})})})}"""
import textwrap
cubo = test.test_set.linegraph(cube)
self.assertEqual(pprint.pformat(cubo), cubo_repr_tgt)
# Single-line, always ordered:
fs0 = frozenset()
fs1 = frozenset(('abc', 'xyz'))
data = frozenset((fs0, fs1))
self.assertEqual(pprint.pformat(data),
'frozenset({%r, %r})' % (fs0, fs1))
self.assertEqual(pprint.pformat(data), repr(data))
fs2 = frozenset(('one', 'two'))
data = {fs2: frozenset((fs0, fs1))}
self.assertEqual(pprint.pformat(data),
"{%r: frozenset({%r, %r})}" % (fs2, fs0, fs1))
self.assertEqual(pprint.pformat(data), repr(data))
# Single-line, unordered:
fs1 = frozenset(("xyz", "qwerty"))
fs2 = frozenset(("abcd", "spam"))
fs = frozenset((fs1, fs2))
self.assertEqual(pprint.pformat(fs), repr(fs))
# Multiline, unordered:
def check(res, invariants):
self.assertIn(res, [textwrap.dedent(i).strip() for i in invariants])
# Inner-most frozensets are singleline, result is multiline, unordered:
fs1 = frozenset(('regular string', 'other string'))
fs2 = frozenset(('third string', 'one more string'))
check(
pprint.pformat(frozenset((fs1, fs2))),
[
"""
frozenset({%r,
%r})
""" % (fs1, fs2),
"""
frozenset({%r,
%r})
""" % (fs2, fs1),
],
)
# Everything is multiline, unordered:
check(
pprint.pformat(
frozenset((
frozenset((
"xyz very-very long string",
"qwerty is also absurdly long",
)),
frozenset((
"abcd is even longer that before",
"spam is not so long",
)),
)),
),
[
"""
frozenset({frozenset({'abcd is even longer that before',
'spam is not so long'}),
frozenset({'qwerty is also absurdly long',
'xyz very-very long string'})})
""",
"""
frozenset({frozenset({'abcd is even longer that before',
'spam is not so long'}),
frozenset({'xyz very-very long string',
'qwerty is also absurdly long'})})
""",
"""
frozenset({frozenset({'qwerty is also absurdly long',
'xyz very-very long string'}),
frozenset({'abcd is even longer that before',
'spam is not so long'})})
""",
"""
frozenset({frozenset({'qwerty is also absurdly long',
'xyz very-very long string'}),
frozenset({'spam is not so long',
'abcd is even longer that before'})})
""",
],
)
def test_depth(self):
nested_tuple = (1, (2, (3, (4, (5, 6)))))

416
Lib/test/test_queue.py vendored
View File

@@ -2,6 +2,7 @@
# to ensure the Queue locks remain stable.
import itertools
import random
import sys
import threading
import time
import unittest
@@ -10,6 +11,8 @@ from test.support import gc_collect
from test.support import import_helper
from test.support import threading_helper
# queue module depends on threading primitives
threading_helper.requires_working_threading(module=True)
py_queue = import_helper.import_fresh_module('queue', blocked=['_queue'])
c_queue = import_helper.import_fresh_module('queue', fresh=['_queue'])
@@ -239,6 +242,418 @@ class BaseQueueTestMixin(BlockingTestMixin):
with self.assertRaises(self.queue.Full):
q.put_nowait(4)
def test_shutdown_empty(self):
q = self.type2test()
q.shutdown()
with self.assertRaises(self.queue.ShutDown):
q.put("data")
with self.assertRaises(self.queue.ShutDown):
q.get()
def test_shutdown_nonempty(self):
q = self.type2test()
q.put("data")
q.shutdown()
q.get()
with self.assertRaises(self.queue.ShutDown):
q.get()
def test_shutdown_immediate(self):
q = self.type2test()
q.put("data")
q.shutdown(immediate=True)
with self.assertRaises(self.queue.ShutDown):
q.get()
def test_shutdown_allowed_transitions(self):
# allowed transitions would be from alive via shutdown to immediate
q = self.type2test()
self.assertFalse(q.is_shutdown)
q.shutdown()
self.assertTrue(q.is_shutdown)
q.shutdown(immediate=True)
self.assertTrue(q.is_shutdown)
q.shutdown(immediate=False)
def _shutdown_all_methods_in_one_thread(self, immediate):
q = self.type2test(2)
q.put("L")
q.put_nowait("O")
q.shutdown(immediate)
with self.assertRaises(self.queue.ShutDown):
q.put("E")
with self.assertRaises(self.queue.ShutDown):
q.put_nowait("W")
if immediate:
with self.assertRaises(self.queue.ShutDown):
q.get()
with self.assertRaises(self.queue.ShutDown):
q.get_nowait()
with self.assertRaises(ValueError):
q.task_done()
q.join()
else:
self.assertIn(q.get(), "LO")
q.task_done()
self.assertIn(q.get(), "LO")
q.task_done()
q.join()
# on shutdown(immediate=False)
# when queue is empty, should raise ShutDown Exception
with self.assertRaises(self.queue.ShutDown):
q.get() # p.get(True)
with self.assertRaises(self.queue.ShutDown):
q.get_nowait() # p.get(False)
with self.assertRaises(self.queue.ShutDown):
q.get(True, 1.0)
def test_shutdown_all_methods_in_one_thread(self):
return self._shutdown_all_methods_in_one_thread(False)
def test_shutdown_immediate_all_methods_in_one_thread(self):
return self._shutdown_all_methods_in_one_thread(True)
def _write_msg_thread(self, q, n, results,
i_when_exec_shutdown, event_shutdown,
barrier_start):
# All `write_msg_threads`
# put several items into the queue.
for i in range(0, i_when_exec_shutdown//2):
q.put((i, 'LOYD'))
# Wait for the barrier to be complete.
barrier_start.wait()
for i in range(i_when_exec_shutdown//2, n):
try:
q.put((i, "YDLO"))
except self.queue.ShutDown:
results.append(False)
break
# Trigger queue shutdown.
if i == i_when_exec_shutdown:
# Only one thread should call shutdown().
if not event_shutdown.is_set():
event_shutdown.set()
results.append(True)
def _read_msg_thread(self, q, results, barrier_start):
# Get at least one item.
q.get(True)
q.task_done()
# Wait for the barrier to be complete.
barrier_start.wait()
while True:
try:
q.get(False)
q.task_done()
except self.queue.ShutDown:
results.append(True)
break
except self.queue.Empty:
pass
def _shutdown_thread(self, q, results, event_end, immediate):
event_end.wait()
q.shutdown(immediate)
results.append(q.qsize() == 0)
def _join_thread(self, q, barrier_start):
# Wait for the barrier to be complete.
barrier_start.wait()
q.join()
def _shutdown_all_methods_in_many_threads(self, immediate):
# Run a 'multi-producers/consumers queue' use case,
# with enough items into the queue.
# When shutdown, all running threads will be joined.
q = self.type2test()
ps = []
res_puts = []
res_gets = []
res_shutdown = []
write_threads = 4
read_threads = 6
join_threads = 2
nb_msgs = 1024*64
nb_msgs_w = nb_msgs // write_threads
when_exec_shutdown = nb_msgs_w // 2
# Use of a Barrier to ensure that
# - all write threads put all their items into the queue,
# - all read thread get at least one item from the queue,
# and keep on running until shutdown.
# The join thread is started only when shutdown is immediate.
nparties = write_threads + read_threads
if immediate:
nparties += join_threads
barrier_start = threading.Barrier(nparties)
ev_exec_shutdown = threading.Event()
lprocs = [
(self._write_msg_thread, write_threads, (q, nb_msgs_w, res_puts,
when_exec_shutdown, ev_exec_shutdown,
barrier_start)),
(self._read_msg_thread, read_threads, (q, res_gets, barrier_start)),
(self._shutdown_thread, 1, (q, res_shutdown, ev_exec_shutdown, immediate)),
]
if immediate:
lprocs.append((self._join_thread, join_threads, (q, barrier_start)))
# start all threads.
for func, n, args in lprocs:
for i in range(n):
ps.append(threading.Thread(target=func, args=args))
ps[-1].start()
for thread in ps:
thread.join()
self.assertTrue(True in res_puts)
self.assertEqual(res_gets.count(True), read_threads)
if immediate:
self.assertListEqual(res_shutdown, [True])
self.assertTrue(q.empty())
def test_shutdown_all_methods_in_many_threads(self):
return self._shutdown_all_methods_in_many_threads(False)
def test_shutdown_immediate_all_methods_in_many_threads(self):
return self._shutdown_all_methods_in_many_threads(True)
def _get(self, q, go, results, shutdown=False):
go.wait()
try:
msg = q.get()
results.append(not shutdown)
return not shutdown
except self.queue.ShutDown:
results.append(shutdown)
return shutdown
def _get_shutdown(self, q, go, results):
return self._get(q, go, results, True)
def _get_task_done(self, q, go, results):
go.wait()
try:
msg = q.get()
q.task_done()
results.append(True)
return msg
except self.queue.ShutDown:
results.append(False)
return False
def _put(self, q, msg, go, results, shutdown=False):
go.wait()
try:
q.put(msg)
results.append(not shutdown)
return not shutdown
except self.queue.ShutDown:
results.append(shutdown)
return shutdown
def _put_shutdown(self, q, msg, go, results):
return self._put(q, msg, go, results, True)
def _join(self, q, results, shutdown=False):
try:
q.join()
results.append(not shutdown)
return not shutdown
except self.queue.ShutDown:
results.append(shutdown)
return shutdown
def _join_shutdown(self, q, results):
return self._join(q, results, True)
def _shutdown_get(self, immediate):
q = self.type2test(2)
results = []
go = threading.Event()
q.put("Y")
q.put("D")
# queue full
if immediate:
thrds = (
(self._get_shutdown, (q, go, results)),
(self._get_shutdown, (q, go, results)),
)
else:
thrds = (
# on shutdown(immediate=False)
# one of these threads should raise Shutdown
(self._get, (q, go, results)),
(self._get, (q, go, results)),
(self._get, (q, go, results)),
)
threads = []
for func, params in thrds:
threads.append(threading.Thread(target=func, args=params))
threads[-1].start()
q.shutdown(immediate)
go.set()
for t in threads:
t.join()
if immediate:
self.assertListEqual(results, [True, True])
else:
self.assertListEqual(sorted(results), [False] + [True]*(len(thrds)-1))
def test_shutdown_get(self):
return self._shutdown_get(False)
def test_shutdown_immediate_get(self):
return self._shutdown_get(True)
def _shutdown_put(self, immediate):
q = self.type2test(2)
results = []
go = threading.Event()
q.put("Y")
q.put("D")
# queue fulled
thrds = (
(self._put_shutdown, (q, "E", go, results)),
(self._put_shutdown, (q, "W", go, results)),
)
threads = []
for func, params in thrds:
threads.append(threading.Thread(target=func, args=params))
threads[-1].start()
q.shutdown()
go.set()
for t in threads:
t.join()
self.assertEqual(results, [True]*len(thrds))
def test_shutdown_put(self):
return self._shutdown_put(False)
def test_shutdown_immediate_put(self):
return self._shutdown_put(True)
def _shutdown_join(self, immediate):
q = self.type2test()
results = []
q.put("Y")
go = threading.Event()
nb = q.qsize()
thrds = (
(self._join, (q, results)),
(self._join, (q, results)),
)
threads = []
for func, params in thrds:
threads.append(threading.Thread(target=func, args=params))
threads[-1].start()
if not immediate:
res = []
for i in range(nb):
threads.append(threading.Thread(target=self._get_task_done, args=(q, go, res)))
threads[-1].start()
q.shutdown(immediate)
go.set()
for t in threads:
t.join()
self.assertEqual(results, [True]*len(thrds))
def test_shutdown_immediate_join(self):
return self._shutdown_join(True)
def test_shutdown_join(self):
return self._shutdown_join(False)
def _shutdown_put_join(self, immediate):
q = self.type2test(2)
results = []
go = threading.Event()
q.put("Y")
# queue not fulled
thrds = (
(self._put_shutdown, (q, "E", go, results)),
(self._join, (q, results)),
)
threads = []
for func, params in thrds:
threads.append(threading.Thread(target=func, args=params))
threads[-1].start()
self.assertEqual(q.unfinished_tasks, 1)
q.shutdown(immediate)
go.set()
if immediate:
with self.assertRaises(self.queue.ShutDown):
q.get_nowait()
else:
result = q.get()
self.assertEqual(result, "Y")
q.task_done()
for t in threads:
t.join()
self.assertEqual(results, [True]*len(thrds))
def test_shutdown_immediate_put_join(self):
return self._shutdown_put_join(True)
def test_shutdown_put_join(self):
return self._shutdown_put_join(False)
def test_shutdown_get_task_done_join(self):
q = self.type2test(2)
results = []
go = threading.Event()
q.put("Y")
q.put("D")
self.assertEqual(q.unfinished_tasks, q.qsize())
thrds = (
(self._get_task_done, (q, go, results)),
(self._get_task_done, (q, go, results)),
(self._join, (q, results)),
(self._join, (q, results)),
)
threads = []
for func, params in thrds:
threads.append(threading.Thread(target=func, args=params))
threads[-1].start()
go.set()
q.shutdown(False)
for t in threads:
t.join()
self.assertEqual(results, [True]*len(thrds))
def test_shutdown_pending_get(self):
def get():
try:
results.append(q.get())
except Exception as e:
results.append(e)
q = self.type2test()
results = []
get_thread = threading.Thread(target=get)
get_thread.start()
q.shutdown(immediate=False)
get_thread.join(timeout=10.0)
self.assertFalse(get_thread.is_alive())
self.assertEqual(len(results), 1)
self.assertIsInstance(results[0], self.queue.ShutDown)
class QueueTest(BaseQueueTestMixin):
def setUp(self):
@@ -289,6 +704,7 @@ class CPriorityQueueTest(PriorityQueueTest, unittest.TestCase):
# A Queue subclass that can provoke failure at a moment's notice :)
class FailingQueueException(Exception): pass
class FailingQueueTest(BlockingTestMixin):
def setUp(self):

View File

@@ -58,6 +58,7 @@ class TestCase(unittest.TestCase):
scheduler.run()
self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])
@threading_helper.requires_working_threading()
def test_enter_concurrent(self):
q = queue.Queue()
fun = q.put
@@ -91,10 +92,23 @@ class TestCase(unittest.TestCase):
l = []
fun = lambda x: l.append(x)
scheduler = sched.scheduler(time.time, time.sleep)
for priority in [1, 2, 3, 4, 5]:
z = scheduler.enterabs(0.01, priority, fun, (priority,))
scheduler.run()
self.assertEqual(l, [1, 2, 3, 4, 5])
cases = [
([1, 2, 3, 4, 5], [1, 2, 3, 4, 5]),
([5, 4, 3, 2, 1], [1, 2, 3, 4, 5]),
([2, 5, 3, 1, 4], [1, 2, 3, 4, 5]),
([1, 2, 3, 2, 1], [1, 1, 2, 2, 3]),
]
for priorities, expected in cases:
with self.subTest(priorities=priorities, expected=expected):
for priority in priorities:
scheduler.enterabs(0.01, priority, fun, (priority,))
scheduler.run()
self.assertEqual(l, expected)
# Cleanup:
self.assertTrue(scheduler.empty())
l.clear()
def test_cancel(self):
l = []
@@ -111,6 +125,7 @@ class TestCase(unittest.TestCase):
scheduler.run()
self.assertEqual(l, [0.02, 0.03, 0.04])
@threading_helper.requires_working_threading()
def test_cancel_concurrent(self):
q = queue.Queue()
fun = q.put

View File

@@ -742,6 +742,8 @@ class UnionTests(unittest.TestCase):
self.assertTrue(issubclass(dict, x))
self.assertFalse(issubclass(list, x))
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_instancecheck_and_subclasscheck_order(self):
T = typing.TypeVar('T')
@@ -788,6 +790,8 @@ class UnionTests(unittest.TestCase):
self.assertTrue(issubclass(int, x))
self.assertRaises(ZeroDivisionError, issubclass, list, x)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_or_type_operator_with_TypeVar(self):
TV = typing.TypeVar('T')
assert TV | str == typing.Union[TV, str]
@@ -795,6 +799,8 @@ class UnionTests(unittest.TestCase):
self.assertIs((int | TV)[int], int)
self.assertIs((TV | int)[int], int)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_union_args(self):
def check(arg, expected):
clear_typing_caches()
@@ -825,6 +831,8 @@ class UnionTests(unittest.TestCase):
check(x | None, (x, type(None)))
check(None | x, (type(None), x))
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_union_parameter_chaining(self):
T = typing.TypeVar("T")
S = typing.TypeVar("S")
@@ -869,6 +877,8 @@ class UnionTests(unittest.TestCase):
eq(x[NT], int | NT | bytes)
eq(x[S], int | S | bytes)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_union_pickle(self):
orig = list[T] | int
for proto in range(pickle.HIGHEST_PROTOCOL + 1):
@@ -878,6 +888,8 @@ class UnionTests(unittest.TestCase):
self.assertEqual(loaded.__args__, orig.__args__)
self.assertEqual(loaded.__parameters__, orig.__parameters__)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_union_copy(self):
orig = list[T] | int
for copied in (copy.copy(orig), copy.deepcopy(orig)):
@@ -885,12 +897,16 @@ class UnionTests(unittest.TestCase):
self.assertEqual(copied.__args__, orig.__args__)
self.assertEqual(copied.__parameters__, orig.__parameters__)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_union_parameter_substitution_errors(self):
T = typing.TypeVar("T")
x = int | T
with self.assertRaises(TypeError):
x[int, str]
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_or_type_operator_with_forward(self):
T = typing.TypeVar('T')
ForwardAfter = T | 'Forward'

6479
Lib/test/test_typing.py vendored

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,30 @@
"""Used to test `get_type_hints()` on a cross-module inherited `TypedDict` class
This script uses future annotations to postpone a type that won't be available
on the module inheriting from to `Foo`. The subclass in the other module should
look something like this:
class Bar(_typed_dict_helper.Foo, total=False):
b: int
In addition, it uses multiple levels of Annotated to test the interaction
between the __future__ import, Annotated, and Required.
"""
from __future__ import annotations
from typing import Annotated, Generic, Optional, Required, TypedDict, TypeVar
OptionalIntType = Optional[int]
class Foo(TypedDict):
a: OptionalIntType
T = TypeVar("T")
class FooGeneric(TypedDict, Generic[T]):
a: Optional[T]
class VeryAnnotated(TypedDict, total=False):
a: Annotated[Annotated[Annotated[Required[int], "a"], "b"], "c"]

72
Lib/test/typinganndata/ann_module695.py vendored Normal file
View File

@@ -0,0 +1,72 @@
from __future__ import annotations
from typing import Callable
class A[T, *Ts, **P]:
x: T
y: tuple[*Ts]
z: Callable[P, str]
class B[T, *Ts, **P]:
T = int
Ts = str
P = bytes
x: T
y: Ts
z: P
Eggs = int
Spam = str
class C[Eggs, **Spam]:
x: Eggs
y: Spam
def generic_function[T, *Ts, **P](
x: T, *y: *Ts, z: P.args, zz: P.kwargs
) -> None: ...
def generic_function_2[Eggs, **Spam](x: Eggs, y: Spam): pass
class D:
Foo = int
Bar = str
def generic_method[Foo, **Bar](
self, x: Foo, y: Bar
) -> None: ...
def generic_method_2[Eggs, **Spam](self, x: Eggs, y: Spam): pass
def nested():
from types import SimpleNamespace
from typing import get_type_hints
Eggs = bytes
Spam = memoryview
class E[Eggs, **Spam]:
x: Eggs
y: Spam
def generic_method[Eggs, **Spam](self, x: Eggs, y: Spam): pass
def generic_function[Eggs, **Spam](x: Eggs, y: Spam): pass
return SimpleNamespace(
E=E,
hints_for_E=get_type_hints(E),
hints_for_E_meth=get_type_hints(E.generic_method),
generic_func=generic_function,
hints_for_generic_func=get_type_hints(generic_function)
)

View File

@@ -0,0 +1,26 @@
"""Module for testing the behavior of generics across different modules."""
from typing import TypeVar, Generic, Optional, TypeAliasType
# TODO: RUSTPYTHON
# default_a: Optional['A'] = None
# default_b: Optional['B'] = None
# T = TypeVar('T')
# class A(Generic[T]):
# some_b: 'B'
# class B(Generic[T]):
# class A(Generic[T]):
# pass
# my_inner_a1: 'B.A'
# my_inner_a2: A
# my_outer_a: 'A' # unless somebody calls get_type_hints with localns=B.__dict__
# type Alias = int
# OldStyle = TypeAliasType("OldStyle", int)

2826
Lib/typing.py vendored

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
//! A crate to hold types and functions common to all rustpython components.
#![cfg_attr(target_os = "redox", feature(byte_slice_trim_ascii, new_uninit))]
#![cfg_attr(all(target_os = "wasi", target_env = "p2"), feature(wasip2))]
#[macro_use]
mod macros;

137
scripts/fix_test.py Normal file
View File

@@ -0,0 +1,137 @@
"""
An automated script to mark failures in python test suite.
It adds @unittest.expectedFailure to the test functions that are failing in RustPython, but not in CPython.
As well as marking the test with a TODO comment.
How to use:
1. Copy a specific test from the CPython repository to the RustPython repository.
2. Remove all unexpected failures from the test and skip the tests that hang
3. Run python ./scripts/fix_test.py --test test_venv --path ./Lib/test/test_venv.py or equivalent for the test from the project root.
4. Ensure that there are no unexpected successes in the test.
5. Actually fix the test.
"""
import argparse
import ast
import itertools
import platform
from pathlib import Path
def parse_args():
parser = argparse.ArgumentParser(description="Fix test.")
parser.add_argument("--path", type=Path, help="Path to test file")
parser.add_argument("--force", action="store_true", help="Force modification")
parser.add_argument("--platform", action="store_true", help="Platform specific failure")
args = parser.parse_args()
return args
class Test:
name: str = ""
path: str = ""
result: str = ""
def __str__(self):
return f"Test(name={self.name}, path={self.path}, result={self.result})"
class TestResult:
tests_result: str = ""
tests = []
stdout = ""
def __str__(self):
return f"TestResult(tests_result={self.tests_result},tests={len(self.tests)})"
def parse_results(result):
lines = result.stdout.splitlines()
test_results = TestResult()
test_results.stdout = result.stdout
in_test_results = False
for line in lines:
if line == "Run tests sequentially":
in_test_results = True
elif line.startswith("-----------"):
in_test_results = False
if in_test_results and not line.startswith("tests") and not line.startswith("["):
line = line.split(" ")
if line != [] and len(line) > 3:
test = Test()
test.name = line[0]
test.path = line[1].strip("(").strip(")")
test.result = " ".join(line[3:]).lower()
test_results.tests.append(test)
else:
if "== Tests result: " in line:
res = line.split("== Tests result: ")[1]
res = res.split(" ")[0]
test_results.tests_result = res
return test_results
def path_to_test(path) -> list[str]:
return path.split(".")[2:]
def modify_test(file: str, test: list[str], for_platform: bool = False) -> str:
a = ast.parse(file)
lines = file.splitlines()
fixture = "@unittest.expectedFailure"
for node in ast.walk(a):
if isinstance(node, ast.FunctionDef):
if node.name == test[-1]:
assert not for_platform
indent = " " * node.col_offset
lines.insert(node.lineno - 1, indent + fixture)
lines.insert(node.lineno - 1, indent + "# TODO: RUSTPYTHON")
break
return "\n".join(lines)
def modify_test_v2(file: str, test: list[str], for_platform: bool = False) -> str:
a = ast.parse(file)
lines = file.splitlines()
fixture = "@unittest.expectedFailure"
for key, node in ast.iter_fields(a):
if key == "body":
for i, n in enumerate(node):
match n:
case ast.ClassDef():
if len(test) == 2 and test[0] == n.name:
# look through body for function def
for i, fn in enumerate(n.body):
match fn:
case ast.FunctionDef():
if fn.name == test[-1]:
assert not for_platform
indent = " " * fn.col_offset
lines.insert(fn.lineno - 1, indent + fixture)
lines.insert(fn.lineno - 1, indent + "# TODO: RUSTPYTHON")
break
case ast.FunctionDef():
if n.name == test[0] and len(test) == 1:
assert not for_platform
indent = " " * n.col_offset
lines.insert(n.lineno - 1, indent + fixture)
lines.insert(n.lineno - 1, indent + "# TODO: RUSTPYTHON")
break
if i > 500:
exit()
return "\n".join(lines)
def run_test(test_name):
print(f"Running test: {test_name}")
rustpython_location = "./target/release/rustpython"
import subprocess
result = subprocess.run([rustpython_location, "-m", "test", "-v", test_name], capture_output=True, text=True)
return parse_results(result)
if __name__ == "__main__":
args = parse_args()
test_name = args.path.stem
tests = run_test(test_name)
f = open(args.path).read()
for test in tests.tests:
if test.result == "fail" or test.result == "error":
print("Modifying test:", test.name)
f = modify_test_v2(f, path_to_test(test.path), args.platform)
with open(args.path, "w") as file:
# TODO: Find validation method, and make --force override it
file.write(f)

View File

@@ -35,6 +35,8 @@
//!
//! The binary will have all the standard arguments of a python interpreter (including a REPL!) but
//! it will have your modules loaded into the vm.
#![cfg_attr(all(target_os = "wasi", target_env = "p2"), feature(wasip2))]
#![allow(clippy::needless_doctest_main)]
#[macro_use]

View File

@@ -1,7 +1,8 @@
// to allow `mod foo {}` in foo.rs; clippy thinks this is a mistake/misunderstanding of
// how `mod` works, but we want this sometimes for pymodule declarations
#![allow(clippy::module_inception)]
#![cfg_attr(target_os = "redox", feature(raw_ref_op))]
#![cfg_attr(all(target_os = "wasi", target_env = "p2"), feature(wasip2))]
#[macro_use]
extern crate rustpython_derive;

View File

@@ -23,13 +23,15 @@ mod mmap {
};
use crossbeam_utils::atomic::AtomicCell;
use memmap2::{Advice, Mmap, MmapMut, MmapOptions};
#[cfg(unix)]
use nix::sys::stat::fstat;
use nix::unistd;
use num_traits::Signed;
use std::fs::File;
use std::io::Write;
use std::io::{self, Write};
use std::ops::{Deref, DerefMut};
#[cfg(unix)]
use std::os::unix::io::{FromRawFd, IntoRawFd, RawFd};
use std::os::unix::io::{FromRawFd, RawFd};
fn advice_try_from_i32(vm: &VirtualMachine, i: i32) -> PyResult<Advice> {
Ok(match i {
@@ -299,7 +301,7 @@ mod mmap {
fn py_new(
cls: PyTypeRef,
MmapNewArgs {
fileno: mut fd,
fileno: fd,
length,
flags,
prot,
@@ -348,12 +350,10 @@ mod mmap {
};
if fd != -1 {
let file = unsafe { File::from_raw_fd(fd) };
let metadata = file.metadata().map_err(|err| err.to_pyexception(vm))?;
let file_len: libc::off_t = metadata.len().try_into().expect("file size overflow");
// File::from_raw_fd will consume the fd, so we
// have to get it again.
fd = file.into_raw_fd();
let metadata = fstat(fd)
.map_err(|err| io::Error::from_raw_os_error(err as i32).to_pyexception(vm))?;
let file_len = metadata.st_size;
if map_size == 0 {
if file_len == 0 {
return Err(vm.new_value_error("cannot mmap an empty file".to_owned()));

View File

@@ -297,6 +297,10 @@ mod _overlapped {
}
}
unsafe fn u64_to_handle(raw_ptr_value: u64) -> HANDLE {
raw_ptr_value as HANDLE
}
#[pyfunction]
fn CreateIoCompletionPort(
handle: isize,
@@ -354,4 +358,56 @@ mod _overlapped {
]);
Ok(value.into())
}
#[pyfunction]
fn CreateEvent(
event_attributes: PyObjectRef,
manual_reset: bool,
initial_state: bool,
name: Option<String>,
vm: &VirtualMachine,
) -> PyResult<isize> {
if !vm.is_none(&event_attributes) {
return Err(vm.new_value_error("EventAttributes must be None".to_owned()));
}
let name = match name {
Some(name) => {
let name = widestring::WideCString::from_str(&name).unwrap();
name.as_ptr()
}
None => std::ptr::null(),
};
let event = unsafe {
windows_sys::Win32::System::Threading::CreateEventW(
std::ptr::null(),
manual_reset as _,
initial_state as _,
name,
) as isize
};
if event == NULL {
return Err(errno_err(vm));
}
Ok(event)
}
#[pyfunction]
fn SetEvent(handle: u64, vm: &VirtualMachine) -> PyResult<()> {
let ret = unsafe { windows_sys::Win32::System::Threading::SetEvent(u64_to_handle(handle)) };
if ret == 0 {
return Err(errno_err(vm));
}
Ok(())
}
#[pyfunction]
fn ResetEvent(handle: u64, vm: &VirtualMachine) -> PyResult<()> {
let ret =
unsafe { windows_sys::Win32::System::Threading::ResetEvent(u64_to_handle(handle)) };
if ret == 0 {
return Err(errno_err(vm));
}
Ok(())
}
}

View File

@@ -338,7 +338,10 @@ mod decl {
};
use libc::pollfd;
use num_traits::{Signed, ToPrimitive};
use std::time::{Duration, Instant};
use std::{
convert::TryFrom,
time::{Duration, Instant},
};
#[derive(Default)]
pub(super) struct TimeoutArg<const MILLIS: bool>(pub Option<Duration>);
@@ -417,25 +420,62 @@ mod decl {
search(fds, fd).ok().map(|i| fds.remove(i))
}
// new EventMask type
#[derive(Copy, Clone)]
#[repr(transparent)]
pub struct EventMask(pub i16);
impl TryFromObject for EventMask {
fn try_from_object(vm: &VirtualMachine, obj: PyObjectRef) -> PyResult<Self> {
use crate::builtins::PyInt;
let int = obj
.downcast::<PyInt>()
.map_err(|_| vm.new_type_error("argument must be an integer".to_owned()))?;
let val = int.as_bigint();
if val.is_negative() {
return Err(vm.new_value_error("negative event mask".to_owned()));
}
// Try converting to i16, should raise OverflowError if too large
let mask = i16::try_from(val).map_err(|_| {
vm.new_overflow_error("event mask value out of range".to_owned())
})?;
Ok(EventMask(mask))
}
}
const DEFAULT_EVENTS: i16 = libc::POLLIN | libc::POLLPRI | libc::POLLOUT;
#[pyclass]
impl PyPoll {
#[pymethod]
fn register(&self, Fildes(fd): Fildes, eventmask: OptionalArg<u16>) {
insert_fd(
&mut self.fds.lock(),
fd,
eventmask.map_or(DEFAULT_EVENTS, |e| e as i16),
)
fn register(
&self,
Fildes(fd): Fildes,
eventmask: OptionalArg<EventMask>,
) -> PyResult<()> {
let mask = match eventmask {
OptionalArg::Present(event_mask) => event_mask.0,
OptionalArg::Missing => DEFAULT_EVENTS,
};
insert_fd(&mut self.fds.lock(), fd, mask);
Ok(())
}
#[pymethod]
fn modify(&self, Fildes(fd): Fildes, eventmask: u16) -> io::Result<()> {
fn modify(
&self,
Fildes(fd): Fildes,
eventmask: EventMask,
vm: &VirtualMachine,
) -> PyResult<()> {
let mut fds = self.fds.lock();
// CPython raises KeyError if fd is not registered, match that behavior
let pfd = get_fd_mut(&mut fds, fd)
.ok_or_else(|| io::Error::from_raw_os_error(libc::ENOENT))?;
pfd.events = eventmask as i16;
.ok_or_else(|| vm.new_key_error(vm.ctx.new_int(fd).into()))?;
pfd.events = eventmask.0;
Ok(())
}

View File

@@ -1183,8 +1183,12 @@ mod _ssl {
let file = file
.rsplit_once(&['/', '\\'][..])
.map_or(file, |(_, basename)| basename);
// TODO: map the error codes to code names, e.g. "CERTIFICATE_VERIFY_FAILED", just requires a big hashmap/dict
let errstr = e.reason().unwrap_or("unknown error");
// TODO: finish map
let default_errstr = e.reason().unwrap_or("unknown error");
let errstr = match default_errstr {
"certificate verify failed" => "CERTIFICATE_VERIFY_FAILED",
_ => default_errstr,
};
let msg = if let Some(lib) = e.library() {
// add `library` attribute
let attr_name = vm.ctx.as_ref().intern_str("library");

View File

@@ -14,7 +14,6 @@
#![allow(clippy::upper_case_acronyms)]
#![doc(html_logo_url = "https://raw.githubusercontent.com/RustPython/RustPython/main/logo.png")]
#![doc(html_root_url = "https://docs.rs/rustpython-vm/")]
#![cfg_attr(target_os = "redox", feature(raw_ref_op))]
#[cfg(feature = "flame-it")]
#[macro_use]

View File

@@ -5,7 +5,7 @@ pub(crate) mod _typing {
use crate::{
PyObjectRef, PyPayload, PyResult, VirtualMachine,
builtins::{PyGenericAlias, PyTupleRef, PyTypeRef, pystr::AsPyStr},
function::IntoFuncArgs,
function::{FuncArgs, IntoFuncArgs},
};
pub(crate) fn _call_typing_func_object<'a>(
@@ -20,8 +20,10 @@ pub(crate) mod _typing {
// func.call(args, vm)
}
#[pyattr]
pub(crate) fn _idfunc(_vm: &VirtualMachine) {}
#[pyfunction]
pub(crate) fn _idfunc(args: FuncArgs, _vm: &VirtualMachine) -> PyObjectRef {
args.args[0].clone()
}
#[pyattr]
#[pyclass(name = "TypeVar")]
@@ -51,6 +53,11 @@ pub(crate) mod _typing {
Ok(vm.ctx.none())
}
}
#[pygetset(magic)]
fn name(&self) -> PyObjectRef {
self.name.clone()
}
}
pub(crate) fn make_typevar(
@@ -77,15 +84,102 @@ pub(crate) mod _typing {
#[allow(dead_code)]
pub(crate) struct ParamSpec {
name: PyObjectRef,
bound: Option<PyObjectRef>,
default_value: Option<PyObjectRef>,
evaluate_default: Option<PyObjectRef>,
covariant: bool,
contravariant: bool,
infer_variance: bool,
}
#[pyclass(flags(BASETYPE))]
impl ParamSpec {}
impl ParamSpec {
#[pygetset(magic)]
fn name(&self) -> PyObjectRef {
self.name.clone()
}
#[pygetset(magic)]
fn bound(&self, vm: &VirtualMachine) -> PyObjectRef {
if let Some(bound) = self.bound.clone() {
return bound;
}
vm.ctx.none()
}
#[pygetset(magic)]
fn covariant(&self) -> bool {
self.covariant
}
#[pygetset(magic)]
fn contravariant(&self) -> bool {
self.contravariant
}
#[pygetset(magic)]
fn infer_variance(&self) -> bool {
self.infer_variance
}
#[pygetset(magic)]
fn default(&self, vm: &VirtualMachine) -> PyResult {
if let Some(default_value) = self.default_value.clone() {
return Ok(default_value);
}
// handle evaluate_default
if let Some(evaluate_default) = self.evaluate_default.clone() {
let default_value = vm.call_method(evaluate_default.as_ref(), "__call__", ())?;
return Ok(default_value);
}
// TODO: this isn't up to spec
Ok(vm.ctx.none())
}
#[pygetset]
fn evaluate_default(&self, vm: &VirtualMachine) -> PyObjectRef {
if let Some(evaluate_default) = self.evaluate_default.clone() {
return evaluate_default;
}
// TODO: default_value case
vm.ctx.none()
}
#[pymethod(magic)]
fn reduce(&self) -> PyResult {
Ok(self.name.clone())
}
#[pymethod]
fn has_default(&self) -> PyResult<bool> {
// TODO: fix
Ok(self.evaluate_default.is_some() || self.default_value.is_some())
}
}
pub(crate) fn make_paramspec(name: PyObjectRef) -> ParamSpec {
ParamSpec { name }
ParamSpec {
name,
bound: None,
default_value: None,
evaluate_default: None,
covariant: false,
contravariant: false,
infer_variance: false,
}
}
#[pyattr]
#[pyclass(name = "NoDefault")]
#[derive(Debug, PyPayload)]
#[allow(dead_code)]
pub(crate) struct NoDefault {
name: PyObjectRef,
}
#[pyclass(flags(BASETYPE))]
impl NoDefault {}
#[pyattr]
#[pyclass(name = "TypeVarTuple")]
#[derive(Debug, PyPayload)]
@@ -161,7 +255,6 @@ pub(crate) mod _typing {
// fn as_mapping() -> &'static PyMappingMethods {
// static AS_MAPPING: Lazy<PyMappingMethods> = Lazy::new(|| PyMappingMethods {
// subscript: atomic_func!(|mapping, needle, vm| {
// println!("gigity");
// call_typing_func_object(vm, "_GenericAlias", (mapping.obj, needle))
// }),
// ..PyMappingMethods::NOT_IMPLEMENTED