Merge pull request #4537 from youknowone/lib-support

Update test.support from CPython 3.11.2
This commit is contained in:
Jeong YunWon
2023-02-24 10:59:04 +09:00
committed by GitHub
12 changed files with 598 additions and 154 deletions

138
Lib/locale.py vendored
View File

@@ -28,7 +28,7 @@ __all__ = ["getlocale", "getdefaultlocale", "getpreferredencoding", "Error",
"setlocale", "resetlocale", "localeconv", "strcoll", "strxfrm",
"str", "atof", "atoi", "format", "format_string", "currency",
"normalize", "LC_CTYPE", "LC_COLLATE", "LC_TIME", "LC_MONETARY",
"LC_NUMERIC", "LC_ALL", "CHAR_MAX"]
"LC_NUMERIC", "LC_ALL", "CHAR_MAX", "getencoding"]
def _strcoll(a,b):
""" strcoll(string,string) -> int.
@@ -185,8 +185,14 @@ def _format(percent, value, grouping=False, monetary=False, *additional):
formatted = percent % ((value,) + additional)
else:
formatted = percent % value
if percent[-1] in 'eEfFgGdiu':
formatted = _localize(formatted, grouping, monetary)
return formatted
# Transform formatted as locale number according to the locale settings
def _localize(formatted, grouping=False, monetary=False):
# floats and decimal ints need special action!
if percent[-1] in 'eEfFgG':
if '.' in formatted:
seps = 0
parts = formatted.split('.')
if grouping:
@@ -196,7 +202,7 @@ def _format(percent, value, grouping=False, monetary=False, *additional):
formatted = decimal_point.join(parts)
if seps:
formatted = _strip_padding(formatted, seps)
elif percent[-1] in 'diu':
else:
seps = 0
if grouping:
formatted, seps = _group(formatted, monetary=monetary)
@@ -267,7 +273,7 @@ def currency(val, symbol=True, grouping=False, international=False):
raise ValueError("Currency formatting is not possible using "
"the 'C' locale.")
s = _format('%%.%if' % digits, abs(val), grouping, monetary=True)
s = _localize(f'{abs(val):.{digits}f}', grouping, monetary=True)
# '<' and '>' are markers if the sign must be inserted between symbol and value
s = '<' + s + '>'
@@ -279,6 +285,8 @@ def currency(val, symbol=True, grouping=False, international=False):
if precedes:
s = smb + (separated and ' ' or '') + s
else:
if international and smb[-1] == ' ':
smb = smb[:-1]
s = s + (separated and ' ' or '') + smb
sign_pos = conv[val<0 and 'n_sign_posn' or 'p_sign_posn']
@@ -321,6 +329,10 @@ def delocalize(string):
string = string.replace(dd, '.')
return string
def localize(string, grouping=False, monetary=False):
"""Parses a string as locale number according to the locale settings."""
return _localize(string, grouping, monetary)
def atof(string, func=float):
"Parses a string as a float according to the locale settings."
return func(delocalize(string))
@@ -492,6 +504,10 @@ def _parse_localename(localename):
return tuple(code.split('.')[:2])
elif code == 'C':
return None, None
elif code == 'UTF-8':
# On macOS "LC_CTYPE=UTF-8" is a valid locale setting
# for getting UTF-8 handling for text.
return None, 'UTF-8'
raise ValueError('unknown locale: %s' % localename)
def _build_localename(localetuple):
@@ -539,6 +555,12 @@ def getdefaultlocale(envvars=('LC_ALL', 'LC_CTYPE', 'LANG', 'LANGUAGE')):
"""
import warnings
warnings.warn(
"Use setlocale(), getencoding() and getlocale() instead",
DeprecationWarning, stacklevel=2
)
try:
# check if it's supported by the _locale module
import _locale
@@ -611,55 +633,72 @@ def resetlocale(category=LC_ALL):
getdefaultlocale(). category defaults to LC_ALL.
"""
_setlocale(category, _build_localename(getdefaultlocale()))
import warnings
warnings.warn(
'Use locale.setlocale(locale.LC_ALL, "") instead',
DeprecationWarning, stacklevel=2
)
if sys.platform.startswith("win"):
# On Win32, this will return the ANSI code page
def getpreferredencoding(do_setlocale = True):
"""Return the charset that the user is likely using."""
if sys.flags.utf8_mode:
return 'UTF-8'
import _bootlocale
return _bootlocale.getpreferredencoding(False)
else:
# On Unix, if CODESET is available, use that.
try:
CODESET
except NameError:
with warnings.catch_warnings():
warnings.simplefilter('ignore', category=DeprecationWarning)
loc = getdefaultlocale()
_setlocale(category, _build_localename(loc))
try:
from _locale import getencoding
except ImportError:
def getencoding():
if hasattr(sys, 'getandroidapilevel'):
# On Android langinfo.h and CODESET are missing, and UTF-8 is
# always used in mbstowcs() and wcstombs().
def getpreferredencoding(do_setlocale = True):
return 'UTF-8'
else:
# Fall back to parsing environment variables :-(
def getpreferredencoding(do_setlocale = True):
"""Return the charset that the user is likely using,
by looking at environment variables."""
if sys.flags.utf8_mode:
return 'UTF-8'
res = getdefaultlocale()[1]
if res is None:
# LANG not set, default conservatively to ASCII
res = 'ascii'
return res
else:
def getpreferredencoding(do_setlocale = True):
"""Return the charset that the user is likely using,
according to the system configuration."""
if sys.flags.utf8_mode:
return 'UTF-8'
import _bootlocale
if do_setlocale:
oldloc = setlocale(LC_CTYPE)
try:
setlocale(LC_CTYPE, "")
except Error:
pass
result = _bootlocale.getpreferredencoding(False)
if do_setlocale:
setlocale(LC_CTYPE, oldloc)
return result
return 'utf-8'
encoding = getdefaultlocale()[1]
if encoding is None:
# LANG not set, default to UTF-8
encoding = 'utf-8'
return encoding
try:
CODESET
except NameError:
def getpreferredencoding(do_setlocale=True):
"""Return the charset that the user is likely using."""
if sys.flags.warn_default_encoding:
import warnings
warnings.warn(
"UTF-8 Mode affects locale.getpreferredencoding(). Consider locale.getencoding() instead.",
EncodingWarning, 2)
if sys.flags.utf8_mode:
return 'utf-8'
return getencoding()
else:
# On Unix, if CODESET is available, use that.
def getpreferredencoding(do_setlocale=True):
"""Return the charset that the user is likely using,
according to the system configuration."""
if sys.flags.warn_default_encoding:
import warnings
warnings.warn(
"UTF-8 Mode affects locale.getpreferredencoding(). Consider locale.getencoding() instead.",
EncodingWarning, 2)
if sys.flags.utf8_mode:
return 'utf-8'
if not do_setlocale:
return getencoding()
old_loc = setlocale(LC_CTYPE)
try:
try:
setlocale(LC_CTYPE, "")
except Error:
pass
return getencoding()
finally:
setlocale(LC_CTYPE, old_loc)
### Database
@@ -734,6 +773,7 @@ locale_encoding_alias = {
for k, v in sorted(locale_encoding_alias.items()):
k = k.replace('_', '')
locale_encoding_alias.setdefault(k, v)
del k, v
#
# The locale_alias table maps lowercase alias names to C locale names

View File

@@ -5,6 +5,7 @@ if __name__ != 'test.support':
import contextlib
import functools
import getpass
import os
import re
import stat
@@ -39,18 +40,21 @@ __all__ = [
"requires_gzip", "requires_bz2", "requires_lzma",
"bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute",
"requires_IEEE_754", "requires_zlib",
"has_fork_support", "requires_fork",
"has_subprocess_support", "requires_subprocess",
"has_socket_support", "requires_working_socket",
"anticipate_failure", "load_package_tests", "detect_api_mismatch",
"check__all__", "skip_if_buggy_ucrt_strfptime",
"check_disallow_instantiation", "check_sanitizer", "skip_if_sanitizer",
# sys
"is_jython", "is_android", "check_impl_detail", "unix_shell",
"setswitchinterval",
"is_jython", "is_android", "is_emscripten", "is_wasi",
"check_impl_detail", "unix_shell", "setswitchinterval",
# network
"open_urlresource",
# processes
"reap_children",
# miscellaneous
"run_with_locale", "swap_item", "findfile",
"run_with_locale", "swap_item", "findfile", "infinite_recursion",
"swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict",
"run_with_tz", "PGO", "missing_compiler_executable",
"ALWAYS_EQ", "NEVER_EQ", "LARGEST", "SMALLEST",
@@ -99,6 +103,13 @@ SHORT_TIMEOUT = 30.0
# option.
LONG_TIMEOUT = 5 * 60.0
# TEST_HOME_DIR refers to the top level directory of the "test" package
# that contains Python's regression test suite
TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__))
TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR)
STDLIB_DIR = os.path.dirname(TEST_HOME_DIR)
REPO_ROOT = os.path.dirname(STDLIB_DIR)
class Error(Exception):
"""Base class for regression test exceptions."""
@@ -148,9 +159,7 @@ def load_package_tests(pkg_dir, loader, standard_tests, pattern):
"""
if pattern is None:
pattern = "test*"
top_dir = os.path.dirname( # Lib
os.path.dirname( # test
os.path.dirname(__file__))) # support
top_dir = STDLIB_DIR
package_tests = loader.discover(start_dir=pkg_dir,
top_level_dir=top_dir,
pattern=pattern)
@@ -190,6 +199,11 @@ def get_original_stdout():
def _force_run(path, func, *args):
try:
return func(*args)
except FileNotFoundError as err:
# chmod() won't fix a missing file.
if verbose >= 2:
print('%s: %s' % (err.__class__.__name__, err))
raise
except OSError as err:
if verbose >= 2:
print('%s: %s' % (err.__class__.__name__, err))
@@ -290,6 +304,8 @@ def requires(resource, msg=None):
if msg is None:
msg = "Use of the %r resource not enabled" % resource
raise ResourceDenied(msg)
if resource in {"network", "urlfetch"} and not has_socket_support:
raise ResourceDenied("No socket support")
if resource == 'gui' and not _is_gui_available():
raise ResourceDenied(_is_gui_available.reason)
@@ -367,6 +383,17 @@ def requires_mac_ver(*min_version):
return decorator
def skip_if_buildbot(reason=None):
"""Decorator raising SkipTest if running on a buildbot."""
if not reason:
reason = 'not suitable for buildbots'
try:
isbuildbot = getpass.getuser().lower() == 'buildbot'
except (KeyError, EnvironmentError) as err:
warnings.warn(f'getpass.getuser() failed {err}.', RuntimeWarning)
isbuildbot = False
return unittest.skipIf(isbuildbot, reason)
def check_sanitizer(*, address=False, memory=False, ub=False):
"""Returns True if Python is compiled with sanitizer support"""
if not (address or memory or ub):
@@ -462,6 +489,17 @@ def requires_lzma(reason='requires lzma'):
lzma = None
return unittest.skipUnless(lzma, reason)
def has_no_debug_ranges():
try:
import _testinternalcapi
except ImportError:
raise unittest.SkipTest("_testinternalcapi required")
config = _testinternalcapi.get_config()
return not bool(config['code_debug_ranges'])
def requires_debug_ranges(reason='requires co_positions / debug_ranges'):
return unittest.skipIf(has_no_debug_ranges(), reason)
requires_legacy_unicode_capi = unittest.skipUnless(unicode_legacy_string,
'requires legacy Unicode C API')
@@ -474,6 +512,46 @@ if sys.platform not in ('win32', 'vxworks'):
else:
unix_shell = None
# wasm32-emscripten and -wasi are POSIX-like but do not
# have subprocess or fork support.
is_emscripten = sys.platform == "emscripten"
is_wasi = sys.platform == "wasi"
has_fork_support = hasattr(os, "fork") and not is_emscripten and not is_wasi
def requires_fork():
return unittest.skipUnless(has_fork_support, "requires working os.fork()")
has_subprocess_support = not is_emscripten and not is_wasi
def requires_subprocess():
"""Used for subprocess, os.spawn calls, fd inheritance"""
return unittest.skipUnless(has_subprocess_support, "requires subprocess support")
# Emscripten's socket emulation and WASI sockets have limitations.
has_socket_support = not is_emscripten and not is_wasi
def requires_working_socket(*, module=False):
"""Skip tests or modules that require working sockets
Can be used as a function/class decorator or to skip an entire module.
"""
msg = "requires socket support"
if module:
if not has_socket_support:
raise unittest.SkipTest(msg)
else:
return unittest.skipUnless(has_socket_support, msg)
# Does strftime() support glibc extension like '%4Y'?
has_strftime_extensions = False
if sys.platform != "win32":
# bpo-47037: Windows debug builds crash with "Debug Assertion Failed"
try:
has_strftime_extensions = time.strftime("%4Y") != "%4Y"
except ValueError:
pass
# Define the URL of a dedicated HTTP server for the network tests.
# The URL must use clear-text HTTP: no redirection to encrypted HTTPS.
TEST_HTTP_URL = "http://www.pythontest.net"
@@ -486,11 +564,6 @@ PGO = False
# PGO task. If this is True, PGO is also True.
PGO_EXTENDED = False
# TEST_HOME_DIR refers to the top level directory of the "test" package
# that contains Python's regression test suite
TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__))
TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR)
# TEST_DATA_DIR is used as a target download location for remote resources
TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data")
@@ -712,7 +785,10 @@ _TPFLAGS_HAVE_GC = 1<<14
_TPFLAGS_HEAPTYPE = 1<<9
def check_sizeof(test, o, size):
import _testinternalcapi
try:
import _testinternalcapi
except ImportError:
raise unittest.SkipTest("_testinternalcapi required")
result = sys.getsizeof(o)
# add GC header size
if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\
@@ -728,29 +804,29 @@ def check_sizeof(test, o, size):
@contextlib.contextmanager
def run_with_locale(catstr, *locales):
try:
import locale
category = getattr(locale, catstr)
orig_locale = locale.setlocale(category)
except AttributeError:
# if the test author gives us an invalid category string
raise
except:
# cannot retrieve original locale, so do nothing
locale = orig_locale = None
else:
for loc in locales:
try:
import locale
category = getattr(locale, catstr)
orig_locale = locale.setlocale(category)
except AttributeError:
# if the test author gives us an invalid category string
raise
locale.setlocale(category, loc)
break
except:
# cannot retrieve original locale, so do nothing
locale = orig_locale = None
else:
for loc in locales:
try:
locale.setlocale(category, loc)
break
except:
pass
pass
try:
yield
finally:
if locale and orig_locale:
locale.setlocale(category, orig_locale)
try:
yield
finally:
if locale and orig_locale:
locale.setlocale(category, orig_locale)
#=======================================================================
# Decorator for running a function in a specific timezone, correctly
@@ -1133,17 +1209,18 @@ def _compile_match_function(patterns):
def run_unittest(*classes):
"""Run tests from unittest.TestCase-derived classes."""
valid_types = (unittest.TestSuite, unittest.TestCase)
loader = unittest.TestLoader()
suite = unittest.TestSuite()
for cls in classes:
if isinstance(cls, str):
if cls in sys.modules:
suite.addTest(unittest.findTestCases(sys.modules[cls]))
suite.addTest(loader.loadTestsFromModule(sys.modules[cls]))
else:
raise ValueError("str arguments must be keys in sys.modules")
elif isinstance(cls, valid_types):
suite.addTest(cls)
else:
suite.addTest(unittest.makeSuite(cls))
suite.addTest(loader.loadTestsFromTestCase(cls))
_filter_suite(suite, match_test)
_run_suite(suite)
@@ -1197,11 +1274,24 @@ def run_doctest(module, verbosity=None, optionflags=0):
#=======================================================================
# Support for saving and restoring the imported modules.
def flush_std_streams():
if sys.stdout is not None:
sys.stdout.flush()
if sys.stderr is not None:
sys.stderr.flush()
def print_warning(msg):
# bpo-39983: Print into sys.__stderr__ to display the warning even
# when sys.stderr is captured temporarily by a test
# bpo-45410: Explicitly flush stdout to keep logs in order
flush_std_streams()
stream = print_warning.orig_stderr
for line in msg.splitlines():
print(f"Warning -- {line}", file=sys.__stderr__, flush=True)
print(f"Warning -- {line}", file=stream)
stream.flush()
# bpo-39983: Store the original sys.stderr at Python startup to be able to
# log warnings even if sys.stderr is captured temporarily by a test.
print_warning.orig_stderr = sys.stderr
# Flag used by saved_test_environment of test.libregrtest.save_env,
@@ -1223,6 +1313,8 @@ def reap_children():
# Need os.waitpid(-1, os.WNOHANG): Windows is not supported
if not (hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG')):
return
elif not has_subprocess_support:
return
# Reap all our dead child processes so we don't leave zombies around.
# These hog resources and might be causing some of the buildbots to die.
@@ -1364,7 +1456,7 @@ def skip_if_buggy_ucrt_strfptime(test):
global _buggy_ucrt
if _buggy_ucrt is None:
if(sys.platform == 'win32' and
locale.getdefaultlocale()[1] == 'cp65001' and
locale.getencoding() == 'cp65001' and
time.localtime().tm_zone == ''):
_buggy_ucrt = True
else:
@@ -1410,8 +1502,8 @@ class PythonSymlink:
self._env = {k.upper(): os.getenv(k) for k in os.environ}
self._env["PYTHONHOME"] = os.path.dirname(self.real)
if sysconfig.is_python_build(True):
self._env["PYTHONPATH"] = os.path.dirname(os.__file__)
if sysconfig.is_python_build():
self._env["PYTHONPATH"] = STDLIB_DIR
else:
def _platform_specific(self):
pass
@@ -1685,6 +1777,16 @@ def patch(test_instance, object_to_patch, attr_name, new_value):
setattr(object_to_patch, attr_name, new_value)
@contextlib.contextmanager
def patch_list(orig):
"""Like unittest.mock.patch.dict, but for lists."""
try:
saved = orig[:]
yield
finally:
orig[:] = saved
def run_in_subinterp(code):
"""
Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc
@@ -1981,7 +2083,7 @@ def wait_process(pid, *, exitcode, timeout=None):
Raise an AssertionError if the process exit code is not equal to exitcode.
If the process runs longer than timeout seconds (SHORT_TIMEOUT by default),
If the process runs longer than timeout seconds (LONG_TIMEOUT by default),
kill the process (if signal.SIGKILL is available) and raise an
AssertionError. The timeout feature is not available on Windows.
"""
@@ -1989,7 +2091,7 @@ def wait_process(pid, *, exitcode, timeout=None):
import signal
if timeout is None:
timeout = SHORT_TIMEOUT
timeout = LONG_TIMEOUT
t0 = time.monotonic()
sleep = 0.001
max_sleep = 0.1
@@ -2000,7 +2102,7 @@ def wait_process(pid, *, exitcode, timeout=None):
# process is still running
dt = time.monotonic() - t0
if dt > SHORT_TIMEOUT:
if dt > timeout:
try:
os.kill(pid, signal.SIGKILL)
os.waitpid(pid, 0)
@@ -2051,16 +2153,6 @@ def skip_if_broken_multiprocessing_synchronize():
raise unittest.SkipTest(f"broken multiprocessing SemLock: {exc!r}")
@contextlib.contextmanager
def infinite_recursion(max_depth=75):
original_depth = sys.getrecursionlimit()
try:
sys.setrecursionlimit(max_depth)
yield
finally:
sys.setrecursionlimit(original_depth)
def check_disallow_instantiation(testcase, tp, *args, **kwds):
"""
Check that given type cannot be instantiated using *args and **kwds.
@@ -2076,6 +2168,20 @@ def check_disallow_instantiation(testcase, tp, *args, **kwds):
msg = f"cannot create '{re.escape(qualname)}' instances"
testcase.assertRaisesRegex(TypeError, msg, tp, *args, **kwds)
@contextlib.contextmanager
def infinite_recursion(max_depth=75):
"""Set a lower limit for tests that interact with infinite recursions
(e.g test_ast.ASTHelpers_Test.test_recursion_direct) since on some
debug windows builds, due to not enough functions being inlined the
stack size might not handle the default recursion limit (1000). See
bpo-11105 for details."""
original_depth = sys.getrecursionlimit()
try:
sys.setrecursionlimit(max_depth)
yield
finally:
sys.setrecursionlimit(original_depth)
def ignore_deprecations_from(module: str, *, like: str) -> object:
token = object()
@@ -2087,7 +2193,6 @@ def ignore_deprecations_from(module: str, *, like: str) -> object:
)
return token
def clear_ignored_deprecations(*tokens: object) -> None:
if not tokens:
raise ValueError("Provide token or tokens returned by ignore_deprecations_from")
@@ -2106,3 +2211,31 @@ def clear_ignored_deprecations(*tokens: object) -> None:
if warnings.filters != new_filters:
warnings.filters[:] = new_filters
warnings._filters_mutated()
# Skip a test if venv with pip is known to not work.
def requires_venv_with_pip():
# ensurepip requires zlib to open ZIP archives (.whl binary wheel packages)
try:
import zlib
except ImportError:
return unittest.skipIf(True, "venv: ensurepip requires zlib")
# bpo-26610: pip/pep425tags.py requires ctypes.
# gh-92820: setuptools/windows_support.py uses ctypes (setuptools 58.1).
try:
import ctypes
except ImportError:
ctypes = None
return unittest.skipUnless(ctypes, 'venv: pip requires ctypes')
@contextlib.contextmanager
def adjust_int_max_str_digits(max_digits):
"""Temporarily change the integer string conversion length limit."""
current = sys.get_int_max_str_digits()
try:
sys.set_int_max_str_digits(max_digits)
yield
finally:
sys.set_int_max_str_digits(current)

View File

@@ -1,4 +1,5 @@
import contextlib
import _imp
import importlib
import importlib.util
import os
@@ -90,7 +91,24 @@ def _save_and_remove_modules(names):
return orig_modules
def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
@contextlib.contextmanager
def frozen_modules(enabled=True):
"""Force frozen modules to be used (or not).
This only applies to modules that haven't been imported yet.
Also, some essential modules will always be imported frozen.
"""
_imp._override_frozen_modules_for_tests(1 if enabled else -1)
try:
yield
finally:
_imp._override_frozen_modules_for_tests(0)
def import_fresh_module(name, fresh=(), blocked=(), *,
deprecated=False,
usefrozen=False,
):
"""Import and return a module, deliberately bypassing sys.modules.
This function imports and returns a fresh copy of the named Python module
@@ -115,6 +133,9 @@ def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
This function will raise ImportError if the named module cannot be
imported.
If "usefrozen" is False (the default) then the frozen importer is
disabled (except for essential modules like importlib._bootstrap).
"""
# NOTE: test_heapq, test_json and test_warnings include extra sanity checks
# to make sure that this utility function is working as expected
@@ -129,13 +150,14 @@ def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
sys.modules[modname] = None
try:
# Return None when one of the "fresh" modules can not be imported.
try:
for modname in fresh:
__import__(modname)
except ImportError:
return None
return importlib.import_module(name)
with frozen_modules(usefrozen):
# Return None when one of the "fresh" modules can not be imported.
try:
for modname in fresh:
__import__(modname)
except ImportError:
return None
return importlib.import_module(name)
finally:
_save_and_remove_modules(names)
sys.modules.update(orig_modules)
@@ -151,9 +173,12 @@ class CleanImport(object):
with CleanImport("foo"):
importlib.import_module("foo") # new reference
If "usefrozen" is False (the default) then the frozen importer is
disabled (except for essential modules like importlib._bootstrap).
"""
def __init__(self, *module_names):
def __init__(self, *module_names, usefrozen=False):
self.original_modules = sys.modules.copy()
for module_name in module_names:
if module_name in sys.modules:
@@ -165,12 +190,15 @@ class CleanImport(object):
if module.__name__ != module_name:
del sys.modules[module.__name__]
del sys.modules[module_name]
self._frozen_modules = frozen_modules(usefrozen)
def __enter__(self):
self._frozen_modules.__enter__()
return self
def __exit__(self, *ignore_exc):
sys.modules.update(self.original_modules)
self._frozen_modules.__exit__(*ignore_exc)
class DirsOnSysPath(object):

View File

@@ -49,8 +49,8 @@ if os.name == 'nt':
'encoding (%s). Unicode filename tests may not be effective'
% (TESTFN_UNENCODABLE, sys.getfilesystemencoding()))
TESTFN_UNENCODABLE = None
# Mac OS X denies unencodable filenames (invalid utf-8)
elif sys.platform != 'darwin':
# macOS and Emscripten deny unencodable filenames (invalid utf-8)
elif sys.platform not in {'darwin', 'emscripten', 'wasi'}:
try:
# ascii and utf-8 cannot encode the byte 0xff
b'\xff'.decode(sys.getfilesystemencoding())
@@ -171,9 +171,13 @@ def can_symlink():
global _can_symlink
if _can_symlink is not None:
return _can_symlink
symlink_path = TESTFN + "can_symlink"
# WASI / wasmtime prevents symlinks with absolute paths, see man
# openat2(2) RESOLVE_BENEATH. Almost all symlink tests use absolute
# paths. Skip symlink tests on WASI for now.
src = os.path.abspath(TESTFN)
symlink_path = src + "can_symlink"
try:
os.symlink(TESTFN, symlink_path)
os.symlink(src, symlink_path)
can = True
except (OSError, NotImplementedError, AttributeError):
can = False
@@ -233,6 +237,84 @@ def skip_unless_xattr(test):
return test if ok else unittest.skip(msg)(test)
_can_chmod = None
def can_chmod():
global _can_chmod
if _can_chmod is not None:
return _can_chmod
if not hasattr(os, "chown"):
_can_chmod = False
return _can_chmod
try:
with open(TESTFN, "wb") as f:
try:
os.chmod(TESTFN, 0o777)
mode1 = os.stat(TESTFN).st_mode
os.chmod(TESTFN, 0o666)
mode2 = os.stat(TESTFN).st_mode
except OSError as e:
can = False
else:
can = stat.S_IMODE(mode1) != stat.S_IMODE(mode2)
finally:
unlink(TESTFN)
_can_chmod = can
return can
def skip_unless_working_chmod(test):
"""Skip tests that require working os.chmod()
WASI SDK 15.0 cannot change file mode bits.
"""
ok = can_chmod()
msg = "requires working os.chmod()"
return test if ok else unittest.skip(msg)(test)
# Check whether the current effective user has the capability to override
# DAC (discretionary access control). Typically user root is able to
# bypass file read, write, and execute permission checks. The capability
# is independent of the effective user. See capabilities(7).
_can_dac_override = None
def can_dac_override():
global _can_dac_override
if not can_chmod():
_can_dac_override = False
if _can_dac_override is not None:
return _can_dac_override
try:
with open(TESTFN, "wb") as f:
os.chmod(TESTFN, 0o400)
try:
with open(TESTFN, "wb"):
pass
except OSError:
_can_dac_override = False
else:
_can_dac_override = True
finally:
unlink(TESTFN)
return _can_dac_override
def skip_if_dac_override(test):
ok = not can_dac_override()
msg = "incompatible with CAP_DAC_OVERRIDE"
return test if ok else unittest.skip(msg)(test)
def skip_unless_dac_override(test):
ok = can_dac_override()
msg = "requires CAP_DAC_OVERRIDE"
return test if ok else unittest.skip(msg)(test)
def unlink(filename):
try:
_unlink(filename)
@@ -459,7 +541,10 @@ def create_empty_file(filename):
def open_dir_fd(path):
"""Open a file descriptor to a directory."""
assert os.path.isdir(path)
dir_fd = os.open(path, os.O_RDONLY)
flags = os.O_RDONLY
if hasattr(os, "O_DIRECTORY"):
flags |= os.O_DIRECTORY
dir_fd = os.open(path, flags)
try:
yield dir_fd
finally:
@@ -502,7 +587,7 @@ class FakePath:
def fd_count():
"""Count the number of open file descriptors.
"""
if sys.platform.startswith(('linux', 'freebsd')):
if sys.platform.startswith(('linux', 'freebsd', 'emscripten')):
try:
names = os.listdir("/proc/self/fd")
# Subtract one because listdir() internally opens a file
@@ -568,6 +653,11 @@ if hasattr(os, "umask"):
yield
finally:
os.umask(oldmask)
else:
@contextlib.contextmanager
def temp_umask(umask):
"""no-op on platforms without umask()"""
yield
class EnvironmentVarGuard(collections.abc.MutableMapping):
@@ -610,6 +700,10 @@ class EnvironmentVarGuard(collections.abc.MutableMapping):
def unset(self, envvar):
del self[envvar]
def copy(self):
# We do what os.environ.copy() does.
return dict(self)
def __enter__(self):
return self

View File

@@ -42,6 +42,10 @@ def interpreter_requires_environment():
if 'PYTHONHOME' in os.environ:
__cached_interp_requires_environment = True
return True
# cannot run subprocess, assume we don't need it
if not support.has_subprocess_support:
__cached_interp_requires_environment = False
return False
# Try running an interpreter with -E to see if it works or not.
try:
@@ -87,6 +91,7 @@ class _PythonRunResult(collections.namedtuple("_PythonRunResult",
# Executing the interpreter in a subprocess
@support.requires_subprocess()
def run_python_until_end(*args, **env_vars):
env_required = interpreter_requires_environment()
cwd = env_vars.pop('__cwd', None)
@@ -139,6 +144,7 @@ def run_python_until_end(*args, **env_vars):
return _PythonRunResult(rc, out, err), cmd_line
@support.requires_subprocess()
def _assert_python(expected_success, /, *args, **env_vars):
res, cmd_line = run_python_until_end(*args, **env_vars)
if (res.rc and expected_success) or (not res.rc and not expected_success):
@@ -171,6 +177,7 @@ def assert_python_failure(*args, **env_vars):
return _assert_python(False, *args, **env_vars)
@support.requires_subprocess()
def spawn_python(*args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kw):
"""Run a Python subprocess with the given arguments.
@@ -273,6 +280,7 @@ def make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
return zip_name, os.path.join(zip_name, script_name_in_zip)
@support.requires_subprocess()
def run_test_script(script):
# use -u to try to get the full output if the test hangs or crash
if support.verbose:

View File

@@ -5,12 +5,15 @@ import unittest
import sys
from .. import support
from . import warnings_helper
HOST = "localhost"
HOSTv4 = "127.0.0.1"
HOSTv6 = "::1"
# WASI SDK 15.0 does not provide gethostname, stub raises OSError ENOTSUP.
has_gethostname = not support.is_wasi
def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
"""Returns an unused port that should be suitable for binding. This is
@@ -190,7 +193,7 @@ _NOT_SET = object()
def transient_internet(resource_name, *, timeout=_NOT_SET, errnos=()):
"""Return a context manager that raises ResourceDenied when various issues
with the internet connection manifest themselves as exceptions."""
import nntplib
nntplib = warnings_helper.import_deprecated("nntplib")
import urllib.error
if timeout is _NOT_SET:
timeout = support.INTERNET_TIMEOUT
@@ -256,7 +259,7 @@ def transient_internet(resource_name, *, timeout=_NOT_SET, errnos=()):
err = a[0]
# The error can also be wrapped as args[1]:
# except socket.error as msg:
# raise OSError('socket error', msg).with_traceback(sys.exc_info()[2])
# raise OSError('socket error', msg) from msg
elif len(a) >= 2 and isinstance(a[1], OSError):
err = a[1]
else:

View File

@@ -173,7 +173,7 @@ if __name__ == '__main__':
raise RuntimeError('error message')
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestTests))
suite.addTest(unittest.TestLoader().loadTestsFromTestCase(TestTests))
stream = io.StringIO()
runner_cls = get_test_runner_class(sum(a == '-v' for a in sys.argv))
runner = runner_cls(sys.stdout)

View File

@@ -4,6 +4,7 @@ import functools
import sys
import threading
import time
import unittest
from test import support
@@ -207,3 +208,37 @@ class catch_threading_exception:
del self.exc_value
del self.exc_traceback
del self.thread
def _can_start_thread() -> bool:
"""Detect whether Python can start new threads.
Some WebAssembly platforms do not provide a working pthread
implementation. Thread support is stubbed and any attempt
to create a new thread fails.
- wasm32-wasi does not have threading.
- wasm32-emscripten can be compiled with or without pthread
support (-s USE_PTHREADS / __EMSCRIPTEN_PTHREADS__).
"""
if sys.platform == "emscripten":
return sys._emscripten_info.pthreads
elif sys.platform == "wasi":
return False
else:
# assume all other platforms have working thread support.
return True
can_start_thread = _can_start_thread()
def requires_working_threading(*, module=False):
"""Skip tests or modules that require working threading.
Can be used as a function/class decorator or to skip an entire module.
"""
msg = "requires threading support"
if module:
if not can_start_thread:
raise unittest.SkipTest(msg)
else:
return unittest.skipUnless(can_start_thread, msg)

View File

@@ -1,10 +1,18 @@
import contextlib
import functools
import importlib
import re
import sys
import warnings
def import_deprecated(name):
"""Import *name* while suppressing DeprecationWarning."""
with warnings.catch_warnings():
warnings.simplefilter('ignore', category=DeprecationWarning)
return importlib.import_module(name)
def check_syntax_warning(testcase, statement, errtext='',
*, lineno=1, offset=None):
# Test also that a warning is emitted only once.

View File

@@ -1,4 +1,5 @@
from test.support import verbose, is_android
from decimal import Decimal
from test.support import verbose, is_android, is_emscripten, is_wasi
from test.support.warnings_helper import check_warnings
import unittest
import locale
@@ -335,8 +336,7 @@ class TestFrFRNumberFormatting(FrFRCookedTest, BaseFormattingTest):
euro = '\u20ac'
self._test_currency(50000, "50000,00 " + euro)
self._test_currency(50000, "50 000,00 " + euro, grouping=True)
# XXX is the trailing space a bug?
self._test_currency(50000, "50 000,00 EUR ",
self._test_currency(50000, "50 000,00 EUR",
grouping=True, international=True)
@@ -367,7 +367,7 @@ class TestEnUSCollation(BaseLocalizedTest, TestCollation):
locale_type = locale.LC_ALL
def setUp(self):
enc = codecs.lookup(locale.getpreferredencoding(False) or 'ascii').name
enc = codecs.lookup(locale.getencoding() or 'ascii').name
if enc not in ('utf-8', 'iso8859-1', 'cp1252'):
raise unittest.SkipTest('encoding not suitable')
if enc != 'iso8859-1' and (sys.platform == 'darwin' or is_android or
@@ -377,11 +377,19 @@ class TestEnUSCollation(BaseLocalizedTest, TestCollation):
@unittest.skipIf(sys.platform.startswith('aix'),
'bpo-29972: broken test on AIX')
@unittest.skipIf(
is_emscripten or is_wasi,
"musl libc issue on Emscripten/WASI, bpo-46390"
)
def test_strcoll_with_diacritic(self):
self.assertLess(locale.strcoll('à', 'b'), 0)
@unittest.skipIf(sys.platform.startswith('aix'),
'bpo-29972: broken test on AIX')
@unittest.skipIf(
is_emscripten or is_wasi,
"musl libc issue on Emscripten/WASI, bpo-46390"
)
def test_strxfrm_with_diacritic(self):
self.assertLess(locale.strxfrm('à'), locale.strxfrm('b'))
@@ -502,7 +510,7 @@ class TestMiscellaneous(unittest.TestCase):
@unittest.expectedFailure
def test_defaults_UTF8(self):
# Issue #18378: on (at least) macOS setting LC_CTYPE to "UTF-8" is
# valid. Futhermore LC_CTYPE=UTF is used by the UTF-8 locale coercing
# valid. Furthermore LC_CTYPE=UTF is used by the UTF-8 locale coercing
# during interpreter startup (on macOS).
import _locale
import os
@@ -524,7 +532,8 @@ class TestMiscellaneous(unittest.TestCase):
os.environ['LC_CTYPE'] = 'UTF-8'
self.assertEqual(locale.getdefaultlocale(), (None, 'UTF-8'))
with check_warnings(('', DeprecationWarning)):
self.assertEqual(locale.getdefaultlocale(), (None, 'UTF-8'))
finally:
for k in orig_env:
@@ -536,6 +545,14 @@ class TestMiscellaneous(unittest.TestCase):
if orig_getlocale is not None:
_locale._getdefaultlocale = orig_getlocale
def test_getencoding(self):
# Invoke getencoding to make sure it does not cause exceptions.
enc = locale.getencoding()
self.assertIsInstance(enc, str)
self.assertNotEqual(enc, "")
# make sure it is valid
codecs.lookup(enc)
def test_getpreferredencoding(self):
# Invoke getpreferredencoding to make sure it does not cause exceptions.
enc = locale.getpreferredencoding()
@@ -573,7 +590,13 @@ class TestMiscellaneous(unittest.TestCase):
loc = locale.getlocale(locale.LC_CTYPE)
if verbose:
print('testing with %a' % (loc,), end=' ', flush=True)
locale.setlocale(locale.LC_CTYPE, loc)
try:
locale.setlocale(locale.LC_CTYPE, loc)
except locale.Error as exc:
# bpo-37945: setlocale(LC_CTYPE) fails with getlocale(LC_CTYPE)
# and the tr_TR locale on Windows. getlocale() builds a locale
# which is not recognize by setlocale().
self.skipTest(f"setlocale(LC_CTYPE, {loc!r}) failed: {exc!r}")
self.assertEqual(loc, locale.getlocale(locale.LC_CTYPE))
def test_invalid_locale_format_in_localetuple(self):
@@ -639,5 +662,32 @@ class TestfrFRDelocalizeTest(FrFRCookedTest, BaseDelocalizeTest):
self._test_atoi('50 000', 50000)
class BaseLocalizeTest(BaseLocalizedTest):
def _test_localize(self, value, out, grouping=False):
self.assertEqual(locale.localize(value, grouping=grouping), out)
class TestEnUSLocalize(EnUSCookedTest, BaseLocalizeTest):
def test_localize(self):
self._test_localize('50000.00', '50000.00')
self._test_localize(
'{0:.16f}'.format(Decimal('1.15')), '1.1500000000000000')
class TestCLocalize(CCookedTest, BaseLocalizeTest):
def test_localize(self):
self._test_localize('50000.00', '50000.00')
class TestfrFRLocalize(FrFRCookedTest, BaseLocalizeTest):
def test_localize(self):
self._test_localize('50000.00', '50000,00')
self._test_localize('50000.00', '50 000,00', grouping=True)
if __name__ == '__main__':
unittest.main()

View File

@@ -123,15 +123,18 @@ class TestSupport(unittest.TestCase):
os_helper.unlink(mod_filename)
os_helper.rmtree('__pycache__')
@support.requires_working_socket()
def test_HOST(self):
s = socket.create_server((socket_helper.HOST, 0))
s.close()
@support.requires_working_socket()
def test_find_unused_port(self):
port = socket_helper.find_unused_port()
s = socket.create_server((socket_helper.HOST, port))
s.close()
@support.requires_working_socket()
def test_bind_port(self):
s = socket.socket()
socket_helper.bind_port(s)
@@ -198,7 +201,7 @@ class TestSupport(unittest.TestCase):
f'temporary directory {path!r}: '),
warn)
@unittest.skipUnless(hasattr(os, "fork"), "test requires os.fork")
@support.requires_fork()
def test_temp_dir__forked_child(self):
"""Test that a forked child process does not remove the directory."""
# See bpo-30028 for details.
@@ -429,9 +432,14 @@ class TestSupport(unittest.TestCase):
extra=extra,
not_exported=not_exported)
extra = {'TextTestResult', 'installHandler'}
extra = {
'TextTestResult',
'findTestCases',
'getTestCaseNames',
'installHandler',
'makeSuite',
}
not_exported = {'load_tests', "TestProgram", "BaseTestSuite"}
support.check__all__(self,
unittest,
("unittest.result", "unittest.case",
@@ -447,6 +455,7 @@ class TestSupport(unittest.TestCase):
@unittest.expectedFailure
@unittest.skipUnless(hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG'),
'need os.waitpid() and os.WNOHANG')
@support.requires_fork()
def test_reap_children(self):
# Make sure that there is no other pending child process
support.reap_children()
@@ -469,12 +478,8 @@ class TestSupport(unittest.TestCase):
if time.monotonic() > deadline:
self.fail("timeout")
old_stderr = sys.__stderr__
try:
sys.__stderr__ = stderr
with support.swap_attr(support.print_warning, 'orig_stderr', stderr):
support.reap_children()
finally:
sys.__stderr__ = old_stderr
# Use environment_altered to check if reap_children() found
# the child process
@@ -494,6 +499,7 @@ class TestSupport(unittest.TestCase):
# pending child process
support.reap_children()
@support.requires_subprocess()
def check_options(self, args, func, expected=None):
code = f'from test.support import {func}; print(repr({func}()))'
cmd = [sys.executable, *args, '-c', code]
@@ -509,6 +515,8 @@ class TestSupport(unittest.TestCase):
self.assertEqual(proc.stdout.rstrip(), repr(expected))
self.assertEqual(proc.returncode, 0)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_args_from_interpreter_flags(self):
# Test test.support.args_from_interpreter_flags()
for opts in (
@@ -521,6 +529,7 @@ class TestSupport(unittest.TestCase):
['-E'],
['-v'],
['-b'],
['-P'],
['-q'],
['-I'],
# same option multiple times
@@ -540,7 +549,8 @@ class TestSupport(unittest.TestCase):
with self.subTest(opts=opts):
self.check_options(opts, 'args_from_interpreter_flags')
self.check_options(['-I', '-E', '-s'], 'args_from_interpreter_flags',
self.check_options(['-I', '-E', '-s', '-P'],
'args_from_interpreter_flags',
['-I'])
def test_optim_args_from_interpreter_flags(self):
@@ -661,10 +671,14 @@ class TestSupport(unittest.TestCase):
self.assertTrue(support.match_test(test_chdir))
@unittest.skipIf(sys.platform.startswith("win"), "TODO: RUSTPYTHON; os.dup on windows")
@unittest.skipIf(support.is_emscripten, "Unstable in Emscripten")
@unittest.skipIf(support.is_wasi, "Unavailable on WASI")
def test_fd_count(self):
# We cannot test the absolute value of fd_count(): on old Linux
# kernel or glibc versions, os.urandom() keeps a FD open on
# /dev/urandom device and Python has 4 FD opens instead of 3.
# Test is unstable on Emscripten. The platform starts and stops
# background threads that use pipes and epoll fds.
start = os_helper.fd_count()
fd = os.open(__file__, os.O_RDONLY)
try:
@@ -675,14 +689,8 @@ class TestSupport(unittest.TestCase):
def check_print_warning(self, msg, expected):
stderr = io.StringIO()
old_stderr = sys.__stderr__
try:
sys.__stderr__ = stderr
with support.swap_attr(support.print_warning, 'orig_stderr', stderr):
support.print_warning(msg)
finally:
sys.__stderr__ = old_stderr
self.assertEqual(stderr.getvalue(), expected)
def test_print_warning(self):
@@ -691,6 +699,17 @@ class TestSupport(unittest.TestCase):
self.check_print_warning("a\nb",
'Warning -- a\nWarning -- b\n')
def test_has_strftime_extensions(self):
if support.is_emscripten or sys.platform == "win32":
self.assertFalse(support.has_strftime_extensions)
else:
self.assertTrue(support.has_strftime_extensions)
# TODO: RUSTPYTHON
if not sys.platform.startswith("win"):
# TODO: RUSTPYTHON
test_has_strftime_extensions = unittest.expectedFailure(test_has_strftime_extensions)
# XXX -follows a list of untested API
# make_legacy_pyc
# is_resource_enabled

View File

@@ -3,11 +3,13 @@ Test the implementation of the PEP 540: the UTF-8 Mode.
"""
import locale
import subprocess
import sys
import textwrap
import unittest
from test import support
from test.support.script_helper import assert_python_ok, assert_python_failure
from test.support import os_helper
MS_WINDOWS = (sys.platform == 'win32')
@@ -159,8 +161,6 @@ class UTF8ModeTests(unittest.TestCase):
'stdout: utf-8/namereplace',
'stderr: utf-8/backslashreplace'])
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_io(self):
code = textwrap.dedent('''
import sys
@@ -171,7 +171,7 @@ class UTF8ModeTests(unittest.TestCase):
filename = __file__
out = self.get_output('-c', code, filename, PYTHONUTF8='1')
self.assertEqual(out, 'UTF-8/strict')
self.assertEqual(out.lower(), 'utf-8/strict')
def _check_io_encoding(self, module, encoding=None, errors=None):
filename = __file__
@@ -193,10 +193,10 @@ class UTF8ModeTests(unittest.TestCase):
PYTHONUTF8='1')
if not encoding:
encoding = 'UTF-8'
encoding = 'utf-8'
if not errors:
errors = 'strict'
self.assertEqual(out, f'{encoding}/{errors}')
self.assertEqual(out.lower(), f'{encoding}/{errors}')
def check_io_encoding(self, module):
self._check_io_encoding(module, encoding="latin1")
@@ -204,8 +204,6 @@ class UTF8ModeTests(unittest.TestCase):
self._check_io_encoding(module,
encoding="latin1", errors="namereplace")
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_io_encoding(self):
self.check_io_encoding('io')
@@ -215,12 +213,12 @@ class UTF8ModeTests(unittest.TestCase):
def test_locale_getpreferredencoding(self):
code = 'import locale; print(locale.getpreferredencoding(False), locale.getpreferredencoding(True))'
out = self.get_output('-X', 'utf8', '-c', code)
self.assertEqual(out, 'UTF-8 UTF-8')
self.assertEqual(out, 'utf-8 utf-8')
for loc in POSIX_LOCALES:
with self.subTest(LC_ALL=loc):
out = self.get_output('-X', 'utf8', '-c', code, LC_ALL=loc)
self.assertEqual(out, 'UTF-8 UTF-8')
self.assertEqual(out, 'utf-8 utf-8')
@unittest.skipIf(MS_WINDOWS, 'test specific to Unix')
def test_cmd_line(self):
@@ -268,6 +266,34 @@ class UTF8ModeTests(unittest.TestCase):
out = self.get_output('-X', 'utf8', '-E', '-c', code)
self.assertEqual(out, '1')
# TODO: RUSTPYTHON
@unittest.expectedFailure
@unittest.skipIf(MS_WINDOWS,
"os.device_encoding() doesn't implement "
"the UTF-8 Mode on Windows")
@support.requires_subprocess()
def test_device_encoding(self):
# Use stdout as TTY
if not sys.stdout.isatty():
self.skipTest("sys.stdout is not a TTY")
filename = 'out.txt'
self.addCleanup(os_helper.unlink, filename)
code = (f'import os, sys; fd = sys.stdout.fileno(); '
f'out = open({filename!r}, "w", encoding="utf-8"); '
f'print(os.isatty(fd), os.device_encoding(fd), file=out); '
f'out.close()')
cmd = [sys.executable, '-X', 'utf8', '-c', code]
# The stdout TTY is inherited to the child process
proc = subprocess.run(cmd, text=True)
self.assertEqual(proc.returncode, 0, proc)
# In UTF-8 Mode, device_encoding(fd) returns "UTF-8" if fd is a TTY
with open(filename, encoding="utf8") as fp:
out = fp.read().rstrip()
self.assertEqual(out, 'True utf-8')
if __name__ == "__main__":
unittest.main()