Update thread/threading from CPython 3.10.5

This commit is contained in:
CPython developers
2022-07-18 22:49:38 +09:00
committed by Jeong YunWon
parent c7ad41ec4d
commit a1548e20f5
5 changed files with 669 additions and 139 deletions

View File

@@ -133,6 +133,7 @@ class ThreadRunningTests(BasicThreadTest):
del task
while not done:
time.sleep(POLL_SLEEP)
support.gc_collect() # For PyPy or other GCs.
self.assertEqual(thread._count(), orig)
def test_unraisable_exception(self):
@@ -227,30 +228,31 @@ class TestForkInThread(unittest.TestCase):
@unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork')
@threading_helper.reap_threads
def test_forkinthread(self):
status = "not set"
pid = None
def thread1():
nonlocal status
def fork_thread(read_fd, write_fd):
nonlocal pid
# fork in a thread
pid = os.fork()
if pid == 0:
# child
try:
os.close(self.read_fd)
os.write(self.write_fd, b"OK")
finally:
os._exit(0)
else:
# parent
os.close(self.write_fd)
pid, status = os.waitpid(pid, 0)
if pid:
# parent process
return
# child process
try:
os.close(read_fd)
os.write(write_fd, b"OK")
finally:
os._exit(0)
with threading_helper.wait_threads_exit():
thread.start_new_thread(thread1, ())
self.assertEqual(os.read(self.read_fd, 2), b"OK",
"Unable to fork() in thread")
self.assertEqual(status, 0)
thread.start_new_thread(fork_thread, (self.read_fd, self.write_fd))
self.assertEqual(os.read(self.read_fd, 2), b"OK")
os.close(self.write_fd)
self.assertIsNotNone(pid)
support.wait_process(pid, exitcode=0)
def tearDown(self):
try:

View File

@@ -19,7 +19,6 @@ from test.support import threading_helper
import unittest
import io
import threading
import sys
from traceback import print_exc

View File

@@ -18,7 +18,10 @@ import weakref
import os
import subprocess
import signal
import textwrap
import traceback
from unittest import mock
from test import lock_tests
from test import support
@@ -29,6 +32,14 @@ from test import support
# on platforms known to behave badly.
platforms_to_skip = ('netbsd5', 'hp-ux11')
# Is Python built with Py_DEBUG macro defined?
Py_DEBUG = hasattr(sys, 'gettotalrefcount')
def restore_default_excepthook(testcase):
testcase.addCleanup(setattr, threading, 'excepthook', threading.excepthook)
threading.excepthook = threading.__excepthook__
# A trivial mutable counter.
class Counter(object):
@@ -85,6 +96,39 @@ class BaseTestCase(unittest.TestCase):
class ThreadTests(BaseTestCase):
@cpython_only
def test_name(self):
def func(): pass
thread = threading.Thread(name="myname1")
self.assertEqual(thread.name, "myname1")
# Convert int name to str
thread = threading.Thread(name=123)
self.assertEqual(thread.name, "123")
# target name is ignored if name is specified
thread = threading.Thread(target=func, name="myname2")
self.assertEqual(thread.name, "myname2")
with mock.patch.object(threading, '_counter', return_value=2):
thread = threading.Thread(name="")
self.assertEqual(thread.name, "Thread-2")
with mock.patch.object(threading, '_counter', return_value=3):
thread = threading.Thread()
self.assertEqual(thread.name, "Thread-3")
with mock.patch.object(threading, '_counter', return_value=5):
thread = threading.Thread(target=func)
self.assertEqual(thread.name, "Thread-5 (func)")
@cpython_only
def test_disallow_instantiation(self):
# Ensure that the type disallows instantiation (bpo-43916)
lock = threading.Lock()
test.support.check_disallow_instantiation(self, type(lock))
# Create a bunch of threads, let each do some work, wait until all are
# done.
def test_various_ops(self):
@@ -125,9 +169,9 @@ class ThreadTests(BaseTestCase):
def test_ident_of_no_threading_threads(self):
# The ident still must work for the main thread and dummy threads.
self.assertIsNotNone(threading.currentThread().ident)
self.assertIsNotNone(threading.current_thread().ident)
def f():
ident.append(threading.currentThread().ident)
ident.append(threading.current_thread().ident)
done.set()
done = threading.Event()
ident = []
@@ -265,7 +309,7 @@ class ThreadTests(BaseTestCase):
self.assertEqual(result, 1) # one thread state modified
if verbose:
print(" waiting for worker to say it caught the exception")
worker_saw_exception.wait(timeout=10)
worker_saw_exception.wait(timeout=support.SHORT_TIMEOUT)
self.assertTrue(t.finished)
if verbose:
print(" all OK -- joining worker")
@@ -288,7 +332,7 @@ class ThreadTests(BaseTestCase):
finally:
threading._start_new_thread = _start_new_thread
def test_finalize_runnning_thread(self):
def test_finalize_running_thread(self):
# Issue 1402: the PyGILState_Ensure / _Release functions may be called
# very late on python exit: on deallocation of a running thread for
# example.
@@ -402,6 +446,8 @@ class ThreadTests(BaseTestCase):
if self.should_raise:
raise SystemExit
restore_default_excepthook(self)
cyclic_object = RunSelfFunction(should_raise=False)
weak_cyclic_object = weakref.ref(cyclic_object)
cyclic_object.thread.join()
@@ -422,15 +468,32 @@ class ThreadTests(BaseTestCase):
# Just a quick sanity check to make sure the old method names are
# still present
t = threading.Thread()
t.isDaemon()
t.setDaemon(True)
t.getName()
t.setName("name")
with self.assertWarnsRegex(DeprecationWarning, 'use is_alive()'):
t.isAlive()
with self.assertWarnsRegex(DeprecationWarning,
r'get the daemon attribute'):
t.isDaemon()
with self.assertWarnsRegex(DeprecationWarning,
r'set the daemon attribute'):
t.setDaemon(True)
with self.assertWarnsRegex(DeprecationWarning,
r'get the name attribute'):
t.getName()
with self.assertWarnsRegex(DeprecationWarning,
r'set the name attribute'):
t.setName("name")
e = threading.Event()
e.isSet()
threading.activeCount()
with self.assertWarnsRegex(DeprecationWarning, 'use is_set()'):
e.isSet()
cond = threading.Condition()
cond.acquire()
with self.assertWarnsRegex(DeprecationWarning, 'use notify_all()'):
cond.notifyAll()
with self.assertWarnsRegex(DeprecationWarning, 'use active_count()'):
threading.activeCount()
with self.assertWarnsRegex(DeprecationWarning, 'use current_thread()'):
threading.currentThread()
def test_repr_daemon(self):
t = threading.Thread()
@@ -446,6 +509,34 @@ class ThreadTests(BaseTestCase):
t = threading.Thread(daemon=True)
self.assertTrue(t.daemon)
@unittest.skipUnless(hasattr(os, 'fork'), 'needs os.fork()')
def test_fork_at_exit(self):
# bpo-42350: Calling os.fork() after threading._shutdown() must
# not log an error.
code = textwrap.dedent("""
import atexit
import os
import sys
from test.support import wait_process
# Import the threading module to register its "at fork" callback
import threading
def exit_handler():
pid = os.fork()
if not pid:
print("child process ok", file=sys.stderr, flush=True)
# child process
else:
wait_process(pid, exitcode=0)
# exit_handler() will be called after threading._shutdown()
atexit.register(exit_handler)
""")
_, out, err = assert_python_ok("-c", code)
self.assertEqual(out, b'')
self.assertEqual(err.rstrip(), b'child process ok')
@unittest.skipUnless(hasattr(os, 'fork'), 'test needs fork()')
def test_dummy_thread_after_fork(self):
# Issue #14308: a dummy thread in the active list doesn't mess up
@@ -492,9 +583,7 @@ class ThreadTests(BaseTestCase):
else:
t.join()
pid, status = os.waitpid(pid, 0)
self.assertTrue(os.WIFEXITED(status))
self.assertEqual(10, os.WEXITSTATUS(status))
support.wait_process(pid, exitcode=10)
def test_main_thread(self):
main = threading.main_thread()
@@ -514,6 +603,7 @@ class ThreadTests(BaseTestCase):
def test_main_thread_after_fork(self):
code = """if 1:
import os, threading
from test import support
pid = os.fork()
if pid == 0:
@@ -522,7 +612,7 @@ class ThreadTests(BaseTestCase):
print(main.ident == threading.current_thread().ident)
print(main.ident == threading.get_ident())
else:
os.waitpid(pid, 0)
support.wait_process(pid, exitcode=0)
"""
_, out, err = assert_python_ok("-c", code)
data = out.decode().replace('\r', '')
@@ -535,8 +625,9 @@ class ThreadTests(BaseTestCase):
def test_main_thread_after_fork_from_nonmain_thread(self):
code = """if 1:
import os, threading, sys
from test import support
def f():
def func():
pid = os.fork()
if pid == 0:
main = threading.main_thread()
@@ -547,16 +638,16 @@ class ThreadTests(BaseTestCase):
# we have to flush before exit.
sys.stdout.flush()
else:
os.waitpid(pid, 0)
support.wait_process(pid, exitcode=0)
th = threading.Thread(target=f)
th = threading.Thread(target=func)
th.start()
th.join()
"""
_, out, err = assert_python_ok("-c", code)
data = out.decode().replace('\r', '')
self.assertEqual(err, b"")
self.assertEqual(data, "Thread-1\nTrue\nTrue\n")
self.assertEqual(data, "Thread-1 (func)\nTrue\nTrue\n")
# TODO: RUSTPYTHON
@unittest.expectedFailure
@@ -647,7 +738,7 @@ class ThreadTests(BaseTestCase):
finish.release()
# When the thread ends, the state_lock can be successfully
# acquired.
self.assertTrue(tstate_lock.acquire(timeout=5), False)
self.assertTrue(tstate_lock.acquire(timeout=support.SHORT_TIMEOUT), False)
# But is_alive() is still True: we hold _tstate_lock now, which
# prevents is_alive() from knowing the thread's end-of-life C code
# is done.
@@ -744,6 +835,27 @@ class ThreadTests(BaseTestCase):
finally:
sys.settrace(old_trace)
def test_gettrace(self):
def noop_trace(frame, event, arg):
# no operation
return noop_trace
old_trace = threading.gettrace()
try:
threading.settrace(noop_trace)
trace_func = threading.gettrace()
self.assertEqual(noop_trace,trace_func)
finally:
threading.settrace(old_trace)
def test_getprofile(self):
def fn(*args): pass
old_profile = threading.getprofile()
try:
threading.setprofile(fn)
self.assertEqual(fn, threading.getprofile())
finally:
threading.setprofile(old_profile)
@cpython_only
def test_shutdown_locks(self):
for daemon in (False, True):
@@ -768,6 +880,93 @@ class ThreadTests(BaseTestCase):
# Daemon threads must never add it to _shutdown_locks.
self.assertNotIn(tstate_lock, threading._shutdown_locks)
def test_locals_at_exit(self):
# bpo-19466: thread locals must not be deleted before destructors
# are called
rc, out, err = assert_python_ok("-c", """if 1:
import threading
class Atexit:
def __del__(self):
print("thread_dict.atexit = %r" % thread_dict.atexit)
thread_dict = threading.local()
thread_dict.atexit = "value"
atexit = Atexit()
""")
self.assertEqual(out.rstrip(), b"thread_dict.atexit = 'value'")
def test_boolean_target(self):
# bpo-41149: A thread that had a boolean value of False would not
# run, regardless of whether it was callable. The correct behaviour
# is for a thread to do nothing if its target is None, and to call
# the target otherwise.
class BooleanTarget(object):
def __init__(self):
self.ran = False
def __bool__(self):
return False
def __call__(self):
self.ran = True
target = BooleanTarget()
thread = threading.Thread(target=target)
thread.start()
thread.join()
self.assertTrue(target.ran)
def test_leak_without_join(self):
# bpo-37788: Test that a thread which is not joined explicitly
# does not leak. Test written for reference leak checks.
def noop(): pass
with threading_helper.wait_threads_exit():
threading.Thread(target=noop).start()
# Thread.join() is not called
@unittest.skipUnless(Py_DEBUG, 'need debug build (Py_DEBUG)')
def test_debug_deprecation(self):
# bpo-44584: The PYTHONTHREADDEBUG environment variable is deprecated
rc, out, err = assert_python_ok("-Wdefault", "-c", "pass",
PYTHONTHREADDEBUG="1")
msg = (b'DeprecationWarning: The threading debug '
b'(PYTHONTHREADDEBUG environment variable) '
b'is deprecated and will be removed in Python 3.12')
self.assertIn(msg, err)
def test_import_from_another_thread(self):
# bpo-1596321: If the threading module is first import from a thread
# different than the main thread, threading._shutdown() must handle
# this case without logging an error at Python exit.
code = textwrap.dedent('''
import _thread
import sys
event = _thread.allocate_lock()
event.acquire()
def import_threading():
import threading
event.release()
if 'threading' in sys.modules:
raise Exception('threading is already imported')
_thread.start_new_thread(import_threading, ())
# wait until the threading module is imported
event.acquire()
event.release()
if 'threading' not in sys.modules:
raise Exception('threading is not imported')
# don't wait until the thread completes
''')
rc, out, err = assert_python_ok("-c", code)
self.assertEqual(out, b'')
self.assertEqual(err, b'')
class ThreadJoinOnShutdown(BaseTestCase):
@@ -807,11 +1006,15 @@ class ThreadJoinOnShutdown(BaseTestCase):
def test_2_join_in_forked_process(self):
# Like the test above, but from a forked interpreter
script = """if 1:
from test import support
childpid = os.fork()
if childpid != 0:
os.waitpid(childpid, 0)
# parent process
support.wait_process(childpid, exitcode=0)
sys.exit(0)
# child process
t = threading.Thread(target=joiningfunc,
args=(threading.current_thread(),))
t.start()
@@ -826,13 +1029,17 @@ class ThreadJoinOnShutdown(BaseTestCase):
# In the forked process, the main Thread object must be marked as stopped.
script = """if 1:
from test import support
main_thread = threading.current_thread()
def worker():
childpid = os.fork()
if childpid != 0:
os.waitpid(childpid, 0)
# parent process
support.wait_process(childpid, exitcode=0)
sys.exit(0)
# child process
t = threading.Thread(target=joiningfunc,
args=(main_thread,))
print('end of main')
@@ -895,9 +1102,9 @@ class ThreadJoinOnShutdown(BaseTestCase):
# just fork a child process and wait it
pid = os.fork()
if pid > 0:
os.waitpid(pid, 0)
support.wait_process(pid, exitcode=50)
else:
os._exit(0)
os._exit(50)
# start a bunch of threads that will fork() child processes
threads = []
@@ -924,28 +1131,32 @@ class ThreadJoinOnShutdown(BaseTestCase):
if pid == 0:
# check that threads states have been cleared
if len(sys._current_frames()) == 1:
os._exit(0)
os._exit(51)
else:
os._exit(1)
os._exit(52)
else:
_, status = os.waitpid(pid, 0)
self.assertEqual(0, status)
support.wait_process(pid, exitcode=51)
for t in threads:
t.join()
class SubinterpThreadingTests(BaseTestCase):
def pipe(self):
r, w = os.pipe()
self.addCleanup(os.close, r)
self.addCleanup(os.close, w)
if hasattr(os, 'set_blocking'):
os.set_blocking(r, False)
return (r, w)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_threads_join(self):
# Non-daemon threads should be joined at subinterpreter shutdown
# (issue #18808)
r, w = os.pipe()
self.addCleanup(os.close, r)
self.addCleanup(os.close, w)
code = r"""if 1:
r, w = self.pipe()
code = textwrap.dedent(r"""
import os
import random
import threading
@@ -963,7 +1174,7 @@ class SubinterpThreadingTests(BaseTestCase):
threading.Thread(target=f).start()
random_sleep()
""" % (w,)
""" % (w,))
ret = test.support.run_in_subinterp(code)
self.assertEqual(ret, 0)
# The thread was joined properly.
@@ -976,10 +1187,8 @@ class SubinterpThreadingTests(BaseTestCase):
# Python code returned but before the thread state is deleted.
# To achieve this, we register a thread-local object which sleeps
# a bit when deallocated.
r, w = os.pipe()
self.addCleanup(os.close, r)
self.addCleanup(os.close, w)
code = r"""if 1:
r, w = self.pipe()
code = textwrap.dedent(r"""
import os
import random
import threading
@@ -1004,7 +1213,7 @@ class SubinterpThreadingTests(BaseTestCase):
threading.Thread(target=f).start()
random_sleep()
""" % (w,)
""" % (w,))
ret = test.support.run_in_subinterp(code)
self.assertEqual(ret, 0)
# The thread was joined properly.
@@ -1012,7 +1221,7 @@ class SubinterpThreadingTests(BaseTestCase):
@cpython_only
def test_daemon_threads_fatal_error(self):
subinterp_code = r"""if 1:
subinterp_code = f"""if 1:
import os
import threading
import time
@@ -1020,7 +1229,7 @@ class SubinterpThreadingTests(BaseTestCase):
def f():
# Make sure the daemon thread is still running when
# Py_EndInterpreter is called.
time.sleep(10)
time.sleep({test.support.SHORT_TIMEOUT})
threading.Thread(target=f, daemon=True).start()
"""
script = r"""if 1:
@@ -1202,6 +1411,22 @@ class ThreadingExceptionTests(BaseTestCase):
# explicitly break the reference cycle to not leak a dangling thread
thread.exc = None
def test_multithread_modify_file_noerror(self):
# See issue25872
def modify_file():
with open(os_helper.TESTFN, 'w', encoding='utf-8') as fp:
fp.write(' ')
traceback.format_stack()
self.addCleanup(os_helper.unlink, os_helper.TESTFN)
threads = [
threading.Thread(target=modify_file)
for i in range(100)
]
for t in threads:
t.start()
t.join()
class ThreadRunFail(threading.Thread):
def run(self):
@@ -1209,6 +1434,10 @@ class ThreadRunFail(threading.Thread):
class ExceptHookTests(BaseTestCase):
def setUp(self):
restore_default_excepthook(self)
super().setUp()
def test_excepthook(self):
with support.captured_output("stderr") as stderr:
thread = ThreadRunFail(name="excepthook thread")
@@ -1297,6 +1526,27 @@ class ExceptHookTests(BaseTestCase):
'Exception in threading.excepthook:\n')
self.assertEqual(err_str, 'threading_hook failed')
def test_original_excepthook(self):
def run_thread():
with support.captured_output("stderr") as output:
thread = ThreadRunFail(name="excepthook thread")
thread.start()
thread.join()
return output.getvalue()
def threading_hook(args):
print("Running a thread failed", file=sys.stderr)
default_output = run_thread()
with support.swap_attr(threading, 'excepthook', threading_hook):
custom_hook_output = run_thread()
threading.excepthook = threading.__excepthook__
recovered_output = run_thread()
self.assertEqual(default_output, recovered_output)
self.assertNotEqual(default_output, custom_hook_output)
self.assertEqual(custom_hook_output, "Running a thread failed\n")
class TimerTests(BaseTestCase):
@@ -1375,6 +1625,8 @@ class MiscTestCase(unittest.TestCase):
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test__all__(self):
restore_default_excepthook(self)
extra = {"ThreadError"}
not_exported = {'currentThread', 'activeCount'}
support.check__all__(self, threading, ('threading', '_thread'),
@@ -1382,6 +1634,29 @@ class MiscTestCase(unittest.TestCase):
class InterruptMainTests(unittest.TestCase):
def check_interrupt_main_with_signal_handler(self, signum):
def handler(signum, frame):
1/0
old_handler = signal.signal(signum, handler)
self.addCleanup(signal.signal, signum, old_handler)
with self.assertRaises(ZeroDivisionError):
_thread.interrupt_main()
def check_interrupt_main_noerror(self, signum):
handler = signal.getsignal(signum)
try:
# No exception should arise.
signal.signal(signum, signal.SIG_IGN)
_thread.interrupt_main(signum)
signal.signal(signum, signal.SIG_DFL)
_thread.interrupt_main(signum)
finally:
# Restore original handler
signal.signal(signum, handler)
def test_interrupt_main_subthread(self):
# Calling start_new_thread with a function that executes interrupt_main
# should raise KeyboardInterrupt upon completion.
@@ -1399,18 +1674,93 @@ class InterruptMainTests(unittest.TestCase):
with self.assertRaises(KeyboardInterrupt):
_thread.interrupt_main()
def test_interrupt_main_noerror(self):
handler = signal.getsignal(signal.SIGINT)
try:
# No exception should arise.
signal.signal(signal.SIGINT, signal.SIG_IGN)
_thread.interrupt_main()
def test_interrupt_main_with_signal_handler(self):
self.check_interrupt_main_with_signal_handler(signal.SIGINT)
self.check_interrupt_main_with_signal_handler(signal.SIGTERM)
signal.signal(signal.SIGINT, signal.SIG_DFL)
_thread.interrupt_main()
finally:
# Restore original handler
signal.signal(signal.SIGINT, handler)
def test_interrupt_main_noerror(self):
self.check_interrupt_main_noerror(signal.SIGINT)
self.check_interrupt_main_noerror(signal.SIGTERM)
def test_interrupt_main_invalid_signal(self):
self.assertRaises(ValueError, _thread.interrupt_main, -1)
self.assertRaises(ValueError, _thread.interrupt_main, signal.NSIG)
self.assertRaises(ValueError, _thread.interrupt_main, 1000000)
@threading_helper.reap_threads
def test_can_interrupt_tight_loops(self):
cont = [True]
started = [False]
interrupted = [False]
def worker(started, cont, interrupted):
iterations = 100_000_000
started[0] = True
while cont[0]:
if iterations:
iterations -= 1
else:
return
pass
interrupted[0] = True
t = threading.Thread(target=worker,args=(started, cont, interrupted))
t.start()
while not started[0]:
pass
cont[0] = False
t.join()
self.assertTrue(interrupted[0])
class AtexitTests(unittest.TestCase):
def test_atexit_output(self):
rc, out, err = assert_python_ok("-c", """if True:
import threading
def run_last():
print('parrot')
threading._register_atexit(run_last)
""")
self.assertFalse(err)
self.assertEqual(out.strip(), b'parrot')
def test_atexit_called_once(self):
rc, out, err = assert_python_ok("-c", """if True:
import threading
from unittest.mock import Mock
mock = Mock()
threading._register_atexit(mock)
mock.assert_not_called()
# force early shutdown to ensure it was called once
threading._shutdown()
mock.assert_called_once()
""")
self.assertFalse(err)
def test_atexit_after_shutdown(self):
# The only way to do this is by registering an atexit within
# an atexit, which is intended to raise an exception.
rc, out, err = assert_python_ok("-c", """if True:
import threading
def func():
pass
def run_last():
threading._register_atexit(func)
threading._register_atexit(run_last)
""")
self.assertTrue(err)
self.assertIn("RuntimeError: can't register atexit after shutdown",
err.decode())
if __name__ == "__main__":

View File

@@ -4,7 +4,7 @@ from doctest import DocTestSuite
from test import support
from test.support import threading_helper
import weakref
# import gc
import gc
# Modules under test
import _thread
@@ -38,7 +38,7 @@ class BaseLocalTest:
t.join()
del t
# gc.collect()
support.gc_collect() # For PyPy or other GCs.
self.assertEqual(len(weaklist), n)
# XXX _threading_local keeps the local of the last stopped thread alive.
@@ -47,7 +47,7 @@ class BaseLocalTest:
# Assignment to the same thread local frees it sometimes (!)
local.someothervar = None
# gc.collect()
support.gc_collect() # For PyPy or other GCs.
deadlist = [weak for weak in weaklist if weak() is None]
self.assertIn(len(deadlist), (n-1, n), (n, len(deadlist)))
@@ -90,7 +90,7 @@ class BaseLocalTest:
# 2) GC the cycle (triggers threadmodule.c::local_clear
# before local_dealloc)
del cycle
# gc.collect()
support.gc_collect() # For PyPy or other GCs.
e1.set()
e2.wait()
@@ -195,7 +195,7 @@ class BaseLocalTest:
x.local.x = x
wr = weakref.ref(x)
del x
# gc.collect()
support.gc_collect() # For PyPy or other GCs.
self.assertIsNone(wr())
@@ -210,7 +210,7 @@ def test_main():
suite = unittest.TestSuite()
suite.addTest(DocTestSuite('_threading_local'))
suite.addTest(unittest.makeSuite(ThreadLocalTest))
# suite.addTest(unittest.makeSuite(PyThreadingLocalTest))
suite.addTest(unittest.makeSuite(PyThreadingLocalTest))
local_orig = _threading_local.local
def setUp(test):

303
Lib/threading.py vendored
View File

@@ -3,6 +3,7 @@
import os as _os
import sys as _sys
import _thread
import functools
from time import monotonic as _time
from _weakrefset import WeakSet
@@ -27,7 +28,7 @@ __all__ = ['get_ident', 'active_count', 'Condition', 'current_thread',
'Event', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
'Barrier', 'BrokenBarrierError', 'Timer', 'ThreadError',
'setprofile', 'settrace', 'local', 'stack_size',
'excepthook', 'ExceptHookArgs']
'excepthook', 'ExceptHookArgs', 'gettrace', 'getprofile']
# Rename some stuff so "from threading import *" is safe
_start_new_thread = _thread.start_new_thread
@@ -64,6 +65,10 @@ def setprofile(func):
global _profile_hook
_profile_hook = func
def getprofile():
"""Get the profiler function as set by threading.setprofile()."""
return _profile_hook
def settrace(func):
"""Set a trace function for all threads started from the threading module.
@@ -74,6 +79,10 @@ def settrace(func):
global _trace_hook
_trace_hook = func
def gettrace():
"""Get the trace function as set by threading.settrace()."""
return _trace_hook
# Synchronization classes
Lock = _allocate_lock
@@ -121,6 +130,11 @@ class _RLock:
hex(id(self))
)
def _at_fork_reinit(self):
self._block._at_fork_reinit()
self._owner = None
self._count = 0
def acquire(self, blocking=True, timeout=-1):
"""Acquire a lock, blocking or non-blocking.
@@ -243,6 +257,10 @@ class Condition:
pass
self._waiters = _deque()
def _at_fork_reinit(self):
self._lock._at_fork_reinit()
self._waiters.clear()
def __enter__(self):
return self._lock.__enter__()
@@ -261,7 +279,7 @@ class Condition:
def _is_owned(self):
# Return True if lock is owned by current_thread.
# This method is called only if _lock doesn't have _is_owned().
if self._lock.acquire(0):
if self._lock.acquire(False):
self._lock.release()
return False
else:
@@ -350,14 +368,21 @@ class Condition:
"""
if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
all_waiters = self._waiters
waiters_to_notify = _deque(_islice(all_waiters, n))
if not waiters_to_notify:
return
for waiter in waiters_to_notify:
waiter.release()
waiters = self._waiters
while waiters and n > 0:
waiter = waiters[0]
try:
all_waiters.remove(waiter)
waiter.release()
except RuntimeError:
# gh-92530: The previous call of notify() released the lock,
# but was interrupted before removing it from the queue.
# It can happen if a signal handler raises an exception,
# like CTRL+C which raises KeyboardInterrupt.
pass
else:
n -= 1
try:
waiters.remove(waiter)
except ValueError:
pass
@@ -370,7 +395,16 @@ class Condition:
"""
self.notify(len(self._waiters))
notifyAll = notify_all
def notifyAll(self):
"""Wake up all threads waiting on this condition.
This method is deprecated, use notify_all() instead.
"""
import warnings
warnings.warn('notifyAll() is deprecated, use notify_all() instead',
DeprecationWarning, stacklevel=2)
self.notify_all()
class Semaphore:
@@ -438,16 +472,19 @@ class Semaphore:
__enter__ = acquire
def release(self):
"""Release a semaphore, incrementing the internal counter by one.
def release(self, n=1):
"""Release a semaphore, incrementing the internal counter by one or more.
When the counter is zero on entry and another thread is waiting for it
to become larger than zero again, wake up that thread.
"""
if n < 1:
raise ValueError('n must be one or more')
with self._cond:
self._value += 1
self._cond.notify()
self._value += n
for i in range(n):
self._cond.notify()
def __exit__(self, t, v, tb):
self.release()
@@ -474,8 +511,8 @@ class BoundedSemaphore(Semaphore):
Semaphore.__init__(self, value)
self._initial_value = value
def release(self):
"""Release a semaphore, incrementing the internal counter by one.
def release(self, n=1):
"""Release a semaphore, incrementing the internal counter by one or more.
When the counter is zero on entry and another thread is waiting for it
to become larger than zero again, wake up that thread.
@@ -484,11 +521,14 @@ class BoundedSemaphore(Semaphore):
raise a ValueError.
"""
if n < 1:
raise ValueError('n must be one or more')
with self._cond:
if self._value >= self._initial_value:
if self._value + n > self._initial_value:
raise ValueError("Semaphore released too many times")
self._value += 1
self._cond.notify()
self._value += n
for i in range(n):
self._cond.notify()
class Event:
@@ -506,15 +546,24 @@ class Event:
self._cond = Condition(Lock())
self._flag = False
def _reset_internal_locks(self):
# private! called by Thread._reset_internal_locks by _after_fork()
self._cond.__init__(Lock())
def _at_fork_reinit(self):
# Private method called by Thread._reset_internal_locks()
self._cond._at_fork_reinit()
def is_set(self):
"""Return true if and only if the internal flag is true."""
return self._flag
isSet = is_set
def isSet(self):
"""Return true if and only if the internal flag is true.
This method is deprecated, use notify_all() instead.
"""
import warnings
warnings.warn('isSet() is deprecated, use is_set() instead',
DeprecationWarning, stacklevel=2)
return self.is_set()
def set(self):
"""Set the internal flag to true.
@@ -592,7 +641,7 @@ class Barrier:
self._action = action
self._timeout = timeout
self._parties = parties
self._state = 0 #0 filling, 1, draining, -1 resetting, -2 broken
self._state = 0 # 0 filling, 1 draining, -1 resetting, -2 broken
self._count = 0
def wait(self, timeout=None):
@@ -729,22 +778,39 @@ class BrokenBarrierError(RuntimeError):
# Helper to generate new thread names
_counter = _count().__next__
_counter() # Consume 0 so first non-main thread has id 1.
def _newname(template="Thread-%d"):
return template % _counter()
_counter = _count(1).__next__
def _newname(name_template):
return name_template % _counter()
# Active thread administration
_active_limbo_lock = _allocate_lock()
# Active thread administration.
#
# bpo-44422: Use a reentrant lock to allow reentrant calls to functions like
# threading.enumerate().
_active_limbo_lock = RLock()
_active = {} # maps thread id to Thread object
_limbo = {}
_dangling = WeakSet()
# Set of Thread._tstate_lock locks of non-daemon threads used by _shutdown()
# to wait until all Python thread states get deleted:
# see Thread._set_tstate_lock().
_shutdown_locks_lock = _allocate_lock()
_shutdown_locks = set()
def _maintain_shutdown_locks():
"""
Drop any shutdown locks that don't correspond to running threads anymore.
Calling this from time to time avoids an ever-growing _shutdown_locks
set when Thread objects are not joined explicitly. See bpo-37788.
This must be called with _shutdown_locks_lock acquired.
"""
# If a lock was released, the corresponding thread has exited
to_remove = [lock for lock in _shutdown_locks if not lock.locked()]
_shutdown_locks.difference_update(to_remove)
# Main class for threads
class Thread:
@@ -784,8 +850,19 @@ class Thread:
assert group is None, "group argument must be None for now"
if kwargs is None:
kwargs = {}
if name:
name = str(name)
else:
name = _newname("Thread-%d")
if target is not None:
try:
target_name = target.__name__
name += f" ({target_name})"
except AttributeError:
pass
self._target = target
self._name = str(name or _newname())
self._name = name
self._args = args
self._kwargs = kwargs
if daemon is not None:
@@ -808,9 +885,14 @@ class Thread:
def _reset_internal_locks(self, is_alive):
# private! Called by _after_fork() to reset our internal locks as
# they may be in an invalid state leading to a deadlock or crash.
self._started._reset_internal_locks()
self._started._at_fork_reinit()
if is_alive:
self._set_tstate_lock()
# bpo-42350: If the fork happens when the thread is already stopped
# (ex: after threading._shutdown() has been called), _tstate_lock
# is None. Do nothing in this case.
if self._tstate_lock is not None:
self._tstate_lock._at_fork_reinit()
self._tstate_lock.acquire()
else:
# The thread isn't alive after fork: it doesn't have a tstate
# anymore.
@@ -846,6 +928,7 @@ class Thread:
if self._started.is_set():
raise RuntimeError("threads can only be started once")
with _active_limbo_lock:
_limbo[self] = self
try:
@@ -866,7 +949,7 @@ class Thread:
"""
try:
if self._target:
if self._target is not None:
self._target(*self._args, **self._kwargs)
finally:
# Avoid a refcycle if the thread is running a function with
@@ -910,6 +993,7 @@ class Thread:
if not self.daemon:
with _shutdown_locks_lock:
_maintain_shutdown_locks()
_shutdown_locks.add(self._tstate_lock)
def _bootstrap_inner(self):
@@ -965,7 +1049,8 @@ class Thread:
self._tstate_lock = None
if not self.daemon:
with _shutdown_locks_lock:
_shutdown_locks.discard(lock)
# Remove our lock and other released locks from _shutdown_locks
_maintain_shutdown_locks()
def _delete(self):
"Remove current thread from the dict of currently running threads."
@@ -1022,11 +1107,24 @@ class Thread:
# If the lock is acquired, the C code is done, and self._stop() is
# called. That sets ._is_stopped to True, and ._tstate_lock to None.
lock = self._tstate_lock
if lock is None: # already determined that the C code is done
if lock is None:
# already determined that the C code is done
assert self._is_stopped
elif lock.acquire(block, timeout):
lock.release()
self._stop()
return
try:
if lock.acquire(block, timeout):
lock.release()
self._stop()
except:
if lock.locked():
# bpo-45274: lock.acquire() acquired the lock, but the function
# was interrupted with an exception before reaching the
# lock.release(). It can happen if a signal handler raises an
# exception, like CTRL+C which raises KeyboardInterrupt.
lock.release()
self._stop()
raise
@property
def name(self):
@@ -1072,8 +1170,8 @@ class Thread:
"""Return whether the thread is alive.
This method returns True just before the run() method starts until just
after the run() method terminates. The module function enumerate()
returns a list of all alive threads.
after the run() method terminates. See also the module function
enumerate().
"""
assert self._initialized, "Thread.__init__() not called"
@@ -1082,16 +1180,6 @@ class Thread:
self._wait_for_tstate_lock(False)
return not self._is_stopped
def isAlive(self):
"""Return whether the thread is alive.
This method is deprecated, use is_alive() instead.
"""
import warnings
warnings.warn('isAlive() is deprecated, use is_alive() instead',
DeprecationWarning, stacklevel=2)
return self.is_alive()
@property
def daemon(self):
"""A boolean value indicating whether this thread is a daemon thread.
@@ -1116,15 +1204,47 @@ class Thread:
self._daemonic = daemonic
def isDaemon(self):
"""Return whether this thread is a daemon.
This method is deprecated, use the daemon attribute instead.
"""
import warnings
warnings.warn('isDaemon() is deprecated, get the daemon attribute instead',
DeprecationWarning, stacklevel=2)
return self.daemon
def setDaemon(self, daemonic):
"""Set whether this thread is a daemon.
This method is deprecated, use the .daemon property instead.
"""
import warnings
warnings.warn('setDaemon() is deprecated, set the daemon attribute instead',
DeprecationWarning, stacklevel=2)
self.daemon = daemonic
def getName(self):
"""Return a string used for identification purposes only.
This method is deprecated, use the name attribute instead.
"""
import warnings
warnings.warn('getName() is deprecated, get the name attribute instead',
DeprecationWarning, stacklevel=2)
return self.name
def setName(self, name):
"""Set the name string for this thread.
This method is deprecated, use the name attribute instead.
"""
import warnings
warnings.warn('setName() is deprecated, set the name attribute instead',
DeprecationWarning, stacklevel=2)
self.name = name
@@ -1174,6 +1294,10 @@ except ImportError:
stderr.flush()
# Original value of threading.excepthook
__excepthook__ = excepthook
def _make_invoke_excepthook():
# Create a local namespace to ensure that variables remain alive
# when _invoke_excepthook() is called, even if it is called late during
@@ -1315,7 +1439,16 @@ def current_thread():
except KeyError:
return _DummyThread()
currentThread = current_thread
def currentThread():
"""Return the current Thread object, corresponding to the caller's thread of control.
This function is deprecated, use current_thread() instead.
"""
import warnings
warnings.warn('currentThread() is deprecated, use current_thread() instead',
DeprecationWarning, stacklevel=2)
return current_thread()
def active_count():
"""Return the number of Thread objects currently alive.
@@ -1327,7 +1460,16 @@ def active_count():
with _active_limbo_lock:
return len(_active) + len(_limbo)
activeCount = active_count
def activeCount():
"""Return the number of Thread objects currently alive.
This function is deprecated, use active_count() instead.
"""
import warnings
warnings.warn('activeCount() is deprecated, use active_count() instead',
DeprecationWarning, stacklevel=2)
return active_count()
def _enumerate():
# Same as enumerate(), but without the lock. Internal use only.
@@ -1344,6 +1486,27 @@ def enumerate():
with _active_limbo_lock:
return list(_active.values()) + list(_limbo.values())
_threading_atexits = []
_SHUTTING_DOWN = False
def _register_atexit(func, *arg, **kwargs):
"""CPython internal: register *func* to be called before joining threads.
The registered *func* is called with its arguments just before all
non-daemon threads are joined in `_shutdown()`. It provides a similar
purpose to `atexit.register()`, but its functions are called prior to
threading shutdown instead of interpreter shutdown.
For similarity to atexit, the registered functions are called in reverse.
"""
if _SHUTTING_DOWN:
raise RuntimeError("can't register atexit after shutdown")
call = functools.partial(func, *arg, **kwargs)
_threading_atexits.append(call)
from _thread import stack_size
# Create the main thread object,
@@ -1365,14 +1528,30 @@ def _shutdown():
# _shutdown() was already called
return
global _SHUTTING_DOWN
_SHUTTING_DOWN = True
# Call registered threading atexit functions before threads are joined.
# Order is reversed, similar to atexit.
for atexit_call in reversed(_threading_atexits):
atexit_call()
# Main thread
tlock = _main_thread._tstate_lock
# The main thread isn't finished yet, so its thread state lock can't have
# been released.
assert tlock is not None
assert tlock.locked()
tlock.release()
_main_thread._stop()
if _main_thread.ident == get_ident():
tlock = _main_thread._tstate_lock
# The main thread isn't finished yet, so its thread state lock can't
# have been released.
assert tlock is not None
assert tlock.locked()
tlock.release()
_main_thread._stop()
else:
# bpo-1596321: _shutdown() must be called in the main thread.
# If the threading module was not imported by the main thread,
# _main_thread is the thread which imported the threading module.
# In this case, ignore _main_thread, similar behavior than for threads
# spawned by C libraries or using _thread.start_new_thread().
pass
# Join all non-deamon threads
while True:
@@ -1384,7 +1563,7 @@ def _shutdown():
break
for lock in locks:
# mimick Thread.join()
# mimic Thread.join()
lock.acquire()
lock.release()
@@ -1417,7 +1596,7 @@ def _after_fork():
# by another (non-forked) thread. http://bugs.python.org/issue874900
global _active_limbo_lock, _main_thread
global _shutdown_locks_lock, _shutdown_locks
_active_limbo_lock = _allocate_lock()
_active_limbo_lock = RLock()
# fork() only copied the current thread; clear references to others.
new_active = {}