Upgrade test/support from CPython 3.13.11 (#6556)

* Upgrade test/support from CPython 3.13.11

* fix test_support
This commit is contained in:
Jeong, YunWon
2025-12-28 16:30:44 +09:00
committed by GitHub
parent 7b36c9e8e0
commit 44327a8ee4
7 changed files with 270 additions and 397 deletions

View File

@@ -1,198 +0,0 @@
"""Subinterpreters High Level Module."""
import time
import _xxsubinterpreters as _interpreters
import _xxinterpchannels as _channels
# aliases:
from _xxsubinterpreters import is_shareable, RunFailedError
from _xxinterpchannels import (
ChannelError, ChannelNotFoundError, ChannelEmptyError,
)
__all__ = [
'Interpreter', 'get_current', 'get_main', 'create', 'list_all',
'SendChannel', 'RecvChannel',
'create_channel', 'list_all_channels', 'is_shareable',
'ChannelError', 'ChannelNotFoundError',
'ChannelEmptyError',
]
def create(*, isolated=True):
"""Return a new (idle) Python interpreter."""
id = _interpreters.create(isolated=isolated)
return Interpreter(id, isolated=isolated)
def list_all():
"""Return all existing interpreters."""
return [Interpreter(id) for id in _interpreters.list_all()]
def get_current():
"""Return the currently running interpreter."""
id = _interpreters.get_current()
return Interpreter(id)
def get_main():
"""Return the main interpreter."""
id = _interpreters.get_main()
return Interpreter(id)
class Interpreter:
"""A single Python interpreter."""
def __init__(self, id, *, isolated=None):
if not isinstance(id, (int, _interpreters.InterpreterID)):
raise TypeError(f'id must be an int, got {id!r}')
self._id = id
self._isolated = isolated
def __repr__(self):
data = dict(id=int(self._id), isolated=self._isolated)
kwargs = (f'{k}={v!r}' for k, v in data.items())
return f'{type(self).__name__}({", ".join(kwargs)})'
def __hash__(self):
return hash(self._id)
def __eq__(self, other):
if not isinstance(other, Interpreter):
return NotImplemented
else:
return other._id == self._id
@property
def id(self):
return self._id
@property
def isolated(self):
if self._isolated is None:
# XXX The low-level function has not been added yet.
# See bpo-....
self._isolated = _interpreters.is_isolated(self._id)
return self._isolated
def is_running(self):
"""Return whether or not the identified interpreter is running."""
return _interpreters.is_running(self._id)
def close(self):
"""Finalize and destroy the interpreter.
Attempting to destroy the current interpreter results
in a RuntimeError.
"""
return _interpreters.destroy(self._id)
def run(self, src_str, /, *, channels=None):
"""Run the given source code in the interpreter.
This blocks the current Python thread until done.
"""
_interpreters.run_string(self._id, src_str, channels)
def create_channel():
"""Return (recv, send) for a new cross-interpreter channel.
The channel may be used to pass data safely between interpreters.
"""
cid = _channels.create()
recv, send = RecvChannel(cid), SendChannel(cid)
return recv, send
def list_all_channels():
"""Return a list of (recv, send) for all open channels."""
return [(RecvChannel(cid), SendChannel(cid))
for cid in _channels.list_all()]
class _ChannelEnd:
"""The base class for RecvChannel and SendChannel."""
def __init__(self, id):
if not isinstance(id, (int, _channels.ChannelID)):
raise TypeError(f'id must be an int, got {id!r}')
self._id = id
def __repr__(self):
return f'{type(self).__name__}(id={int(self._id)})'
def __hash__(self):
return hash(self._id)
def __eq__(self, other):
if isinstance(self, RecvChannel):
if not isinstance(other, RecvChannel):
return NotImplemented
elif not isinstance(other, SendChannel):
return NotImplemented
return other._id == self._id
@property
def id(self):
return self._id
_NOT_SET = object()
class RecvChannel(_ChannelEnd):
"""The receiving end of a cross-interpreter channel."""
def recv(self, *, _sentinel=object(), _delay=10 / 1000): # 10 milliseconds
"""Return the next object from the channel.
This blocks until an object has been sent, if none have been
sent already.
"""
obj = _channels.recv(self._id, _sentinel)
while obj is _sentinel:
time.sleep(_delay)
obj = _channels.recv(self._id, _sentinel)
return obj
def recv_nowait(self, default=_NOT_SET):
"""Return the next object from the channel.
If none have been sent then return the default if one
is provided or fail with ChannelEmptyError. Otherwise this
is the same as recv().
"""
if default is _NOT_SET:
return _channels.recv(self._id)
else:
return _channels.recv(self._id, default)
class SendChannel(_ChannelEnd):
"""The sending end of a cross-interpreter channel."""
def send(self, obj):
"""Send the object (i.e. its data) to the channel's receiving end.
This blocks until the object is received.
"""
_channels.send(self._id, obj)
# XXX We are missing a low-level channel_send_wait().
# See bpo-32604 and gh-19829.
# Until that shows up we fake it:
time.sleep(2)
def send_nowait(self, obj):
"""Send the object to the channel's receiving end.
If the object is immediately received then return True
(else False). Otherwise this is the same as send().
"""
# XXX Note that at the moment channel_send() only ever returns
# None. This should be fixed when channel_send_wait() is added.
# See bpo-32604 and gh-19829.
return _channels.send(self._id, obj)

View File

@@ -1,6 +1,7 @@
import collections.abc
import contextlib
import errno
import logging
import os
import re
import stat
@@ -10,7 +11,6 @@ import time
import unittest
import warnings
# From CPython 3.13.5
from test import support
@@ -23,8 +23,8 @@ TESTFN_ASCII = "{}_{}_tmp".format(TESTFN_ASCII, os.getpid())
# TESTFN_UNICODE is a non-ascii filename
TESTFN_UNICODE = TESTFN_ASCII + "-\xe0\xf2\u0258\u0141\u011f"
if sys.platform == 'darwin':
# In Mac OS X's VFS API file names are, by definition, canonically
if support.is_apple:
# On Apple's VFS API file names are, by definition, canonically
# decomposed Unicode, encoded using UTF-8. See QA1173:
# http://developer.apple.com/mac/library/qa/qa2001/qa1173.html
import unicodedata
@@ -49,8 +49,8 @@ if os.name == 'nt':
'encoding (%s). Unicode filename tests may not be effective'
% (TESTFN_UNENCODABLE, sys.getfilesystemencoding()))
TESTFN_UNENCODABLE = None
# macOS and Emscripten deny unencodable filenames (invalid utf-8)
elif sys.platform not in {'darwin', 'emscripten', 'wasi'}:
# Apple and Emscripten deny unencodable filenames (invalid utf-8)
elif not support.is_apple and sys.platform not in {"emscripten", "wasi"}:
try:
# ascii and utf-8 cannot encode the byte 0xff
b'\xff'.decode(sys.getfilesystemencoding())
@@ -199,10 +199,8 @@ def skip_unless_symlink(test):
return test if ok else unittest.skip(msg)(test)
# From CPython 3.13.5
_can_hardlink = None
# From CPython 3.13.5
def can_hardlink():
global _can_hardlink
if _can_hardlink is None:
@@ -212,7 +210,6 @@ def can_hardlink():
return _can_hardlink
# From CPython 3.13.5
def skip_unless_hardlink(test):
ok = can_hardlink()
msg = "requires hardlink support"
@@ -268,15 +265,15 @@ def can_chmod():
global _can_chmod
if _can_chmod is not None:
return _can_chmod
if not hasattr(os, "chown"):
if not hasattr(os, "chmod"):
_can_chmod = False
return _can_chmod
try:
with open(TESTFN, "wb") as f:
try:
os.chmod(TESTFN, 0o777)
os.chmod(TESTFN, 0o555)
mode1 = os.stat(TESTFN).st_mode
os.chmod(TESTFN, 0o666)
os.chmod(TESTFN, 0o777)
mode2 = os.stat(TESTFN).st_mode
except OSError as e:
can = False
@@ -323,6 +320,10 @@ def can_dac_override():
else:
_can_dac_override = True
finally:
try:
os.chmod(TESTFN, 0o700)
except OSError:
pass
unlink(TESTFN)
return _can_dac_override
@@ -378,8 +379,12 @@ if sys.platform.startswith("win"):
# Increase the timeout and try again
time.sleep(timeout)
timeout *= 2
warnings.warn('tests may fail, delete still pending for ' + pathname,
RuntimeWarning, stacklevel=4)
logging.getLogger(__name__).warning(
'tests may fail, delete still pending for %s',
pathname,
stack_info=True,
stacklevel=4,
)
def _unlink(filename):
_waitfor(os.unlink, filename)
@@ -494,9 +499,14 @@ def temp_dir(path=None, quiet=False):
except OSError as exc:
if not quiet:
raise
warnings.warn(f'tests may fail, unable to create '
f'temporary directory {path!r}: {exc}',
RuntimeWarning, stacklevel=3)
logging.getLogger(__name__).warning(
"tests may fail, unable to create temporary directory %r: %s",
path,
exc,
exc_info=exc,
stack_info=True,
stacklevel=3,
)
if dir_created:
pid = os.getpid()
try:
@@ -527,9 +537,15 @@ def change_cwd(path, quiet=False):
except OSError as exc:
if not quiet:
raise
warnings.warn(f'tests may fail, unable to change the current working '
f'directory to {path!r}: {exc}',
RuntimeWarning, stacklevel=3)
logging.getLogger(__name__).warning(
'tests may fail, unable to change the current working directory '
'to %r: %s',
path,
exc,
exc_info=exc,
stack_info=True,
stacklevel=3,
)
try:
yield os.getcwd()
finally:
@@ -612,11 +628,18 @@ class FakePath:
def fd_count():
"""Count the number of open file descriptors.
"""
if sys.platform.startswith(('linux', 'freebsd', 'emscripten')):
if sys.platform.startswith(('linux', 'android', 'freebsd', 'emscripten')):
fd_path = "/proc/self/fd"
elif support.is_apple:
fd_path = "/dev/fd"
else:
fd_path = None
if fd_path is not None:
try:
names = os.listdir("/proc/self/fd")
names = os.listdir(fd_path)
# Subtract one because listdir() internally opens a file
# descriptor to list the content of the /proc/self/fd/ directory.
# descriptor to list the content of the directory.
return len(names) - 1
except FileNotFoundError:
pass
@@ -686,9 +709,10 @@ else:
class EnvironmentVarGuard(collections.abc.MutableMapping):
"""Class to help protect the environment variable properly.
"""Class to help protect the environment variable properly. Can be used as
a context manager."""
Can be used as a context manager.
"""
def __init__(self):
self._environ = os.environ
@@ -722,7 +746,6 @@ class EnvironmentVarGuard(collections.abc.MutableMapping):
def set(self, envvar, value):
self[envvar] = value
# From CPython 3.13.5
def unset(self, envvar, /, *envvars):
"""Unset one or more environment variables."""
for ev in (envvar, *envvars):
@@ -746,13 +769,16 @@ class EnvironmentVarGuard(collections.abc.MutableMapping):
try:
import ctypes
kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
if support.MS_WINDOWS:
import ctypes
kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
ERROR_FILE_NOT_FOUND = 2
DDD_REMOVE_DEFINITION = 2
DDD_EXACT_MATCH_ON_REMOVE = 4
DDD_NO_BROADCAST_SYSTEM = 8
ERROR_FILE_NOT_FOUND = 2
DDD_REMOVE_DEFINITION = 2
DDD_EXACT_MATCH_ON_REMOVE = 4
DDD_NO_BROADCAST_SYSTEM = 8
else:
raise AttributeError
except (ImportError, AttributeError):
def subst_drive(path):
raise unittest.SkipTest('ctypes or kernel32 is not available')

8
Lib/test/support/refleak_helper.py vendored Normal file
View File

@@ -0,0 +1,8 @@
"""
Utilities for changing test behaviour while hunting
for refleaks
"""
_hunting_for_refleaks = False
def hunting_for_refleaks():
return _hunting_for_refleaks

View File

@@ -8,7 +8,6 @@ import os
import os.path
import subprocess
import py_compile
import zipfile
from importlib.util import source_from_cache
from test import support
@@ -64,8 +63,8 @@ class _PythonRunResult(collections.namedtuple("_PythonRunResult",
"""Helper for reporting Python subprocess run results"""
def fail(self, cmd_line):
"""Provide helpful details about failed subcommand runs"""
# Limit to 80 lines to ASCII characters
maxlen = 80 * 100
# Limit to 300 lines of ASCII characters
maxlen = 300 * 100
out, err = self.out, self.err
if len(out) > maxlen:
out = b'(... truncated stdout ...)' + out[-maxlen:]
@@ -93,13 +92,28 @@ class _PythonRunResult(collections.namedtuple("_PythonRunResult",
# Executing the interpreter in a subprocess
@support.requires_subprocess()
def run_python_until_end(*args, **env_vars):
"""Used to implement assert_python_*.
*args are the command line flags to pass to the python interpreter.
**env_vars keyword arguments are environment variables to set on the process.
If __run_using_command= is supplied, it must be a list of
command line arguments to prepend to the command line used.
Useful when you want to run another command that should launch the
python interpreter via its own arguments. ["/bin/echo", "--"] for
example could print the unquoted python command line instead of
run it.
"""
env_required = interpreter_requires_environment()
run_using_command = env_vars.pop('__run_using_command', None)
cwd = env_vars.pop('__cwd', None)
if '__isolated' in env_vars:
isolated = env_vars.pop('__isolated')
else:
isolated = not env_vars and not env_required
cmd_line = [sys.executable, '-X', 'faulthandler']
if run_using_command:
cmd_line = run_using_command + cmd_line
if isolated:
# isolated mode: ignore Python environment variables, ignore user
# site-packages, and don't add the current directory to sys.path
@@ -218,14 +232,19 @@ def make_script(script_dir, script_basename, source, omit_suffix=False):
if not omit_suffix:
script_filename += os.extsep + 'py'
script_name = os.path.join(script_dir, script_filename)
# The script should be encoded to UTF-8, the default string encoding
with open(script_name, 'w', encoding='utf-8') as script_file:
script_file.write(source)
if isinstance(source, str):
# The script should be encoded to UTF-8, the default string encoding
with open(script_name, 'w', encoding='utf-8') as script_file:
script_file.write(source)
else:
with open(script_name, 'wb') as script_file:
script_file.write(source)
importlib.invalidate_caches()
return script_name
def make_zip_script(zip_dir, zip_basename, script_name, name_in_zip=None):
import zipfile
zip_filename = zip_basename+os.extsep+'zip'
zip_name = os.path.join(zip_dir, zip_filename)
with zipfile.ZipFile(zip_name, 'w') as zip_file:
@@ -252,6 +271,7 @@ def make_pkg(pkg_dir, init_source=''):
def make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
source, depth=1, compiled=False):
import zipfile
unlink = []
init_name = make_script(zip_dir, '__init__', '')
unlink.append(init_name)

View File

@@ -259,6 +259,10 @@ def transient_internet(resource_name, *, timeout=_NOT_SET, errnos=()):
# raise OSError('socket error', msg) from msg
elif len(a) >= 2 and isinstance(a[1], OSError):
err = a[1]
# The error can also be wrapped as __cause__:
# raise URLError(f"ftp error: {exp}") from exp
elif isinstance(err, urllib.error.URLError) and err.__cause__:
err = err.__cause__
else:
break
filter_error(err)

View File

@@ -22,34 +22,37 @@ from test import support
def threading_setup():
return _thread._count(), threading._dangling.copy()
return _thread._count(), len(threading._dangling)
def threading_cleanup(*original_values):
_MAX_COUNT = 100
orig_count, orig_ndangling = original_values
for count in range(_MAX_COUNT):
values = _thread._count(), threading._dangling
if values == original_values:
break
timeout = 1.0
for _ in support.sleeping_retry(timeout, error=False):
# Copy the thread list to get a consistent output. threading._dangling
# is a WeakSet, its value changes when it's read.
dangling_threads = list(threading._dangling)
count = _thread._count()
if not count:
# Display a warning at the first iteration
support.environment_altered = True
dangling_threads = values[1]
support.print_warning(f"threading_cleanup() failed to cleanup "
f"{values[0] - original_values[0]} threads "
f"(count: {values[0]}, "
f"dangling: {len(dangling_threads)})")
for thread in dangling_threads:
support.print_warning(f"Dangling thread: {thread!r}")
if count <= orig_count:
return
# Don't hold references to threads
dangling_threads = None
values = None
# Timeout!
support.environment_altered = True
support.print_warning(
f"threading_cleanup() failed to clean up threads "
f"in {timeout:.1f} seconds\n"
f" before: thread count={orig_count}, dangling={orig_ndangling}\n"
f" after: thread count={count}, dangling={len(dangling_threads)}")
for thread in dangling_threads:
support.print_warning(f"Dangling thread: {thread!r}")
time.sleep(0.01)
support.gc_collect()
# The warning happens when a test spawns threads and some of these threads
# are still running after the test completes. To fix this warning, join
# threads explicitly to wait until they complete.
#
# To make the warning more likely, reduce the timeout.
def reap_threads(func):

View File

@@ -1,12 +1,15 @@
import contextlib
import errno
import importlib
import io
import logging
import os
import shutil
import socket
import stat
import subprocess
import sys
import sysconfig
import tempfile
import textwrap
import unittest
@@ -18,11 +21,37 @@ from test.support import os_helper
from test.support import script_helper
from test.support import socket_helper
from test.support import warnings_helper
from test.support.testcase import ExtraAssertions
TESTFN = os_helper.TESTFN
class TestSupport(unittest.TestCase):
class LogCaptureHandler(logging.StreamHandler):
# Inspired by pytest's caplog
def __init__(self):
super().__init__(io.StringIO())
self.records = []
def emit(self, record) -> None:
self.records.append(record)
super().emit(record)
def handleError(self, record):
raise
@contextlib.contextmanager
def _caplog():
handler = LogCaptureHandler()
root_logger = logging.getLogger()
root_logger.addHandler(handler)
try:
yield handler
finally:
root_logger.removeHandler(handler)
class TestSupport(unittest.TestCase, ExtraAssertions):
@classmethod
def setUpClass(cls):
orig_filter_len = len(warnings.filters)
@@ -185,7 +214,7 @@ class TestSupport(unittest.TestCase):
path = os.path.realpath(path)
try:
with warnings_helper.check_warnings() as recorder:
with warnings_helper.check_warnings() as recorder, _caplog() as caplog:
with os_helper.temp_dir(path, quiet=True) as temp_path:
self.assertEqual(path, temp_path)
warnings = [str(w.message) for w in recorder.warnings]
@@ -194,11 +223,14 @@ class TestSupport(unittest.TestCase):
finally:
shutil.rmtree(path)
self.assertEqual(len(warnings), 1, warnings)
warn = warnings[0]
self.assertTrue(warn.startswith(f'tests may fail, unable to create '
f'temporary directory {path!r}: '),
warn)
self.assertListEqual(warnings, [])
self.assertEqual(len(caplog.records), 1)
record = caplog.records[0]
self.assertStartsWith(
record.getMessage(),
f'tests may fail, unable to create '
f'temporary directory {path!r}: '
)
@support.requires_fork()
def test_temp_dir__forked_child(self):
@@ -258,35 +290,41 @@ class TestSupport(unittest.TestCase):
with os_helper.temp_dir() as parent_dir:
bad_dir = os.path.join(parent_dir, 'does_not_exist')
with warnings_helper.check_warnings() as recorder:
with warnings_helper.check_warnings() as recorder, _caplog() as caplog:
with os_helper.change_cwd(bad_dir, quiet=True) as new_cwd:
self.assertEqual(new_cwd, original_cwd)
self.assertEqual(os.getcwd(), new_cwd)
warnings = [str(w.message) for w in recorder.warnings]
self.assertEqual(len(warnings), 1, warnings)
warn = warnings[0]
self.assertTrue(warn.startswith(f'tests may fail, unable to change '
f'the current working directory '
f'to {bad_dir!r}: '),
warn)
self.assertListEqual(warnings, [])
self.assertEqual(len(caplog.records), 1)
record = caplog.records[0]
self.assertStartsWith(
record.getMessage(),
f'tests may fail, unable to change '
f'the current working directory '
f'to {bad_dir!r}: '
)
# Tests for change_cwd()
def test_change_cwd__chdir_warning(self):
"""Check the warning message when os.chdir() fails."""
path = TESTFN + '_does_not_exist'
with warnings_helper.check_warnings() as recorder:
with warnings_helper.check_warnings() as recorder, _caplog() as caplog:
with os_helper.change_cwd(path=path, quiet=True):
pass
messages = [str(w.message) for w in recorder.warnings]
self.assertEqual(len(messages), 1, messages)
msg = messages[0]
self.assertTrue(msg.startswith(f'tests may fail, unable to change '
f'the current working directory '
f'to {path!r}: '),
msg)
self.assertListEqual(messages, [])
self.assertEqual(len(caplog.records), 1)
record = caplog.records[0]
self.assertStartsWith(
record.getMessage(),
f'tests may fail, unable to change '
f'the current working directory '
f'to {path!r}: ',
)
# Tests for temp_cwd()
@@ -420,8 +458,6 @@ class TestSupport(unittest.TestCase):
self.OtherClass, self.RefClass, ignore=ignore)
self.assertEqual(set(), missing_items)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_check__all__(self):
extra = {'tempdir'}
not_exported = {'template'}
@@ -432,10 +468,8 @@ class TestSupport(unittest.TestCase):
extra = {
'TextTestResult',
'findTestCases',
'getTestCaseNames',
'installHandler',
'makeSuite',
'IsolatedAsyncioTestCase',
}
not_exported = {'load_tests', "TestProgram", "BaseTestSuite"}
support.check__all__(self,
@@ -551,119 +585,14 @@ class TestSupport(unittest.TestCase):
with self.subTest(opts=opts):
self.check_options(opts, 'optim_args_from_interpreter_flags')
def test_match_test(self):
class Test:
def __init__(self, test_id):
self.test_id = test_id
def id(self):
return self.test_id
test_access = Test('test.test_os.FileTests.test_access')
test_chdir = Test('test.test_os.Win32ErrorTests.test_chdir')
# Test acceptance
with support.swap_attr(support, '_match_test_func', None):
# match all
support.set_match_tests([])
self.assertTrue(support.match_test(test_access))
self.assertTrue(support.match_test(test_chdir))
# match all using None
support.set_match_tests(None, None)
self.assertTrue(support.match_test(test_access))
self.assertTrue(support.match_test(test_chdir))
# match the full test identifier
support.set_match_tests([test_access.id()], None)
self.assertTrue(support.match_test(test_access))
self.assertFalse(support.match_test(test_chdir))
# match the module name
support.set_match_tests(['test_os'], None)
self.assertTrue(support.match_test(test_access))
self.assertTrue(support.match_test(test_chdir))
# Test '*' pattern
support.set_match_tests(['test_*'], None)
self.assertTrue(support.match_test(test_access))
self.assertTrue(support.match_test(test_chdir))
# Test case sensitivity
support.set_match_tests(['filetests'], None)
self.assertFalse(support.match_test(test_access))
support.set_match_tests(['FileTests'], None)
self.assertTrue(support.match_test(test_access))
# Test pattern containing '.' and a '*' metacharacter
support.set_match_tests(['*test_os.*.test_*'], None)
self.assertTrue(support.match_test(test_access))
self.assertTrue(support.match_test(test_chdir))
# Multiple patterns
support.set_match_tests([test_access.id(), test_chdir.id()], None)
self.assertTrue(support.match_test(test_access))
self.assertTrue(support.match_test(test_chdir))
support.set_match_tests(['test_access', 'DONTMATCH'], None)
self.assertTrue(support.match_test(test_access))
self.assertFalse(support.match_test(test_chdir))
# Test rejection
with support.swap_attr(support, '_match_test_func', None):
# match all
support.set_match_tests(ignore_patterns=[])
self.assertTrue(support.match_test(test_access))
self.assertTrue(support.match_test(test_chdir))
# match all using None
support.set_match_tests(None, None)
self.assertTrue(support.match_test(test_access))
self.assertTrue(support.match_test(test_chdir))
# match the full test identifier
support.set_match_tests(None, [test_access.id()])
self.assertFalse(support.match_test(test_access))
self.assertTrue(support.match_test(test_chdir))
# match the module name
support.set_match_tests(None, ['test_os'])
self.assertFalse(support.match_test(test_access))
self.assertFalse(support.match_test(test_chdir))
# Test '*' pattern
support.set_match_tests(None, ['test_*'])
self.assertFalse(support.match_test(test_access))
self.assertFalse(support.match_test(test_chdir))
# Test case sensitivity
support.set_match_tests(None, ['filetests'])
self.assertTrue(support.match_test(test_access))
support.set_match_tests(None, ['FileTests'])
self.assertFalse(support.match_test(test_access))
# Test pattern containing '.' and a '*' metacharacter
support.set_match_tests(None, ['*test_os.*.test_*'])
self.assertFalse(support.match_test(test_access))
self.assertFalse(support.match_test(test_chdir))
# Multiple patterns
support.set_match_tests(None, [test_access.id(), test_chdir.id()])
self.assertFalse(support.match_test(test_access))
self.assertFalse(support.match_test(test_chdir))
support.set_match_tests(None, ['test_access', 'DONTMATCH'])
self.assertFalse(support.match_test(test_access))
self.assertTrue(support.match_test(test_chdir))
@unittest.skipIf(sys.platform.startswith("win"), "TODO: RUSTPYTHON; os.dup on windows")
@unittest.skipIf(support.is_apple_mobile, "Unstable on Apple Mobile")
@unittest.skipIf(support.is_emscripten, "Unstable in Emscripten")
@unittest.skipIf(support.is_wasi, "Unavailable on WASI")
def test_fd_count(self):
# We cannot test the absolute value of fd_count(): on old Linux
# kernel or glibc versions, os.urandom() keeps a FD open on
# /dev/urandom device and Python has 4 FD opens instead of 3.
# Test is unstable on Emscripten. The platform starts and stops
# We cannot test the absolute value of fd_count(): on old Linux kernel
# or glibc versions, os.urandom() keeps a FD open on /dev/urandom
# device and Python has 4 FD opens instead of 3. Test is unstable on
# Emscripten and Apple Mobile platforms; these platforms start and stop
# background threads that use pipes and epoll fds.
start = os_helper.fd_count()
fd = os.open(__file__, os.O_RDONLY)
@@ -685,13 +614,16 @@ class TestSupport(unittest.TestCase):
self.check_print_warning("a\nb",
'Warning -- a\nWarning -- b\n')
@unittest.expectedFailureIf(sys.platform != "win32", "TODO: RUSTPYTHON")
# TODO: RUSTPYTHON - strftime extension not fully supported on non-Windows
@unittest.skipUnless(sys.platform == "win32" or support.is_emscripten,
"strftime extension not fully supported on non-Windows")
def test_has_strftime_extensions(self):
if support.is_emscripten or sys.platform == "win32":
self.assertFalse(support.has_strftime_extensions)
else:
self.assertTrue(support.has_strftime_extensions)
# TODO: RUSTPYTHON - _testinternalcapi module not available
@unittest.expectedFailure
def test_get_recursion_depth(self):
# test support.get_recursion_depth()
@@ -736,13 +668,15 @@ class TestSupport(unittest.TestCase):
""")
script_helper.assert_python_ok("-c", code)
# TODO: RUSTPYTHON - stack overflow in debug mode with deep recursion
@unittest.skip("TODO: RUSTPYTHON - causes segfault in debug builds")
def test_recursion(self):
# Test infinite_recursion() and get_recursion_available() functions.
def recursive_function(depth):
if depth:
recursive_function(depth - 1)
for max_depth in (5, 25, 250):
for max_depth in (5, 25, 250, 2500):
with support.infinite_recursion(max_depth):
available = support.get_recursion_available()
@@ -768,7 +702,85 @@ class TestSupport(unittest.TestCase):
else:
self.fail("RecursionError was not raised")
#self.assertEqual(available, 2)
def test_parse_memlimit(self):
parse = support._parse_memlimit
KiB = 1024
MiB = KiB * 1024
GiB = MiB * 1024
TiB = GiB * 1024
self.assertEqual(parse('0k'), 0)
self.assertEqual(parse('3k'), 3 * KiB)
self.assertEqual(parse('2.4m'), int(2.4 * MiB))
self.assertEqual(parse('4g'), int(4 * GiB))
self.assertEqual(parse('1t'), TiB)
for limit in ('', '3', '3.5.10k', '10x'):
with self.subTest(limit=limit):
with self.assertRaises(ValueError):
parse(limit)
def test_set_memlimit(self):
_4GiB = 4 * 1024 ** 3
TiB = 1024 ** 4
old_max_memuse = support.max_memuse
old_real_max_memuse = support.real_max_memuse
try:
if sys.maxsize > 2**32:
support.set_memlimit('4g')
self.assertEqual(support.max_memuse, _4GiB)
self.assertEqual(support.real_max_memuse, _4GiB)
big = 2**100 // TiB
support.set_memlimit(f'{big}t')
self.assertEqual(support.max_memuse, sys.maxsize)
self.assertEqual(support.real_max_memuse, big * TiB)
else:
support.set_memlimit('4g')
self.assertEqual(support.max_memuse, sys.maxsize)
self.assertEqual(support.real_max_memuse, _4GiB)
finally:
support.max_memuse = old_max_memuse
support.real_max_memuse = old_real_max_memuse
def test_copy_python_src_ignore(self):
# Get source directory
src_dir = sysconfig.get_config_var('abs_srcdir')
if not src_dir:
src_dir = sysconfig.get_config_var('srcdir')
src_dir = os.path.abspath(src_dir)
# Check that the source code is available
if not os.path.exists(src_dir):
self.skipTest(f"cannot access Python source code directory:"
f" {src_dir!r}")
# Check that the landmark copy_python_src_ignore() expects is available
# (Previously we looked for 'Lib\os.py', which is always present on Windows.)
landmark = os.path.join(src_dir, 'Modules')
if not os.path.exists(landmark):
self.skipTest(f"cannot access Python source code directory:"
f" {landmark!r} landmark is missing")
# Test support.copy_python_src_ignore()
# Source code directory
ignored = {'.git', '__pycache__'}
names = os.listdir(src_dir)
self.assertEqual(support.copy_python_src_ignore(src_dir, names),
ignored | {'build'})
# Doc/ directory
path = os.path.join(src_dir, 'Doc')
self.assertEqual(support.copy_python_src_ignore(path, os.listdir(path)),
ignored | {'build', 'venv'})
# Another directory
path = os.path.join(src_dir, 'Objects')
self.assertEqual(support.copy_python_src_ignore(path, os.listdir(path)),
ignored)
def test_linked_to_musl(self):
linked = support.linked_to_musl()
self.assertIsInstance(linked, bool)
# XXX -follows a list of untested API
# make_legacy_pyc
@@ -781,12 +793,10 @@ class TestSupport(unittest.TestCase):
# EnvironmentVarGuard
# transient_internet
# run_with_locale
# set_memlimit
# bigmemtest
# precisionbigmemtest
# bigaddrspacetest
# requires_resource
# run_doctest
# threading_cleanup
# reap_threads
# can_symlink