Update signal from CPython 3.12.3

This commit is contained in:
Jeong YunWon
2024-04-25 23:27:18 +09:00
committed by Jeong, YunWon
parent ed51d8dcf6
commit 0273d78de9
2 changed files with 126 additions and 20 deletions

6
Lib/signal.py vendored
View File

@@ -22,9 +22,11 @@ if 'pthread_sigmask' in _globals:
def _int_to_enum(value, enum_klass):
"""Convert a numeric value to an IntEnum member.
If it's not a known member, return the numeric value itself.
"""Convert a possible numeric value to an IntEnum member.
If it's not a known member, return the value itself.
"""
if not isinstance(value, int):
return value
try:
return enum_klass(value)
except ValueError:

View File

@@ -1,4 +1,6 @@
import enum
import errno
import functools
import inspect
import os
import random
@@ -13,6 +15,7 @@ import unittest
from test import support
from test.support import os_helper
from test.support.script_helper import assert_python_ok, spawn_python
from test.support import threading_helper
try:
import _testcapi
except ImportError:
@@ -21,6 +24,8 @@ except ImportError:
class GenericTests(unittest.TestCase):
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_enums(self):
for name in dir(signal):
sig = getattr(signal, name)
@@ -34,6 +39,32 @@ class GenericTests(unittest.TestCase):
self.assertIsInstance(sig, signal.Signals)
self.assertEqual(sys.platform, "win32")
CheckedSignals = enum._old_convert_(
enum.IntEnum, 'Signals', 'signal',
lambda name:
name.isupper()
and (name.startswith('SIG') and not name.startswith('SIG_'))
or name.startswith('CTRL_'),
source=signal,
)
enum._test_simple_enum(CheckedSignals, signal.Signals)
CheckedHandlers = enum._old_convert_(
enum.IntEnum, 'Handlers', 'signal',
lambda name: name in ('SIG_DFL', 'SIG_IGN'),
source=signal,
)
enum._test_simple_enum(CheckedHandlers, signal.Handlers)
Sigmasks = getattr(signal, 'Sigmasks', None)
if Sigmasks is not None:
CheckedSigmasks = enum._old_convert_(
enum.IntEnum, 'Sigmasks', 'signal',
lambda name: name in ('SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'),
source=signal,
)
enum._test_simple_enum(CheckedSigmasks, Sigmasks)
def test_functions_module_attr(self):
# Issue #27718: If __all__ is not defined all non-builtin functions
# should have correct __module__ to be displayed by pydoc.
@@ -48,6 +79,9 @@ class PosixTests(unittest.TestCase):
def trivial_signal_handler(self, *args):
pass
def create_handler_with_partial(self, argument):
return functools.partial(self.trivial_signal_handler, argument)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_out_of_range_signal_number_raises_error(self):
@@ -70,6 +104,28 @@ class PosixTests(unittest.TestCase):
signal.signal(signal.SIGHUP, hup)
self.assertEqual(signal.getsignal(signal.SIGHUP), hup)
def test_no_repr_is_called_on_signal_handler(self):
# See https://github.com/python/cpython/issues/112559.
class MyArgument:
def __init__(self):
self.repr_count = 0
def __repr__(self):
self.repr_count += 1
return super().__repr__()
argument = MyArgument()
self.assertEqual(0, argument.repr_count)
handler = self.create_handler_with_partial(argument)
hup = signal.signal(signal.SIGHUP, handler)
self.assertIsInstance(hup, signal.Handlers)
self.assertEqual(signal.getsignal(signal.SIGHUP), handler)
signal.signal(signal.SIGHUP, hup)
self.assertEqual(signal.getsignal(signal.SIGHUP), hup)
self.assertEqual(0, argument.repr_count)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_strsignal(self):
@@ -87,6 +143,10 @@ class PosixTests(unittest.TestCase):
# TODO: RUSTPYTHON
@unittest.expectedFailure
@unittest.skipUnless(
hasattr(signal, "valid_signals"),
"requires signal.valid_signals"
)
def test_valid_signals(self):
s = signal.valid_signals()
self.assertIsInstance(s, set)
@@ -96,7 +156,21 @@ class PosixTests(unittest.TestCase):
self.assertNotIn(signal.NSIG, s)
self.assertLess(len(s), signal.NSIG)
# gh-91145: Make sure that all SIGxxx constants exposed by the Python
# signal module have a number in the [0; signal.NSIG-1] range.
for name in dir(signal):
if not name.startswith("SIG"):
continue
if name in {"SIG_IGN", "SIG_DFL"}:
# SIG_IGN and SIG_DFL are pointers
continue
with self.subTest(name=name):
signum = getattr(signal, name)
self.assertGreaterEqual(signum, 0)
self.assertLess(signum, signal.NSIG)
@unittest.skipUnless(sys.executable, "sys.executable required.")
@support.requires_subprocess()
def test_keyboard_interrupt_exit_code(self):
"""KeyboardInterrupt triggers exit via SIGINT."""
process = subprocess.run(
@@ -153,6 +227,7 @@ class WindowsSignalTests(unittest.TestCase):
# TODO: RUSTPYTHON
@unittest.expectedFailure
@unittest.skipUnless(sys.executable, "sys.executable required.")
@support.requires_subprocess()
def test_keyboard_interrupt_exit_code(self):
"""KeyboardInterrupt triggers an exit using STATUS_CONTROL_C_EXIT."""
# We don't test via os.kill(os.getpid(), signal.CTRL_C_EVENT) here
@@ -185,6 +260,7 @@ class WakeupFDTests(unittest.TestCase):
signal.set_wakeup_fd, fd)
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
@unittest.skipUnless(support.has_socket_support, "needs working sockets.")
def test_invalid_socket(self):
sock = socket.socket()
fd = sock.fileno()
@@ -193,6 +269,10 @@ class WakeupFDTests(unittest.TestCase):
signal.set_wakeup_fd, fd)
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
# Emscripten does not support fstat on pipes yet.
# https://github.com/emscripten-core/emscripten/issues/16414
@unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.")
@unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
def test_set_wakeup_fd_result(self):
r1, w1 = os.pipe()
self.addCleanup(os.close, r1)
@@ -211,6 +291,8 @@ class WakeupFDTests(unittest.TestCase):
self.assertEqual(signal.set_wakeup_fd(-1), -1)
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
@unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.")
@unittest.skipUnless(support.has_socket_support, "needs working sockets.")
def test_set_wakeup_fd_socket_result(self):
sock1 = socket.socket()
self.addCleanup(sock1.close)
@@ -230,6 +312,8 @@ class WakeupFDTests(unittest.TestCase):
# On Windows, files are always blocking and Windows does not provide a
# function to test if a socket is in non-blocking mode.
@unittest.skipIf(sys.platform == "win32", "tests specific to POSIX")
@unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.")
@unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
def test_set_wakeup_fd_blocking(self):
rfd, wfd = os.pipe()
self.addCleanup(os.close, rfd)
@@ -290,6 +374,7 @@ class WakeupSignalTests(unittest.TestCase):
assert_python_ok('-c', code)
@unittest.skipIf(_testcapi is None, 'need _testcapi')
@unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
def test_wakeup_write_error(self):
# Issue #16105: write() errors in the C signal handler should not
# pass silently.
@@ -628,6 +713,8 @@ class WakeupSocketSignalTests(unittest.TestCase):
@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
@unittest.skipUnless(hasattr(signal, 'siginterrupt'), "needs signal.siginterrupt()")
@support.requires_subprocess()
@unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
class SiginterruptTest(unittest.TestCase):
def readpipe_interrupted(self, interrupt):
@@ -708,6 +795,7 @@ class SiginterruptTest(unittest.TestCase):
interrupted = self.readpipe_interrupted(True)
self.assertTrue(interrupted)
@support.requires_resource('walltime')
def test_siginterrupt_off(self):
# If a signal handler is installed and siginterrupt is called with
# a false value for the second argument, when that signal arrives, it
@@ -781,15 +869,12 @@ class ItimerTest(unittest.TestCase):
signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
signal.setitimer(self.itimer, 0.3, 0.2)
start_time = time.monotonic()
while time.monotonic() - start_time < 60.0:
for _ in support.busy_retry(support.LONG_TIMEOUT):
# use up some virtual time by doing real work
_ = pow(12345, 67890, 10000019)
if signal.getitimer(self.itimer) == (0.0, 0.0):
break # sig_vtalrm handler stopped this itimer
else: # Issue 8424
self.skipTest("timeout: likely cause: machine too slow or load too "
"high")
# sig_vtalrm handler stopped this itimer
break
# virtual itimer should be (0.0, 0.0) now
self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
@@ -803,15 +888,12 @@ class ItimerTest(unittest.TestCase):
signal.signal(signal.SIGPROF, self.sig_prof)
signal.setitimer(self.itimer, 0.2, 0.2)
start_time = time.monotonic()
while time.monotonic() - start_time < 60.0:
for _ in support.busy_retry(support.LONG_TIMEOUT):
# do some work
_ = pow(12345, 67890, 10000019)
if signal.getitimer(self.itimer) == (0.0, 0.0):
break # sig_prof handler stopped this itimer
else: # Issue 8424
self.skipTest("timeout: likely cause: machine too slow or load too "
"high")
# sig_prof handler stopped this itimer
break
# profiling itimer should be (0.0, 0.0) now
self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
@@ -873,6 +955,7 @@ class PendingSignalsTests(unittest.TestCase):
@unittest.skipUnless(hasattr(signal, 'pthread_kill'),
'need signal.pthread_kill()')
@threading_helper.requires_working_threading()
def test_pthread_kill(self):
code = """if 1:
import signal
@@ -1009,6 +1092,7 @@ class PendingSignalsTests(unittest.TestCase):
'need signal.sigwait()')
@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
'need signal.pthread_sigmask()')
@threading_helper.requires_working_threading()
def test_sigwait_thread(self):
# Check that calling sigwait() from a thread doesn't suspend the whole
# process. A new interpreter is spawned to avoid problems when mixing
@@ -1064,6 +1148,7 @@ class PendingSignalsTests(unittest.TestCase):
@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
'need signal.pthread_sigmask()')
@threading_helper.requires_working_threading()
def test_pthread_sigmask(self):
code = """if 1:
import signal
@@ -1141,6 +1226,7 @@ class PendingSignalsTests(unittest.TestCase):
@unittest.skipUnless(hasattr(signal, 'pthread_kill'),
'need signal.pthread_kill()')
@threading_helper.requires_working_threading()
def test_pthread_kill_main_thread(self):
# Test that a signal can be sent to the main thread with pthread_kill()
# before any other thread has been created (see issue #12392).
@@ -1276,8 +1362,6 @@ class StressTest(unittest.TestCase):
self.setsig(signal.SIGALRM, handler) # for ITIMER_REAL
expected_sigs = 0
deadline = time.monotonic() + support.SHORT_TIMEOUT
while expected_sigs < N:
# Hopefully the SIGALRM will be received somewhere during
# initial processing of SIGUSR1.
@@ -1286,16 +1370,19 @@ class StressTest(unittest.TestCase):
expected_sigs += 2
# Wait for handlers to run to avoid signal coalescing
while len(sigs) < expected_sigs and time.monotonic() < deadline:
time.sleep(1e-5)
for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
if len(sigs) >= expected_sigs:
break
# All ITIMER_REAL signals should have been delivered to the
# Python handler
self.assertEqual(len(sigs), N, "Some signals were lost")
@unittest.skip("TODO: RUSTPYTHON; hang")
@unittest.skipIf(sys.platform == "darwin", "crashes due to system bug (FB13453490)")
@unittest.skipUnless(hasattr(signal, "SIGUSR1"),
"test needs SIGUSR1")
@threading_helper.requires_working_threading()
def test_stress_modifying_handlers(self):
# bpo-43406: race condition between trip_signal() and signal.signal
signum = signal.SIGUSR1
@@ -1314,7 +1401,7 @@ class StressTest(unittest.TestCase):
num_sent_signals += 1
def cycle_handlers():
while num_sent_signals < 100:
while num_sent_signals < 100 or num_received_signals < 1:
for i in range(20000):
# Cycle between a Python-defined and a non-Python handler
for handler in [custom_handler, signal.SIG_IGN]:
@@ -1347,7 +1434,7 @@ class StressTest(unittest.TestCase):
if not ignored:
# Sanity check that some signals were received, but not all
self.assertGreater(num_received_signals, 0)
self.assertLess(num_received_signals, num_sent_signals)
self.assertLessEqual(num_received_signals, num_sent_signals)
finally:
do_stop = True
t.join()
@@ -1388,6 +1475,23 @@ class RaiseSignalTest(unittest.TestCase):
signal.raise_signal(signal.SIGINT)
self.assertTrue(is_ok)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test__thread_interrupt_main(self):
# See https://github.com/python/cpython/issues/102397
code = """if 1:
import _thread
class Foo():
def __del__(self):
_thread.interrupt_main()
x = Foo()
"""
rc, out, err = assert_python_ok('-c', code)
self.assertIn(b'OSError: Signal 2 ignored due to race condition', err)
class PidfdSignalTest(unittest.TestCase):