Update the concurrent library + Added tests (#6673)

* Updated concurrent library

* Added test_concurrent_futures from v3.13.11

* Annotated failing tests in test_concurrent_futures
This commit is contained in:
Terry Tianlin Luan
2026-01-09 01:28:12 -05:00
committed by GitHub
parent a3425b435e
commit b38cdaa30e
15 changed files with 2400 additions and 101 deletions

View File

@@ -23,6 +23,7 @@ __all__ = (
'ALL_COMPLETED',
'CancelledError',
'TimeoutError',
'InvalidStateError',
'BrokenExecutor',
'Future',
'Executor',
@@ -50,4 +51,4 @@ def __getattr__(name):
ThreadPoolExecutor = te
return te
raise AttributeError(f"module {__name__} has no attribute {name}")
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

View File

@@ -50,9 +50,7 @@ class CancelledError(Error):
"""The Future was cancelled."""
pass
class TimeoutError(Error):
"""The operation exceeded the given deadline."""
pass
TimeoutError = TimeoutError # make local alias for the standard exception
class InvalidStateError(Error):
"""The operation is not allowed in this state."""
@@ -284,7 +282,7 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED):
A named 2-tuple of sets. The first set, named 'done', contains the
futures that completed (is finished or cancelled) before the wait
completed. The second set, named 'not_done', contains uncompleted
futures. Duplicate futures given to *fs* are removed and will be
futures. Duplicate futures given to *fs* are removed and will be
returned only once.
"""
fs = set(fs)
@@ -312,6 +310,18 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED):
done.update(waiter.finished_futures)
return DoneAndNotDoneFutures(done, fs - done)
def _result_or_cancel(fut, timeout=None):
try:
try:
return fut.result(timeout)
finally:
fut.cancel()
finally:
# Break a reference cycle with the exception in self._exception
del fut
class Future(object):
"""Represents the result of an asynchronous computation."""
@@ -386,7 +396,7 @@ class Future(object):
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
def __get_result(self):
if self._exception:
if self._exception is not None:
try:
raise self._exception
finally:
@@ -606,9 +616,9 @@ class Executor(object):
while fs:
# Careful not to keep a reference to the popped future
if timeout is None:
yield fs.pop().result()
yield _result_or_cancel(fs.pop())
else:
yield fs.pop().result(end_time - time.monotonic())
yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
finally:
for future in fs:
future.cancel()

View File

@@ -49,6 +49,8 @@ import os
from concurrent.futures import _base
import queue
import multiprocessing as mp
# This import is required to load the multiprocessing.connection submodule
# so that it can be accessed later as `mp.connection`
import multiprocessing.connection
from multiprocessing.queues import Queue
import threading
@@ -56,7 +58,7 @@ import weakref
from functools import partial
import itertools
import sys
import traceback
from traceback import format_exception
_threads_wakeups = weakref.WeakKeyDictionary()
@@ -66,22 +68,31 @@ _global_shutdown = False
class _ThreadWakeup:
def __init__(self):
self._closed = False
self._lock = threading.Lock()
self._reader, self._writer = mp.Pipe(duplex=False)
def close(self):
if not self._closed:
self._closed = True
self._writer.close()
self._reader.close()
# Please note that we do not take the self._lock when
# calling clear() (to avoid deadlocking) so this method can
# only be called safely from the same thread as all calls to
# clear() even if you hold the lock. Otherwise we
# might try to read from the closed pipe.
with self._lock:
if not self._closed:
self._closed = True
self._writer.close()
self._reader.close()
def wakeup(self):
if not self._closed:
self._writer.send_bytes(b"")
with self._lock:
if not self._closed:
self._writer.send_bytes(b"")
def clear(self):
if not self._closed:
while self._reader.poll():
self._reader.recv_bytes()
if self._closed:
raise RuntimeError('operation on closed _ThreadWakeup')
while self._reader.poll():
self._reader.recv_bytes()
def _python_exit():
@@ -123,8 +134,7 @@ class _RemoteTraceback(Exception):
class _ExceptionWithTraceback:
def __init__(self, exc, tb):
tb = traceback.format_exception(type(exc), exc, tb)
tb = ''.join(tb)
tb = ''.join(format_exception(type(exc), exc, tb))
self.exc = exc
# Traceback object needs to be garbage-collected as its frames
# contain references to all the objects in the exception scope
@@ -145,10 +155,11 @@ class _WorkItem(object):
self.kwargs = kwargs
class _ResultItem(object):
def __init__(self, work_id, exception=None, result=None):
def __init__(self, work_id, exception=None, result=None, exit_pid=None):
self.work_id = work_id
self.exception = exception
self.result = result
self.exit_pid = exit_pid
class _CallItem(object):
def __init__(self, work_id, fn, args, kwargs):
@@ -160,20 +171,17 @@ class _CallItem(object):
class _SafeQueue(Queue):
"""Safe Queue set exception to the future object linked to a job"""
def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock,
thread_wakeup):
def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup):
self.pending_work_items = pending_work_items
self.shutdown_lock = shutdown_lock
self.thread_wakeup = thread_wakeup
super().__init__(max_size, ctx=ctx)
def _on_queue_feeder_error(self, e, obj):
if isinstance(obj, _CallItem):
tb = traceback.format_exception(type(e), e, e.__traceback__)
tb = format_exception(type(e), e, e.__traceback__)
e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
work_item = self.pending_work_items.pop(obj.work_id, None)
with self.shutdown_lock:
self.thread_wakeup.wakeup()
self.thread_wakeup.wakeup()
# work_item can be None if another process terminated. In this
# case, the executor_manager_thread fails all work_items
# with BrokenProcessPool
@@ -183,16 +191,6 @@ class _SafeQueue(Queue):
super()._on_queue_feeder_error(e, obj)
def _get_chunks(*iterables, chunksize):
""" Iterates over zip()ed iterables in chunks. """
it = zip(*iterables)
while True:
chunk = tuple(itertools.islice(it, chunksize))
if not chunk:
return
yield chunk
def _process_chunk(fn, chunk):
""" Processes a chunk of an iterable passed to map.
@@ -205,17 +203,19 @@ def _process_chunk(fn, chunk):
return [fn(*args) for args in chunk]
def _sendback_result(result_queue, work_id, result=None, exception=None):
def _sendback_result(result_queue, work_id, result=None, exception=None,
exit_pid=None):
"""Safely send back the given result or exception"""
try:
result_queue.put(_ResultItem(work_id, result=result,
exception=exception))
exception=exception, exit_pid=exit_pid))
except BaseException as e:
exc = _ExceptionWithTraceback(e, e.__traceback__)
result_queue.put(_ResultItem(work_id, exception=exc))
result_queue.put(_ResultItem(work_id, exception=exc,
exit_pid=exit_pid))
def _process_worker(call_queue, result_queue, initializer, initargs):
def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None):
"""Evaluates calls from call_queue and places the results in result_queue.
This worker is run in a separate process.
@@ -236,25 +236,38 @@ def _process_worker(call_queue, result_queue, initializer, initargs):
# The parent will notice that the process stopped and
# mark the pool broken
return
num_tasks = 0
exit_pid = None
while True:
call_item = call_queue.get(block=True)
if call_item is None:
# Wake up queue management thread
result_queue.put(os.getpid())
return
if max_tasks is not None:
num_tasks += 1
if num_tasks >= max_tasks:
exit_pid = os.getpid()
try:
r = call_item.fn(*call_item.args, **call_item.kwargs)
except BaseException as e:
exc = _ExceptionWithTraceback(e, e.__traceback__)
_sendback_result(result_queue, call_item.work_id, exception=exc)
_sendback_result(result_queue, call_item.work_id, exception=exc,
exit_pid=exit_pid)
else:
_sendback_result(result_queue, call_item.work_id, result=r)
_sendback_result(result_queue, call_item.work_id, result=r,
exit_pid=exit_pid)
del r
# Liberate the resource as soon as possible, to avoid holding onto
# open files or shared memory that is not needed anymore
del call_item
if exit_pid is not None:
return
class _ExecutorManagerThread(threading.Thread):
"""Manages the communication between this process and the worker processes.
@@ -284,11 +297,10 @@ class _ExecutorManagerThread(threading.Thread):
# if there is no pending work item.
def weakref_cb(_,
thread_wakeup=self.thread_wakeup,
shutdown_lock=self.shutdown_lock):
mp.util.debug('Executor collected: triggering callback for'
mp_util_debug=mp.util.debug):
mp_util_debug('Executor collected: triggering callback for'
' QueueManager wakeup')
with shutdown_lock:
thread_wakeup.wakeup()
thread_wakeup.wakeup()
self.executor_reference = weakref.ref(executor, weakref_cb)
@@ -305,6 +317,10 @@ class _ExecutorManagerThread(threading.Thread):
# A queue.Queue of work ids e.g. Queue([5, 6, ...]).
self.work_ids_queue = executor._work_ids
# Maximum number of tasks a worker process can execute before
# exiting safely
self.max_tasks_per_child = executor._max_tasks_per_child
# A dict mapping work ids to _WorkItems e.g.
# {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
self.pending_work_items = executor._pending_work_items
@@ -315,7 +331,14 @@ class _ExecutorManagerThread(threading.Thread):
# Main loop for the executor manager thread.
while True:
self.add_call_item_to_queue()
# gh-109047: During Python finalization, self.call_queue.put()
# creation of a thread can fail with RuntimeError.
try:
self.add_call_item_to_queue()
except BaseException as exc:
cause = format_exception(exc)
self.terminate_broken(cause)
return
result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
@@ -324,19 +347,32 @@ class _ExecutorManagerThread(threading.Thread):
return
if result_item is not None:
self.process_result_item(result_item)
process_exited = result_item.exit_pid is not None
if process_exited:
p = self.processes.pop(result_item.exit_pid)
p.join()
# Delete reference to result_item to avoid keeping references
# while waiting on new results.
del result_item
# attempt to increment idle process count
executor = self.executor_reference()
if executor is not None:
executor._idle_worker_semaphore.release()
del executor
if executor := self.executor_reference():
if process_exited:
with self.shutdown_lock:
executor._adjust_process_count()
else:
executor._idle_worker_semaphore.release()
del executor
if self.is_shutting_down():
self.flag_executor_shutting_down()
# When only canceled futures remain in pending_work_items, our
# next call to wait_result_broken_or_wakeup would hang forever.
# This makes sure we have some running futures or none at all.
self.add_call_item_to_queue()
# Since no new work items can be added, it is safe to shutdown
# this thread if there are no pending work items.
if not self.pending_work_items:
@@ -386,14 +422,13 @@ class _ExecutorManagerThread(threading.Thread):
try:
result_item = result_reader.recv()
is_broken = False
except BaseException as e:
cause = traceback.format_exception(type(e), e, e.__traceback__)
except BaseException as exc:
cause = format_exception(exc)
elif wakeup_reader in ready:
is_broken = False
with self.shutdown_lock:
self.thread_wakeup.clear()
self.thread_wakeup.clear()
return result_item, is_broken, cause
@@ -401,24 +436,14 @@ class _ExecutorManagerThread(threading.Thread):
# Process the received a result_item. This can be either the PID of a
# worker that exited gracefully or a _ResultItem
if isinstance(result_item, int):
# Clean shutdown of a worker using its PID
# (avoids marking the executor broken)
assert self.is_shutting_down()
p = self.processes.pop(result_item)
p.join()
if not self.processes:
self.join_executor_internals()
return
else:
# Received a _ResultItem so mark the future as completed.
work_item = self.pending_work_items.pop(result_item.work_id, None)
# work_item can be None if another process terminated (see above)
if work_item is not None:
if result_item.exception:
work_item.future.set_exception(result_item.exception)
else:
work_item.future.set_result(result_item.result)
# Received a _ResultItem so mark the future as completed.
work_item = self.pending_work_items.pop(result_item.work_id, None)
# work_item can be None if another process terminated (see above)
if work_item is not None:
if result_item.exception is not None:
work_item.future.set_exception(result_item.exception)
else:
work_item.future.set_result(result_item.result)
def is_shutting_down(self):
# Check whether we should start shutting down the executor.
@@ -430,7 +455,7 @@ class _ExecutorManagerThread(threading.Thread):
return (_global_shutdown or executor is None
or executor._shutdown_thread)
def terminate_broken(self, cause):
def _terminate_broken(self, cause):
# Terminate the executor because it is in a broken state. The cause
# argument can be used to display more information on the error that
# lead the executor into becoming broken.
@@ -455,7 +480,14 @@ class _ExecutorManagerThread(threading.Thread):
# Mark pending tasks as failed.
for work_id, work_item in self.pending_work_items.items():
work_item.future.set_exception(bpe)
try:
work_item.future.set_exception(bpe)
except _base.InvalidStateError:
# set_exception() fails if the future is cancelled: ignore it.
# Trying to check if the future is cancelled before calling
# set_exception() would leave a race condition if the future is
# cancelled between the check and set_exception().
pass
# Delete references to object. See issue16284
del work_item
self.pending_work_items.clear()
@@ -465,8 +497,14 @@ class _ExecutorManagerThread(threading.Thread):
for p in self.processes.values():
p.terminate()
self.call_queue._terminate_broken()
# clean up resources
self.join_executor_internals()
self._join_executor_internals(broken=True)
def terminate_broken(self, cause):
with self.shutdown_lock:
self._terminate_broken(cause)
def flag_executor_shutting_down(self):
# Flag the executor as shutting down and cancel remaining tasks if
@@ -509,15 +547,24 @@ class _ExecutorManagerThread(threading.Thread):
break
def join_executor_internals(self):
self.shutdown_workers()
with self.shutdown_lock:
self._join_executor_internals()
def _join_executor_internals(self, broken=False):
# If broken, call_queue was closed and so can no longer be used.
if not broken:
self.shutdown_workers()
# Release the queue's resources as soon as possible.
self.call_queue.close()
self.call_queue.join_thread()
with self.shutdown_lock:
self.thread_wakeup.close()
self.thread_wakeup.close()
# If .join() is not called on the created processes then
# some ctx.Queue methods may deadlock on Mac OS X.
for p in self.processes.values():
if broken:
p.terminate()
p.join()
def get_n_children_alive(self):
@@ -582,22 +629,29 @@ class BrokenProcessPool(_base.BrokenExecutor):
class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None, mp_context=None,
initializer=None, initargs=()):
initializer=None, initargs=(), *, max_tasks_per_child=None):
"""Initializes a new ProcessPoolExecutor instance.
Args:
max_workers: The maximum number of processes that can be used to
execute the given calls. If None or not given then as many
worker processes will be created as the machine has processors.
mp_context: A multiprocessing context to launch the workers. This
mp_context: A multiprocessing context to launch the workers created
using the multiprocessing.get_context('start method') API. This
object should provide SimpleQueue, Queue and Process.
initializer: A callable used to initialize worker processes.
initargs: A tuple of arguments to pass to the initializer.
max_tasks_per_child: The maximum number of tasks a worker process
can complete before it will exit and be replaced with a fresh
worker process. The default of None means worker process will
live as long as the executor. Requires a non-'fork' mp_context
start method. When given, we default to using 'spawn' if no
mp_context is supplied.
"""
_check_system_limits()
if max_workers is None:
self._max_workers = os.cpu_count() or 1
self._max_workers = os.process_cpu_count() or 1
if sys.platform == 'win32':
self._max_workers = min(_MAX_WINDOWS_WORKERS,
self._max_workers)
@@ -612,7 +666,10 @@ class ProcessPoolExecutor(_base.Executor):
self._max_workers = max_workers
if mp_context is None:
mp_context = mp.get_context()
if max_tasks_per_child is not None:
mp_context = mp.get_context("spawn")
else:
mp_context = mp.get_context()
self._mp_context = mp_context
# https://github.com/python/cpython/issues/90622
@@ -624,6 +681,18 @@ class ProcessPoolExecutor(_base.Executor):
self._initializer = initializer
self._initargs = initargs
if max_tasks_per_child is not None:
if not isinstance(max_tasks_per_child, int):
raise TypeError("max_tasks_per_child must be an integer")
elif max_tasks_per_child <= 0:
raise ValueError("max_tasks_per_child must be >= 1")
if self._mp_context.get_start_method(allow_none=False) == "fork":
# https://github.com/python/cpython/issues/90622
raise ValueError("max_tasks_per_child is incompatible with"
" the 'fork' multiprocessing start method;"
" supply a different mp_context.")
self._max_tasks_per_child = max_tasks_per_child
# Management thread
self._executor_manager_thread = None
@@ -646,7 +715,9 @@ class ProcessPoolExecutor(_base.Executor):
# as it could result in a deadlock if a worker process dies with the
# _result_queue write lock still acquired.
#
# _shutdown_lock must be locked to access _ThreadWakeup.
# Care must be taken to only call clear and close from the
# executor_manager_thread, since _ThreadWakeup.clear() is not protected
# by a lock.
self._executor_manager_thread_wakeup = _ThreadWakeup()
# Create communication channels for the executor
@@ -657,7 +728,6 @@ class ProcessPoolExecutor(_base.Executor):
self._call_queue = _SafeQueue(
max_size=queue_size, ctx=self._mp_context,
pending_work_items=self._pending_work_items,
shutdown_lock=self._shutdown_lock,
thread_wakeup=self._executor_manager_thread_wakeup)
# Killed worker processes can produce spurious "broken pipe"
# tracebacks in the queue's own worker thread. But we detect killed
@@ -677,6 +747,11 @@ class ProcessPoolExecutor(_base.Executor):
self._executor_manager_thread_wakeup
def _adjust_process_count(self):
# gh-132969: avoid error when state is reset and executor is still running,
# which will happen when shutdown(wait=False) is called.
if self._processes is None:
return
# if there's an idle process, we don't need to spawn a new one.
if self._idle_worker_semaphore.acquire(blocking=False):
return
@@ -705,7 +780,8 @@ class ProcessPoolExecutor(_base.Executor):
args=(self._call_queue,
self._result_queue,
self._initializer,
self._initargs))
self._initargs,
self._max_tasks_per_child))
p.start()
self._processes[p.pid] = p
@@ -759,7 +835,7 @@ class ProcessPoolExecutor(_base.Executor):
raise ValueError("chunksize must be >= 1.")
results = super().map(partial(_process_chunk, fn),
_get_chunks(*iterables, chunksize=chunksize),
itertools.batched(zip(*iterables), chunksize),
timeout=timeout)
return _chain_from_iterable_of_lists(results)

View File

@@ -37,14 +37,14 @@ def _python_exit():
threading._register_atexit(_python_exit)
# At fork, reinitialize the `_global_shutdown_lock` lock in the child process
# TODO RUSTPYTHON - _at_fork_reinit is not implemented yet
if hasattr(os, 'register_at_fork') and hasattr(_global_shutdown_lock, '_at_fork_reinit'):
if hasattr(os, 'register_at_fork'):
os.register_at_fork(before=_global_shutdown_lock.acquire,
after_in_child=_global_shutdown_lock._at_fork_reinit,
after_in_parent=_global_shutdown_lock.release)
os.register_at_fork(after_in_child=_threads_queues.clear)
class _WorkItem(object):
class _WorkItem:
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
@@ -79,17 +79,20 @@ def _worker(executor_reference, work_queue, initializer, initargs):
return
try:
while True:
work_item = work_queue.get(block=True)
if work_item is not None:
work_item.run()
# Delete references to object. See issue16284
del work_item
# attempt to increment idle count
try:
work_item = work_queue.get_nowait()
except queue.Empty:
# attempt to increment idle count if queue is empty
executor = executor_reference()
if executor is not None:
executor._idle_semaphore.release()
del executor
work_item = work_queue.get(block=True)
if work_item is not None:
work_item.run()
# Delete references to object. See GH-60488
del work_item
continue
executor = executor_reference()
@@ -137,10 +140,10 @@ class ThreadPoolExecutor(_base.Executor):
# * CPU bound task which releases GIL
# * I/O bound task (which releases GIL, of course)
#
# We use cpu_count + 4 for both types of tasks.
# We use process_cpu_count + 4 for both types of tasks.
# But we limit it to 32 to avoid consuming surprisingly large resource
# on many core machine.
max_workers = min(32, (os.cpu_count() or 1) + 4)
max_workers = min(32, (os.process_cpu_count() or 1) + 4)
if max_workers <= 0:
raise ValueError("max_workers must be greater than 0")

View File

@@ -0,0 +1,18 @@
import os.path
import unittest
from test import support
from test.support import threading_helper
# Adjust if we ever have a platform with processes but not threads.
threading_helper.requires_working_threading(module=True)
if support.check_sanitizer(address=True, memory=True):
# gh-90791: Skip the test because it is too slow when Python is built
# with ASAN/MSAN: between 5 and 20 minutes on GitHub Actions.
raise unittest.SkipTest("test too slow on ASAN/MSAN build")
def load_tests(*args):
return support.load_package_tests(os.path.dirname(__file__), *args)

View File

@@ -0,0 +1,162 @@
import threading
import time
import unittest
import weakref
from concurrent import futures
from test import support
from test.support import Py_GIL_DISABLED
def mul(x, y):
return x * y
def capture(*args, **kwargs):
return args, kwargs
class MyObject(object):
def my_method(self):
pass
def make_dummy_object(_):
return MyObject()
# Used in test_swallows_falsey_exceptions
def raiser(exception, msg='std'):
raise exception(msg)
class FalseyBoolException(Exception):
def __bool__(self):
return False
class FalseyLenException(Exception):
def __len__(self):
return 0
class ExecutorTest:
# Executor.shutdown() and context manager usage is tested by
# ExecutorShutdownTest.
def test_submit(self):
future = self.executor.submit(pow, 2, 8)
self.assertEqual(256, future.result())
def test_submit_keyword(self):
future = self.executor.submit(mul, 2, y=8)
self.assertEqual(16, future.result())
future = self.executor.submit(capture, 1, self=2, fn=3)
self.assertEqual(future.result(), ((1,), {'self': 2, 'fn': 3}))
with self.assertRaises(TypeError):
self.executor.submit(fn=capture, arg=1)
with self.assertRaises(TypeError):
self.executor.submit(arg=1)
def test_map(self):
self.assertEqual(
list(self.executor.map(pow, range(10), range(10))),
list(map(pow, range(10), range(10))))
self.assertEqual(
list(self.executor.map(pow, range(10), range(10), chunksize=3)),
list(map(pow, range(10), range(10))))
def test_map_exception(self):
i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
self.assertEqual(i.__next__(), (0, 1))
self.assertEqual(i.__next__(), (0, 1))
self.assertRaises(ZeroDivisionError, i.__next__)
@support.requires_resource('walltime')
def test_map_timeout(self):
results = []
try:
for i in self.executor.map(time.sleep,
[0, 0, 6],
timeout=5):
results.append(i)
except futures.TimeoutError:
pass
else:
self.fail('expected TimeoutError')
self.assertEqual([None, None], results)
def test_shutdown_race_issue12456(self):
# Issue #12456: race condition at shutdown where trying to post a
# sentinel in the call queue blocks (the queue is full while processes
# have exited).
self.executor.map(str, [2] * (self.worker_count + 1))
self.executor.shutdown()
@support.cpython_only
def test_no_stale_references(self):
# Issue #16284: check that the executors don't unnecessarily hang onto
# references.
my_object = MyObject()
my_object_collected = threading.Event()
def set_event():
if Py_GIL_DISABLED:
# gh-117688 Avoid deadlock by setting the event in a
# background thread. The current thread may be in the middle
# of the my_object_collected.wait() call, which holds locks
# needed by my_object_collected.set().
threading.Thread(target=my_object_collected.set).start()
else:
my_object_collected.set()
my_object_callback = weakref.ref(my_object, lambda obj: set_event())
# Deliberately discarding the future.
self.executor.submit(my_object.my_method)
del my_object
if Py_GIL_DISABLED:
# Due to biased reference counting, my_object might only be
# deallocated while the thread that created it runs -- if the
# thread is paused waiting on an event, it may not merge the
# refcount of the queued object. For that reason, we alternate
# between running the GC and waiting for the event.
wait_time = 0
collected = False
while not collected and wait_time <= support.SHORT_TIMEOUT:
support.gc_collect()
collected = my_object_collected.wait(timeout=1.0)
wait_time += 1.0
else:
collected = my_object_collected.wait(timeout=support.SHORT_TIMEOUT)
self.assertTrue(collected,
"Stale reference not collected within timeout.")
def test_max_workers_negative(self):
for number in (0, -1):
with self.assertRaisesRegex(ValueError,
"max_workers must be greater "
"than 0"):
self.executor_type(max_workers=number)
def test_free_reference(self):
# Issue #14406: Result iterator should not keep an internal
# reference to result objects.
for obj in self.executor.map(make_dummy_object, range(10)):
wr = weakref.ref(obj)
del obj
support.gc_collect() # For PyPy or other GCs.
for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
if wr() is None:
break
def test_swallows_falsey_exceptions(self):
# see gh-132063: Prevent exceptions that evaluate as falsey
# from being ignored.
# Recall: `x` is falsey if `len(x)` returns 0 or `bool(x)` returns False.
msg = 'boolbool'
with self.assertRaisesRegex(FalseyBoolException, msg):
self.executor.submit(raiser, FalseyBoolException, msg).result()
msg = 'lenlen'
with self.assertRaisesRegex(FalseyLenException, msg):
self.executor.submit(raiser, FalseyLenException, msg).result()

View File

@@ -0,0 +1,118 @@
import itertools
import time
import unittest
import weakref
from concurrent import futures
from concurrent.futures._base import (
CANCELLED_AND_NOTIFIED, FINISHED, Future)
from test import support
from .util import (
PENDING_FUTURE, RUNNING_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE,
create_future, create_executor_tests, setup_module)
def mul(x, y):
return x * y
class AsCompletedTests:
def test_no_timeout(self):
future1 = self.executor.submit(mul, 2, 21)
future2 = self.executor.submit(mul, 7, 6)
completed = set(futures.as_completed(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1, future2]))
self.assertEqual(set(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1, future2]),
completed)
def test_future_times_out(self):
"""Test ``futures.as_completed`` timing out before
completing it's final future."""
already_completed = {CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE}
# Windows clock resolution is around 15.6 ms
short_timeout = 0.100
for timeout in (0, short_timeout):
with self.subTest(timeout):
completed_futures = set()
future = self.executor.submit(time.sleep, short_timeout * 10)
try:
for f in futures.as_completed(
already_completed | {future},
timeout
):
completed_futures.add(f)
except futures.TimeoutError:
pass
# Check that ``future`` wasn't completed.
self.assertEqual(completed_futures, already_completed)
def test_duplicate_futures(self):
# Issue 20367. Duplicate futures should not raise exceptions or give
# duplicate responses.
# Issue #31641: accept arbitrary iterables.
future1 = self.executor.submit(time.sleep, 2)
completed = [
f for f in futures.as_completed(itertools.repeat(future1, 3))
]
self.assertEqual(len(completed), 1)
def test_free_reference_yielded_future(self):
# Issue #14406: Generator should not keep references
# to finished futures.
futures_list = [Future() for _ in range(8)]
futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED))
futures_list.append(create_future(state=FINISHED, result=42))
with self.assertRaises(futures.TimeoutError):
for future in futures.as_completed(futures_list, timeout=0):
futures_list.remove(future)
wr = weakref.ref(future)
del future
support.gc_collect() # For PyPy or other GCs.
self.assertIsNone(wr())
futures_list[0].set_result("test")
for future in futures.as_completed(futures_list):
futures_list.remove(future)
wr = weakref.ref(future)
del future
support.gc_collect() # For PyPy or other GCs.
self.assertIsNone(wr())
if futures_list:
futures_list[0].set_result("test")
def test_correct_timeout_exception_msg(self):
futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE,
RUNNING_FUTURE, SUCCESSFUL_FUTURE]
with self.assertRaises(futures.TimeoutError) as cm:
list(futures.as_completed(futures_list, timeout=0))
self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished')
create_executor_tests(globals(), AsCompletedTests)
def setUpModule():
setup_module()
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,332 @@
import contextlib
import queue
import signal
import sys
import time
import unittest
import unittest.mock
from pickle import PicklingError
from concurrent import futures
from concurrent.futures.process import BrokenProcessPool, _ThreadWakeup
from test import support
from .util import (
create_executor_tests, setup_module,
ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin)
def _crash(delay=None):
"""Induces a segfault."""
if delay:
time.sleep(delay)
import faulthandler
faulthandler.disable()
faulthandler._sigsegv()
def _crash_with_data(data):
"""Induces a segfault with dummy data in input."""
_crash()
def _exit():
"""Induces a sys exit with exitcode 1."""
sys.exit(1)
def _raise_error(Err):
"""Function that raises an Exception in process."""
raise Err()
def _raise_error_ignore_stderr(Err):
"""Function that raises an Exception in process and ignores stderr."""
import io
sys.stderr = io.StringIO()
raise Err()
def _return_instance(cls):
"""Function that returns a instance of cls."""
return cls()
class CrashAtPickle(object):
"""Bad object that triggers a segfault at pickling time."""
def __reduce__(self):
_crash()
class CrashAtUnpickle(object):
"""Bad object that triggers a segfault at unpickling time."""
def __reduce__(self):
return _crash, ()
class ExitAtPickle(object):
"""Bad object that triggers a process exit at pickling time."""
def __reduce__(self):
_exit()
class ExitAtUnpickle(object):
"""Bad object that triggers a process exit at unpickling time."""
def __reduce__(self):
return _exit, ()
class ErrorAtPickle(object):
"""Bad object that triggers an error at pickling time."""
def __reduce__(self):
from pickle import PicklingError
raise PicklingError("Error in pickle")
class ErrorAtUnpickle(object):
"""Bad object that triggers an error at unpickling time."""
def __reduce__(self):
from pickle import UnpicklingError
return _raise_error_ignore_stderr, (UnpicklingError, )
class ExecutorDeadlockTest:
TIMEOUT = support.LONG_TIMEOUT
def _fail_on_deadlock(self, executor):
# If we did not recover before TIMEOUT seconds, consider that the
# executor is in a deadlock state and forcefully clean all its
# composants.
import faulthandler
from tempfile import TemporaryFile
with TemporaryFile(mode="w+") as f:
faulthandler.dump_traceback(file=f)
f.seek(0)
tb = f.read()
for p in executor._processes.values():
p.terminate()
# This should be safe to call executor.shutdown here as all possible
# deadlocks should have been broken.
executor.shutdown(wait=True)
print(f"\nTraceback:\n {tb}", file=sys.__stderr__)
self.fail(f"Executor deadlock:\n\n{tb}")
def _check_crash(self, error, func, *args, ignore_stderr=False):
# test for deadlock caused by crashes in a pool
self.executor.shutdown(wait=True)
executor = self.executor_type(
max_workers=2, mp_context=self.get_context())
res = executor.submit(func, *args)
if ignore_stderr:
cm = support.captured_stderr()
else:
cm = contextlib.nullcontext()
try:
with self.assertRaises(error):
with cm:
res.result(timeout=self.TIMEOUT)
except futures.TimeoutError:
# If we did not recover before TIMEOUT seconds,
# consider that the executor is in a deadlock state
self._fail_on_deadlock(executor)
executor.shutdown(wait=True)
def test_error_at_task_pickle(self):
# Check problem occurring while pickling a task in
# the task_handler thread
self._check_crash(PicklingError, id, ErrorAtPickle())
def test_exit_at_task_unpickle(self):
# Check problem occurring while unpickling a task on workers
self._check_crash(BrokenProcessPool, id, ExitAtUnpickle())
def test_error_at_task_unpickle(self):
# gh-109832: Restore stderr overridden by _raise_error_ignore_stderr()
self.addCleanup(setattr, sys, 'stderr', sys.stderr)
# Check problem occurring while unpickling a task on workers
self._check_crash(BrokenProcessPool, id, ErrorAtUnpickle())
def test_crash_at_task_unpickle(self):
# Check problem occurring while unpickling a task on workers
self._check_crash(BrokenProcessPool, id, CrashAtUnpickle())
def test_crash_during_func_exec_on_worker(self):
# Check problem occurring during func execution on workers
self._check_crash(BrokenProcessPool, _crash)
def test_exit_during_func_exec_on_worker(self):
# Check problem occurring during func execution on workers
self._check_crash(SystemExit, _exit)
def test_error_during_func_exec_on_worker(self):
# Check problem occurring during func execution on workers
self._check_crash(RuntimeError, _raise_error, RuntimeError)
def test_crash_during_result_pickle_on_worker(self):
# Check problem occurring while pickling a task result
# on workers
self._check_crash(BrokenProcessPool, _return_instance, CrashAtPickle)
def test_exit_during_result_pickle_on_worker(self):
# Check problem occurring while pickling a task result
# on workers
self._check_crash(SystemExit, _return_instance, ExitAtPickle)
def test_error_during_result_pickle_on_worker(self):
# Check problem occurring while pickling a task result
# on workers
self._check_crash(PicklingError, _return_instance, ErrorAtPickle)
def test_error_during_result_unpickle_in_result_handler(self):
# gh-109832: Restore stderr overridden by _raise_error_ignore_stderr()
self.addCleanup(setattr, sys, 'stderr', sys.stderr)
# Check problem occurring while unpickling a task in
# the result_handler thread
self._check_crash(BrokenProcessPool,
_return_instance, ErrorAtUnpickle,
ignore_stderr=True)
def test_exit_during_result_unpickle_in_result_handler(self):
# Check problem occurring while unpickling a task in
# the result_handler thread
self._check_crash(BrokenProcessPool, _return_instance, ExitAtUnpickle)
def test_shutdown_deadlock(self):
# Test that the pool calling shutdown do not cause deadlock
# if a worker fails after the shutdown call.
self.executor.shutdown(wait=True)
with self.executor_type(max_workers=2,
mp_context=self.get_context()) as executor:
self.executor = executor # Allow clean up in fail_on_deadlock
f = executor.submit(_crash, delay=.1)
executor.shutdown(wait=True)
with self.assertRaises(BrokenProcessPool):
f.result()
def test_shutdown_deadlock_pickle(self):
# Test that the pool calling shutdown with wait=False does not cause
# a deadlock if a task fails at pickle after the shutdown call.
# Reported in bpo-39104.
self.executor.shutdown(wait=True)
with self.executor_type(max_workers=2,
mp_context=self.get_context()) as executor:
self.executor = executor # Allow clean up in fail_on_deadlock
# Start the executor and get the executor_manager_thread to collect
# the threads and avoid dangling thread that should be cleaned up
# asynchronously.
executor.submit(id, 42).result()
executor_manager = executor._executor_manager_thread
# Submit a task that fails at pickle and shutdown the executor
# without waiting
f = executor.submit(id, ErrorAtPickle())
executor.shutdown(wait=False)
with self.assertRaises(PicklingError):
f.result()
# Make sure the executor is eventually shutdown and do not leave
# dangling threads
executor_manager.join()
def test_crash_big_data(self):
# Test that there is a clean exception instad of a deadlock when a
# child process crashes while some data is being written into the
# queue.
# https://github.com/python/cpython/issues/94777
self.executor.shutdown(wait=True)
data = "a" * support.PIPE_MAX_SIZE
with self.executor_type(max_workers=2,
mp_context=self.get_context()) as executor:
self.executor = executor # Allow clean up in fail_on_deadlock
with self.assertRaises(BrokenProcessPool):
list(executor.map(_crash_with_data, [data] * 10))
executor.shutdown(wait=True)
def test_gh105829_should_not_deadlock_if_wakeup_pipe_full(self):
# Issue #105829: The _ExecutorManagerThread wakeup pipe could
# fill up and block. See: https://github.com/python/cpython/issues/105829
# Lots of cargo culting while writing this test, apologies if
# something is really stupid...
self.executor.shutdown(wait=True)
if not hasattr(signal, 'alarm'):
raise unittest.SkipTest(
"Tested platform does not support the alarm signal")
def timeout(_signum, _frame):
import faulthandler
faulthandler.dump_traceback()
raise RuntimeError("timed out while submitting jobs?")
thread_run = futures.process._ExecutorManagerThread.run
def mock_run(self):
# Delay thread startup so the wakeup pipe can fill up and block
time.sleep(3)
thread_run(self)
class MockWakeup(_ThreadWakeup):
"""Mock wakeup object to force the wakeup to block"""
def __init__(self):
super().__init__()
self._dummy_queue = queue.Queue(maxsize=1)
def wakeup(self):
self._dummy_queue.put(None, block=True)
super().wakeup()
def clear(self):
super().clear()
try:
while True:
self._dummy_queue.get_nowait()
except queue.Empty:
pass
with (unittest.mock.patch.object(futures.process._ExecutorManagerThread,
'run', mock_run),
unittest.mock.patch('concurrent.futures.process._ThreadWakeup',
MockWakeup)):
with self.executor_type(max_workers=2,
mp_context=self.get_context()) as executor:
self.executor = executor # Allow clean up in fail_on_deadlock
job_num = 100
job_data = range(job_num)
# Need to use sigalarm for timeout detection because
# Executor.submit is not guarded by any timeout (both
# self._work_ids.put(self._queue_count) and
# self._executor_manager_thread_wakeup.wakeup() might
# timeout, maybe more?). In this specific case it was
# the wakeup call that deadlocked on a blocking pipe.
old_handler = signal.signal(signal.SIGALRM, timeout)
try:
signal.alarm(int(self.TIMEOUT))
self.assertEqual(job_num, len(list(executor.map(int, job_data))))
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
create_executor_tests(globals(), ExecutorDeadlockTest,
executor_mixins=(ProcessPoolForkMixin,
ProcessPoolForkserverMixin,
ProcessPoolSpawnMixin))
def setUpModule():
setup_module()
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,291 @@
import threading
import time
import unittest
from concurrent import futures
from concurrent.futures._base import (
PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
from test import support
from .util import (
PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE,
BaseTestCase, create_future, setup_module)
class FutureTests(BaseTestCase):
def test_done_callback_with_result(self):
callback_result = None
def fn(callback_future):
nonlocal callback_result
callback_result = callback_future.result()
f = Future()
f.add_done_callback(fn)
f.set_result(5)
self.assertEqual(5, callback_result)
def test_done_callback_with_exception(self):
callback_exception = None
def fn(callback_future):
nonlocal callback_exception
callback_exception = callback_future.exception()
f = Future()
f.add_done_callback(fn)
f.set_exception(Exception('test'))
self.assertEqual(('test',), callback_exception.args)
def test_done_callback_with_cancel(self):
was_cancelled = None
def fn(callback_future):
nonlocal was_cancelled
was_cancelled = callback_future.cancelled()
f = Future()
f.add_done_callback(fn)
self.assertTrue(f.cancel())
self.assertTrue(was_cancelled)
def test_done_callback_raises(self):
with support.captured_stderr() as stderr:
raising_was_called = False
fn_was_called = False
def raising_fn(callback_future):
nonlocal raising_was_called
raising_was_called = True
raise Exception('doh!')
def fn(callback_future):
nonlocal fn_was_called
fn_was_called = True
f = Future()
f.add_done_callback(raising_fn)
f.add_done_callback(fn)
f.set_result(5)
self.assertTrue(raising_was_called)
self.assertTrue(fn_was_called)
self.assertIn('Exception: doh!', stderr.getvalue())
def test_done_callback_already_successful(self):
callback_result = None
def fn(callback_future):
nonlocal callback_result
callback_result = callback_future.result()
f = Future()
f.set_result(5)
f.add_done_callback(fn)
self.assertEqual(5, callback_result)
def test_done_callback_already_failed(self):
callback_exception = None
def fn(callback_future):
nonlocal callback_exception
callback_exception = callback_future.exception()
f = Future()
f.set_exception(Exception('test'))
f.add_done_callback(fn)
self.assertEqual(('test',), callback_exception.args)
def test_done_callback_already_cancelled(self):
was_cancelled = None
def fn(callback_future):
nonlocal was_cancelled
was_cancelled = callback_future.cancelled()
f = Future()
self.assertTrue(f.cancel())
f.add_done_callback(fn)
self.assertTrue(was_cancelled)
def test_done_callback_raises_already_succeeded(self):
with support.captured_stderr() as stderr:
def raising_fn(callback_future):
raise Exception('doh!')
f = Future()
# Set the result first to simulate a future that runs instantly,
# effectively allowing the callback to be run immediately.
f.set_result(5)
f.add_done_callback(raising_fn)
self.assertIn('exception calling callback for', stderr.getvalue())
self.assertIn('doh!', stderr.getvalue())
def test_repr(self):
self.assertRegex(repr(PENDING_FUTURE),
'<Future at 0x[0-9a-f]+ state=pending>')
self.assertRegex(repr(RUNNING_FUTURE),
'<Future at 0x[0-9a-f]+ state=running>')
self.assertRegex(repr(CANCELLED_FUTURE),
'<Future at 0x[0-9a-f]+ state=cancelled>')
self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE),
'<Future at 0x[0-9a-f]+ state=cancelled>')
self.assertRegex(
repr(EXCEPTION_FUTURE),
'<Future at 0x[0-9a-f]+ state=finished raised OSError>')
self.assertRegex(
repr(SUCCESSFUL_FUTURE),
'<Future at 0x[0-9a-f]+ state=finished returned int>')
def test_cancel(self):
f1 = create_future(state=PENDING)
f2 = create_future(state=RUNNING)
f3 = create_future(state=CANCELLED)
f4 = create_future(state=CANCELLED_AND_NOTIFIED)
f5 = create_future(state=FINISHED, exception=OSError())
f6 = create_future(state=FINISHED, result=5)
self.assertTrue(f1.cancel())
self.assertEqual(f1._state, CANCELLED)
self.assertFalse(f2.cancel())
self.assertEqual(f2._state, RUNNING)
self.assertTrue(f3.cancel())
self.assertEqual(f3._state, CANCELLED)
self.assertTrue(f4.cancel())
self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
self.assertFalse(f5.cancel())
self.assertEqual(f5._state, FINISHED)
self.assertFalse(f6.cancel())
self.assertEqual(f6._state, FINISHED)
def test_cancelled(self):
self.assertFalse(PENDING_FUTURE.cancelled())
self.assertFalse(RUNNING_FUTURE.cancelled())
self.assertTrue(CANCELLED_FUTURE.cancelled())
self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
self.assertFalse(EXCEPTION_FUTURE.cancelled())
self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
def test_done(self):
self.assertFalse(PENDING_FUTURE.done())
self.assertFalse(RUNNING_FUTURE.done())
self.assertTrue(CANCELLED_FUTURE.done())
self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
self.assertTrue(EXCEPTION_FUTURE.done())
self.assertTrue(SUCCESSFUL_FUTURE.done())
def test_running(self):
self.assertFalse(PENDING_FUTURE.running())
self.assertTrue(RUNNING_FUTURE.running())
self.assertFalse(CANCELLED_FUTURE.running())
self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
self.assertFalse(EXCEPTION_FUTURE.running())
self.assertFalse(SUCCESSFUL_FUTURE.running())
def test_result_with_timeout(self):
self.assertRaises(futures.TimeoutError,
PENDING_FUTURE.result, timeout=0)
self.assertRaises(futures.TimeoutError,
RUNNING_FUTURE.result, timeout=0)
self.assertRaises(futures.CancelledError,
CANCELLED_FUTURE.result, timeout=0)
self.assertRaises(futures.CancelledError,
CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0)
self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
def test_result_with_success(self):
# TODO(brian@sweetapp.com): This test is timing dependent.
def notification():
# Wait until the main thread is waiting for the result.
time.sleep(1)
f1.set_result(42)
f1 = create_future(state=PENDING)
t = threading.Thread(target=notification)
t.start()
self.assertEqual(f1.result(timeout=5), 42)
t.join()
def test_result_with_cancel(self):
# TODO(brian@sweetapp.com): This test is timing dependent.
def notification():
# Wait until the main thread is waiting for the result.
time.sleep(1)
f1.cancel()
f1 = create_future(state=PENDING)
t = threading.Thread(target=notification)
t.start()
self.assertRaises(futures.CancelledError,
f1.result, timeout=support.SHORT_TIMEOUT)
t.join()
def test_exception_with_timeout(self):
self.assertRaises(futures.TimeoutError,
PENDING_FUTURE.exception, timeout=0)
self.assertRaises(futures.TimeoutError,
RUNNING_FUTURE.exception, timeout=0)
self.assertRaises(futures.CancelledError,
CANCELLED_FUTURE.exception, timeout=0)
self.assertRaises(futures.CancelledError,
CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
OSError))
self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
def test_exception_with_success(self):
def notification():
# Wait until the main thread is waiting for the exception.
time.sleep(1)
with f1._condition:
f1._state = FINISHED
f1._exception = OSError()
f1._condition.notify_all()
f1 = create_future(state=PENDING)
t = threading.Thread(target=notification)
t.start()
self.assertTrue(isinstance(f1.exception(timeout=support.SHORT_TIMEOUT), OSError))
t.join()
def test_multiple_set_result(self):
f = create_future(state=PENDING)
f.set_result(1)
with self.assertRaisesRegex(
futures.InvalidStateError,
'FINISHED: <Future at 0x[0-9a-f]+ '
'state=finished returned int>'
):
f.set_result(2)
self.assertTrue(f.done())
self.assertEqual(f.result(), 1)
def test_multiple_set_exception(self):
f = create_future(state=PENDING)
e = ValueError()
f.set_exception(e)
with self.assertRaisesRegex(
futures.InvalidStateError,
'FINISHED: <Future at 0x[0-9a-f]+ '
'state=finished raised ValueError>'
):
f.set_exception(Exception())
self.assertEqual(f.exception(), e)
def setUpModule():
setup_module()
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,152 @@
import contextlib
import logging
import queue
import time
import unittest
import sys
import io
from concurrent.futures._base import BrokenExecutor
from concurrent.futures.process import _check_system_limits
from logging.handlers import QueueHandler
from test import support
from .util import ExecutorMixin, create_executor_tests, setup_module
INITIALIZER_STATUS = 'uninitialized'
def init(x):
global INITIALIZER_STATUS
INITIALIZER_STATUS = x
def get_init_status():
return INITIALIZER_STATUS
def init_fail(log_queue=None):
if log_queue is not None:
logger = logging.getLogger('concurrent.futures')
logger.addHandler(QueueHandler(log_queue))
logger.setLevel('CRITICAL')
logger.propagate = False
time.sleep(0.1) # let some futures be scheduled
raise ValueError('error in initializer')
class InitializerMixin(ExecutorMixin):
worker_count = 2
def setUp(self):
global INITIALIZER_STATUS
INITIALIZER_STATUS = 'uninitialized'
self.executor_kwargs = dict(initializer=init,
initargs=('initialized',))
super().setUp()
def test_initializer(self):
futures = [self.executor.submit(get_init_status)
for _ in range(self.worker_count)]
for f in futures:
self.assertEqual(f.result(), 'initialized')
class FailingInitializerMixin(ExecutorMixin):
worker_count = 2
def setUp(self):
if hasattr(self, "ctx"):
# Pass a queue to redirect the child's logging output
self.mp_context = self.get_context()
self.log_queue = self.mp_context.Queue()
self.executor_kwargs = dict(initializer=init_fail,
initargs=(self.log_queue,))
else:
# In a thread pool, the child shares our logging setup
# (see _assert_logged())
self.mp_context = None
self.log_queue = None
self.executor_kwargs = dict(initializer=init_fail)
super().setUp()
def test_initializer(self):
with self._assert_logged('ValueError: error in initializer'):
try:
future = self.executor.submit(get_init_status)
except BrokenExecutor:
# Perhaps the executor is already broken
pass
else:
with self.assertRaises(BrokenExecutor):
future.result()
# At some point, the executor should break
for _ in support.sleeping_retry(support.SHORT_TIMEOUT,
"executor not broken"):
if self.executor._broken:
break
# ... and from this point submit() is guaranteed to fail
with self.assertRaises(BrokenExecutor):
self.executor.submit(get_init_status)
@contextlib.contextmanager
def _assert_logged(self, msg):
if self.log_queue is not None:
yield
output = []
try:
while True:
output.append(self.log_queue.get_nowait().getMessage())
except queue.Empty:
pass
else:
with self.assertLogs('concurrent.futures', 'CRITICAL') as cm:
yield
output = cm.output
self.assertTrue(any(msg in line for line in output),
output)
create_executor_tests(globals(), InitializerMixin)
create_executor_tests(globals(), FailingInitializerMixin)
@unittest.skipIf(sys.platform == "win32", "Resource Tracker doesn't run on Windows")
class FailingInitializerResourcesTest(unittest.TestCase):
"""
Source: https://github.com/python/cpython/issues/104090
"""
def _test(self, test_class):
try:
_check_system_limits()
except NotImplementedError:
self.skipTest("ProcessPoolExecutor unavailable on this system")
runner = unittest.TextTestRunner(stream=io.StringIO())
runner.run(test_class('test_initializer'))
# GH-104090:
# Stop resource tracker manually now, so we can verify there are not leaked resources by checking
# the process exit code
from multiprocessing.resource_tracker import _resource_tracker
_resource_tracker._stop()
self.assertEqual(_resource_tracker._exitcode, 0)
def test_spawn(self):
self._test(ProcessPoolSpawnFailingInitializerTest)
@support.skip_if_sanitizer("TSAN doesn't support threads after fork", thread=True)
def test_forkserver(self):
self._test(ProcessPoolForkserverFailingInitializerTest)
def setUpModule():
setup_module()
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,234 @@
import os
import sys
import threading
import time
import unittest
from concurrent import futures
from concurrent.futures.process import BrokenProcessPool
from test import support
from test.support import hashlib_helper
from .executor import ExecutorTest, mul
from .util import (
ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin,
create_executor_tests, setup_module)
class EventfulGCObj():
def __init__(self, mgr):
self.event = mgr.Event()
def __del__(self):
self.event.set()
class ProcessPoolExecutorTest(ExecutorTest):
@unittest.skipUnless(sys.platform=='win32', 'Windows-only process limit')
def test_max_workers_too_large(self):
with self.assertRaisesRegex(ValueError,
"max_workers must be <= 61"):
futures.ProcessPoolExecutor(max_workers=62)
def test_killed_child(self):
# When a child process is abruptly terminated, the whole pool gets
# "broken".
futures = [self.executor.submit(time.sleep, 3)]
# Get one of the processes, and terminate (kill) it
p = next(iter(self.executor._processes.values()))
p.terminate()
for fut in futures:
self.assertRaises(BrokenProcessPool, fut.result)
# Submitting other jobs fails as well.
self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
def test_map_chunksize(self):
def bad_map():
list(self.executor.map(pow, range(40), range(40), chunksize=-1))
ref = list(map(pow, range(40), range(40)))
self.assertEqual(
list(self.executor.map(pow, range(40), range(40), chunksize=6)),
ref)
self.assertEqual(
list(self.executor.map(pow, range(40), range(40), chunksize=50)),
ref)
self.assertEqual(
list(self.executor.map(pow, range(40), range(40), chunksize=40)),
ref)
self.assertRaises(ValueError, bad_map)
@classmethod
def _test_traceback(cls):
raise RuntimeError(123) # some comment
def test_traceback(self):
# We want ensure that the traceback from the child process is
# contained in the traceback raised in the main process.
future = self.executor.submit(self._test_traceback)
with self.assertRaises(Exception) as cm:
future.result()
exc = cm.exception
self.assertIs(type(exc), RuntimeError)
self.assertEqual(exc.args, (123,))
cause = exc.__cause__
self.assertIs(type(cause), futures.process._RemoteTraceback)
self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
with support.captured_stderr() as f1:
try:
raise exc
except RuntimeError:
sys.excepthook(*sys.exc_info())
self.assertIn('raise RuntimeError(123) # some comment',
f1.getvalue())
@hashlib_helper.requires_hashdigest('md5')
def test_ressources_gced_in_workers(self):
# Ensure that argument for a job are correctly gc-ed after the job
# is finished
mgr = self.get_context().Manager()
obj = EventfulGCObj(mgr)
future = self.executor.submit(id, obj)
future.result()
self.assertTrue(obj.event.wait(timeout=1))
# explicitly destroy the object to ensure that EventfulGCObj.__del__()
# is called while manager is still running.
support.gc_collect()
obj = None
support.gc_collect()
mgr.shutdown()
mgr.join()
def test_saturation(self):
executor = self.executor
mp_context = self.get_context()
sem = mp_context.Semaphore(0)
job_count = 15 * executor._max_workers
for _ in range(job_count):
executor.submit(sem.acquire)
self.assertEqual(len(executor._processes), executor._max_workers)
for _ in range(job_count):
sem.release()
@support.requires_gil_enabled("gh-117344: test is flaky without the GIL")
def test_idle_process_reuse_one(self):
executor = self.executor
assert executor._max_workers >= 4
if self.get_context().get_start_method(allow_none=False) == "fork":
raise unittest.SkipTest("Incompatible with the fork start method.")
executor.submit(mul, 21, 2).result()
executor.submit(mul, 6, 7).result()
executor.submit(mul, 3, 14).result()
self.assertEqual(len(executor._processes), 1)
def test_idle_process_reuse_multiple(self):
executor = self.executor
assert executor._max_workers <= 5
if self.get_context().get_start_method(allow_none=False) == "fork":
raise unittest.SkipTest("Incompatible with the fork start method.")
executor.submit(mul, 12, 7).result()
executor.submit(mul, 33, 25)
executor.submit(mul, 25, 26).result()
executor.submit(mul, 18, 29)
executor.submit(mul, 1, 2).result()
executor.submit(mul, 0, 9)
self.assertLessEqual(len(executor._processes), 3)
executor.shutdown()
def test_max_tasks_per_child(self):
context = self.get_context()
if context.get_start_method(allow_none=False) == "fork":
with self.assertRaises(ValueError):
self.executor_type(1, mp_context=context, max_tasks_per_child=3)
return
# not using self.executor as we need to control construction.
# arguably this could go in another class w/o that mixin.
executor = self.executor_type(
1, mp_context=context, max_tasks_per_child=3)
f1 = executor.submit(os.getpid)
original_pid = f1.result()
# The worker pid remains the same as the worker could be reused
f2 = executor.submit(os.getpid)
self.assertEqual(f2.result(), original_pid)
self.assertEqual(len(executor._processes), 1)
f3 = executor.submit(os.getpid)
self.assertEqual(f3.result(), original_pid)
# A new worker is spawned, with a statistically different pid,
# while the previous was reaped.
f4 = executor.submit(os.getpid)
new_pid = f4.result()
self.assertNotEqual(original_pid, new_pid)
self.assertEqual(len(executor._processes), 1)
executor.shutdown()
def test_max_tasks_per_child_defaults_to_spawn_context(self):
# not using self.executor as we need to control construction.
# arguably this could go in another class w/o that mixin.
executor = self.executor_type(1, max_tasks_per_child=3)
self.assertEqual(executor._mp_context.get_start_method(), "spawn")
def test_max_tasks_early_shutdown(self):
context = self.get_context()
if context.get_start_method(allow_none=False) == "fork":
raise unittest.SkipTest("Incompatible with the fork start method.")
# not using self.executor as we need to control construction.
# arguably this could go in another class w/o that mixin.
executor = self.executor_type(
3, mp_context=context, max_tasks_per_child=1)
futures = []
for i in range(6):
futures.append(executor.submit(mul, i, i))
executor.shutdown()
for i, future in enumerate(futures):
self.assertEqual(future.result(), mul(i, i))
@unittest.expectedFailure # TODO: RUSTPYTHON AttributeError: module 'threading' has no attribute '_start_joinable_thread'. Did you mean: '_start_new_thread'?
def test_python_finalization_error(self):
# gh-109047: Catch RuntimeError on thread creation
# during Python finalization.
context = self.get_context()
# gh-109047: Mock the threading.start_joinable_thread() function to inject
# RuntimeError: simulate the error raised during Python finalization.
# Block the second creation: create _ExecutorManagerThread, but block
# QueueFeederThread.
orig_start_new_thread = threading._start_joinable_thread
nthread = 0
def mock_start_new_thread(func, *args, **kwargs):
nonlocal nthread
if nthread >= 1:
raise RuntimeError("can't create new thread at "
"interpreter shutdown")
nthread += 1
return orig_start_new_thread(func, *args, **kwargs)
with support.swap_attr(threading, '_start_joinable_thread',
mock_start_new_thread):
executor = self.executor_type(max_workers=2, mp_context=context)
with executor:
with self.assertRaises(BrokenProcessPool):
list(executor.map(mul, [(2, 3)] * 10))
executor.shutdown()
create_executor_tests(globals(), ProcessPoolExecutorTest,
executor_mixins=(ProcessPoolForkMixin,
ProcessPoolForkserverMixin,
ProcessPoolSpawnMixin))
def setUpModule():
setup_module()
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,406 @@
import signal
import sys
import threading
import time
import unittest
from concurrent import futures
from test import support
from test.support.script_helper import assert_python_ok
from .util import (
BaseTestCase, ThreadPoolMixin, ProcessPoolForkMixin,
ProcessPoolForkserverMixin, ProcessPoolSpawnMixin,
create_executor_tests, setup_module)
def sleep_and_print(t, msg):
time.sleep(t)
print(msg)
sys.stdout.flush()
class ExecutorShutdownTest:
def test_run_after_shutdown(self):
self.executor.shutdown()
self.assertRaises(RuntimeError,
self.executor.submit,
pow, 2, 5)
@unittest.skip('TODO: RUSTPYTHON; hangs')
def test_interpreter_shutdown(self):
# Test the atexit hook for shutdown of worker threads and processes
rc, out, err = assert_python_ok('-c', """if 1:
from concurrent.futures import {executor_type}
from time import sleep
from test.test_concurrent_futures.test_shutdown import sleep_and_print
if __name__ == "__main__":
context = '{context}'
if context == "":
t = {executor_type}(5)
else:
from multiprocessing import get_context
context = get_context(context)
t = {executor_type}(5, mp_context=context)
t.submit(sleep_and_print, 1.0, "apple")
""".format(executor_type=self.executor_type.__name__,
context=getattr(self, "ctx", "")))
# Errors in atexit hooks don't change the process exit code, check
# stderr manually.
self.assertFalse(err)
self.assertEqual(out.strip(), b"apple")
@unittest.skip('TODO: RUSTPYTHON; Hangs')
def test_submit_after_interpreter_shutdown(self):
# Test the atexit hook for shutdown of worker threads and processes
rc, out, err = assert_python_ok('-c', """if 1:
import atexit
@atexit.register
def run_last():
try:
t.submit(id, None)
except RuntimeError:
print("runtime-error")
raise
from concurrent.futures import {executor_type}
if __name__ == "__main__":
context = '{context}'
if not context:
t = {executor_type}(5)
else:
from multiprocessing import get_context
context = get_context(context)
t = {executor_type}(5, mp_context=context)
t.submit(id, 42).result()
""".format(executor_type=self.executor_type.__name__,
context=getattr(self, "ctx", "")))
# Errors in atexit hooks don't change the process exit code, check
# stderr manually.
self.assertIn("RuntimeError: cannot schedule new futures", err.decode())
self.assertEqual(out.strip(), b"runtime-error")
def test_hang_issue12364(self):
fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
self.executor.shutdown()
for f in fs:
f.result()
def test_cancel_futures(self):
assert self.worker_count <= 5, "test needs few workers"
fs = [self.executor.submit(time.sleep, .1) for _ in range(50)]
self.executor.shutdown(cancel_futures=True)
# We can't guarantee the exact number of cancellations, but we can
# guarantee that *some* were cancelled. With few workers, many of
# the submitted futures should have been cancelled.
cancelled = [fut for fut in fs if fut.cancelled()]
self.assertGreater(len(cancelled), 20)
# Ensure the other futures were able to finish.
# Use "not fut.cancelled()" instead of "fut.done()" to include futures
# that may have been left in a pending state.
others = [fut for fut in fs if not fut.cancelled()]
for fut in others:
self.assertTrue(fut.done(), msg=f"{fut._state=}")
self.assertIsNone(fut.exception())
# Similar to the number of cancelled futures, we can't guarantee the
# exact number that completed. But, we can guarantee that at least
# one finished.
self.assertGreater(len(others), 0)
@unittest.expectedFailure # TODO: RUSTPYTHON AssertionError: b'' != b'apple'
def test_hang_gh83386(self):
"""shutdown(wait=False) doesn't hang at exit with running futures.
See https://github.com/python/cpython/issues/83386.
"""
if self.executor_type == futures.ProcessPoolExecutor:
raise unittest.SkipTest(
"Hangs, see https://github.com/python/cpython/issues/83386")
rc, out, err = assert_python_ok('-c', """if True:
from concurrent.futures import {executor_type}
from test.test_concurrent_futures.test_shutdown import sleep_and_print
if __name__ == "__main__":
if {context!r}: multiprocessing.set_start_method({context!r})
t = {executor_type}(max_workers=3)
t.submit(sleep_and_print, 1.0, "apple")
t.shutdown(wait=False)
""".format(executor_type=self.executor_type.__name__,
context=getattr(self, 'ctx', None)))
self.assertFalse(err)
self.assertEqual(out.strip(), b"apple")
def test_hang_gh94440(self):
"""shutdown(wait=True) doesn't hang when a future was submitted and
quickly canceled right before shutdown.
See https://github.com/python/cpython/issues/94440.
"""
if not hasattr(signal, 'alarm'):
raise unittest.SkipTest(
"Tested platform does not support the alarm signal")
def timeout(_signum, _frame):
raise RuntimeError("timed out waiting for shutdown")
kwargs = {}
if getattr(self, 'ctx', None):
kwargs['mp_context'] = self.get_context()
executor = self.executor_type(max_workers=1, **kwargs)
executor.submit(int).result()
old_handler = signal.signal(signal.SIGALRM, timeout)
try:
signal.alarm(5)
executor.submit(int).cancel()
executor.shutdown(wait=True)
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase):
def test_threads_terminate(self):
def acquire_lock(lock):
lock.acquire()
sem = threading.Semaphore(0)
for i in range(3):
self.executor.submit(acquire_lock, sem)
self.assertEqual(len(self.executor._threads), 3)
for i in range(3):
sem.release()
self.executor.shutdown()
for t in self.executor._threads:
t.join()
def test_context_manager_shutdown(self):
with futures.ThreadPoolExecutor(max_workers=5) as e:
executor = e
self.assertEqual(list(e.map(abs, range(-5, 5))),
[5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
for t in executor._threads:
t.join()
def test_del_shutdown(self):
executor = futures.ThreadPoolExecutor(max_workers=5)
res = executor.map(abs, range(-5, 5))
threads = executor._threads
del executor
for t in threads:
t.join()
# Make sure the results were all computed before the
# executor got shutdown.
assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
def test_shutdown_no_wait(self):
# Ensure that the executor cleans up the threads when calling
# shutdown with wait=False
executor = futures.ThreadPoolExecutor(max_workers=5)
res = executor.map(abs, range(-5, 5))
threads = executor._threads
executor.shutdown(wait=False)
for t in threads:
t.join()
# Make sure the results were all computed before the
# executor got shutdown.
assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
def test_thread_names_assigned(self):
executor = futures.ThreadPoolExecutor(
max_workers=5, thread_name_prefix='SpecialPool')
executor.map(abs, range(-5, 5))
threads = executor._threads
del executor
support.gc_collect() # For PyPy or other GCs.
for t in threads:
self.assertRegex(t.name, r'^SpecialPool_[0-4]$')
t.join()
def test_thread_names_default(self):
executor = futures.ThreadPoolExecutor(max_workers=5)
executor.map(abs, range(-5, 5))
threads = executor._threads
del executor
support.gc_collect() # For PyPy or other GCs.
for t in threads:
# Ensure that our default name is reasonably sane and unique when
# no thread_name_prefix was supplied.
self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$')
t.join()
def test_cancel_futures_wait_false(self):
# Can only be reliably tested for TPE, since PPE often hangs with
# `wait=False` (even without *cancel_futures*).
rc, out, err = assert_python_ok('-c', """if True:
from concurrent.futures import ThreadPoolExecutor
from test.test_concurrent_futures.test_shutdown import sleep_and_print
if __name__ == "__main__":
t = ThreadPoolExecutor()
t.submit(sleep_and_print, .1, "apple")
t.shutdown(wait=False, cancel_futures=True)
""")
# Errors in atexit hooks don't change the process exit code, check
# stderr manually.
self.assertFalse(err)
# gh-116682: stdout may be empty if shutdown happens before task
# starts executing.
self.assertIn(out.strip(), [b"apple", b""])
class ProcessPoolShutdownTest(ExecutorShutdownTest):
def test_processes_terminate(self):
def acquire_lock(lock):
lock.acquire()
mp_context = self.get_context()
if mp_context.get_start_method(allow_none=False) == "fork":
# fork pre-spawns, not on demand.
expected_num_processes = self.worker_count
else:
expected_num_processes = 3
sem = mp_context.Semaphore(0)
for _ in range(3):
self.executor.submit(acquire_lock, sem)
self.assertEqual(len(self.executor._processes), expected_num_processes)
for _ in range(3):
sem.release()
processes = self.executor._processes
self.executor.shutdown()
for p in processes.values():
p.join()
def test_context_manager_shutdown(self):
with futures.ProcessPoolExecutor(
max_workers=5, mp_context=self.get_context()) as e:
processes = e._processes
self.assertEqual(list(e.map(abs, range(-5, 5))),
[5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
for p in processes.values():
p.join()
def test_del_shutdown(self):
executor = futures.ProcessPoolExecutor(
max_workers=5, mp_context=self.get_context())
res = executor.map(abs, range(-5, 5))
executor_manager_thread = executor._executor_manager_thread
processes = executor._processes
call_queue = executor._call_queue
executor_manager_thread = executor._executor_manager_thread
del executor
support.gc_collect() # For PyPy or other GCs.
# Make sure that all the executor resources were properly cleaned by
# the shutdown process
executor_manager_thread.join()
for p in processes.values():
p.join()
call_queue.join_thread()
# Make sure the results were all computed before the
# executor got shutdown.
assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
def test_shutdown_no_wait(self):
# Ensure that the executor cleans up the processes when calling
# shutdown with wait=False
executor = futures.ProcessPoolExecutor(
max_workers=5, mp_context=self.get_context())
res = executor.map(abs, range(-5, 5))
processes = executor._processes
call_queue = executor._call_queue
executor_manager_thread = executor._executor_manager_thread
executor.shutdown(wait=False)
# Make sure that all the executor resources were properly cleaned by
# the shutdown process
executor_manager_thread.join()
for p in processes.values():
p.join()
call_queue.join_thread()
# Make sure the results were all computed before the executor got
# shutdown.
assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
@classmethod
def _failing_task_gh_132969(cls, n):
raise ValueError("failing task")
@classmethod
def _good_task_gh_132969(cls, n):
time.sleep(0.1 * n)
return n
def _run_test_issue_gh_132969(self, max_workers):
# max_workers=2 will repro exception
# max_workers=4 will repro exception and then hang
# Repro conditions
# max_tasks_per_child=1
# a task ends abnormally
# shutdown(wait=False) is called
start_method = self.get_context().get_start_method()
if (start_method == "fork" or
(start_method == "forkserver" and sys.platform.startswith("win"))):
self.skipTest(f"Skipping test for {start_method = }")
executor = futures.ProcessPoolExecutor(
max_workers=max_workers,
max_tasks_per_child=1,
mp_context=self.get_context())
f1 = executor.submit(ProcessPoolShutdownTest._good_task_gh_132969, 1)
f2 = executor.submit(ProcessPoolShutdownTest._failing_task_gh_132969, 2)
f3 = executor.submit(ProcessPoolShutdownTest._good_task_gh_132969, 3)
result = 0
try:
result += f1.result()
result += f2.result()
result += f3.result()
except ValueError:
# stop processing results upon first exception
pass
# Ensure that the executor cleans up after called
# shutdown with wait=False
executor_manager_thread = executor._executor_manager_thread
executor.shutdown(wait=False)
time.sleep(0.2)
executor_manager_thread.join()
return result
def test_shutdown_gh_132969_case_1(self):
# gh-132969: test that exception "object of type 'NoneType' has no len()"
# is not raised when shutdown(wait=False) is called.
result = self._run_test_issue_gh_132969(2)
self.assertEqual(result, 1)
def test_shutdown_gh_132969_case_2(self):
# gh-132969: test that process does not hang and
# exception "object of type 'NoneType' has no len()" is not raised
# when shutdown(wait=False) is called.
result = self._run_test_issue_gh_132969(4)
self.assertEqual(result, 1)
create_executor_tests(globals(), ProcessPoolShutdownTest,
executor_mixins=(ProcessPoolForkMixin,
ProcessPoolForkserverMixin,
ProcessPoolSpawnMixin))
def setUpModule():
setup_module()
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,122 @@
import contextlib
import multiprocessing as mp
import multiprocessing.process
import multiprocessing.util
import os
import threading
import unittest
from concurrent import futures
from test import support
from .executor import ExecutorTest, mul
from .util import BaseTestCase, ThreadPoolMixin, setup_module
class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase):
def test_map_submits_without_iteration(self):
"""Tests verifying issue 11777."""
finished = []
def record_finished(n):
finished.append(n)
self.executor.map(record_finished, range(10))
self.executor.shutdown(wait=True)
self.assertCountEqual(finished, range(10))
def test_default_workers(self):
executor = self.executor_type()
expected = min(32, (os.process_cpu_count() or 1) + 4)
self.assertEqual(executor._max_workers, expected)
def test_saturation(self):
executor = self.executor_type(4)
def acquire_lock(lock):
lock.acquire()
sem = threading.Semaphore(0)
for i in range(15 * executor._max_workers):
executor.submit(acquire_lock, sem)
self.assertEqual(len(executor._threads), executor._max_workers)
for i in range(15 * executor._max_workers):
sem.release()
executor.shutdown(wait=True)
@support.requires_gil_enabled("gh-117344: test is flaky without the GIL")
def test_idle_thread_reuse(self):
executor = self.executor_type()
executor.submit(mul, 21, 2).result()
executor.submit(mul, 6, 7).result()
executor.submit(mul, 3, 14).result()
self.assertEqual(len(executor._threads), 1)
executor.shutdown(wait=True)
@support.requires_fork()
@unittest.skipUnless(hasattr(os, 'register_at_fork'), 'need os.register_at_fork')
@support.requires_resource('cpu')
def test_hang_global_shutdown_lock(self):
# bpo-45021: _global_shutdown_lock should be reinitialized in the child
# process, otherwise it will never exit
def submit(pool):
pool.submit(submit, pool)
with futures.ThreadPoolExecutor(1) as pool:
pool.submit(submit, pool)
for _ in range(50):
with futures.ProcessPoolExecutor(1, mp_context=mp.get_context('fork')) as workers:
workers.submit(tuple)
@support.requires_fork()
@unittest.skipUnless(hasattr(os, 'register_at_fork'), 'need os.register_at_fork')
@unittest.expectedFailure # TODO: RUSTPYTHON AssertionError: DeprecationWarning not triggered
def test_process_fork_from_a_threadpool(self):
# bpo-43944: clear concurrent.futures.thread._threads_queues after fork,
# otherwise child process will try to join parent thread
def fork_process_and_return_exitcode():
# Ignore the warning about fork with threads.
with self.assertWarnsRegex(DeprecationWarning,
r"use of fork\(\) may lead to deadlocks in the child"):
p = mp.get_context('fork').Process(target=lambda: 1)
p.start()
p.join()
return p.exitcode
with futures.ThreadPoolExecutor(1) as pool:
process_exitcode = pool.submit(fork_process_and_return_exitcode).result()
self.assertEqual(process_exitcode, 0)
def test_executor_map_current_future_cancel(self):
stop_event = threading.Event()
log = []
def log_n_wait(ident):
log.append(f"{ident=} started")
try:
stop_event.wait()
finally:
log.append(f"{ident=} stopped")
with self.executor_type(max_workers=1) as pool:
# submit work to saturate the pool
fut = pool.submit(log_n_wait, ident="first")
try:
with contextlib.closing(
pool.map(log_n_wait, ["second", "third"], timeout=0)
) as gen:
with self.assertRaises(TimeoutError):
next(gen)
finally:
stop_event.set()
fut.result()
# ident='second' is cancelled as a result of raising a TimeoutError
# ident='third' is cancelled because it remained in the collection of futures
self.assertListEqual(log, ["ident='first' started", "ident='first' stopped"])
def setUpModule():
setup_module()
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,205 @@
import sys
import threading
import unittest
from concurrent import futures
from test import support
from test.support import threading_helper
from .util import (
CANCELLED_FUTURE, CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
create_executor_tests, setup_module,
BaseTestCase, ThreadPoolMixin,
ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin)
def mul(x, y):
return x * y
def wait_and_raise(e):
e.wait()
raise Exception('this is an exception')
class WaitTests:
def test_20369(self):
# See https://bugs.python.org/issue20369
future = self.executor.submit(mul, 1, 2)
done, not_done = futures.wait([future, future],
return_when=futures.ALL_COMPLETED)
self.assertEqual({future}, done)
self.assertEqual(set(), not_done)
def test_first_completed(self):
event = self.create_event()
future1 = self.executor.submit(mul, 21, 2)
future2 = self.executor.submit(event.wait)
try:
done, not_done = futures.wait(
[CANCELLED_FUTURE, future1, future2],
return_when=futures.FIRST_COMPLETED)
self.assertEqual(set([future1]), done)
self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
finally:
event.set()
future2.result() # wait for job to finish
def test_first_completed_some_already_completed(self):
event = self.create_event()
future1 = self.executor.submit(event.wait)
try:
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
return_when=futures.FIRST_COMPLETED)
self.assertEqual(
set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
finished)
self.assertEqual(set([future1]), pending)
finally:
event.set()
future1.result() # wait for job to finish
def test_first_exception(self):
event1 = self.create_event()
event2 = self.create_event()
try:
future1 = self.executor.submit(mul, 2, 21)
future2 = self.executor.submit(wait_and_raise, event1)
future3 = self.executor.submit(event2.wait)
# Ensure that future1 is completed before future2 finishes
def wait_for_future1():
future1.result()
event1.set()
t = threading.Thread(target=wait_for_future1)
t.start()
finished, pending = futures.wait(
[future1, future2, future3],
return_when=futures.FIRST_EXCEPTION)
self.assertEqual(set([future1, future2]), finished)
self.assertEqual(set([future3]), pending)
threading_helper.join_thread(t)
finally:
event1.set()
event2.set()
future3.result() # wait for job to finish
def test_first_exception_some_already_complete(self):
event = self.create_event()
future1 = self.executor.submit(divmod, 21, 0)
future2 = self.executor.submit(event.wait)
try:
finished, pending = futures.wait(
[SUCCESSFUL_FUTURE,
CANCELLED_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
future1, future2],
return_when=futures.FIRST_EXCEPTION)
self.assertEqual(set([SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
future1]), finished)
self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
finally:
event.set()
future2.result() # wait for job to finish
def test_first_exception_one_already_failed(self):
event = self.create_event()
future1 = self.executor.submit(event.wait)
try:
finished, pending = futures.wait(
[EXCEPTION_FUTURE, future1],
return_when=futures.FIRST_EXCEPTION)
self.assertEqual(set([EXCEPTION_FUTURE]), finished)
self.assertEqual(set([future1]), pending)
finally:
event.set()
future1.result() # wait for job to finish
def test_all_completed(self):
future1 = self.executor.submit(divmod, 2, 0)
future2 = self.executor.submit(mul, 2, 21)
finished, pending = futures.wait(
[SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
future1,
future2],
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
future1,
future2]), finished)
self.assertEqual(set(), pending)
def test_timeout(self):
short_timeout = 0.050
event = self.create_event()
future = self.executor.submit(event.wait)
try:
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future],
timeout=short_timeout,
return_when=futures.ALL_COMPLETED)
self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE]),
finished)
self.assertEqual(set([future]), pending)
finally:
event.set()
future.result() # wait for job to finish
class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, BaseTestCase):
def test_pending_calls_race(self):
# Issue #14406: multi-threaded race condition when waiting on all
# futures.
event = threading.Event()
def future_func():
event.wait()
oldswitchinterval = sys.getswitchinterval()
support.setswitchinterval(1e-6)
try:
fs = {self.executor.submit(future_func) for i in range(100)}
event.set()
futures.wait(fs, return_when=futures.ALL_COMPLETED)
finally:
sys.setswitchinterval(oldswitchinterval)
create_executor_tests(globals(), WaitTests,
executor_mixins=(ProcessPoolForkMixin,
ProcessPoolForkserverMixin,
ProcessPoolSpawnMixin))
def setUpModule():
setup_module()
if __name__ == "__main__":
unittest.main()

169
Lib/test/test_concurrent_futures/util.py vendored Normal file
View File

@@ -0,0 +1,169 @@
import multiprocessing
import sys
import threading
import time
import unittest
from concurrent import futures
from concurrent.futures._base import (
PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
)
from concurrent.futures.process import _check_system_limits
from test import support
from test.support import threading_helper
def create_future(state=PENDING, exception=None, result=None):
f = Future()
f._state = state
f._exception = exception
f._result = result
return f
PENDING_FUTURE = create_future(state=PENDING)
RUNNING_FUTURE = create_future(state=RUNNING)
CANCELLED_FUTURE = create_future(state=CANCELLED)
CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError())
SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
class BaseTestCase(unittest.TestCase):
def setUp(self):
self._thread_key = threading_helper.threading_setup()
def tearDown(self):
support.reap_children()
threading_helper.threading_cleanup(*self._thread_key)
class ExecutorMixin:
worker_count = 5
executor_kwargs = {}
def setUp(self):
super().setUp()
self.t1 = time.monotonic()
if hasattr(self, "ctx"):
self.executor = self.executor_type(
max_workers=self.worker_count,
mp_context=self.get_context(),
**self.executor_kwargs)
self.manager = self.get_context().Manager()
else:
self.executor = self.executor_type(
max_workers=self.worker_count,
**self.executor_kwargs)
self.manager = None
def tearDown(self):
self.executor.shutdown(wait=True)
self.executor = None
if self.manager is not None:
self.manager.shutdown()
self.manager = None
dt = time.monotonic() - self.t1
if support.verbose:
print("%.2fs" % dt, end=' ')
self.assertLess(dt, 300, "synchronization issue: test lasted too long")
super().tearDown()
def get_context(self):
return multiprocessing.get_context(self.ctx)
class ThreadPoolMixin(ExecutorMixin):
executor_type = futures.ThreadPoolExecutor
def create_event(self):
return threading.Event()
class ProcessPoolForkMixin(ExecutorMixin):
executor_type = futures.ProcessPoolExecutor
ctx = "fork"
def get_context(self):
try:
_check_system_limits()
except NotImplementedError:
self.skipTest("ProcessPoolExecutor unavailable on this system")
if sys.platform == "win32":
self.skipTest("require unix system")
if support.check_sanitizer(thread=True):
self.skipTest("TSAN doesn't support threads after fork")
return super().get_context()
def create_event(self):
return self.manager.Event()
class ProcessPoolSpawnMixin(ExecutorMixin):
executor_type = futures.ProcessPoolExecutor
ctx = "spawn"
def get_context(self):
try:
_check_system_limits()
except NotImplementedError:
self.skipTest("ProcessPoolExecutor unavailable on this system")
return super().get_context()
def create_event(self):
return self.manager.Event()
class ProcessPoolForkserverMixin(ExecutorMixin):
executor_type = futures.ProcessPoolExecutor
ctx = "forkserver"
def get_context(self):
try:
_check_system_limits()
except NotImplementedError:
self.skipTest("ProcessPoolExecutor unavailable on this system")
if sys.platform == "win32":
self.skipTest("require unix system")
if support.check_sanitizer(thread=True):
self.skipTest("TSAN doesn't support threads after fork")
return super().get_context()
def create_event(self):
return self.manager.Event()
def create_executor_tests(remote_globals, mixin, bases=(BaseTestCase,),
executor_mixins=(ThreadPoolMixin,
ProcessPoolForkMixin,
ProcessPoolForkserverMixin,
ProcessPoolSpawnMixin)):
def strip_mixin(name):
if name.endswith(('Mixin', 'Tests')):
return name[:-5]
elif name.endswith('Test'):
return name[:-4]
else:
return name
module = remote_globals['__name__']
for exe in executor_mixins:
name = ("%s%sTest"
% (strip_mixin(exe.__name__), strip_mixin(mixin.__name__)))
cls = type(name, (mixin,) + (exe,) + bases, {'__module__': module})
remote_globals[name] = cls
def setup_module():
try:
_check_system_limits()
except NotImplementedError:
pass
else:
unittest.addModuleCleanup(multiprocessing.util._cleanup_tests)
thread_info = threading_helper.threading_setup()
unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)