Merge pull request #2485 from RustPython/misc-aiohttp

Misc fixes to get aiohttp working
This commit is contained in:
Noah
2021-02-27 15:06:27 -06:00
committed by GitHub
28 changed files with 1289 additions and 319 deletions

3
Cargo.lock generated
View File

@@ -1101,8 +1101,7 @@ checksum = "8dd5a6d5999d9907cda8ed67bbd137d3af8085216c2ac62de5be860bd41f304a"
[[package]]
name = "lock_api"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd96ffd135b2fd7b973ac026d28085defbe8983df057ced3eb4f2130b0831312"
source = "git+https://github.com/coolreader18/parking_lot?branch=remutex-is_owned#391cf555e94c05450cea2e88b42c95c6631ab163"
dependencies = [
"scopeguard",
]

View File

@@ -69,4 +69,4 @@ opt-level = 3
[patch.crates-io]
# REDOX START, Uncommment when you want to compile/check with redoxer
# REDOX END
lock_api = { git = "https://github.com/coolreader18/parking_lot", branch = "remutex-is_owned" }

View File

@@ -10,9 +10,44 @@ from concurrent.futures._base import (FIRST_COMPLETED,
ALL_COMPLETED,
CancelledError,
TimeoutError,
InvalidStateError,
BrokenExecutor,
Future,
Executor,
wait,
as_completed)
from concurrent.futures.process import ProcessPoolExecutor
from concurrent.futures.thread import ThreadPoolExecutor
__all__ = (
'FIRST_COMPLETED',
'FIRST_EXCEPTION',
'ALL_COMPLETED',
'CancelledError',
'TimeoutError',
'BrokenExecutor',
'Future',
'Executor',
'wait',
'as_completed',
'ProcessPoolExecutor',
'ThreadPoolExecutor',
)
def __dir__():
return __all__ + ('__author__', '__doc__')
def __getattr__(name):
global ProcessPoolExecutor, ThreadPoolExecutor
if name == 'ProcessPoolExecutor':
from .process import ProcessPoolExecutor as pe
ProcessPoolExecutor = pe
return pe
if name == 'ThreadPoolExecutor':
from .thread import ThreadPoolExecutor as te
ThreadPoolExecutor = te
return te
raise AttributeError(f"module {__name__} has no attribute {name}")

View File

@@ -53,6 +53,10 @@ class TimeoutError(Error):
"""The operation exceeded the given deadline."""
pass
class InvalidStateError(Error):
"""The operation is not allowed in this state."""
pass
class _Waiter(object):
"""Provides the event that wait() and as_completed() block on."""
def __init__(self):
@@ -170,6 +174,29 @@ def _create_and_install_waiters(fs, return_when):
return waiter
def _yield_finished_futures(fs, waiter, ref_collect):
"""
Iterate on the list *fs*, yielding finished futures one by one in
reverse order.
Before yielding a future, *waiter* is removed from its waiters
and the future is removed from each set in the collection of sets
*ref_collect*.
The aim of this function is to avoid keeping stale references after
the future is yielded and before the iterator resumes.
"""
while fs:
f = fs[-1]
for futures_set in ref_collect:
futures_set.remove(f)
with f._condition:
f._waiters.remove(waiter)
del f
# Careful not to keep a reference to the popped value
yield fs.pop()
def as_completed(fs, timeout=None):
"""An iterator over the given futures that yields each as it completes.
@@ -189,28 +216,30 @@ def as_completed(fs, timeout=None):
before the given timeout.
"""
if timeout is not None:
end_time = timeout + time.time()
end_time = timeout + time.monotonic()
fs = set(fs)
total_futures = len(fs)
with _AcquireFutures(fs):
finished = set(
f for f in fs
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
pending = fs - finished
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
finished = list(finished)
try:
yield from finished
yield from _yield_finished_futures(finished, waiter,
ref_collect=(fs,))
while pending:
if timeout is None:
wait_timeout = None
else:
wait_timeout = end_time - time.time()
wait_timeout = end_time - time.monotonic()
if wait_timeout < 0:
raise TimeoutError(
'%d (of %d) futures unfinished' % (
len(pending), len(fs)))
len(pending), total_futures))
waiter.event.wait(wait_timeout)
@@ -219,11 +248,13 @@ def as_completed(fs, timeout=None):
waiter.finished_futures = []
waiter.event.clear()
for future in finished:
yield future
pending.remove(future)
# reverse to keep finishing order
finished.reverse()
yield from _yield_finished_futures(finished, waiter,
ref_collect=(fs, pending))
finally:
# Remove waiter from unfinished futures
for f in fs:
with f._condition:
f._waiters.remove(waiter)
@@ -373,7 +404,10 @@ class Future(object):
if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
self._done_callbacks.append(fn)
return
fn(self)
try:
fn(self)
except Exception:
LOGGER.exception('exception calling callback for %r', self)
def result(self, timeout=None):
"""Return the result of the call that the future represents.
@@ -486,6 +520,8 @@ class Future(object):
Should only be used by Executor implementations and unit tests.
"""
with self._condition:
if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}:
raise InvalidStateError('{}: {!r}'.format(self._state, self))
self._result = result
self._state = FINISHED
for waiter in self._waiters:
@@ -499,6 +535,8 @@ class Future(object):
Should only be used by Executor implementations and unit tests.
"""
with self._condition:
if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}:
raise InvalidStateError('{}: {!r}'.format(self._state, self))
self._exception = exception
self._state = FINISHED
for waiter in self._waiters:
@@ -509,7 +547,7 @@ class Future(object):
class Executor(object):
"""This is an abstract base class for concrete asynchronous executors."""
def submit(self, fn, *args, **kwargs):
def submit(*args, **kwargs):
"""Submits a callable to be executed with the given arguments.
Schedules the callable to be executed as fn(*args, **kwargs) and returns
@@ -518,7 +556,21 @@ class Executor(object):
Returns:
A Future representing the given call.
"""
if len(args) >= 2:
pass
elif not args:
raise TypeError("descriptor 'submit' of 'Executor' object "
"needs an argument")
elif 'fn' in kwargs:
import warnings
warnings.warn("Passing 'fn' as keyword argument is deprecated",
DeprecationWarning, stacklevel=2)
else:
raise TypeError('submit expected at least 1 positional argument, '
'got %d' % (len(args)-1))
raise NotImplementedError()
submit.__text_signature__ = '($self, fn, /, *args, **kwargs)'
def map(self, fn, *iterables, timeout=None, chunksize=1):
"""Returns an iterator equivalent to map(fn, iter).
@@ -543,7 +595,7 @@ class Executor(object):
Exception: If fn(*args) raises for any values.
"""
if timeout is not None:
end_time = timeout + time.time()
end_time = timeout + time.monotonic()
fs = [self.submit(fn, *args) for args in zip(*iterables)]
@@ -551,11 +603,14 @@ class Executor(object):
# before the first iterator value is required.
def result_iterator():
try:
for future in fs:
# reverse to keep finishing order
fs.reverse()
while fs:
# Careful not to keep a reference to the popped future
if timeout is None:
yield future.result()
yield fs.pop().result()
else:
yield future.result(end_time - time.time())
yield fs.pop().result(end_time - time.monotonic())
finally:
for future in fs:
future.cancel()
@@ -580,3 +635,9 @@ class Executor(object):
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(wait=True)
return False
class BrokenExecutor(RuntimeError):
"""
Raised when a executor has become non-functional after a severe failure.
"""

View File

@@ -3,15 +3,15 @@
"""Implements ProcessPoolExecutor.
The follow diagram and text describe the data-flow through the system:
The following diagram and text describe the data-flow through the system:
|======================= In-process =====================|== Out-of-process ==|
+----------+ +----------+ +--------+ +-----------+ +---------+
| | => | Work Ids | => | | => | Call Q | => | |
| | +----------+ | | +-----------+ | |
| | | ... | | | | ... | | |
| | | 6 | | | | 5, call() | | |
| | => | Work Ids | | | | Call Q | | Process |
| | +----------+ | | +-----------+ | Pool |
| | | ... | | | | ... | +---------+
| | | 6 | => | | => | 5, call() | => | |
| | | 7 | | | | ... | | |
| Process | | ... | | Local | +-----------+ | Process |
| Pool | +----------+ | Worker | | #1..n |
@@ -50,13 +50,14 @@ import os
from concurrent.futures import _base
import queue
from queue import Full
import multiprocessing
from multiprocessing import SimpleQueue
from multiprocessing.connection import wait
import multiprocessing as mp
import multiprocessing.connection
from multiprocessing.queues import Queue
import threading
import weakref
from functools import partial
import itertools
import sys
import traceback
# Workers are created as daemon threads and processes. This is done to allow the
@@ -73,16 +74,33 @@ import traceback
# workers to exit when their work queues are empty and then waits until the
# threads/processes finish.
_threads_queues = weakref.WeakKeyDictionary()
_shutdown = False
_threads_wakeups = weakref.WeakKeyDictionary()
_global_shutdown = False
class _ThreadWakeup:
def __init__(self):
self._reader, self._writer = mp.Pipe(duplex=False)
def close(self):
self._writer.close()
self._reader.close()
def wakeup(self):
self._writer.send_bytes(b"")
def clear(self):
while self._reader.poll():
self._reader.recv_bytes()
def _python_exit():
global _shutdown
_shutdown = True
items = list(_threads_queues.items())
for t, q in items:
q.put(None)
for t, q in items:
global _global_shutdown
_global_shutdown = True
items = list(_threads_wakeups.items())
for _, thread_wakeup in items:
thread_wakeup.wakeup()
for t, _ in items:
t.join()
# Controls how many more calls than processes will be queued in the call queue.
@@ -91,6 +109,13 @@ def _python_exit():
# (Futures in the call queue cannot be cancelled).
EXTRA_QUEUED_CALLS = 1
# On Windows, WaitForMultipleObjects is used to wait for processes to finish.
# It can wait on, at most, 63 objects. There is an overhead of two objects:
# - the result queue reader
# - the thread wakeup reader
_MAX_WINDOWS_WORKERS = 63 - 2
# Hack to embed stringification of remote traceback in local traceback
class _RemoteTraceback(Exception):
@@ -132,6 +157,26 @@ class _CallItem(object):
self.args = args
self.kwargs = kwargs
class _SafeQueue(Queue):
"""Safe Queue set exception to the future object linked to a job"""
def __init__(self, max_size=0, *, ctx, pending_work_items):
self.pending_work_items = pending_work_items
super().__init__(max_size, ctx=ctx)
def _on_queue_feeder_error(self, e, obj):
if isinstance(obj, _CallItem):
tb = traceback.format_exception(type(e), e, e.__traceback__)
e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
work_item = self.pending_work_items.pop(obj.work_id, None)
# work_item can be None if another process terminated. In this case,
# the queue_manager_thread fails all work_items with BrokenProcessPool
if work_item is not None:
work_item.future.set_exception(e)
else:
super()._on_queue_feeder_error(e, obj)
def _get_chunks(*iterables, chunksize):
""" Iterates over zip()ed iterables in chunks. """
it = zip(*iterables)
@@ -152,19 +197,38 @@ def _process_chunk(fn, chunk):
"""
return [fn(*args) for args in chunk]
def _process_worker(call_queue, result_queue):
def _sendback_result(result_queue, work_id, result=None, exception=None):
"""Safely send back the given result or exception"""
try:
result_queue.put(_ResultItem(work_id, result=result,
exception=exception))
except BaseException as e:
exc = _ExceptionWithTraceback(e, e.__traceback__)
result_queue.put(_ResultItem(work_id, exception=exc))
def _process_worker(call_queue, result_queue, initializer, initargs):
"""Evaluates calls from call_queue and places the results in result_queue.
This worker is run in a separate process.
Args:
call_queue: A multiprocessing.Queue of _CallItems that will be read and
call_queue: A ctx.Queue of _CallItems that will be read and
evaluated by the worker.
result_queue: A multiprocessing.Queue of _ResultItems that will written
result_queue: A ctx.Queue of _ResultItems that will written
to by the worker.
shutdown: A multiprocessing.Event that will be set as a signal to the
worker that it should exit when call_queue is empty.
initializer: A callable initializer, or None
initargs: A tuple of args for the initializer
"""
if initializer is not None:
try:
initializer(*initargs)
except BaseException:
_base.LOGGER.critical('Exception in initializer:', exc_info=True)
# The parent will notice that the process stopped and
# mark the pool broken
return
while True:
call_item = call_queue.get(block=True)
if call_item is None:
@@ -175,10 +239,15 @@ def _process_worker(call_queue, result_queue):
r = call_item.fn(*call_item.args, **call_item.kwargs)
except BaseException as e:
exc = _ExceptionWithTraceback(e, e.__traceback__)
result_queue.put(_ResultItem(call_item.work_id, exception=exc))
_sendback_result(result_queue, call_item.work_id, exception=exc)
else:
result_queue.put(_ResultItem(call_item.work_id,
result=r))
_sendback_result(result_queue, call_item.work_id, result=r)
del r
# Liberate the resource as soon as possible, to avoid holding onto
# open files or shared memory that is not needed anymore
del call_item
def _add_call_item_to_queue(pending_work_items,
work_ids,
@@ -217,12 +286,14 @@ def _add_call_item_to_queue(pending_work_items,
del pending_work_items[work_id]
continue
def _queue_management_worker(executor_reference,
processes,
pending_work_items,
work_ids_queue,
call_queue,
result_queue):
result_queue,
thread_wakeup):
"""Manages the communication between this process and the worker processes.
This function is run in a local thread.
@@ -231,60 +302,96 @@ def _queue_management_worker(executor_reference,
executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
this thread. Used to determine if the ProcessPoolExecutor has been
garbage collected and that this function can exit.
process: A list of the multiprocessing.Process instances used as
process: A list of the ctx.Process instances used as
workers.
pending_work_items: A dict mapping work ids to _WorkItems e.g.
{5: <_WorkItem...>, 6: <_WorkItem...>, ...}
work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
call_queue: A multiprocessing.Queue that will be filled with _CallItems
call_queue: A ctx.Queue that will be filled with _CallItems
derived from _WorkItems for processing by the process workers.
result_queue: A multiprocessing.Queue of _ResultItems generated by the
result_queue: A ctx.SimpleQueue of _ResultItems generated by the
process workers.
thread_wakeup: A _ThreadWakeup to allow waking up the
queue_manager_thread from the main Thread and avoid deadlocks
caused by permanently locked queues.
"""
executor = None
def shutting_down():
return _shutdown or executor is None or executor._shutdown_thread
return (_global_shutdown or executor is None
or executor._shutdown_thread)
def shutdown_worker():
# This is an upper bound
nb_children_alive = sum(p.is_alive() for p in processes.values())
for i in range(0, nb_children_alive):
call_queue.put_nowait(None)
# This is an upper bound on the number of children alive.
n_children_alive = sum(p.is_alive() for p in processes.values())
n_children_to_stop = n_children_alive
n_sentinels_sent = 0
# Send the right number of sentinels, to make sure all children are
# properly terminated.
while n_sentinels_sent < n_children_to_stop and n_children_alive > 0:
for i in range(n_children_to_stop - n_sentinels_sent):
try:
call_queue.put_nowait(None)
n_sentinels_sent += 1
except Full:
break
n_children_alive = sum(p.is_alive() for p in processes.values())
# Release the queue's resources as soon as possible.
call_queue.close()
# If .join() is not called on the created processes then
# some multiprocessing.Queue methods may deadlock on Mac OS X.
# some ctx.Queue methods may deadlock on Mac OS X.
for p in processes.values():
p.join()
reader = result_queue._reader
result_reader = result_queue._reader
wakeup_reader = thread_wakeup._reader
readers = [result_reader, wakeup_reader]
while True:
_add_call_item_to_queue(pending_work_items,
work_ids_queue,
call_queue)
sentinels = [p.sentinel for p in processes.values()]
assert sentinels
ready = wait([reader] + sentinels)
if reader in ready:
result_item = reader.recv()
else:
# Wait for a result to be ready in the result_queue while checking
# that all worker processes are still running, or for a wake up
# signal send. The wake up signals come either from new tasks being
# submitted, from the executor being shutdown/gc-ed, or from the
# shutdown of the python interpreter.
worker_sentinels = [p.sentinel for p in processes.values()]
ready = mp.connection.wait(readers + worker_sentinels)
cause = None
is_broken = True
if result_reader in ready:
try:
result_item = result_reader.recv()
is_broken = False
except BaseException as e:
cause = traceback.format_exception(type(e), e, e.__traceback__)
elif wakeup_reader in ready:
is_broken = False
result_item = None
thread_wakeup.clear()
if is_broken:
# Mark the process pool broken so that submits fail right now.
executor = executor_reference()
if executor is not None:
executor._broken = True
executor._broken = ('A child process terminated '
'abruptly, the process pool is not '
'usable anymore')
executor._shutdown_thread = True
executor = None
bpe = BrokenProcessPool("A process in the process pool was "
"terminated abruptly while the future was "
"running or pending.")
if cause is not None:
bpe.__cause__ = _RemoteTraceback(
f"\n'''\n{''.join(cause)}'''")
# All futures in flight must be marked failed
for work_id, work_item in pending_work_items.items():
work_item.future.set_exception(
BrokenProcessPool(
"A process in the process pool was "
"terminated abruptly while the future was "
"running or pending."
))
work_item.future.set_exception(bpe)
# Delete references to object. See issue16284
del work_item
pending_work_items.clear()
@@ -313,6 +420,9 @@ def _queue_management_worker(executor_reference,
work_item.future.set_result(result_item.result)
# Delete references to object. See issue16284
del work_item
# Delete reference to result_item
del result_item
# Check whether we should start shutting down.
executor = executor_reference()
# No more work items can be added if:
@@ -321,6 +431,10 @@ def _queue_management_worker(executor_reference,
# - The executor that owns this worker has been shutdown.
if shutting_down():
try:
# Flag the executor as shutting down as early as possible if it
# is not gc-ed yet.
if executor is not None:
executor._shutdown_thread = True
# Since no new work items can be added, it is safe to shutdown
# this thread if there are no pending work items.
if not pending_work_items:
@@ -332,8 +446,11 @@ def _queue_management_worker(executor_reference,
pass
executor = None
_system_limits_checked = False
_system_limited = None
def _check_system_limits():
global _system_limits_checked, _system_limited
if _system_limits_checked:
@@ -353,11 +470,24 @@ def _check_system_limits():
# minimum number of semaphores available
# according to POSIX
return
_system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
_system_limited = ("system provides too few semaphores (%d"
" available, 256 necessary)" % nsems_max)
raise NotImplementedError(_system_limited)
class BrokenProcessPool(RuntimeError):
def _chain_from_iterable_of_lists(iterable):
"""
Specialized implementation of itertools.chain.from_iterable.
Each item in *iterable* should be a list. This function is
careful not to keep references to yielded objects.
"""
for element in iterable:
element.reverse()
while element:
yield element.pop()
class BrokenProcessPool(_base.BrokenExecutor):
"""
Raised when a process in a ProcessPoolExecutor terminated abruptly
while a future was in the running state.
@@ -365,36 +495,48 @@ class BrokenProcessPool(RuntimeError):
class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None):
def __init__(self, max_workers=None, mp_context=None,
initializer=None, initargs=()):
"""Initializes a new ProcessPoolExecutor instance.
Args:
max_workers: The maximum number of processes that can be used to
execute the given calls. If None or not given then as many
worker processes will be created as the machine has processors.
mp_context: A multiprocessing context to launch the workers. This
object should provide SimpleQueue, Queue and Process.
initializer: A callable used to initialize worker processes.
initargs: A tuple of arguments to pass to the initializer.
"""
_check_system_limits()
if max_workers is None:
self._max_workers = os.cpu_count() or 1
if sys.platform == 'win32':
self._max_workers = min(_MAX_WINDOWS_WORKERS,
self._max_workers)
else:
if max_workers <= 0:
raise ValueError("max_workers must be greater than 0")
elif (sys.platform == 'win32' and
max_workers > _MAX_WINDOWS_WORKERS):
raise ValueError(
f"max_workers must be <= {_MAX_WINDOWS_WORKERS}")
self._max_workers = max_workers
# Make the call queue slightly larger than the number of processes to
# prevent the worker processes from idling. But don't make it too big
# because futures in the call queue cannot be cancelled.
self._call_queue = multiprocessing.Queue(self._max_workers +
EXTRA_QUEUED_CALLS)
# Killed worker processes can produce spurious "broken pipe"
# tracebacks in the queue's own worker thread. But we detect killed
# processes anyway, so silence the tracebacks.
self._call_queue._ignore_epipe = True
self._result_queue = SimpleQueue()
self._work_ids = queue.Queue()
if mp_context is None:
mp_context = mp.get_context()
self._mp_context = mp_context
if initializer is not None and not callable(initializer):
raise TypeError("initializer must be a callable")
self._initializer = initializer
self._initargs = initargs
# Management thread
self._queue_management_thread = None
# Map of pids to processes
self._processes = {}
@@ -405,42 +547,91 @@ class ProcessPoolExecutor(_base.Executor):
self._queue_count = 0
self._pending_work_items = {}
# Create communication channels for the executor
# Make the call queue slightly larger than the number of processes to
# prevent the worker processes from idling. But don't make it too big
# because futures in the call queue cannot be cancelled.
queue_size = self._max_workers + EXTRA_QUEUED_CALLS
self._call_queue = _SafeQueue(
max_size=queue_size, ctx=self._mp_context,
pending_work_items=self._pending_work_items)
# Killed worker processes can produce spurious "broken pipe"
# tracebacks in the queue's own worker thread. But we detect killed
# processes anyway, so silence the tracebacks.
self._call_queue._ignore_epipe = True
self._result_queue = mp_context.SimpleQueue()
self._work_ids = queue.Queue()
# _ThreadWakeup is a communication channel used to interrupt the wait
# of the main loop of queue_manager_thread from another thread (e.g.
# when calling executor.submit or executor.shutdown). We do not use the
# _result_queue to send the wakeup signal to the queue_manager_thread
# as it could result in a deadlock if a worker process dies with the
# _result_queue write lock still acquired.
self._queue_management_thread_wakeup = _ThreadWakeup()
def _start_queue_management_thread(self):
# When the executor gets lost, the weakref callback will wake up
# the queue management thread.
def weakref_cb(_, q=self._result_queue):
q.put(None)
if self._queue_management_thread is None:
# When the executor gets garbarge collected, the weakref callback
# will wake up the queue management thread so that it can terminate
# if there is no pending work item.
def weakref_cb(_,
thread_wakeup=self._queue_management_thread_wakeup):
mp.util.debug('Executor collected: triggering callback for'
' QueueManager wakeup')
thread_wakeup.wakeup()
# Start the processes so that their sentinels are known.
self._adjust_process_count()
self._queue_management_thread = threading.Thread(
target=_queue_management_worker,
args=(weakref.ref(self, weakref_cb),
self._processes,
self._pending_work_items,
self._work_ids,
self._call_queue,
self._result_queue))
target=_queue_management_worker,
args=(weakref.ref(self, weakref_cb),
self._processes,
self._pending_work_items,
self._work_ids,
self._call_queue,
self._result_queue,
self._queue_management_thread_wakeup),
name="QueueManagerThread")
self._queue_management_thread.daemon = True
self._queue_management_thread.start()
_threads_queues[self._queue_management_thread] = self._result_queue
_threads_wakeups[self._queue_management_thread] = \
self._queue_management_thread_wakeup
def _adjust_process_count(self):
for _ in range(len(self._processes), self._max_workers):
p = multiprocessing.Process(
target=_process_worker,
args=(self._call_queue,
self._result_queue))
p = self._mp_context.Process(
target=_process_worker,
args=(self._call_queue,
self._result_queue,
self._initializer,
self._initargs))
p.start()
self._processes[p.pid] = p
def submit(self, fn, *args, **kwargs):
def submit(*args, **kwargs):
if len(args) >= 2:
self, fn, *args = args
elif not args:
raise TypeError("descriptor 'submit' of 'ProcessPoolExecutor' object "
"needs an argument")
elif 'fn' in kwargs:
fn = kwargs.pop('fn')
self, *args = args
import warnings
warnings.warn("Passing 'fn' as keyword argument is deprecated",
DeprecationWarning, stacklevel=2)
else:
raise TypeError('submit expected at least 1 positional argument, '
'got %d' % (len(args)-1))
with self._shutdown_lock:
if self._broken:
raise BrokenProcessPool('A child process terminated '
'abruptly, the process pool is not usable anymore')
raise BrokenProcessPool(self._broken)
if self._shutdown_thread:
raise RuntimeError('cannot schedule new futures after shutdown')
if _global_shutdown:
raise RuntimeError('cannot schedule new futures after '
'interpreter shutdown')
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
@@ -449,10 +640,11 @@ class ProcessPoolExecutor(_base.Executor):
self._work_ids.put(self._queue_count)
self._queue_count += 1
# Wake up queue management thread
self._result_queue.put(None)
self._queue_management_thread_wakeup.wakeup()
self._start_queue_management_thread()
return f
submit.__text_signature__ = _base.Executor.submit.__text_signature__
submit.__doc__ = _base.Executor.submit.__doc__
def map(self, fn, *iterables, timeout=None, chunksize=1):
@@ -482,22 +674,31 @@ class ProcessPoolExecutor(_base.Executor):
results = super().map(partial(_process_chunk, fn),
_get_chunks(*iterables, chunksize=chunksize),
timeout=timeout)
return itertools.chain.from_iterable(results)
return _chain_from_iterable_of_lists(results)
def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown_thread = True
if self._queue_management_thread:
# Wake up queue management thread
self._result_queue.put(None)
self._queue_management_thread_wakeup.wakeup()
if wait:
self._queue_management_thread.join()
# To reduce the risk of opening too many files, remove references to
# objects that use file descriptors.
self._queue_management_thread = None
self._call_queue = None
if self._call_queue is not None:
self._call_queue.close()
if wait:
self._call_queue.join_thread()
self._call_queue = None
self._result_queue = None
self._processes = None
if self._queue_management_thread_wakeup:
self._queue_management_thread_wakeup.close()
self._queue_management_thread_wakeup = None
shutdown.__doc__ = _base.Executor.shutdown.__doc__
atexit.register(_python_exit)

View File

@@ -7,6 +7,7 @@ __author__ = 'Brian Quinlan (brian@sweetapp.com)'
import atexit
from concurrent.futures import _base
import itertools
import queue
import threading
import weakref
@@ -40,6 +41,7 @@ def _python_exit():
atexit.register(_python_exit)
class _WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future
@@ -53,12 +55,24 @@ class _WorkItem(object):
try:
result = self.fn(*self.args, **self.kwargs)
except BaseException as e:
self.future.set_exception(e)
except BaseException as exc:
self.future.set_exception(exc)
# Break a reference cycle with the exception 'exc'
self = None
else:
self.future.set_result(result)
def _worker(executor_reference, work_queue):
def _worker(executor_reference, work_queue, initializer, initargs):
if initializer is not None:
try:
initializer(*initargs)
except BaseException:
_base.LOGGER.critical('Exception in initializer:', exc_info=True)
executor = executor_reference()
if executor is not None:
executor._initializer_failed()
return
try:
while True:
work_item = work_queue.get(block=True)
@@ -66,13 +80,24 @@ def _worker(executor_reference, work_queue):
work_item.run()
# Delete references to object. See issue16284
del work_item
# attempt to increment idle count
executor = executor_reference()
if executor is not None:
executor._idle_semaphore.release()
del executor
continue
executor = executor_reference()
# Exit if:
# - The interpreter is shutting down OR
# - The executor that owns the worker has been collected OR
# - The executor that owns the worker has been shutdown.
if _shutdown or executor is None or executor._shutdown:
# Flag the executor as shutting down as early as possible if it
# is not gc-ed yet.
if executor is not None:
executor._shutdown = True
# Notice other workers
work_queue.put(None)
return
@@ -80,33 +105,81 @@ def _worker(executor_reference, work_queue):
except BaseException:
_base.LOGGER.critical('Exception in worker', exc_info=True)
class BrokenThreadPool(_base.BrokenExecutor):
"""
Raised when a worker thread in a ThreadPoolExecutor failed initializing.
"""
class ThreadPoolExecutor(_base.Executor):
def __init__(self, max_workers=None, thread_name_prefix=''):
# Used to assign unique thread names when thread_name_prefix is not supplied.
_counter = itertools.count().__next__
def __init__(self, max_workers=None, thread_name_prefix='',
initializer=None, initargs=()):
"""Initializes a new ThreadPoolExecutor instance.
Args:
max_workers: The maximum number of threads that can be used to
execute the given calls.
thread_name_prefix: An optional name prefix to give our threads.
initializer: A callable used to initialize worker threads.
initargs: A tuple of arguments to pass to the initializer.
"""
if max_workers is None:
# Use this number because ThreadPoolExecutor is often
# used to overlap I/O instead of CPU work.
max_workers = (os.cpu_count() or 1) * 5
# ThreadPoolExecutor is often used to:
# * CPU bound task which releases GIL
# * I/O bound task (which releases GIL, of course)
#
# We use cpu_count + 4 for both types of tasks.
# But we limit it to 32 to avoid consuming surprisingly large resource
# on many core machine.
max_workers = min(32, (os.cpu_count() or 1) + 4)
if max_workers <= 0:
raise ValueError("max_workers must be greater than 0")
if initializer is not None and not callable(initializer):
raise TypeError("initializer must be a callable")
self._max_workers = max_workers
self._work_queue = queue.Queue()
self._work_queue = queue.SimpleQueue()
self._idle_semaphore = threading.Semaphore(0)
self._threads = set()
self._broken = False
self._shutdown = False
self._shutdown_lock = threading.Lock()
self._thread_name_prefix = thread_name_prefix
self._thread_name_prefix = (thread_name_prefix or
("ThreadPoolExecutor-%d" % self._counter()))
self._initializer = initializer
self._initargs = initargs
def submit(*args, **kwargs):
if len(args) >= 2:
self, fn, *args = args
elif not args:
raise TypeError("descriptor 'submit' of 'ThreadPoolExecutor' object "
"needs an argument")
elif 'fn' in kwargs:
fn = kwargs.pop('fn')
self, *args = args
import warnings
warnings.warn("Passing 'fn' as keyword argument is deprecated",
DeprecationWarning, stacklevel=2)
else:
raise TypeError('submit expected at least 1 positional argument, '
'got %d' % (len(args)-1))
def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._broken:
raise BrokenThreadPool(self._broken)
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
if _shutdown:
raise RuntimeError('cannot schedule new futures after '
'interpreter shutdown')
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
@@ -114,27 +187,46 @@ class ThreadPoolExecutor(_base.Executor):
self._work_queue.put(w)
self._adjust_thread_count()
return f
submit.__text_signature__ = _base.Executor.submit.__text_signature__
submit.__doc__ = _base.Executor.submit.__doc__
def _adjust_thread_count(self):
# if idle threads are available, don't spin new threads
if self._idle_semaphore.acquire(timeout=0):
return
# When the executor gets lost, the weakref callback will wake up
# the worker threads.
def weakref_cb(_, q=self._work_queue):
q.put(None)
# TODO(bquinlan): Should avoid creating new threads if there are more
# idle threads than items in the work queue.
num_threads = len(self._threads)
if num_threads < self._max_workers:
thread_name = '%s_%d' % (self._thread_name_prefix or self,
num_threads)
t = threading.Thread(name=thread_name, target=_worker,
args=(weakref.ref(self, weakref_cb),
self._work_queue))
self._work_queue,
self._initializer,
self._initargs))
t.daemon = True
t.start()
self._threads.add(t)
_threads_queues[t] = self._work_queue
def _initializer_failed(self):
with self._shutdown_lock:
self._broken = ('A thread initializer failed, the thread pool '
'is not usable anymore')
# Drain work queue and mark pending futures failed
while True:
try:
work_item = self._work_queue.get_nowait()
except queue.Empty:
break
if work_item is not None:
work_item.future.set_exception(BrokenThreadPool(self._broken))
def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown = True

View File

@@ -16,10 +16,7 @@ import pickle
import socket
import sys
# XXX RustPython TODO: figure out why this doesn't work
# from . import context
from .context import *
context = sys.modules[__name__]
from . import context
__all__ = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump']

139
Lib/netrc.py vendored Normal file
View File

@@ -0,0 +1,139 @@
"""An object-oriented interface to .netrc files."""
# Module and documentation by Eric S. Raymond, 21 Dec 1998
import os, shlex, stat
__all__ = ["netrc", "NetrcParseError"]
class NetrcParseError(Exception):
"""Exception raised on syntax errors in the .netrc file."""
def __init__(self, msg, filename=None, lineno=None):
self.filename = filename
self.lineno = lineno
self.msg = msg
Exception.__init__(self, msg)
def __str__(self):
return "%s (%s, line %s)" % (self.msg, self.filename, self.lineno)
class netrc:
def __init__(self, file=None):
default_netrc = file is None
if file is None:
file = os.path.join(os.path.expanduser("~"), ".netrc")
self.hosts = {}
self.macros = {}
with open(file) as fp:
self._parse(file, fp, default_netrc)
def _parse(self, file, fp, default_netrc):
lexer = shlex.shlex(fp)
lexer.wordchars += r"""!"#$%&'()*+,-./:;<=>?@[\]^_`{|}~"""
lexer.commenters = lexer.commenters.replace('#', '')
while 1:
# Look for a machine, default, or macdef top-level keyword
saved_lineno = lexer.lineno
toplevel = tt = lexer.get_token()
if not tt:
break
elif tt[0] == '#':
if lexer.lineno == saved_lineno and len(tt) == 1:
lexer.instream.readline()
continue
elif tt == 'machine':
entryname = lexer.get_token()
elif tt == 'default':
entryname = 'default'
elif tt == 'macdef': # Just skip to end of macdefs
entryname = lexer.get_token()
self.macros[entryname] = []
lexer.whitespace = ' \t'
while 1:
line = lexer.instream.readline()
if not line or line == '\012':
lexer.whitespace = ' \t\r\n'
break
self.macros[entryname].append(line)
continue
else:
raise NetrcParseError(
"bad toplevel token %r" % tt, file, lexer.lineno)
# We're looking at start of an entry for a named machine or default.
login = ''
account = password = None
self.hosts[entryname] = {}
while 1:
tt = lexer.get_token()
if (tt.startswith('#') or
tt in {'', 'machine', 'default', 'macdef'}):
if password:
self.hosts[entryname] = (login, account, password)
lexer.push_token(tt)
break
else:
raise NetrcParseError(
"malformed %s entry %s terminated by %s"
% (toplevel, entryname, repr(tt)),
file, lexer.lineno)
elif tt == 'login' or tt == 'user':
login = lexer.get_token()
elif tt == 'account':
account = lexer.get_token()
elif tt == 'password':
if os.name == 'posix' and default_netrc:
prop = os.fstat(fp.fileno())
if prop.st_uid != os.getuid():
import pwd
try:
fowner = pwd.getpwuid(prop.st_uid)[0]
except KeyError:
fowner = 'uid %s' % prop.st_uid
try:
user = pwd.getpwuid(os.getuid())[0]
except KeyError:
user = 'uid %s' % os.getuid()
raise NetrcParseError(
("~/.netrc file owner (%s) does not match"
" current user (%s)") % (fowner, user),
file, lexer.lineno)
if (prop.st_mode & (stat.S_IRWXG | stat.S_IRWXO)):
raise NetrcParseError(
"~/.netrc access too permissive: access"
" permissions must restrict access to only"
" the owner", file, lexer.lineno)
password = lexer.get_token()
else:
raise NetrcParseError("bad follower token %r" % tt,
file, lexer.lineno)
def authenticators(self, host):
"""Return a (user, account, password) tuple for given host."""
if host in self.hosts:
return self.hosts[host]
elif 'default' in self.hosts:
return self.hosts['default']
else:
return None
def __repr__(self):
"""Dump the class data in the format of a .netrc file."""
rep = ""
for host in self.hosts.keys():
attrs = self.hosts[host]
rep += f"machine {host}\n\tlogin {attrs[0]}\n"
if attrs[1]:
rep += f"\taccount {attrs[1]}\n"
rep += f"\tpassword {attrs[2]}\n"
for macro in self.macros.keys():
rep += f"macdef {macro}\n"
for line in self.macros[macro]:
rep += line
rep += "\n"
return rep
if __name__ == '__main__':
print(netrc())

91
Lib/queue.py vendored
View File

@@ -1,23 +1,29 @@
'''A multi-producer, multi-consumer queue.'''
try:
import threading
except ImportError:
import dummy_threading as threading
import threading
from collections import deque
from heapq import heappush, heappop
from time import monotonic as time
try:
from _queue import SimpleQueue
except ImportError:
SimpleQueue = None
__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue']
__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue', 'SimpleQueue']
class Empty(Exception):
'Exception raised by Queue.get(block=0)/get_nowait().'
pass
try:
from _queue import Empty
except ImportError:
class Empty(Exception):
'Exception raised by Queue.get(block=0)/get_nowait().'
pass
class Full(Exception):
'Exception raised by Queue.put(block=0)/put_nowait().'
pass
class Queue:
'''Create a queue object with a given maximum size.
@@ -244,3 +250,72 @@ class LifoQueue(Queue):
def _get(self):
return self.queue.pop()
class _PySimpleQueue:
'''Simple, unbounded FIFO queue.
This pure Python implementation is not reentrant.
'''
# Note: while this pure Python version provides fairness
# (by using a threading.Semaphore which is itself fair, being based
# on threading.Condition), fairness is not part of the API contract.
# This allows the C version to use a different implementation.
def __init__(self):
self._queue = deque()
self._count = threading.Semaphore(0)
def put(self, item, block=True, timeout=None):
'''Put the item on the queue.
The optional 'block' and 'timeout' arguments are ignored, as this method
never blocks. They are provided for compatibility with the Queue class.
'''
self._queue.append(item)
self._count.release()
def get(self, block=True, timeout=None):
'''Remove and return an item from the queue.
If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until an item is available. If 'timeout' is
a non-negative number, it blocks at most 'timeout' seconds and raises
the Empty exception if no item was available within that time.
Otherwise ('block' is false), return an item if one is immediately
available, else raise the Empty exception ('timeout' is ignored
in that case).
'''
if timeout is not None and timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
if not self._count.acquire(block, timeout):
raise Empty
return self._queue.popleft()
def put_nowait(self, item):
'''Put an item into the queue without blocking.
This is exactly equivalent to `put(item)` and is only provided
for compatibility with the Queue class.
'''
return self.put(item, block=False)
def get_nowait(self):
'''Remove and return an item from the queue without blocking.
Only get an item if one is immediately available. Otherwise
raise the Empty exception.
'''
return self.get(block=False)
def empty(self):
'''Return True if the queue is empty, False otherwise (not reliable!).'''
return len(self._queue) == 0
def qsize(self):
'''Return the approximate size of the queue (not reliable!).'''
return len(self._queue)
if SimpleQueue is None:
SimpleQueue = _PySimpleQueue

View File

@@ -2516,9 +2516,7 @@ class TestFlag(unittest.TestCase):
self.assertEqual(Color.ALL.value, 7)
self.assertEqual(str(Color.BLUE), 'blue')
# TODO: RUSTPYTHON
@unittest.expectedFailure
@unittest.skipIf(sys.platform == "win32", "TODO: RUSTPYTHON, universal newlines")
@unittest.skip("TODO: RUSTPYTHON, universal newlines")
@support.reap_threads
def test_unique_composite(self):
# override __eq__ to be identity only
@@ -2951,9 +2949,7 @@ class TestIntFlag(unittest.TestCase):
self.assertEqual(Color.ALL.value, 7)
self.assertEqual(str(Color.BLUE), 'blue')
# TODO: RUSTPYTHON
@unittest.expectedFailure
@unittest.skipIf(sys.platform == "win32", "TODO: RUSTPYTHON, universal newlines")
@unittest.skip("TODO: RUSTPYTHON, universal newlines")
@support.reap_threads
def test_unique_composite(self):
# override __eq__ to be identity only

View File

@@ -38,8 +38,6 @@ class TestUnicode:
self.assertEqual(self.loads('"' + u + '"'), u)
self.assertEqual(self.loads('"z\\ud834\\udd20x"'), u)
# just takes FOREVER (3min+), unskip when it doesn't
@unittest.skip("TODO: RUSTPYTHON time")
def test_unicode_decode(self):
for i in range(0, 0xd7ff):
u = chr(i)

265
Lib/threading.py vendored
View File

@@ -5,7 +5,6 @@ import sys as _sys
import _thread
from time import monotonic as _time
from traceback import format_exc as _format_exc
from _weakrefset import WeakSet
from itertools import islice as _islice, count as _count
try:
@@ -27,13 +26,20 @@ __all__ = ['get_ident', 'active_count', 'Condition', 'current_thread',
'enumerate', 'main_thread', 'TIMEOUT_MAX',
'Event', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
'Barrier', 'BrokenBarrierError', 'Timer', 'ThreadError',
'setprofile', 'settrace', 'local', 'stack_size']
'setprofile', 'settrace', 'local', 'stack_size',
'excepthook', 'ExceptHookArgs']
# Rename some stuff so "from threading import *" is safe
_start_new_thread = _thread.start_new_thread
_allocate_lock = _thread.allocate_lock
_set_sentinel = _thread._set_sentinel
get_ident = _thread.get_ident
try:
get_native_id = _thread.get_native_id
_HAVE_THREAD_NATIVE_ID = True
__all__.append('get_native_id')
except AttributeError:
_HAVE_THREAD_NATIVE_ID = False
ThreadError = _thread.error
try:
_CRLock = _thread.RLock
@@ -568,8 +574,8 @@ class Barrier:
"""Implements a Barrier.
Useful for synchronizing a fixed number of threads at known synchronization
points. Threads block on 'wait()' and are simultaneously once they have all
made that call.
points. Threads block on 'wait()' and are simultaneously awoken once they
have all made that call.
"""
@@ -578,7 +584,7 @@ class Barrier:
'action' is a callable which, when supplied, will be called by one of
the threads after they have all entered the barrier and just prior to
releasing them all. If a 'timeout' is provided, it is uses as the
releasing them all. If a 'timeout' is provided, it is used as the
default for all subsequent 'wait()' calls.
"""
@@ -733,6 +739,11 @@ _active_limbo_lock = _allocate_lock()
_active = {} # maps thread id to Thread object
_limbo = {}
_dangling = WeakSet()
# Set of Thread._tstate_lock locks of non-daemon threads used by _shutdown()
# to wait until all Python thread states get deleted:
# see Thread._set_tstate_lock().
_shutdown_locks_lock = _allocate_lock()
_shutdown_locks = set()
# Main class for threads
@@ -746,14 +757,6 @@ class Thread:
"""
_initialized = False
# Need to store a reference to sys.exc_info for printing
# out exceptions when a thread tries to use a global var. during interp.
# shutdown and thus raises an exception about trying to perform some
# operation on/with a NoneType
_exc_info = _sys.exc_info
# Keep sys.exc_clear too to clear the exception just before
# allowing .join() to return.
#XXX __exc_clear = _sys.exc_clear
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, *, daemon=None):
@@ -790,13 +793,15 @@ class Thread:
else:
self._daemonic = current_thread().daemon
self._ident = None
if _HAVE_THREAD_NATIVE_ID:
self._native_id = None
self._tstate_lock = None
self._started = Event()
self._is_stopped = False
self._initialized = True
# sys.stderr is not stored in the class like
# sys.exc_info since it can be changed between instances
# Copy of sys.stderr used by self._invoke_excepthook()
self._stderr = _sys.stderr
self._invoke_excepthook = _make_invoke_excepthook()
# For debugging and _after_fork()
_dangling.add(self)
@@ -891,6 +896,10 @@ class Thread:
def _set_ident(self):
self._ident = get_ident()
if _HAVE_THREAD_NATIVE_ID:
def _set_native_id(self):
self._native_id = get_native_id()
def _set_tstate_lock(self):
"""
Set a lock object which will be released by the interpreter when
@@ -899,10 +908,16 @@ class Thread:
self._tstate_lock = _set_sentinel()
self._tstate_lock.acquire()
if not self.daemon:
with _shutdown_locks_lock:
_shutdown_locks.add(self._tstate_lock)
def _bootstrap_inner(self):
try:
self._set_ident()
self._set_tstate_lock()
if _HAVE_THREAD_NATIVE_ID:
self._set_native_id()
self._started.set()
with _active_limbo_lock:
_active[self._ident] = self
@@ -915,47 +930,8 @@ class Thread:
try:
self.run()
except SystemExit:
pass
except:
# If sys.stderr is no more (most likely from interpreter
# shutdown) use self._stderr. Otherwise still use sys (as in
# _sys) in case sys.stderr was redefined since the creation of
# self.
if _sys and _sys.stderr is not None:
print("Exception in thread %s:\n%s" %
(self.name, _format_exc()), file=_sys.stderr)
elif self._stderr is not None:
# Do the best job possible w/o a huge amt. of code to
# approximate a traceback (code ideas from
# Lib/traceback.py)
exc_type, exc_value, exc_tb = self._exc_info()
try:
print((
"Exception in thread " + self.name +
" (most likely raised during interpreter shutdown):"), file=self._stderr)
print((
"Traceback (most recent call last):"), file=self._stderr)
while exc_tb:
print((
' File "%s", line %s, in %s' %
(exc_tb.tb_frame.f_code.co_filename,
exc_tb.tb_lineno,
exc_tb.tb_frame.f_code.co_name)), file=self._stderr)
exc_tb = exc_tb.tb_next
print(("%s: %s" % (exc_type, exc_value)), file=self._stderr)
self._stderr.flush()
# Make sure that exc_tb gets deleted since it is a memory
# hog; deleting everything else is just for thoroughness
finally:
del exc_type, exc_value, exc_tb
finally:
# Prevent a race in
# test_threading.test_no_refcycle_through_target when
# the exception keeps the target alive past when we
# assert that it's dead.
#XXX self._exc_clear()
pass
self._invoke_excepthook(self)
finally:
with _active_limbo_lock:
try:
@@ -987,6 +963,9 @@ class Thread:
assert not lock.locked()
self._is_stopped = True
self._tstate_lock = None
if not self.daemon:
with _shutdown_locks_lock:
_shutdown_locks.discard(lock)
def _delete(self):
"Remove current thread from the dict of currently running threads."
@@ -1007,7 +986,7 @@ class Thread:
When the timeout argument is present and not None, it should be a
floating point number specifying a timeout for the operation in seconds
(or fractions thereof). As join() always returns None, you must call
isAlive() after join() to decide whether a timeout happened -- if the
is_alive() after join() to decide whether a timeout happened -- if the
thread is still alive, the join() call timed out.
When the timeout argument is not present or None, the operation will
@@ -1077,6 +1056,18 @@ class Thread:
assert self._initialized, "Thread.__init__() not called"
return self._ident
if _HAVE_THREAD_NATIVE_ID:
@property
def native_id(self):
"""Native integral thread ID of this thread, or None if it has not been started.
This is a non-negative integer. See the get_native_id() function.
This represents the Thread ID as reported by the kernel.
"""
assert self._initialized, "Thread.__init__() not called"
return self._native_id
def is_alive(self):
"""Return whether the thread is alive.
@@ -1091,7 +1082,15 @@ class Thread:
self._wait_for_tstate_lock(False)
return not self._is_stopped
isAlive = is_alive
def isAlive(self):
"""Return whether the thread is alive.
This method is deprecated, use is_alive() instead.
"""
import warnings
warnings.warn('isAlive() is deprecated, use is_alive() instead',
DeprecationWarning, stacklevel=2)
return self.is_alive()
@property
def daemon(self):
@@ -1102,8 +1101,7 @@ class Thread:
main thread is not a daemon thread and therefore all threads created in
the main thread default to daemon = False.
The entire Python program exits when no alive non-daemon threads are
left.
The entire Python program exits when only daemon threads are left.
"""
assert self._initialized, "Thread.__init__() not called"
@@ -1129,6 +1127,104 @@ class Thread:
def setName(self, name):
self.name = name
try:
from _thread import (_excepthook as excepthook,
_ExceptHookArgs as ExceptHookArgs)
except ImportError:
# Simple Python implementation if _thread._excepthook() is not available
from traceback import print_exception as _print_exception
from collections import namedtuple
_ExceptHookArgs = namedtuple(
'ExceptHookArgs',
'exc_type exc_value exc_traceback thread')
def ExceptHookArgs(args):
return _ExceptHookArgs(*args)
def excepthook(args, /):
"""
Handle uncaught Thread.run() exception.
"""
if args.exc_type == SystemExit:
# silently ignore SystemExit
return
if _sys is not None and _sys.stderr is not None:
stderr = _sys.stderr
elif args.thread is not None:
stderr = args.thread._stderr
if stderr is None:
# do nothing if sys.stderr is None and sys.stderr was None
# when the thread was created
return
else:
# do nothing if sys.stderr is None and args.thread is None
return
if args.thread is not None:
name = args.thread.name
else:
name = get_ident()
print(f"Exception in thread {name}:",
file=stderr, flush=True)
_print_exception(args.exc_type, args.exc_value, args.exc_traceback,
file=stderr)
stderr.flush()
def _make_invoke_excepthook():
# Create a local namespace to ensure that variables remain alive
# when _invoke_excepthook() is called, even if it is called late during
# Python shutdown. It is mostly needed for daemon threads.
old_excepthook = excepthook
old_sys_excepthook = _sys.excepthook
if old_excepthook is None:
raise RuntimeError("threading.excepthook is None")
if old_sys_excepthook is None:
raise RuntimeError("sys.excepthook is None")
sys_exc_info = _sys.exc_info
local_print = print
local_sys = _sys
def invoke_excepthook(thread):
global excepthook
try:
hook = excepthook
if hook is None:
hook = old_excepthook
args = ExceptHookArgs([*sys_exc_info(), thread])
hook(args)
except Exception as exc:
exc.__suppress_context__ = True
del exc
if local_sys is not None and local_sys.stderr is not None:
stderr = local_sys.stderr
else:
stderr = thread._stderr
local_print("Exception in threading.excepthook:",
file=stderr, flush=True)
if local_sys is not None and local_sys.excepthook is not None:
sys_excepthook = local_sys.excepthook
else:
sys_excepthook = old_sys_excepthook
sys_excepthook(*sys_exc_info())
finally:
# Break reference cycle (exception stored in a variable)
args = None
return invoke_excepthook
# The timer class was contributed by Itamar Shtull-Trauring
class Timer(Thread):
@@ -1168,6 +1264,8 @@ class _MainThread(Thread):
self._set_tstate_lock()
self._started.set()
self._set_ident()
if _HAVE_THREAD_NATIVE_ID:
self._set_native_id()
with _active_limbo_lock:
_active[self._ident] = self
@@ -1187,6 +1285,8 @@ class _DummyThread(Thread):
self._started.set()
self._set_ident()
if _HAVE_THREAD_NATIVE_ID:
self._set_native_id()
with _active_limbo_lock:
_active[self._ident] = self
@@ -1253,6 +1353,9 @@ from _thread import stack_size
_main_thread = _MainThread()
def _shutdown():
"""
Wait until the Python thread state of all non-daemon threads get deleted.
"""
# Obscure: other threads may be waiting to join _main_thread. That's
# dubious, but some code does it. We can't wait for C code to release
# the main thread's tstate_lock - that won't happen until the interpreter
@@ -1261,6 +1364,8 @@ def _shutdown():
if _main_thread._is_stopped:
# _shutdown() was already called
return
# Main thread
tlock = _main_thread._tstate_lock
# The main thread isn't finished yet, so its thread state lock can't have
# been released.
@@ -1268,16 +1373,24 @@ def _shutdown():
assert tlock.locked()
tlock.release()
_main_thread._stop()
t = _pickSomeNonDaemonThread()
while t:
t.join()
t = _pickSomeNonDaemonThread()
def _pickSomeNonDaemonThread():
for t in enumerate():
if not t.daemon and t.is_alive():
return t
return None
# Join all non-deamon threads
while True:
with _shutdown_locks_lock:
locks = list(_shutdown_locks)
_shutdown_locks.clear()
if not locks:
break
for lock in locks:
# mimick Thread.join()
lock.acquire()
lock.release()
# new threads can be spawned while we were waiting for the other
# threads to complete
def main_thread():
"""Return the main thread object.
@@ -1303,12 +1416,26 @@ def _after_fork():
# Reset _active_limbo_lock, in case we forked while the lock was held
# by another (non-forked) thread. http://bugs.python.org/issue874900
global _active_limbo_lock, _main_thread
global _shutdown_locks_lock, _shutdown_locks
_active_limbo_lock = _allocate_lock()
# fork() only copied the current thread; clear references to others.
new_active = {}
current = current_thread()
try:
current = _active[get_ident()]
except KeyError:
# fork() was called in a thread which was not spawned
# by threading.Thread. For example, a thread spawned
# by thread.start_new_thread().
current = _MainThread()
_main_thread = current
# reset _shutdown() locks: threads re-register their _tstate_lock below
_shutdown_locks_lock = _allocate_lock()
_shutdown_locks = set()
with _active_limbo_lock:
# Dangling thread instances must still have their locks reset,
# because someone may join() them.

View File

@@ -903,7 +903,13 @@ impl Instruction {
Reverse { .. } => 0,
GetAwaitable => 0,
BeforeAsyncWith => 1,
SetupAsyncWith { .. } => 0,
SetupAsyncWith { .. } => {
if jump {
-1
} else {
0
}
}
GetAIter => 0,
GetANext => 1,
EndAsyncFor => -1,

View File

@@ -2563,6 +2563,23 @@ if True and False and False:
"\
if (True and False) or (False and True):
pass
"
));
}
#[test]
fn test_nested_double_async_with() {
assert_dis_snapshot!(compile_exec(
"\
for stop_exc in (StopIteration('spam'), StopAsyncIteration('ham')):
with self.subTest(type=type(stop_exc)):
try:
async with woohoo():
raise stop_exc
except Exception as ex:
self.assertIs(ex, stop_exc)
else:
self.fail(f'{stop_exc} was suppressed')
"
));
}

View File

@@ -0,0 +1,76 @@
---
source: compiler/src/compile.rs
expression: "compile_exec(\"\\\nfor stop_exc in (StopIteration('spam'), StopAsyncIteration('ham')):\n with self.subTest(type=type(stop_exc)):\n try:\n async with woohoo():\n raise stop_exc\n except Exception as ex:\n self.assertIs(ex, stop_exc)\n else:\n self.fail(f'{stop_exc} was suppressed')\n\")"
---
0 SetupLoop
1 LoadNameAny (0, StopIteration)
2 LoadConst ("spam")
3 CallFunctionPositional (1)
4 LoadNameAny (1, StopAsyncIteration)
5 LoadConst ("ham")
6 CallFunctionPositional (1)
7 BuildTuple (2, false)
8 GetIter
>> 9 ForIter (68)
10 StoreLocal (2, stop_exc)
11 LoadNameAny (3, self)
12 LoadAttr (subTest)
13 LoadNameAny (5, type)
14 LoadNameAny (2, stop_exc)
15 CallFunctionPositional (1)
16 LoadConst (("type"))
17 CallFunctionKeyword (1)
18 SetupWith (65)
19 Pop
20 SetupExcept (40)
21 LoadNameAny (6, woohoo)
22 CallFunctionPositional (0)
23 BeforeAsyncWith
24 GetAwaitable
25 LoadConst (None)
26 YieldFrom
27 SetupAsyncWith (33)
28 Pop
29 LoadNameAny (2, stop_exc)
30 Raise (Raise)
31 PopBlock
32 EnterFinally
>> 33 WithCleanupStart
34 GetAwaitable
35 LoadConst (None)
36 YieldFrom
37 WithCleanupFinish
38 PopBlock
39 Jump (54)
>> 40 Duplicate
41 LoadNameAny (7, Exception)
42 CompareOperation (ExceptionMatch)
43 JumpIfFalse (53)
44 StoreLocal (8, ex)
45 LoadNameAny (3, self)
46 LoadAttr (assertIs)
47 LoadNameAny (8, ex)
48 LoadNameAny (2, stop_exc)
49 CallFunctionPositional (2)
50 Pop
51 PopException
52 Jump (63)
>> 53 Raise (Reraise)
>> 54 LoadNameAny (3, self)
55 LoadAttr (fail)
56 LoadConst ("")
57 LoadNameAny (2, stop_exc)
58 FormatValue (None)
59 LoadConst (" was suppressed")
60 BuildString (2)
61 CallFunctionPositional (1)
62 Pop
>> 63 PopBlock
64 EnterFinally
>> 65 WithCleanupStart
66 WithCleanupFinish
67 Jump (9)
>> 68 PopBlock
69 LoadConst (None)
70 ReturnValue

View File

@@ -184,3 +184,5 @@ assert json_dump({'a': 'b'}) == json_dump(Dict({'a': 'b'}))
i = 7**500
assert json.dumps(i) == str(i)
assert json.decoder.scanstring('✨x"', 1) == ('x', 3)

View File

@@ -49,6 +49,11 @@ fn main() {
println!("cargo:rustc-cfg=ossl111");
}
}
if let Ok(v) = env::var("DEP_OPENSSL_CONF") {
for conf in v.split(',') {
println!("cargo:rustc-cfg=osslconf=\"{}\"", conf);
}
}
}
fn git_hash() -> String {

View File

@@ -122,7 +122,7 @@ impl PyValue for PyCoroutineWrapper {
}
}
#[pyimpl]
#[pyimpl(with(PyIter))]
impl PyCoroutineWrapper {
#[pymethod]
fn send(&self, val: PyObjectRef, vm: &VirtualMachine) -> PyResult {
@@ -141,6 +141,12 @@ impl PyCoroutineWrapper {
}
}
impl PyIter for PyCoroutineWrapper {
fn next(zelf: &PyRef<Self>, vm: &VirtualMachine) -> PyResult {
zelf.send(vm.ctx.none(), vm)
}
}
pub fn init(ctx: &PyContext) {
PyCoroutine::extend_class(ctx, &ctx.types.coroutine_type);
PyCoroutineWrapper::extend_class(ctx, &ctx.types.coroutine_wrapper_type);

View File

@@ -324,6 +324,11 @@ impl PyFunction {
self.globals.clone()
}
#[pyproperty(magic)]
fn closure(&self) -> Option<PyTupleTyped<PyCellRef>> {
self.closure.clone()
}
#[pyproperty(magic)]
fn name(&self) -> PyStrRef {
self.name.lock().clone()

View File

@@ -94,17 +94,24 @@ impl PyModule {
impl SlotGetattro for PyModule {
fn getattro(zelf: PyRef<Self>, name: PyStrRef, vm: &VirtualMachine) -> PyResult {
vm.generic_getattribute_opt(zelf.as_object().clone(), name.clone(), None)?
.ok_or_else(|| {
let module_name = if let Some(name) = Self::name(zelf, vm) {
format!(" '{}'", name)
} else {
"".to_owned()
};
vm.new_attribute_error(
format!("module{} has no attribute '{}'", module_name, name,),
)
})
if let Some(attr) =
vm.generic_getattribute_opt(zelf.as_object().clone(), name.clone(), None)?
{
return Ok(attr);
}
if let Some(getattr) = zelf
.as_object()
.dict()
.and_then(|d| d.get_item("__getattr__", vm).ok())
{
return vm.invoke(&getattr, (name,));
}
let module_name = if let Some(name) = Self::name(zelf, vm) {
format!(" '{}'", name)
} else {
"".to_owned()
};
Err(vm.new_attribute_error(format!("module{} has no attribute '{}'", module_name, name)))
}
}

View File

@@ -350,6 +350,15 @@ impl<T: TransmuteFromObject> TryFromObject for PyTupleTyped<T> {
}
}
impl<T: TransmuteFromObject> Clone for PyTupleTyped<T> {
fn clone(&self) -> Self {
Self {
tuple: self.tuple.clone(),
_marker: PhantomData,
}
}
}
impl<'a, T: TransmuteFromObject + 'a> BorrowValue<'a> for PyTupleTyped<T> {
type Borrowed = &'a [T];
#[inline]

View File

@@ -480,17 +480,21 @@ pub struct ExceptionZoo {
pub resource_warning: PyTypeRef,
}
pub fn exception_slots() -> crate::slots::PyTypeSlots {
let mut slots = PyBaseException::make_slots();
// make_slots produces it with a tp_name of BaseException, which is usually wrong
slots.name.get_mut().take();
slots
}
pub fn create_exception_type(name: &str, base: &PyTypeRef) -> PyTypeRef {
create_type_with_slots(name, PyType::static_type(), base, exception_slots())
}
impl ExceptionZoo {
pub(crate) fn init() -> Self {
let base_exception_type = PyBaseException::init_bare_type().clone();
let create_exception_type = |name: &str, base: &PyTypeRef| {
create_type_with_slots(
name,
PyType::static_type(),
base,
PyBaseException::make_slots(),
)
};
// Sorted By Hierarchy then alphabetized.
let system_exit = create_exception_type("SystemExit", &base_exception_type);
let keyboard_interrupt = create_exception_type("KeyboardInterrupt", &base_exception_type);

View File

@@ -518,7 +518,11 @@ impl ExecutingFrame<'_> {
}
bytecode::Instruction::ImportNameless => self.import(vm, None),
bytecode::Instruction::ImportStar => self.import_star(vm),
bytecode::Instruction::ImportFrom { idx } => self.import_from(vm, *idx),
bytecode::Instruction::ImportFrom { idx } => {
let obj = self.import_from(vm, *idx)?;
self.push_value(obj);
Ok(None)
}
bytecode::Instruction::LoadFast(idx) => {
let idx = *idx as usize;
let x = self.fastlocals.lock()[idx].clone().ok_or_else(|| {
@@ -803,7 +807,9 @@ impl ExecutingFrame<'_> {
Ok(None)
}
bytecode::Instruction::SetupAsyncWith { end } => {
let enter_res = self.pop_value();
self.push_block(BlockType::Finally { handler: *end });
self.push_value(enter_res);
Ok(None)
}
bytecode::Instruction::WithCleanupStart => {
@@ -1084,15 +1090,22 @@ impl ExecutingFrame<'_> {
}
#[cfg_attr(feature = "flame-it", flame("Frame"))]
fn import_from(&mut self, vm: &VirtualMachine, idx: bytecode::NameIdx) -> FrameResult {
fn import_from(&mut self, vm: &VirtualMachine, idx: bytecode::NameIdx) -> PyResult {
let module = self.last_value();
let name = &self.code.names[idx as usize];
let err = || vm.new_import_error(format!("cannot import name '{}'", name), name.clone());
// Load attribute, and transform any error into import error.
let obj = vm.get_attribute(module, name.clone()).map_err(|_| {
vm.new_import_error(format!("cannot import name '{}'", name), name.clone())
})?;
self.push_value(obj);
Ok(None)
if let Some(obj) = vm.get_attribute_opt(module.clone(), name.clone())? {
return Ok(obj);
}
// fallback to importing '{module.__name__}.{name}' from sys.modules
let mod_name = vm.get_attribute(module, "__name__").map_err(|_| err())?;
let mod_name = mod_name.downcast::<PyStr>().map_err(|_| err())?;
let full_mod_name = format!("{}.{}", mod_name, name);
let sys_modules = vm
.get_attribute(vm.sys_module.clone(), "modules")
.map_err(|_| err())?;
sys_modules.get_item(full_mod_name, vm).map_err(|_| err())
}
#[cfg_attr(feature = "flame-it", flame("Frame"))]

View File

@@ -136,12 +136,12 @@ pub fn scanstring<'a>(
chunks.push(chunk);
};
let unterminated_err = || DecodeError::new("Unterminated string starting at", end - 1);
let mut chunk_start = end;
let mut chars = s.char_indices().enumerate().skip(end).peekable();
while let Some((char_i, (i, c))) = chars.next() {
let (_, (mut chunk_start, _)) = chars.peek().ok_or_else(unterminated_err)?;
while let Some((char_i, (byte_i, c))) = chars.next() {
match c {
'"' => {
push_chunk(StrOrChar::Str(&s[chunk_start..i]));
push_chunk(StrOrChar::Str(&s[chunk_start..byte_i]));
let mut out = String::with_capacity(output_len);
for x in chunks {
match x {
@@ -152,7 +152,7 @@ pub fn scanstring<'a>(
return Ok((out, char_i + 1));
}
'\\' => {
push_chunk(StrOrChar::Str(&s[chunk_start..i]));
push_chunk(StrOrChar::Str(&s[chunk_start..byte_i]));
let (_, (_, c)) = chars.next().ok_or_else(unterminated_err)?;
let esc = match c {
'"' => "\"",
@@ -166,7 +166,7 @@ pub fn scanstring<'a>(
'u' => {
let surrogate_err = || DecodeError::new("unpaired surrogate", char_i);
let mut uni = decode_unicode(&mut chars, char_i)?;
chunk_start = char_i + 6;
chunk_start = byte_i + 6;
if (0xd800..=0xdbff).contains(&uni) {
// uni is a surrogate -- try to find its pair
if let Some(&(pos2, (_, '\\'))) = chars.peek() {
@@ -202,7 +202,7 @@ pub fn scanstring<'a>(
))
}
};
chunk_start = i + 2;
chunk_start = byte_i + 2;
push_chunk(StrOrChar::Str(esc));
}
'\x00'..='\x1f' if strict => {

View File

@@ -18,7 +18,7 @@ use crate::exceptions::{IntoPyException, PyBaseExceptionRef};
use crate::function::{FuncArgs, OptionalArg};
use crate::pyobject::{
BorrowValue, Either, IntoPyObject, PyClassImpl, PyObjectRef, PyRef, PyResult, PyValue,
StaticType, TryFromObject,
StaticType, TryFromObject, TypeProtocol,
};
use crate::VirtualMachine;
@@ -34,7 +34,13 @@ mod c {
#[cfg(target_os = "redox")]
pub const AI_PASSIVE: c_int = 0x01;
#[cfg(target_os = "redox")]
pub const AI_NUMERICHOST: c_int = 0x0004;
#[cfg(target_os = "redox")]
pub const AI_ALL: c_int = 0x10;
#[cfg(target_os = "redox")]
pub const AI_ADDRCONFIG: c_int = 0x0020;
#[cfg(target_os = "redox")]
pub const AI_NUMERICSERV: c_int = 0x0400;
// https://gitlab.redox-os.org/redox-os/relibc/-/blob/master/src/header/sys_socket/constants.rs
#[cfg(target_os = "redox")]
pub const SO_TYPE: c_int = 3;
@@ -185,9 +191,68 @@ impl PySocket {
}
}
fn extract_address(
&self,
addr: PyObjectRef,
caller: &str,
vm: &VirtualMachine,
) -> PyResult<socket2::SockAddr> {
let family = self.family.load();
match family {
c::AF_INET => {
let addr = Address::try_from_object(vm, addr)?;
let addr4 = get_addr(vm, addr, |sa| match sa {
SocketAddr::V4(v4) => Some(v4),
_ => None,
})?;
Ok(addr4.into())
}
c::AF_INET6 => {
let tuple: PyTupleRef = addr.downcast().map_err(|obj| {
vm.new_type_error(format!(
"{}(): AF_INET6 address must be tuple, not {}",
caller,
obj.class().name
))
})?;
let tuple = tuple.borrow_value();
match tuple.len() {
2 | 3 | 4 => {}
_ => {
return Err(vm.new_type_error(
"AF_INET6 address must be a tuple (host, port[, flowinfo[, scopeid]])"
.to_owned(),
))
}
}
let addr = Address::from_tuple(tuple, vm)?;
let flowinfo = tuple
.get(2)
.map(|obj| u32::try_from_object(vm, obj.clone()))
.transpose()?;
let scopeid = tuple
.get(3)
.map(|obj| u32::try_from_object(vm, obj.clone()))
.transpose()?;
let mut addr6 = get_addr(vm, addr, |sa| match sa {
SocketAddr::V6(v6) => Some(v6),
_ => None,
})?;
if let Some(fi) = flowinfo {
addr6.set_flowinfo(fi)
}
if let Some(si) = scopeid {
addr6.set_scope_id(si)
}
Ok(addr6.into())
}
_ => Err(vm.new_os_error(format!("{}(): bad family", caller))),
}
}
#[pymethod]
fn connect(&self, address: Address, vm: &VirtualMachine) -> PyResult<()> {
let sock_addr = get_addr(vm, address, Some(self.family.load()))?;
fn connect(&self, address: PyObjectRef, vm: &VirtualMachine) -> PyResult<()> {
let sock_addr = self.extract_address(address, "connect", vm)?;
let err = match self.sock().connect(&sock_addr) {
Ok(()) => return Ok(()),
@@ -221,8 +286,8 @@ impl PySocket {
}
#[pymethod]
fn bind(&self, address: Address, vm: &VirtualMachine) -> PyResult<()> {
let sock_addr = get_addr(vm, address, Some(self.family.load()))?;
fn bind(&self, address: PyObjectRef, vm: &VirtualMachine) -> PyResult<()> {
let sock_addr = self.extract_address(address, "bind", vm)?;
self.sock()
.bind(&sock_addr)
.map_err(|err| convert_sock_error(vm, err))
@@ -238,10 +303,10 @@ impl PySocket {
}
#[pymethod]
fn _accept(&self, vm: &VirtualMachine) -> PyResult<(RawSocket, AddrTuple)> {
fn _accept(&self, vm: &VirtualMachine) -> PyResult<(RawSocket, PyObjectRef)> {
let (sock, addr) = self.sock_op(vm, SelectKind::Read, || self.sock().accept())?;
let fd = into_sock_fileno(sock);
Ok((fd, get_addr_tuple(addr)))
Ok((fd, get_addr_tuple(addr, vm)))
}
#[pymethod]
@@ -281,14 +346,14 @@ impl PySocket {
bufsize: usize,
flags: OptionalArg<i32>,
vm: &VirtualMachine,
) -> PyResult<(Vec<u8>, AddrTuple)> {
) -> PyResult<(Vec<u8>, PyObjectRef)> {
let flags = flags.unwrap_or(0);
let mut buffer = vec![0u8; bufsize];
let (n, addr) = self.sock_op(vm, SelectKind::Read, || {
self.sock().recv_from_with_flags(&mut buffer, flags)
})?;
buffer.truncate(n);
Ok((buffer, get_addr_tuple(addr)))
Ok((buffer, get_addr_tuple(addr, vm)))
}
#[pymethod]
@@ -343,12 +408,12 @@ impl PySocket {
fn sendto(
&self,
bytes: PyBytesLike,
address: Address,
address: PyObjectRef,
flags: OptionalArg<i32>,
vm: &VirtualMachine,
) -> PyResult<usize> {
let flags = flags.unwrap_or(0);
let addr = get_addr(vm, address, Some(self.family.load()))?;
let addr = self.extract_address(address, "sendto", vm)?;
self.sock_op(vm, SelectKind::Write, || {
bytes.with_ref(|b| self.sock().send_to_with_flags(b, &addr, flags))
})
@@ -369,22 +434,22 @@ impl PySocket {
}
#[pymethod]
fn getsockname(&self, vm: &VirtualMachine) -> PyResult<AddrTuple> {
fn getsockname(&self, vm: &VirtualMachine) -> PyResult<PyObjectRef> {
let addr = self
.sock()
.local_addr()
.map_err(|err| convert_sock_error(vm, err))?;
Ok(get_addr_tuple(addr))
Ok(get_addr_tuple(addr, vm))
}
#[pymethod]
fn getpeername(&self, vm: &VirtualMachine) -> PyResult<AddrTuple> {
fn getpeername(&self, vm: &VirtualMachine) -> PyResult<PyObjectRef> {
let addr = self
.sock()
.peer_addr()
.map_err(|err| convert_sock_error(vm, err))?;
Ok(get_addr_tuple(addr))
Ok(get_addr_tuple(addr, vm))
}
#[pymethod]
@@ -572,28 +637,37 @@ impl TryFromObject for Address {
if tuple.borrow_value().len() != 2 {
Err(vm.new_type_error("Address tuple should have only 2 values".to_owned()))
} else {
let host = PyStrRef::try_from_object(vm, tuple.borrow_value()[0].clone())?;
let host = if host.borrow_value().is_empty() {
PyStr::from("0.0.0.0").into_ref(vm)
} else {
host
};
let port = u16::try_from_object(vm, tuple.borrow_value()[1].clone())?;
Ok(Address { host, port })
Self::from_tuple(tuple.borrow_value(), vm)
}
}
}
type AddrTuple = (String, u16);
impl Address {
fn from_tuple(tuple: &[PyObjectRef], vm: &VirtualMachine) -> PyResult<Self> {
let host = PyStrRef::try_from_object(vm, tuple[0].clone())?;
let host = if host.borrow_value().is_empty() {
PyStr::from("0.0.0.0").into_ref(vm)
} else {
host
};
let port = u16::try_from_object(vm, tuple[1].clone())?;
Ok(Address { host, port })
}
}
fn get_addr_tuple<A: Into<socket2::SockAddr>>(addr: A) -> AddrTuple {
fn get_addr_tuple<A: Into<socket2::SockAddr>>(addr: A, vm: &VirtualMachine) -> PyObjectRef {
let addr = addr.into();
if let Some(addr) = addr.as_inet() {
(addr.ip().to_string(), addr.port())
} else if let Some(addr) = addr.as_inet6() {
(addr.ip().to_string(), addr.port())
} else {
(String::new(), 0)
match addr.as_std() {
Some(SocketAddr::V4(addr)) => (addr.ip().to_string(), addr.port()).into_pyobject(vm),
Some(SocketAddr::V6(addr)) => (
addr.ip().to_string(),
addr.port(),
addr.flowinfo(),
addr.scope_id(),
)
.into_pyobject(vm),
// TODO: support AF_UNIX et al
None => (String::new(), 0).into_pyobject(vm),
}
}
@@ -777,7 +851,7 @@ fn _socket_getaddrinfo(opts: GAIOptions, vm: &VirtualMachine) -> PyResult {
vm.ctx.new_int(ai.socktype),
vm.ctx.new_int(ai.protocol),
ai.canonname.into_pyobject(vm),
get_addr_tuple(ai.sockaddr).into_pyobject(vm),
get_addr_tuple(ai.sockaddr, vm),
])
})
})
@@ -880,11 +954,8 @@ fn _socket_getnameinfo(
flags: i32,
vm: &VirtualMachine,
) -> PyResult<(String, String)> {
let addr = get_addr(vm, address, None)?;
let nameinfo = addr
.as_std()
.and_then(|addr| dns_lookup::getnameinfo(&addr, flags).ok());
nameinfo.ok_or_else(|| {
let addr = get_addr(vm, address, Some)?;
dns_lookup::getnameinfo(&addr, flags).map_err(|_| {
let error_type = GAI_ERROR.get().unwrap().clone();
vm.new_exception_msg(
error_type,
@@ -893,39 +964,22 @@ fn _socket_getnameinfo(
})
}
fn get_addr(
fn get_addr<R>(
vm: &VirtualMachine,
addr: impl ToSocketAddrs,
domain: Option<i32>,
) -> PyResult<socket2::SockAddr> {
let sock_addr = match addr.to_socket_addrs() {
Ok(mut sock_addrs) => match domain {
None => sock_addrs.next(),
Some(dom) => {
if dom == i32::from(Domain::ipv4()) {
sock_addrs.find(|a| a.is_ipv4())
} else if dom == i32::from(Domain::ipv6()) {
sock_addrs.find(|a| a.is_ipv6())
} else {
sock_addrs.next()
}
}
},
Err(e) => {
let error_type = GAI_ERROR.get().unwrap().clone();
return Err(vm.new_exception_msg(error_type, e.to_string()));
}
};
match sock_addr {
Some(sock_addr) => Ok(sock_addr.into()),
None => {
let error_type = GAI_ERROR.get().unwrap().clone();
Err(vm.new_exception_msg(
error_type,
"nodename nor servname provided, or not known".to_owned(),
))
}
}
filter: impl FnMut(SocketAddr) -> Option<R>,
) -> PyResult<R> {
let mut sock_addrs = addr.to_socket_addrs().map_err(|e| {
let error_type = GAI_ERROR.get().unwrap().clone();
vm.new_exception_msg(error_type, e.to_string())
})?;
sock_addrs.find_map(filter).ok_or_else(|| {
let error_type = GAI_ERROR.get().unwrap().clone();
vm.new_exception_msg(
error_type,
"nodename nor servname provided, or not known".to_owned(),
)
})
}
fn sock_fileno(sock: &Socket) -> RawSocket {
@@ -1072,8 +1126,11 @@ pub fn make_module(vm: &VirtualMachine) -> PyObjectRef {
"SO_OOBINLINE" => ctx.new_int(c::SO_OOBINLINE),
"SO_ERROR" => ctx.new_int(c::SO_ERROR),
"TCP_NODELAY" => ctx.new_int(c::TCP_NODELAY),
"AI_ALL" => ctx.new_int(c::AI_ALL),
"AI_PASSIVE" => ctx.new_int(c::AI_PASSIVE),
"AI_NUMERICHOST" => ctx.new_int(c::AI_NUMERICHOST),
"AI_ALL" => ctx.new_int(c::AI_ALL),
"AI_ADDRCONFIG" => ctx.new_int(c::AI_ADDRCONFIG),
"AI_NUMERICSERV" => ctx.new_int(c::AI_NUMERICSERV),
"NI_NAMEREQD" => ctx.new_int(c::NI_NAMEREQD),
"NI_NOFQDN" => ctx.new_int(c::NI_NOFQDN),
"NI_NUMERICHOST" => ctx.new_int(c::NI_NUMERICHOST),

View File

@@ -3,13 +3,12 @@ use super::socket::PySocketRef;
use crate::builtins::{pytype, weakref::PyWeak, PyStrRef, PyTypeRef};
use crate::byteslike::{PyBytesLike, PyRwBytesLike};
use crate::common::lock::{PyRwLock, PyRwLockWriteGuard};
use crate::exceptions::{IntoPyException, PyBaseExceptionRef};
use crate::exceptions::{create_exception_type, IntoPyException, PyBaseExceptionRef};
use crate::function::OptionalArg;
use crate::pyobject::{
BorrowValue, Either, IntoPyObject, ItemProtocol, PyCallable, PyClassImpl, PyObjectRef, PyRef,
PyResult, PyValue, StaticType,
};
use crate::types::create_simple_type;
use crate::VirtualMachine;
use crossbeam_utils::atomic::AtomicCell;
@@ -43,6 +42,7 @@ mod sys {
pub fn X509_get_version(x: *const X509) -> c_long;
pub fn SSLv3_method() -> *const SSL_METHOD;
pub fn TLSv1_method() -> *const SSL_METHOD;
pub fn COMP_get_type(meth: *const COMP_METHOD) -> i32;
}
}
@@ -687,6 +687,37 @@ impl PySslSocket {
}
}
#[pymethod]
fn cipher(&self) -> Option<CipherTuple> {
self.stream
.read()
.ssl()
.current_cipher()
.map(cipher_to_tuple)
}
#[pymethod]
fn compression(&self) -> Option<&'static str> {
#[cfg(osslconf = "OPENSSL_NO_COMP")]
{
None
}
#[cfg(not(osslconf = "OPENSSL_NO_COMP"))]
{
let stream = self.stream.read();
let comp_method = unsafe { sys::SSL_get_current_compression(stream.ssl().as_ptr()) };
if comp_method.is_null() {
return None;
}
let typ = unsafe { sys::COMP_get_type(comp_method) };
let nid = Nid::from_raw(typ);
if nid == Nid::UNDEF {
return None;
}
nid.short_name().ok()
}
}
#[pymethod]
fn do_handshake(&self, vm: &VirtualMachine) -> PyResult<()> {
let mut stream = self.stream.write();
@@ -808,6 +839,12 @@ fn convert_ssl_error(
vm.new_exception_msg(cls, msg.to_owned())
}
type CipherTuple = (&'static str, &'static str, i32);
fn cipher_to_tuple(cipher: &ssl::SslCipherRef) -> CipherTuple {
(cipher.name(), cipher.version(), cipher.bits().secret)
}
fn cert_to_py(vm: &VirtualMachine, cert: &X509Ref, binary: bool) -> PyResult {
if binary {
cert.to_der()
@@ -914,7 +951,7 @@ pub fn make_module(vm: &VirtualMachine) -> PyObjectRef {
let ctx = &vm.ctx;
let ssl_error = create_simple_type("SSLError", &vm.ctx.exceptions.os_error);
let ssl_error = create_exception_type("SSLError", &vm.ctx.exceptions.os_error);
let ssl_cert_verification_error = pytype::new(
ctx.types.type_type.clone(),
@@ -922,14 +959,14 @@ pub fn make_module(vm: &VirtualMachine) -> PyObjectRef {
ssl_error.clone(),
vec![ssl_error.clone(), ctx.exceptions.value_error.clone()],
Default::default(),
Default::default(),
crate::exceptions::exception_slots(),
)
.unwrap();
let ssl_zero_return_error = create_simple_type("SSLZeroReturnError", &ssl_error);
let ssl_want_read_error = create_simple_type("SSLWantReadError", &ssl_error);
let ssl_want_write_error = create_simple_type("SSLWantWriteError", &ssl_error);
let ssl_syscall_error = create_simple_type("SSLSyscallError", &ssl_error);
let ssl_eof_error = create_simple_type("SSLEOFError", &ssl_error);
let ssl_zero_return_error = create_exception_type("SSLZeroReturnError", &ssl_error);
let ssl_want_read_error = create_exception_type("SSLWantReadError", &ssl_error);
let ssl_want_write_error = create_exception_type("SSLWantWriteError", &ssl_error);
let ssl_syscall_error = create_exception_type("SSLSyscallError", &ssl_error);
let ssl_eof_error = create_exception_type("SSLEOFError", &ssl_error);
let module = py_module!(vm, "_ssl", {
"_SSLContext" => PySslContext::make_class(ctx),

View File

@@ -191,6 +191,11 @@ impl PyRLock {
Ok(())
}
#[pymethod]
fn _is_owned(&self) -> bool {
self.mu.is_owned_by_current_thread()
}
#[pymethod(magic)]
fn exit(&self, _args: FuncArgs, vm: &VirtualMachine) -> PyResult<()> {
self.release(vm)

View File

@@ -273,6 +273,7 @@ mod decl {
Some(args.max_length)
};
let data = args.data.borrow_value();
let data = &*data;
let mut d = self.decompress.lock();
let orig_in = d.total_in();
@@ -339,7 +340,7 @@ mod decl {
#[derive(FromArgs)]
struct DecompressArgs {
#[pyarg(positional)]
data: PyBytesRef,
data: PyBytesLike,
#[pyarg(any, default = "0")]
max_length: usize,
}