From e6d9af43ca8f39feba15400e0618ce454a34f67e Mon Sep 17 00:00:00 2001 From: Noah <33094578+coolreader18@users.noreply.github.com> Date: Tue, 16 Feb 2021 18:37:14 -0600 Subject: [PATCH 01/18] Miscellaneous fixes for aiohttp --- vm/src/builtins/function.rs | 5 +++++ vm/src/builtins/tuple.rs | 9 +++++++++ vm/src/exceptions.rs | 20 ++++++++++++-------- vm/src/stdlib/socket.rs | 11 ++++++++++- vm/src/stdlib/ssl.rs | 17 ++++++++--------- 5 files changed, 44 insertions(+), 18 deletions(-) diff --git a/vm/src/builtins/function.rs b/vm/src/builtins/function.rs index 0538e7449..2c2a7aaed 100644 --- a/vm/src/builtins/function.rs +++ b/vm/src/builtins/function.rs @@ -324,6 +324,11 @@ impl PyFunction { self.globals.clone() } + #[pyproperty(magic)] + fn closure(&self) -> Option> { + self.closure.clone() + } + #[pyproperty(magic)] fn name(&self) -> PyStrRef { self.name.lock().clone() diff --git a/vm/src/builtins/tuple.rs b/vm/src/builtins/tuple.rs index 4bcb73449..1199c1dad 100644 --- a/vm/src/builtins/tuple.rs +++ b/vm/src/builtins/tuple.rs @@ -324,6 +324,15 @@ impl TryFromObject for PyTupleTyped { } } +impl Clone for PyTupleTyped { + fn clone(&self) -> Self { + Self { + tuple: self.tuple.clone(), + _marker: PhantomData, + } + } +} + impl<'a, T: TransmuteFromObject + 'a> BorrowValue<'a> for PyTupleTyped { type Borrowed = &'a [T]; #[inline] diff --git a/vm/src/exceptions.rs b/vm/src/exceptions.rs index 4a323bffa..2799a7d26 100644 --- a/vm/src/exceptions.rs +++ b/vm/src/exceptions.rs @@ -480,17 +480,21 @@ pub struct ExceptionZoo { pub resource_warning: PyTypeRef, } +pub fn exception_slots() -> crate::slots::PyTypeSlots { + let mut slots = PyBaseException::make_slots(); + // make_slots produces it with a tp_name of BaseException, which is usually wrong + slots.name.get_mut().take(); + slots +} + +pub fn create_exception_type(name: &str, base: &PyTypeRef) -> PyTypeRef { + create_type_with_slots(name, PyType::static_type(), base, exception_slots()) +} + impl ExceptionZoo { pub(crate) fn init() -> Self { let base_exception_type = PyBaseException::init_bare_type().clone(); - let create_exception_type = |name: &str, base: &PyTypeRef| { - create_type_with_slots( - name, - PyType::static_type(), - base, - PyBaseException::make_slots(), - ) - }; + // Sorted By Hierarchy then alphabetized. let system_exit = create_exception_type("SystemExit", &base_exception_type); let keyboard_interrupt = create_exception_type("KeyboardInterrupt", &base_exception_type); diff --git a/vm/src/stdlib/socket.rs b/vm/src/stdlib/socket.rs index 8bb43fae7..e203c9b86 100644 --- a/vm/src/stdlib/socket.rs +++ b/vm/src/stdlib/socket.rs @@ -34,7 +34,13 @@ mod c { #[cfg(target_os = "redox")] pub const AI_PASSIVE: c_int = 0x01; #[cfg(target_os = "redox")] + pub const AI_NUMERICHOST: c_int = 0x0004; + #[cfg(target_os = "redox")] pub const AI_ALL: c_int = 0x10; + #[cfg(target_os = "redox")] + pub const AI_ADDRCONFIG: c_int = 0x0020; + #[cfg(target_os = "redox")] + pub const AI_NUMERICSERV: c_int = 0x0400; // https://gitlab.redox-os.org/redox-os/relibc/-/blob/master/src/header/sys_socket/constants.rs #[cfg(target_os = "redox")] pub const SO_TYPE: c_int = 3; @@ -1072,8 +1078,11 @@ pub fn make_module(vm: &VirtualMachine) -> PyObjectRef { "SO_OOBINLINE" => ctx.new_int(c::SO_OOBINLINE), "SO_ERROR" => ctx.new_int(c::SO_ERROR), "TCP_NODELAY" => ctx.new_int(c::TCP_NODELAY), - "AI_ALL" => ctx.new_int(c::AI_ALL), "AI_PASSIVE" => ctx.new_int(c::AI_PASSIVE), + "AI_NUMERICHOST" => ctx.new_int(c::AI_NUMERICHOST), + "AI_ALL" => ctx.new_int(c::AI_ALL), + "AI_ADDRCONFIG" => ctx.new_int(c::AI_ADDRCONFIG), + "AI_NUMERICSERV" => ctx.new_int(c::AI_NUMERICSERV), "NI_NAMEREQD" => ctx.new_int(c::NI_NAMEREQD), "NI_NOFQDN" => ctx.new_int(c::NI_NOFQDN), "NI_NUMERICHOST" => ctx.new_int(c::NI_NUMERICHOST), diff --git a/vm/src/stdlib/ssl.rs b/vm/src/stdlib/ssl.rs index 209bcfb6a..d37e698e6 100644 --- a/vm/src/stdlib/ssl.rs +++ b/vm/src/stdlib/ssl.rs @@ -3,13 +3,12 @@ use super::socket::PySocketRef; use crate::builtins::{pytype, weakref::PyWeak, PyStrRef, PyTypeRef}; use crate::byteslike::{PyBytesLike, PyRwBytesLike}; use crate::common::lock::{PyRwLock, PyRwLockWriteGuard}; -use crate::exceptions::{IntoPyException, PyBaseExceptionRef}; +use crate::exceptions::{create_exception_type, IntoPyException, PyBaseExceptionRef}; use crate::function::OptionalArg; use crate::pyobject::{ BorrowValue, Either, IntoPyObject, ItemProtocol, PyCallable, PyClassImpl, PyObjectRef, PyRef, PyResult, PyValue, StaticType, }; -use crate::types::create_simple_type; use crate::VirtualMachine; use crossbeam_utils::atomic::AtomicCell; @@ -914,7 +913,7 @@ pub fn make_module(vm: &VirtualMachine) -> PyObjectRef { let ctx = &vm.ctx; - let ssl_error = create_simple_type("SSLError", &vm.ctx.exceptions.os_error); + let ssl_error = create_exception_type("SSLError", &vm.ctx.exceptions.os_error); let ssl_cert_verification_error = pytype::new( ctx.types.type_type.clone(), @@ -922,14 +921,14 @@ pub fn make_module(vm: &VirtualMachine) -> PyObjectRef { ssl_error.clone(), vec![ssl_error.clone(), ctx.exceptions.value_error.clone()], Default::default(), - Default::default(), + crate::exceptions::exception_slots(), ) .unwrap(); - let ssl_zero_return_error = create_simple_type("SSLZeroReturnError", &ssl_error); - let ssl_want_read_error = create_simple_type("SSLWantReadError", &ssl_error); - let ssl_want_write_error = create_simple_type("SSLWantWriteError", &ssl_error); - let ssl_syscall_error = create_simple_type("SSLSyscallError", &ssl_error); - let ssl_eof_error = create_simple_type("SSLEOFError", &ssl_error); + let ssl_zero_return_error = create_exception_type("SSLZeroReturnError", &ssl_error); + let ssl_want_read_error = create_exception_type("SSLWantReadError", &ssl_error); + let ssl_want_write_error = create_exception_type("SSLWantWriteError", &ssl_error); + let ssl_syscall_error = create_exception_type("SSLSyscallError", &ssl_error); + let ssl_eof_error = create_exception_type("SSLEOFError", &ssl_error); let module = py_module!(vm, "_ssl", { "_SSLContext" => PySslContext::make_class(ctx), From 84a5a2c8d62339002964adc8ae2367a6115d36cb Mon Sep 17 00:00:00 2001 From: Noah <33094578+coolreader18@users.noreply.github.com> Date: Tue, 16 Feb 2021 18:37:53 -0600 Subject: [PATCH 02/18] Add netrc.py from CPython 3.8 --- Lib/netrc.py | 139 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 139 insertions(+) create mode 100644 Lib/netrc.py diff --git a/Lib/netrc.py b/Lib/netrc.py new file mode 100644 index 000000000..f0ae48cfe --- /dev/null +++ b/Lib/netrc.py @@ -0,0 +1,139 @@ +"""An object-oriented interface to .netrc files.""" + +# Module and documentation by Eric S. Raymond, 21 Dec 1998 + +import os, shlex, stat + +__all__ = ["netrc", "NetrcParseError"] + + +class NetrcParseError(Exception): + """Exception raised on syntax errors in the .netrc file.""" + def __init__(self, msg, filename=None, lineno=None): + self.filename = filename + self.lineno = lineno + self.msg = msg + Exception.__init__(self, msg) + + def __str__(self): + return "%s (%s, line %s)" % (self.msg, self.filename, self.lineno) + + +class netrc: + def __init__(self, file=None): + default_netrc = file is None + if file is None: + file = os.path.join(os.path.expanduser("~"), ".netrc") + self.hosts = {} + self.macros = {} + with open(file) as fp: + self._parse(file, fp, default_netrc) + + def _parse(self, file, fp, default_netrc): + lexer = shlex.shlex(fp) + lexer.wordchars += r"""!"#$%&'()*+,-./:;<=>?@[\]^_`{|}~""" + lexer.commenters = lexer.commenters.replace('#', '') + while 1: + # Look for a machine, default, or macdef top-level keyword + saved_lineno = lexer.lineno + toplevel = tt = lexer.get_token() + if not tt: + break + elif tt[0] == '#': + if lexer.lineno == saved_lineno and len(tt) == 1: + lexer.instream.readline() + continue + elif tt == 'machine': + entryname = lexer.get_token() + elif tt == 'default': + entryname = 'default' + elif tt == 'macdef': # Just skip to end of macdefs + entryname = lexer.get_token() + self.macros[entryname] = [] + lexer.whitespace = ' \t' + while 1: + line = lexer.instream.readline() + if not line or line == '\012': + lexer.whitespace = ' \t\r\n' + break + self.macros[entryname].append(line) + continue + else: + raise NetrcParseError( + "bad toplevel token %r" % tt, file, lexer.lineno) + + # We're looking at start of an entry for a named machine or default. + login = '' + account = password = None + self.hosts[entryname] = {} + while 1: + tt = lexer.get_token() + if (tt.startswith('#') or + tt in {'', 'machine', 'default', 'macdef'}): + if password: + self.hosts[entryname] = (login, account, password) + lexer.push_token(tt) + break + else: + raise NetrcParseError( + "malformed %s entry %s terminated by %s" + % (toplevel, entryname, repr(tt)), + file, lexer.lineno) + elif tt == 'login' or tt == 'user': + login = lexer.get_token() + elif tt == 'account': + account = lexer.get_token() + elif tt == 'password': + if os.name == 'posix' and default_netrc: + prop = os.fstat(fp.fileno()) + if prop.st_uid != os.getuid(): + import pwd + try: + fowner = pwd.getpwuid(prop.st_uid)[0] + except KeyError: + fowner = 'uid %s' % prop.st_uid + try: + user = pwd.getpwuid(os.getuid())[0] + except KeyError: + user = 'uid %s' % os.getuid() + raise NetrcParseError( + ("~/.netrc file owner (%s) does not match" + " current user (%s)") % (fowner, user), + file, lexer.lineno) + if (prop.st_mode & (stat.S_IRWXG | stat.S_IRWXO)): + raise NetrcParseError( + "~/.netrc access too permissive: access" + " permissions must restrict access to only" + " the owner", file, lexer.lineno) + password = lexer.get_token() + else: + raise NetrcParseError("bad follower token %r" % tt, + file, lexer.lineno) + + def authenticators(self, host): + """Return a (user, account, password) tuple for given host.""" + if host in self.hosts: + return self.hosts[host] + elif 'default' in self.hosts: + return self.hosts['default'] + else: + return None + + def __repr__(self): + """Dump the class data in the format of a .netrc file.""" + rep = "" + for host in self.hosts.keys(): + attrs = self.hosts[host] + rep += f"machine {host}\n\tlogin {attrs[0]}\n" + if attrs[1]: + rep += f"\taccount {attrs[1]}\n" + rep += f"\tpassword {attrs[2]}\n" + for macro in self.macros.keys(): + rep += f"macdef {macro}\n" + for line in self.macros[macro]: + rep += line + rep += "\n" + return rep + +if __name__ == '__main__': + print(netrc()) From 09320f14228e19845d07bfaa2a2d92f055836bb1 Mon Sep 17 00:00:00 2001 From: Noah <33094578+coolreader18@users.noreply.github.com> Date: Tue, 16 Feb 2021 20:55:15 -0600 Subject: [PATCH 03/18] Update threading.py to version from CPython 3.8 --- Lib/threading.py | 265 +++++++++++++++++++++++++++++++++++------------ 1 file changed, 196 insertions(+), 69 deletions(-) diff --git a/Lib/threading.py b/Lib/threading.py index bb41456fb..813dae2aa 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -5,7 +5,6 @@ import sys as _sys import _thread from time import monotonic as _time -from traceback import format_exc as _format_exc from _weakrefset import WeakSet from itertools import islice as _islice, count as _count try: @@ -27,13 +26,20 @@ __all__ = ['get_ident', 'active_count', 'Condition', 'current_thread', 'enumerate', 'main_thread', 'TIMEOUT_MAX', 'Event', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', 'Barrier', 'BrokenBarrierError', 'Timer', 'ThreadError', - 'setprofile', 'settrace', 'local', 'stack_size'] + 'setprofile', 'settrace', 'local', 'stack_size', + 'excepthook', 'ExceptHookArgs'] # Rename some stuff so "from threading import *" is safe _start_new_thread = _thread.start_new_thread _allocate_lock = _thread.allocate_lock _set_sentinel = _thread._set_sentinel get_ident = _thread.get_ident +try: + get_native_id = _thread.get_native_id + _HAVE_THREAD_NATIVE_ID = True + __all__.append('get_native_id') +except AttributeError: + _HAVE_THREAD_NATIVE_ID = False ThreadError = _thread.error try: _CRLock = _thread.RLock @@ -568,8 +574,8 @@ class Barrier: """Implements a Barrier. Useful for synchronizing a fixed number of threads at known synchronization - points. Threads block on 'wait()' and are simultaneously once they have all - made that call. + points. Threads block on 'wait()' and are simultaneously awoken once they + have all made that call. """ @@ -578,7 +584,7 @@ class Barrier: 'action' is a callable which, when supplied, will be called by one of the threads after they have all entered the barrier and just prior to - releasing them all. If a 'timeout' is provided, it is uses as the + releasing them all. If a 'timeout' is provided, it is used as the default for all subsequent 'wait()' calls. """ @@ -733,6 +739,11 @@ _active_limbo_lock = _allocate_lock() _active = {} # maps thread id to Thread object _limbo = {} _dangling = WeakSet() +# Set of Thread._tstate_lock locks of non-daemon threads used by _shutdown() +# to wait until all Python thread states get deleted: +# see Thread._set_tstate_lock(). +_shutdown_locks_lock = _allocate_lock() +_shutdown_locks = set() # Main class for threads @@ -746,14 +757,6 @@ class Thread: """ _initialized = False - # Need to store a reference to sys.exc_info for printing - # out exceptions when a thread tries to use a global var. during interp. - # shutdown and thus raises an exception about trying to perform some - # operation on/with a NoneType - _exc_info = _sys.exc_info - # Keep sys.exc_clear too to clear the exception just before - # allowing .join() to return. - #XXX __exc_clear = _sys.exc_clear def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None): @@ -790,13 +793,15 @@ class Thread: else: self._daemonic = current_thread().daemon self._ident = None + if _HAVE_THREAD_NATIVE_ID: + self._native_id = None self._tstate_lock = None self._started = Event() self._is_stopped = False self._initialized = True - # sys.stderr is not stored in the class like - # sys.exc_info since it can be changed between instances + # Copy of sys.stderr used by self._invoke_excepthook() self._stderr = _sys.stderr + self._invoke_excepthook = _make_invoke_excepthook() # For debugging and _after_fork() _dangling.add(self) @@ -891,6 +896,10 @@ class Thread: def _set_ident(self): self._ident = get_ident() + if _HAVE_THREAD_NATIVE_ID: + def _set_native_id(self): + self._native_id = get_native_id() + def _set_tstate_lock(self): """ Set a lock object which will be released by the interpreter when @@ -899,10 +908,16 @@ class Thread: self._tstate_lock = _set_sentinel() self._tstate_lock.acquire() + if not self.daemon: + with _shutdown_locks_lock: + _shutdown_locks.add(self._tstate_lock) + def _bootstrap_inner(self): try: self._set_ident() self._set_tstate_lock() + if _HAVE_THREAD_NATIVE_ID: + self._set_native_id() self._started.set() with _active_limbo_lock: _active[self._ident] = self @@ -915,47 +930,8 @@ class Thread: try: self.run() - except SystemExit: - pass except: - # If sys.stderr is no more (most likely from interpreter - # shutdown) use self._stderr. Otherwise still use sys (as in - # _sys) in case sys.stderr was redefined since the creation of - # self. - if _sys and _sys.stderr is not None: - print("Exception in thread %s:\n%s" % - (self.name, _format_exc()), file=_sys.stderr) - elif self._stderr is not None: - # Do the best job possible w/o a huge amt. of code to - # approximate a traceback (code ideas from - # Lib/traceback.py) - exc_type, exc_value, exc_tb = self._exc_info() - try: - print(( - "Exception in thread " + self.name + - " (most likely raised during interpreter shutdown):"), file=self._stderr) - print(( - "Traceback (most recent call last):"), file=self._stderr) - while exc_tb: - print(( - ' File "%s", line %s, in %s' % - (exc_tb.tb_frame.f_code.co_filename, - exc_tb.tb_lineno, - exc_tb.tb_frame.f_code.co_name)), file=self._stderr) - exc_tb = exc_tb.tb_next - print(("%s: %s" % (exc_type, exc_value)), file=self._stderr) - self._stderr.flush() - # Make sure that exc_tb gets deleted since it is a memory - # hog; deleting everything else is just for thoroughness - finally: - del exc_type, exc_value, exc_tb - finally: - # Prevent a race in - # test_threading.test_no_refcycle_through_target when - # the exception keeps the target alive past when we - # assert that it's dead. - #XXX self._exc_clear() - pass + self._invoke_excepthook(self) finally: with _active_limbo_lock: try: @@ -987,6 +963,9 @@ class Thread: assert not lock.locked() self._is_stopped = True self._tstate_lock = None + if not self.daemon: + with _shutdown_locks_lock: + _shutdown_locks.discard(lock) def _delete(self): "Remove current thread from the dict of currently running threads." @@ -1007,7 +986,7 @@ class Thread: When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call - isAlive() after join() to decide whether a timeout happened -- if the + is_alive() after join() to decide whether a timeout happened -- if the thread is still alive, the join() call timed out. When the timeout argument is not present or None, the operation will @@ -1077,6 +1056,18 @@ class Thread: assert self._initialized, "Thread.__init__() not called" return self._ident + if _HAVE_THREAD_NATIVE_ID: + @property + def native_id(self): + """Native integral thread ID of this thread, or None if it has not been started. + + This is a non-negative integer. See the get_native_id() function. + This represents the Thread ID as reported by the kernel. + + """ + assert self._initialized, "Thread.__init__() not called" + return self._native_id + def is_alive(self): """Return whether the thread is alive. @@ -1091,7 +1082,15 @@ class Thread: self._wait_for_tstate_lock(False) return not self._is_stopped - isAlive = is_alive + def isAlive(self): + """Return whether the thread is alive. + + This method is deprecated, use is_alive() instead. + """ + import warnings + warnings.warn('isAlive() is deprecated, use is_alive() instead', + DeprecationWarning, stacklevel=2) + return self.is_alive() @property def daemon(self): @@ -1102,8 +1101,7 @@ class Thread: main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False. - The entire Python program exits when no alive non-daemon threads are - left. + The entire Python program exits when only daemon threads are left. """ assert self._initialized, "Thread.__init__() not called" @@ -1129,6 +1127,104 @@ class Thread: def setName(self, name): self.name = name + +try: + from _thread import (_excepthook as excepthook, + _ExceptHookArgs as ExceptHookArgs) +except ImportError: + # Simple Python implementation if _thread._excepthook() is not available + from traceback import print_exception as _print_exception + from collections import namedtuple + + _ExceptHookArgs = namedtuple( + 'ExceptHookArgs', + 'exc_type exc_value exc_traceback thread') + + def ExceptHookArgs(args): + return _ExceptHookArgs(*args) + + def excepthook(args, /): + """ + Handle uncaught Thread.run() exception. + """ + if args.exc_type == SystemExit: + # silently ignore SystemExit + return + + if _sys is not None and _sys.stderr is not None: + stderr = _sys.stderr + elif args.thread is not None: + stderr = args.thread._stderr + if stderr is None: + # do nothing if sys.stderr is None and sys.stderr was None + # when the thread was created + return + else: + # do nothing if sys.stderr is None and args.thread is None + return + + if args.thread is not None: + name = args.thread.name + else: + name = get_ident() + print(f"Exception in thread {name}:", + file=stderr, flush=True) + _print_exception(args.exc_type, args.exc_value, args.exc_traceback, + file=stderr) + stderr.flush() + + +def _make_invoke_excepthook(): + # Create a local namespace to ensure that variables remain alive + # when _invoke_excepthook() is called, even if it is called late during + # Python shutdown. It is mostly needed for daemon threads. + + old_excepthook = excepthook + old_sys_excepthook = _sys.excepthook + if old_excepthook is None: + raise RuntimeError("threading.excepthook is None") + if old_sys_excepthook is None: + raise RuntimeError("sys.excepthook is None") + + sys_exc_info = _sys.exc_info + local_print = print + local_sys = _sys + + def invoke_excepthook(thread): + global excepthook + try: + hook = excepthook + if hook is None: + hook = old_excepthook + + args = ExceptHookArgs([*sys_exc_info(), thread]) + + hook(args) + except Exception as exc: + exc.__suppress_context__ = True + del exc + + if local_sys is not None and local_sys.stderr is not None: + stderr = local_sys.stderr + else: + stderr = thread._stderr + + local_print("Exception in threading.excepthook:", + file=stderr, flush=True) + + if local_sys is not None and local_sys.excepthook is not None: + sys_excepthook = local_sys.excepthook + else: + sys_excepthook = old_sys_excepthook + + sys_excepthook(*sys_exc_info()) + finally: + # Break reference cycle (exception stored in a variable) + args = None + + return invoke_excepthook + + # The timer class was contributed by Itamar Shtull-Trauring class Timer(Thread): @@ -1168,6 +1264,8 @@ class _MainThread(Thread): self._set_tstate_lock() self._started.set() self._set_ident() + if _HAVE_THREAD_NATIVE_ID: + self._set_native_id() with _active_limbo_lock: _active[self._ident] = self @@ -1187,6 +1285,8 @@ class _DummyThread(Thread): self._started.set() self._set_ident() + if _HAVE_THREAD_NATIVE_ID: + self._set_native_id() with _active_limbo_lock: _active[self._ident] = self @@ -1253,6 +1353,9 @@ from _thread import stack_size _main_thread = _MainThread() def _shutdown(): + """ + Wait until the Python thread state of all non-daemon threads get deleted. + """ # Obscure: other threads may be waiting to join _main_thread. That's # dubious, but some code does it. We can't wait for C code to release # the main thread's tstate_lock - that won't happen until the interpreter @@ -1261,6 +1364,8 @@ def _shutdown(): if _main_thread._is_stopped: # _shutdown() was already called return + + # Main thread tlock = _main_thread._tstate_lock # The main thread isn't finished yet, so its thread state lock can't have # been released. @@ -1268,16 +1373,24 @@ def _shutdown(): assert tlock.locked() tlock.release() _main_thread._stop() - t = _pickSomeNonDaemonThread() - while t: - t.join() - t = _pickSomeNonDaemonThread() -def _pickSomeNonDaemonThread(): - for t in enumerate(): - if not t.daemon and t.is_alive(): - return t - return None + # Join all non-deamon threads + while True: + with _shutdown_locks_lock: + locks = list(_shutdown_locks) + _shutdown_locks.clear() + + if not locks: + break + + for lock in locks: + # mimick Thread.join() + lock.acquire() + lock.release() + + # new threads can be spawned while we were waiting for the other + # threads to complete + def main_thread(): """Return the main thread object. @@ -1303,12 +1416,26 @@ def _after_fork(): # Reset _active_limbo_lock, in case we forked while the lock was held # by another (non-forked) thread. http://bugs.python.org/issue874900 global _active_limbo_lock, _main_thread + global _shutdown_locks_lock, _shutdown_locks _active_limbo_lock = _allocate_lock() # fork() only copied the current thread; clear references to others. new_active = {} - current = current_thread() + + try: + current = _active[get_ident()] + except KeyError: + # fork() was called in a thread which was not spawned + # by threading.Thread. For example, a thread spawned + # by thread.start_new_thread(). + current = _MainThread() + _main_thread = current + + # reset _shutdown() locks: threads re-register their _tstate_lock below + _shutdown_locks_lock = _allocate_lock() + _shutdown_locks = set() + with _active_limbo_lock: # Dangling thread instances must still have their locks reset, # because someone may join() them. From ce2d81e8e10e81811527c1f86177060b52920337 Mon Sep 17 00:00:00 2001 From: Noah <33094578+coolreader18@users.noreply.github.com> Date: Tue, 16 Feb 2021 20:59:15 -0600 Subject: [PATCH 04/18] Update concurrent/ to version from CPython 3.8 --- Lib/concurrent/futures/__init__.py | 39 ++- Lib/concurrent/futures/_base.py | 89 +++++-- Lib/concurrent/futures/process.py | 371 ++++++++++++++++++++++------- Lib/concurrent/futures/thread.py | 118 ++++++++- 4 files changed, 503 insertions(+), 114 deletions(-) diff --git a/Lib/concurrent/futures/__init__.py b/Lib/concurrent/futures/__init__.py index b5231f8aa..d746aeac5 100644 --- a/Lib/concurrent/futures/__init__.py +++ b/Lib/concurrent/futures/__init__.py @@ -10,9 +10,44 @@ from concurrent.futures._base import (FIRST_COMPLETED, ALL_COMPLETED, CancelledError, TimeoutError, + InvalidStateError, + BrokenExecutor, Future, Executor, wait, as_completed) -from concurrent.futures.process import ProcessPoolExecutor -from concurrent.futures.thread import ThreadPoolExecutor + +__all__ = ( + 'FIRST_COMPLETED', + 'FIRST_EXCEPTION', + 'ALL_COMPLETED', + 'CancelledError', + 'TimeoutError', + 'BrokenExecutor', + 'Future', + 'Executor', + 'wait', + 'as_completed', + 'ProcessPoolExecutor', + 'ThreadPoolExecutor', +) + + +def __dir__(): + return __all__ + ('__author__', '__doc__') + + +def __getattr__(name): + global ProcessPoolExecutor, ThreadPoolExecutor + + if name == 'ProcessPoolExecutor': + from .process import ProcessPoolExecutor as pe + ProcessPoolExecutor = pe + return pe + + if name == 'ThreadPoolExecutor': + from .thread import ThreadPoolExecutor as te + ThreadPoolExecutor = te + return te + + raise AttributeError(f"module {__name__} has no attribute {name}") diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index 295489c93..6001e3bdb 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -53,6 +53,10 @@ class TimeoutError(Error): """The operation exceeded the given deadline.""" pass +class InvalidStateError(Error): + """The operation is not allowed in this state.""" + pass + class _Waiter(object): """Provides the event that wait() and as_completed() block on.""" def __init__(self): @@ -170,6 +174,29 @@ def _create_and_install_waiters(fs, return_when): return waiter + +def _yield_finished_futures(fs, waiter, ref_collect): + """ + Iterate on the list *fs*, yielding finished futures one by one in + reverse order. + Before yielding a future, *waiter* is removed from its waiters + and the future is removed from each set in the collection of sets + *ref_collect*. + + The aim of this function is to avoid keeping stale references after + the future is yielded and before the iterator resumes. + """ + while fs: + f = fs[-1] + for futures_set in ref_collect: + futures_set.remove(f) + with f._condition: + f._waiters.remove(waiter) + del f + # Careful not to keep a reference to the popped value + yield fs.pop() + + def as_completed(fs, timeout=None): """An iterator over the given futures that yields each as it completes. @@ -189,28 +216,30 @@ def as_completed(fs, timeout=None): before the given timeout. """ if timeout is not None: - end_time = timeout + time.time() + end_time = timeout + time.monotonic() fs = set(fs) + total_futures = len(fs) with _AcquireFutures(fs): finished = set( f for f in fs if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) pending = fs - finished waiter = _create_and_install_waiters(fs, _AS_COMPLETED) - + finished = list(finished) try: - yield from finished + yield from _yield_finished_futures(finished, waiter, + ref_collect=(fs,)) while pending: if timeout is None: wait_timeout = None else: - wait_timeout = end_time - time.time() + wait_timeout = end_time - time.monotonic() if wait_timeout < 0: raise TimeoutError( '%d (of %d) futures unfinished' % ( - len(pending), len(fs))) + len(pending), total_futures)) waiter.event.wait(wait_timeout) @@ -219,11 +248,13 @@ def as_completed(fs, timeout=None): waiter.finished_futures = [] waiter.event.clear() - for future in finished: - yield future - pending.remove(future) + # reverse to keep finishing order + finished.reverse() + yield from _yield_finished_futures(finished, waiter, + ref_collect=(fs, pending)) finally: + # Remove waiter from unfinished futures for f in fs: with f._condition: f._waiters.remove(waiter) @@ -373,7 +404,10 @@ class Future(object): if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: self._done_callbacks.append(fn) return - fn(self) + try: + fn(self) + except Exception: + LOGGER.exception('exception calling callback for %r', self) def result(self, timeout=None): """Return the result of the call that the future represents. @@ -486,6 +520,8 @@ class Future(object): Should only be used by Executor implementations and unit tests. """ with self._condition: + if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}: + raise InvalidStateError('{}: {!r}'.format(self._state, self)) self._result = result self._state = FINISHED for waiter in self._waiters: @@ -499,6 +535,8 @@ class Future(object): Should only be used by Executor implementations and unit tests. """ with self._condition: + if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}: + raise InvalidStateError('{}: {!r}'.format(self._state, self)) self._exception = exception self._state = FINISHED for waiter in self._waiters: @@ -509,7 +547,7 @@ class Future(object): class Executor(object): """This is an abstract base class for concrete asynchronous executors.""" - def submit(self, fn, *args, **kwargs): + def submit(*args, **kwargs): """Submits a callable to be executed with the given arguments. Schedules the callable to be executed as fn(*args, **kwargs) and returns @@ -518,7 +556,21 @@ class Executor(object): Returns: A Future representing the given call. """ + if len(args) >= 2: + pass + elif not args: + raise TypeError("descriptor 'submit' of 'Executor' object " + "needs an argument") + elif 'fn' in kwargs: + import warnings + warnings.warn("Passing 'fn' as keyword argument is deprecated", + DeprecationWarning, stacklevel=2) + else: + raise TypeError('submit expected at least 1 positional argument, ' + 'got %d' % (len(args)-1)) + raise NotImplementedError() + submit.__text_signature__ = '($self, fn, /, *args, **kwargs)' def map(self, fn, *iterables, timeout=None, chunksize=1): """Returns an iterator equivalent to map(fn, iter). @@ -543,7 +595,7 @@ class Executor(object): Exception: If fn(*args) raises for any values. """ if timeout is not None: - end_time = timeout + time.time() + end_time = timeout + time.monotonic() fs = [self.submit(fn, *args) for args in zip(*iterables)] @@ -551,11 +603,14 @@ class Executor(object): # before the first iterator value is required. def result_iterator(): try: - for future in fs: + # reverse to keep finishing order + fs.reverse() + while fs: + # Careful not to keep a reference to the popped future if timeout is None: - yield future.result() + yield fs.pop().result() else: - yield future.result(end_time - time.time()) + yield fs.pop().result(end_time - time.monotonic()) finally: for future in fs: future.cancel() @@ -580,3 +635,9 @@ class Executor(object): def __exit__(self, exc_type, exc_val, exc_tb): self.shutdown(wait=True) return False + + +class BrokenExecutor(RuntimeError): + """ + Raised when a executor has become non-functional after a severe failure. + """ diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 8f1d71419..2b2b78eed 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -3,15 +3,15 @@ """Implements ProcessPoolExecutor. -The follow diagram and text describe the data-flow through the system: +The following diagram and text describe the data-flow through the system: |======================= In-process =====================|== Out-of-process ==| +----------+ +----------+ +--------+ +-----------+ +---------+ -| | => | Work Ids | => | | => | Call Q | => | | -| | +----------+ | | +-----------+ | | -| | | ... | | | | ... | | | -| | | 6 | | | | 5, call() | | | +| | => | Work Ids | | | | Call Q | | Process | +| | +----------+ | | +-----------+ | Pool | +| | | ... | | | | ... | +---------+ +| | | 6 | => | | => | 5, call() | => | | | | | 7 | | | | ... | | | | Process | | ... | | Local | +-----------+ | Process | | Pool | +----------+ | Worker | | #1..n | @@ -50,13 +50,14 @@ import os from concurrent.futures import _base import queue from queue import Full -import multiprocessing -from multiprocessing import SimpleQueue -from multiprocessing.connection import wait +import multiprocessing as mp +import multiprocessing.connection +from multiprocessing.queues import Queue import threading import weakref from functools import partial import itertools +import sys import traceback # Workers are created as daemon threads and processes. This is done to allow the @@ -73,16 +74,33 @@ import traceback # workers to exit when their work queues are empty and then waits until the # threads/processes finish. -_threads_queues = weakref.WeakKeyDictionary() -_shutdown = False +_threads_wakeups = weakref.WeakKeyDictionary() +_global_shutdown = False + + +class _ThreadWakeup: + def __init__(self): + self._reader, self._writer = mp.Pipe(duplex=False) + + def close(self): + self._writer.close() + self._reader.close() + + def wakeup(self): + self._writer.send_bytes(b"") + + def clear(self): + while self._reader.poll(): + self._reader.recv_bytes() + def _python_exit(): - global _shutdown - _shutdown = True - items = list(_threads_queues.items()) - for t, q in items: - q.put(None) - for t, q in items: + global _global_shutdown + _global_shutdown = True + items = list(_threads_wakeups.items()) + for _, thread_wakeup in items: + thread_wakeup.wakeup() + for t, _ in items: t.join() # Controls how many more calls than processes will be queued in the call queue. @@ -91,6 +109,13 @@ def _python_exit(): # (Futures in the call queue cannot be cancelled). EXTRA_QUEUED_CALLS = 1 + +# On Windows, WaitForMultipleObjects is used to wait for processes to finish. +# It can wait on, at most, 63 objects. There is an overhead of two objects: +# - the result queue reader +# - the thread wakeup reader +_MAX_WINDOWS_WORKERS = 63 - 2 + # Hack to embed stringification of remote traceback in local traceback class _RemoteTraceback(Exception): @@ -132,6 +157,26 @@ class _CallItem(object): self.args = args self.kwargs = kwargs + +class _SafeQueue(Queue): + """Safe Queue set exception to the future object linked to a job""" + def __init__(self, max_size=0, *, ctx, pending_work_items): + self.pending_work_items = pending_work_items + super().__init__(max_size, ctx=ctx) + + def _on_queue_feeder_error(self, e, obj): + if isinstance(obj, _CallItem): + tb = traceback.format_exception(type(e), e, e.__traceback__) + e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb))) + work_item = self.pending_work_items.pop(obj.work_id, None) + # work_item can be None if another process terminated. In this case, + # the queue_manager_thread fails all work_items with BrokenProcessPool + if work_item is not None: + work_item.future.set_exception(e) + else: + super()._on_queue_feeder_error(e, obj) + + def _get_chunks(*iterables, chunksize): """ Iterates over zip()ed iterables in chunks. """ it = zip(*iterables) @@ -152,19 +197,38 @@ def _process_chunk(fn, chunk): """ return [fn(*args) for args in chunk] -def _process_worker(call_queue, result_queue): + +def _sendback_result(result_queue, work_id, result=None, exception=None): + """Safely send back the given result or exception""" + try: + result_queue.put(_ResultItem(work_id, result=result, + exception=exception)) + except BaseException as e: + exc = _ExceptionWithTraceback(e, e.__traceback__) + result_queue.put(_ResultItem(work_id, exception=exc)) + + +def _process_worker(call_queue, result_queue, initializer, initargs): """Evaluates calls from call_queue and places the results in result_queue. This worker is run in a separate process. Args: - call_queue: A multiprocessing.Queue of _CallItems that will be read and + call_queue: A ctx.Queue of _CallItems that will be read and evaluated by the worker. - result_queue: A multiprocessing.Queue of _ResultItems that will written + result_queue: A ctx.Queue of _ResultItems that will written to by the worker. - shutdown: A multiprocessing.Event that will be set as a signal to the - worker that it should exit when call_queue is empty. + initializer: A callable initializer, or None + initargs: A tuple of args for the initializer """ + if initializer is not None: + try: + initializer(*initargs) + except BaseException: + _base.LOGGER.critical('Exception in initializer:', exc_info=True) + # The parent will notice that the process stopped and + # mark the pool broken + return while True: call_item = call_queue.get(block=True) if call_item is None: @@ -175,10 +239,15 @@ def _process_worker(call_queue, result_queue): r = call_item.fn(*call_item.args, **call_item.kwargs) except BaseException as e: exc = _ExceptionWithTraceback(e, e.__traceback__) - result_queue.put(_ResultItem(call_item.work_id, exception=exc)) + _sendback_result(result_queue, call_item.work_id, exception=exc) else: - result_queue.put(_ResultItem(call_item.work_id, - result=r)) + _sendback_result(result_queue, call_item.work_id, result=r) + del r + + # Liberate the resource as soon as possible, to avoid holding onto + # open files or shared memory that is not needed anymore + del call_item + def _add_call_item_to_queue(pending_work_items, work_ids, @@ -217,12 +286,14 @@ def _add_call_item_to_queue(pending_work_items, del pending_work_items[work_id] continue + def _queue_management_worker(executor_reference, processes, pending_work_items, work_ids_queue, call_queue, - result_queue): + result_queue, + thread_wakeup): """Manages the communication between this process and the worker processes. This function is run in a local thread. @@ -231,60 +302,96 @@ def _queue_management_worker(executor_reference, executor_reference: A weakref.ref to the ProcessPoolExecutor that owns this thread. Used to determine if the ProcessPoolExecutor has been garbage collected and that this function can exit. - process: A list of the multiprocessing.Process instances used as + process: A list of the ctx.Process instances used as workers. pending_work_items: A dict mapping work ids to _WorkItems e.g. {5: <_WorkItem...>, 6: <_WorkItem...>, ...} work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). - call_queue: A multiprocessing.Queue that will be filled with _CallItems + call_queue: A ctx.Queue that will be filled with _CallItems derived from _WorkItems for processing by the process workers. - result_queue: A multiprocessing.Queue of _ResultItems generated by the + result_queue: A ctx.SimpleQueue of _ResultItems generated by the process workers. + thread_wakeup: A _ThreadWakeup to allow waking up the + queue_manager_thread from the main Thread and avoid deadlocks + caused by permanently locked queues. """ executor = None def shutting_down(): - return _shutdown or executor is None or executor._shutdown_thread + return (_global_shutdown or executor is None + or executor._shutdown_thread) def shutdown_worker(): - # This is an upper bound - nb_children_alive = sum(p.is_alive() for p in processes.values()) - for i in range(0, nb_children_alive): - call_queue.put_nowait(None) + # This is an upper bound on the number of children alive. + n_children_alive = sum(p.is_alive() for p in processes.values()) + n_children_to_stop = n_children_alive + n_sentinels_sent = 0 + # Send the right number of sentinels, to make sure all children are + # properly terminated. + while n_sentinels_sent < n_children_to_stop and n_children_alive > 0: + for i in range(n_children_to_stop - n_sentinels_sent): + try: + call_queue.put_nowait(None) + n_sentinels_sent += 1 + except Full: + break + n_children_alive = sum(p.is_alive() for p in processes.values()) + # Release the queue's resources as soon as possible. call_queue.close() # If .join() is not called on the created processes then - # some multiprocessing.Queue methods may deadlock on Mac OS X. + # some ctx.Queue methods may deadlock on Mac OS X. for p in processes.values(): p.join() - reader = result_queue._reader + result_reader = result_queue._reader + wakeup_reader = thread_wakeup._reader + readers = [result_reader, wakeup_reader] while True: _add_call_item_to_queue(pending_work_items, work_ids_queue, call_queue) - sentinels = [p.sentinel for p in processes.values()] - assert sentinels - ready = wait([reader] + sentinels) - if reader in ready: - result_item = reader.recv() - else: + # Wait for a result to be ready in the result_queue while checking + # that all worker processes are still running, or for a wake up + # signal send. The wake up signals come either from new tasks being + # submitted, from the executor being shutdown/gc-ed, or from the + # shutdown of the python interpreter. + worker_sentinels = [p.sentinel for p in processes.values()] + ready = mp.connection.wait(readers + worker_sentinels) + + cause = None + is_broken = True + if result_reader in ready: + try: + result_item = result_reader.recv() + is_broken = False + except BaseException as e: + cause = traceback.format_exception(type(e), e, e.__traceback__) + + elif wakeup_reader in ready: + is_broken = False + result_item = None + thread_wakeup.clear() + if is_broken: # Mark the process pool broken so that submits fail right now. executor = executor_reference() if executor is not None: - executor._broken = True + executor._broken = ('A child process terminated ' + 'abruptly, the process pool is not ' + 'usable anymore') executor._shutdown_thread = True executor = None + bpe = BrokenProcessPool("A process in the process pool was " + "terminated abruptly while the future was " + "running or pending.") + if cause is not None: + bpe.__cause__ = _RemoteTraceback( + f"\n'''\n{''.join(cause)}'''") # All futures in flight must be marked failed for work_id, work_item in pending_work_items.items(): - work_item.future.set_exception( - BrokenProcessPool( - "A process in the process pool was " - "terminated abruptly while the future was " - "running or pending." - )) + work_item.future.set_exception(bpe) # Delete references to object. See issue16284 del work_item pending_work_items.clear() @@ -313,6 +420,9 @@ def _queue_management_worker(executor_reference, work_item.future.set_result(result_item.result) # Delete references to object. See issue16284 del work_item + # Delete reference to result_item + del result_item + # Check whether we should start shutting down. executor = executor_reference() # No more work items can be added if: @@ -321,6 +431,10 @@ def _queue_management_worker(executor_reference, # - The executor that owns this worker has been shutdown. if shutting_down(): try: + # Flag the executor as shutting down as early as possible if it + # is not gc-ed yet. + if executor is not None: + executor._shutdown_thread = True # Since no new work items can be added, it is safe to shutdown # this thread if there are no pending work items. if not pending_work_items: @@ -332,8 +446,11 @@ def _queue_management_worker(executor_reference, pass executor = None + _system_limits_checked = False _system_limited = None + + def _check_system_limits(): global _system_limits_checked, _system_limited if _system_limits_checked: @@ -353,11 +470,24 @@ def _check_system_limits(): # minimum number of semaphores available # according to POSIX return - _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max + _system_limited = ("system provides too few semaphores (%d" + " available, 256 necessary)" % nsems_max) raise NotImplementedError(_system_limited) -class BrokenProcessPool(RuntimeError): +def _chain_from_iterable_of_lists(iterable): + """ + Specialized implementation of itertools.chain.from_iterable. + Each item in *iterable* should be a list. This function is + careful not to keep references to yielded objects. + """ + for element in iterable: + element.reverse() + while element: + yield element.pop() + + +class BrokenProcessPool(_base.BrokenExecutor): """ Raised when a process in a ProcessPoolExecutor terminated abruptly while a future was in the running state. @@ -365,36 +495,48 @@ class BrokenProcessPool(RuntimeError): class ProcessPoolExecutor(_base.Executor): - def __init__(self, max_workers=None): + def __init__(self, max_workers=None, mp_context=None, + initializer=None, initargs=()): """Initializes a new ProcessPoolExecutor instance. Args: max_workers: The maximum number of processes that can be used to execute the given calls. If None or not given then as many worker processes will be created as the machine has processors. + mp_context: A multiprocessing context to launch the workers. This + object should provide SimpleQueue, Queue and Process. + initializer: A callable used to initialize worker processes. + initargs: A tuple of arguments to pass to the initializer. """ _check_system_limits() if max_workers is None: self._max_workers = os.cpu_count() or 1 + if sys.platform == 'win32': + self._max_workers = min(_MAX_WINDOWS_WORKERS, + self._max_workers) else: if max_workers <= 0: raise ValueError("max_workers must be greater than 0") + elif (sys.platform == 'win32' and + max_workers > _MAX_WINDOWS_WORKERS): + raise ValueError( + f"max_workers must be <= {_MAX_WINDOWS_WORKERS}") self._max_workers = max_workers - # Make the call queue slightly larger than the number of processes to - # prevent the worker processes from idling. But don't make it too big - # because futures in the call queue cannot be cancelled. - self._call_queue = multiprocessing.Queue(self._max_workers + - EXTRA_QUEUED_CALLS) - # Killed worker processes can produce spurious "broken pipe" - # tracebacks in the queue's own worker thread. But we detect killed - # processes anyway, so silence the tracebacks. - self._call_queue._ignore_epipe = True - self._result_queue = SimpleQueue() - self._work_ids = queue.Queue() + if mp_context is None: + mp_context = mp.get_context() + self._mp_context = mp_context + + if initializer is not None and not callable(initializer): + raise TypeError("initializer must be a callable") + self._initializer = initializer + self._initargs = initargs + + # Management thread self._queue_management_thread = None + # Map of pids to processes self._processes = {} @@ -405,42 +547,91 @@ class ProcessPoolExecutor(_base.Executor): self._queue_count = 0 self._pending_work_items = {} + # Create communication channels for the executor + # Make the call queue slightly larger than the number of processes to + # prevent the worker processes from idling. But don't make it too big + # because futures in the call queue cannot be cancelled. + queue_size = self._max_workers + EXTRA_QUEUED_CALLS + self._call_queue = _SafeQueue( + max_size=queue_size, ctx=self._mp_context, + pending_work_items=self._pending_work_items) + # Killed worker processes can produce spurious "broken pipe" + # tracebacks in the queue's own worker thread. But we detect killed + # processes anyway, so silence the tracebacks. + self._call_queue._ignore_epipe = True + self._result_queue = mp_context.SimpleQueue() + self._work_ids = queue.Queue() + + # _ThreadWakeup is a communication channel used to interrupt the wait + # of the main loop of queue_manager_thread from another thread (e.g. + # when calling executor.submit or executor.shutdown). We do not use the + # _result_queue to send the wakeup signal to the queue_manager_thread + # as it could result in a deadlock if a worker process dies with the + # _result_queue write lock still acquired. + self._queue_management_thread_wakeup = _ThreadWakeup() + def _start_queue_management_thread(self): - # When the executor gets lost, the weakref callback will wake up - # the queue management thread. - def weakref_cb(_, q=self._result_queue): - q.put(None) if self._queue_management_thread is None: + # When the executor gets garbarge collected, the weakref callback + # will wake up the queue management thread so that it can terminate + # if there is no pending work item. + def weakref_cb(_, + thread_wakeup=self._queue_management_thread_wakeup): + mp.util.debug('Executor collected: triggering callback for' + ' QueueManager wakeup') + thread_wakeup.wakeup() # Start the processes so that their sentinels are known. self._adjust_process_count() self._queue_management_thread = threading.Thread( - target=_queue_management_worker, - args=(weakref.ref(self, weakref_cb), - self._processes, - self._pending_work_items, - self._work_ids, - self._call_queue, - self._result_queue)) + target=_queue_management_worker, + args=(weakref.ref(self, weakref_cb), + self._processes, + self._pending_work_items, + self._work_ids, + self._call_queue, + self._result_queue, + self._queue_management_thread_wakeup), + name="QueueManagerThread") self._queue_management_thread.daemon = True self._queue_management_thread.start() - _threads_queues[self._queue_management_thread] = self._result_queue + _threads_wakeups[self._queue_management_thread] = \ + self._queue_management_thread_wakeup def _adjust_process_count(self): for _ in range(len(self._processes), self._max_workers): - p = multiprocessing.Process( - target=_process_worker, - args=(self._call_queue, - self._result_queue)) + p = self._mp_context.Process( + target=_process_worker, + args=(self._call_queue, + self._result_queue, + self._initializer, + self._initargs)) p.start() self._processes[p.pid] = p - def submit(self, fn, *args, **kwargs): + def submit(*args, **kwargs): + if len(args) >= 2: + self, fn, *args = args + elif not args: + raise TypeError("descriptor 'submit' of 'ProcessPoolExecutor' object " + "needs an argument") + elif 'fn' in kwargs: + fn = kwargs.pop('fn') + self, *args = args + import warnings + warnings.warn("Passing 'fn' as keyword argument is deprecated", + DeprecationWarning, stacklevel=2) + else: + raise TypeError('submit expected at least 1 positional argument, ' + 'got %d' % (len(args)-1)) + with self._shutdown_lock: if self._broken: - raise BrokenProcessPool('A child process terminated ' - 'abruptly, the process pool is not usable anymore') + raise BrokenProcessPool(self._broken) if self._shutdown_thread: raise RuntimeError('cannot schedule new futures after shutdown') + if _global_shutdown: + raise RuntimeError('cannot schedule new futures after ' + 'interpreter shutdown') f = _base.Future() w = _WorkItem(f, fn, args, kwargs) @@ -449,10 +640,11 @@ class ProcessPoolExecutor(_base.Executor): self._work_ids.put(self._queue_count) self._queue_count += 1 # Wake up queue management thread - self._result_queue.put(None) + self._queue_management_thread_wakeup.wakeup() self._start_queue_management_thread() return f + submit.__text_signature__ = _base.Executor.submit.__text_signature__ submit.__doc__ = _base.Executor.submit.__doc__ def map(self, fn, *iterables, timeout=None, chunksize=1): @@ -482,22 +674,31 @@ class ProcessPoolExecutor(_base.Executor): results = super().map(partial(_process_chunk, fn), _get_chunks(*iterables, chunksize=chunksize), timeout=timeout) - return itertools.chain.from_iterable(results) + return _chain_from_iterable_of_lists(results) def shutdown(self, wait=True): with self._shutdown_lock: self._shutdown_thread = True if self._queue_management_thread: # Wake up queue management thread - self._result_queue.put(None) + self._queue_management_thread_wakeup.wakeup() if wait: self._queue_management_thread.join() # To reduce the risk of opening too many files, remove references to # objects that use file descriptors. self._queue_management_thread = None - self._call_queue = None + if self._call_queue is not None: + self._call_queue.close() + if wait: + self._call_queue.join_thread() + self._call_queue = None self._result_queue = None self._processes = None + + if self._queue_management_thread_wakeup: + self._queue_management_thread_wakeup.close() + self._queue_management_thread_wakeup = None + shutdown.__doc__ = _base.Executor.shutdown.__doc__ atexit.register(_python_exit) diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index 03d276b63..9e669b219 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -7,6 +7,7 @@ __author__ = 'Brian Quinlan (brian@sweetapp.com)' import atexit from concurrent.futures import _base +import itertools import queue import threading import weakref @@ -40,6 +41,7 @@ def _python_exit(): atexit.register(_python_exit) + class _WorkItem(object): def __init__(self, future, fn, args, kwargs): self.future = future @@ -53,12 +55,24 @@ class _WorkItem(object): try: result = self.fn(*self.args, **self.kwargs) - except BaseException as e: - self.future.set_exception(e) + except BaseException as exc: + self.future.set_exception(exc) + # Break a reference cycle with the exception 'exc' + self = None else: self.future.set_result(result) -def _worker(executor_reference, work_queue): + +def _worker(executor_reference, work_queue, initializer, initargs): + if initializer is not None: + try: + initializer(*initargs) + except BaseException: + _base.LOGGER.critical('Exception in initializer:', exc_info=True) + executor = executor_reference() + if executor is not None: + executor._initializer_failed() + return try: while True: work_item = work_queue.get(block=True) @@ -66,13 +80,24 @@ def _worker(executor_reference, work_queue): work_item.run() # Delete references to object. See issue16284 del work_item + + # attempt to increment idle count + executor = executor_reference() + if executor is not None: + executor._idle_semaphore.release() + del executor continue + executor = executor_reference() # Exit if: # - The interpreter is shutting down OR # - The executor that owns the worker has been collected OR # - The executor that owns the worker has been shutdown. if _shutdown or executor is None or executor._shutdown: + # Flag the executor as shutting down as early as possible if it + # is not gc-ed yet. + if executor is not None: + executor._shutdown = True # Notice other workers work_queue.put(None) return @@ -80,33 +105,81 @@ def _worker(executor_reference, work_queue): except BaseException: _base.LOGGER.critical('Exception in worker', exc_info=True) + +class BrokenThreadPool(_base.BrokenExecutor): + """ + Raised when a worker thread in a ThreadPoolExecutor failed initializing. + """ + + class ThreadPoolExecutor(_base.Executor): - def __init__(self, max_workers=None, thread_name_prefix=''): + + # Used to assign unique thread names when thread_name_prefix is not supplied. + _counter = itertools.count().__next__ + + def __init__(self, max_workers=None, thread_name_prefix='', + initializer=None, initargs=()): """Initializes a new ThreadPoolExecutor instance. Args: max_workers: The maximum number of threads that can be used to execute the given calls. thread_name_prefix: An optional name prefix to give our threads. + initializer: A callable used to initialize worker threads. + initargs: A tuple of arguments to pass to the initializer. """ if max_workers is None: - # Use this number because ThreadPoolExecutor is often - # used to overlap I/O instead of CPU work. - max_workers = (os.cpu_count() or 1) * 5 + # ThreadPoolExecutor is often used to: + # * CPU bound task which releases GIL + # * I/O bound task (which releases GIL, of course) + # + # We use cpu_count + 4 for both types of tasks. + # But we limit it to 32 to avoid consuming surprisingly large resource + # on many core machine. + max_workers = min(32, (os.cpu_count() or 1) + 4) if max_workers <= 0: raise ValueError("max_workers must be greater than 0") + if initializer is not None and not callable(initializer): + raise TypeError("initializer must be a callable") + self._max_workers = max_workers - self._work_queue = queue.Queue() + self._work_queue = queue.SimpleQueue() + self._idle_semaphore = threading.Semaphore(0) self._threads = set() + self._broken = False self._shutdown = False self._shutdown_lock = threading.Lock() - self._thread_name_prefix = thread_name_prefix + self._thread_name_prefix = (thread_name_prefix or + ("ThreadPoolExecutor-%d" % self._counter())) + self._initializer = initializer + self._initargs = initargs + + def submit(*args, **kwargs): + if len(args) >= 2: + self, fn, *args = args + elif not args: + raise TypeError("descriptor 'submit' of 'ThreadPoolExecutor' object " + "needs an argument") + elif 'fn' in kwargs: + fn = kwargs.pop('fn') + self, *args = args + import warnings + warnings.warn("Passing 'fn' as keyword argument is deprecated", + DeprecationWarning, stacklevel=2) + else: + raise TypeError('submit expected at least 1 positional argument, ' + 'got %d' % (len(args)-1)) - def submit(self, fn, *args, **kwargs): with self._shutdown_lock: + if self._broken: + raise BrokenThreadPool(self._broken) + if self._shutdown: raise RuntimeError('cannot schedule new futures after shutdown') + if _shutdown: + raise RuntimeError('cannot schedule new futures after ' + 'interpreter shutdown') f = _base.Future() w = _WorkItem(f, fn, args, kwargs) @@ -114,27 +187,46 @@ class ThreadPoolExecutor(_base.Executor): self._work_queue.put(w) self._adjust_thread_count() return f + submit.__text_signature__ = _base.Executor.submit.__text_signature__ submit.__doc__ = _base.Executor.submit.__doc__ def _adjust_thread_count(self): + # if idle threads are available, don't spin new threads + if self._idle_semaphore.acquire(timeout=0): + return + # When the executor gets lost, the weakref callback will wake up # the worker threads. def weakref_cb(_, q=self._work_queue): q.put(None) - # TODO(bquinlan): Should avoid creating new threads if there are more - # idle threads than items in the work queue. + num_threads = len(self._threads) if num_threads < self._max_workers: thread_name = '%s_%d' % (self._thread_name_prefix or self, num_threads) t = threading.Thread(name=thread_name, target=_worker, args=(weakref.ref(self, weakref_cb), - self._work_queue)) + self._work_queue, + self._initializer, + self._initargs)) t.daemon = True t.start() self._threads.add(t) _threads_queues[t] = self._work_queue + def _initializer_failed(self): + with self._shutdown_lock: + self._broken = ('A thread initializer failed, the thread pool ' + 'is not usable anymore') + # Drain work queue and mark pending futures failed + while True: + try: + work_item = self._work_queue.get_nowait() + except queue.Empty: + break + if work_item is not None: + work_item.future.set_exception(BrokenThreadPool(self._broken)) + def shutdown(self, wait=True): with self._shutdown_lock: self._shutdown = True From 86f907ec0d684ae4b03c23f67a4618bd80c1dfa9 Mon Sep 17 00:00:00 2001 From: Noah <33094578+coolreader18@users.noreply.github.com> Date: Wed, 17 Feb 2021 17:11:39 -0600 Subject: [PATCH 05/18] Fix SetupAsyncWith stack level handling --- bytecode/src/lib.rs | 8 +- compiler/src/compile.rs | 17 +++++ ...pile__tests__nested_double_async_with.snap | 76 +++++++++++++++++++ vm/src/frame.rs | 2 + 4 files changed, 102 insertions(+), 1 deletion(-) create mode 100644 compiler/src/snapshots/rustpython_compiler_core__compile__tests__nested_double_async_with.snap diff --git a/bytecode/src/lib.rs b/bytecode/src/lib.rs index f2f906d24..49b8b7d20 100644 --- a/bytecode/src/lib.rs +++ b/bytecode/src/lib.rs @@ -903,7 +903,13 @@ impl Instruction { Reverse { .. } => 0, GetAwaitable => 0, BeforeAsyncWith => 1, - SetupAsyncWith { .. } => 0, + SetupAsyncWith { .. } => { + if jump { + -1 + } else { + 0 + } + } GetAIter => 0, GetANext => 1, EndAsyncFor => -1, diff --git a/compiler/src/compile.rs b/compiler/src/compile.rs index 626bec72e..e25bd7c01 100644 --- a/compiler/src/compile.rs +++ b/compiler/src/compile.rs @@ -2563,6 +2563,23 @@ if True and False and False: "\ if (True and False) or (False and True): pass +" + )); + } + + #[test] + fn test_nested_double_async_with() { + assert_dis_snapshot!(compile_exec( + "\ +for stop_exc in (StopIteration('spam'), StopAsyncIteration('ham')): + with self.subTest(type=type(stop_exc)): + try: + async with woohoo(): + raise stop_exc + except Exception as ex: + self.assertIs(ex, stop_exc) + else: + self.fail(f'{stop_exc} was suppressed') " )); } diff --git a/compiler/src/snapshots/rustpython_compiler_core__compile__tests__nested_double_async_with.snap b/compiler/src/snapshots/rustpython_compiler_core__compile__tests__nested_double_async_with.snap new file mode 100644 index 000000000..b6f7d64fd --- /dev/null +++ b/compiler/src/snapshots/rustpython_compiler_core__compile__tests__nested_double_async_with.snap @@ -0,0 +1,76 @@ +--- +source: compiler/src/compile.rs +expression: "compile_exec(\"\\\nfor stop_exc in (StopIteration('spam'), StopAsyncIteration('ham')):\n with self.subTest(type=type(stop_exc)):\n try:\n async with woohoo():\n raise stop_exc\n except Exception as ex:\n self.assertIs(ex, stop_exc)\n else:\n self.fail(f'{stop_exc} was suppressed')\n\")" +--- + 0 SetupLoop + 1 LoadNameAny (0, StopIteration) + 2 LoadConst ("spam") + 3 CallFunctionPositional (1) + 4 LoadNameAny (1, StopAsyncIteration) + 5 LoadConst ("ham") + 6 CallFunctionPositional (1) + 7 BuildTuple (2, false) + 8 GetIter + >> 9 ForIter (68) + 10 StoreLocal (2, stop_exc) + 11 LoadNameAny (3, self) + 12 LoadAttr (subTest) + 13 LoadNameAny (5, type) + 14 LoadNameAny (2, stop_exc) + 15 CallFunctionPositional (1) + 16 LoadConst (("type")) + 17 CallFunctionKeyword (1) + 18 SetupWith (65) + 19 Pop + 20 SetupExcept (40) + 21 LoadNameAny (6, woohoo) + 22 CallFunctionPositional (0) + 23 BeforeAsyncWith + 24 GetAwaitable + 25 LoadConst (None) + 26 YieldFrom + 27 SetupAsyncWith (33) + 28 Pop + 29 LoadNameAny (2, stop_exc) + 30 Raise (Raise) + 31 PopBlock + 32 EnterFinally + >> 33 WithCleanupStart + 34 GetAwaitable + 35 LoadConst (None) + 36 YieldFrom + 37 WithCleanupFinish + 38 PopBlock + 39 Jump (54) + >> 40 Duplicate + 41 LoadNameAny (7, Exception) + 42 CompareOperation (ExceptionMatch) + 43 JumpIfFalse (53) + 44 StoreLocal (8, ex) + 45 LoadNameAny (3, self) + 46 LoadAttr (assertIs) + 47 LoadNameAny (8, ex) + 48 LoadNameAny (2, stop_exc) + 49 CallFunctionPositional (2) + 50 Pop + 51 PopException + 52 Jump (63) + >> 53 Raise (Reraise) + >> 54 LoadNameAny (3, self) + 55 LoadAttr (fail) + 56 LoadConst ("") + 57 LoadNameAny (2, stop_exc) + 58 FormatValue (None) + 59 LoadConst (" was suppressed") + 60 BuildString (2) + 61 CallFunctionPositional (1) + 62 Pop + >> 63 PopBlock + 64 EnterFinally + >> 65 WithCleanupStart + 66 WithCleanupFinish + 67 Jump (9) + >> 68 PopBlock + 69 LoadConst (None) + 70 ReturnValue + diff --git a/vm/src/frame.rs b/vm/src/frame.rs index 67c8874d4..9c2ece204 100644 --- a/vm/src/frame.rs +++ b/vm/src/frame.rs @@ -803,7 +803,9 @@ impl ExecutingFrame<'_> { Ok(None) } bytecode::Instruction::SetupAsyncWith { end } => { + let enter_res = self.pop_value(); self.push_block(BlockType::Finally { handler: *end }); + self.push_value(enter_res); Ok(None) } bytecode::Instruction::WithCleanupStart => { From 58db1752b02bd17e77b0e8918bee13cc747487a8 Mon Sep 17 00:00:00 2001 From: Noah <33094578+coolreader18@users.noreply.github.com> Date: Wed, 17 Feb 2021 18:11:37 -0600 Subject: [PATCH 06/18] Fix ImportFrom implementation --- vm/src/frame.rs | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/vm/src/frame.rs b/vm/src/frame.rs index 9c2ece204..f0ea038ac 100644 --- a/vm/src/frame.rs +++ b/vm/src/frame.rs @@ -518,7 +518,11 @@ impl ExecutingFrame<'_> { } bytecode::Instruction::ImportNameless => self.import(vm, None), bytecode::Instruction::ImportStar => self.import_star(vm), - bytecode::Instruction::ImportFrom { idx } => self.import_from(vm, *idx), + bytecode::Instruction::ImportFrom { idx } => { + let obj = self.import_from(vm, *idx)?; + self.push_value(obj); + Ok(None) + } bytecode::Instruction::LoadFast(idx) => { let idx = *idx as usize; let x = self.fastlocals.lock()[idx].clone().ok_or_else(|| { @@ -1086,15 +1090,22 @@ impl ExecutingFrame<'_> { } #[cfg_attr(feature = "flame-it", flame("Frame"))] - fn import_from(&mut self, vm: &VirtualMachine, idx: bytecode::NameIdx) -> FrameResult { + fn import_from(&mut self, vm: &VirtualMachine, idx: bytecode::NameIdx) -> PyResult { let module = self.last_value(); let name = &self.code.names[idx as usize]; + let err = || vm.new_import_error(format!("cannot import name '{}'", name), name.clone()); // Load attribute, and transform any error into import error. - let obj = vm.get_attribute(module, name.clone()).map_err(|_| { - vm.new_import_error(format!("cannot import name '{}'", name), name.clone()) - })?; - self.push_value(obj); - Ok(None) + if let Some(obj) = vm.get_attribute_opt(module.clone(), name.clone())? { + return Ok(obj); + } + // fallback to importing '{module.__name__}.{name}' from sys.modules + let mod_name = vm.get_attribute(module, "__name__").map_err(|_| err())?; + let mod_name = mod_name.downcast::().map_err(|_| err())?; + let full_mod_name = format!("{}.{}", mod_name, name); + let sys_modules = vm + .get_attribute(vm.sys_module.clone(), "modules") + .map_err(|_| err())?; + sys_modules.get_item(full_mod_name, vm).map_err(|_| err()) } #[cfg_attr(feature = "flame-it", flame("Frame"))] From f63f155dfacdc8cc3a550f24f019febdb4b1fc7a Mon Sep 17 00:00:00 2001 From: Noah <33094578+coolreader18@users.noreply.github.com> Date: Wed, 17 Feb 2021 18:43:24 -0600 Subject: [PATCH 07/18] Remove circular imports hack in multiprocessing --- Lib/multiprocessing/reduction.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py index 95e0a74ce..c043c9a0d 100644 --- a/Lib/multiprocessing/reduction.py +++ b/Lib/multiprocessing/reduction.py @@ -16,10 +16,7 @@ import pickle import socket import sys -# XXX RustPython TODO: figure out why this doesn't work -# from . import context -from .context import * -context = sys.modules[__name__] +from . import context __all__ = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump'] From 7a84e2b6c97acde8e97f3341b8a8947a20570c43 Mon Sep 17 00:00:00 2001 From: Noah <33094578+coolreader18@users.noreply.github.com> Date: Wed, 17 Feb 2021 21:01:14 -0600 Subject: [PATCH 08/18] Respect module.__getattr__ --- vm/src/builtins/module.rs | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/vm/src/builtins/module.rs b/vm/src/builtins/module.rs index 06aee7dac..daa36e723 100644 --- a/vm/src/builtins/module.rs +++ b/vm/src/builtins/module.rs @@ -94,17 +94,24 @@ impl PyModule { impl SlotGetattro for PyModule { fn getattro(zelf: PyRef, name: PyStrRef, vm: &VirtualMachine) -> PyResult { - vm.generic_getattribute_opt(zelf.as_object().clone(), name.clone(), None)? - .ok_or_else(|| { - let module_name = if let Some(name) = Self::name(zelf, vm) { - format!(" '{}'", name) - } else { - "".to_owned() - }; - vm.new_attribute_error( - format!("module{} has no attribute '{}'", module_name, name,), - ) - }) + if let Some(attr) = + vm.generic_getattribute_opt(zelf.as_object().clone(), name.clone(), None)? + { + return Ok(attr); + } + if let Some(getattr) = zelf + .as_object() + .dict() + .and_then(|d| d.get_item("__getattr__", vm).ok()) + { + return vm.invoke(&getattr, (name,)); + } + let module_name = if let Some(name) = Self::name(zelf, vm) { + format!(" '{}'", name) + } else { + "".to_owned() + }; + Err(vm.new_attribute_error(format!("module{} has no attribute '{}'", module_name, name))) } } From bcdda5eef8e36f53c3c75b7f964e4c268834ae46 Mon Sep 17 00:00:00 2001 From: Noah <33094578+coolreader18@users.noreply.github.com> Date: Wed, 17 Feb 2021 21:01:24 -0600 Subject: [PATCH 09/18] Update queue.py to version from CPython 3.8 --- Lib/queue.py | 91 +++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 83 insertions(+), 8 deletions(-) diff --git a/Lib/queue.py b/Lib/queue.py index 572425e84..5bb0431e9 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -1,23 +1,29 @@ '''A multi-producer, multi-consumer queue.''' -try: - import threading -except ImportError: - import dummy_threading as threading +import threading from collections import deque from heapq import heappush, heappop from time import monotonic as time +try: + from _queue import SimpleQueue +except ImportError: + SimpleQueue = None -__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue'] +__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue', 'SimpleQueue'] -class Empty(Exception): - 'Exception raised by Queue.get(block=0)/get_nowait().' - pass + +try: + from _queue import Empty +except ImportError: + class Empty(Exception): + 'Exception raised by Queue.get(block=0)/get_nowait().' + pass class Full(Exception): 'Exception raised by Queue.put(block=0)/put_nowait().' pass + class Queue: '''Create a queue object with a given maximum size. @@ -244,3 +250,72 @@ class LifoQueue(Queue): def _get(self): return self.queue.pop() + + +class _PySimpleQueue: + '''Simple, unbounded FIFO queue. + + This pure Python implementation is not reentrant. + ''' + # Note: while this pure Python version provides fairness + # (by using a threading.Semaphore which is itself fair, being based + # on threading.Condition), fairness is not part of the API contract. + # This allows the C version to use a different implementation. + + def __init__(self): + self._queue = deque() + self._count = threading.Semaphore(0) + + def put(self, item, block=True, timeout=None): + '''Put the item on the queue. + + The optional 'block' and 'timeout' arguments are ignored, as this method + never blocks. They are provided for compatibility with the Queue class. + ''' + self._queue.append(item) + self._count.release() + + def get(self, block=True, timeout=None): + '''Remove and return an item from the queue. + + If optional args 'block' is true and 'timeout' is None (the default), + block if necessary until an item is available. If 'timeout' is + a non-negative number, it blocks at most 'timeout' seconds and raises + the Empty exception if no item was available within that time. + Otherwise ('block' is false), return an item if one is immediately + available, else raise the Empty exception ('timeout' is ignored + in that case). + ''' + if timeout is not None and timeout < 0: + raise ValueError("'timeout' must be a non-negative number") + if not self._count.acquire(block, timeout): + raise Empty + return self._queue.popleft() + + def put_nowait(self, item): + '''Put an item into the queue without blocking. + + This is exactly equivalent to `put(item)` and is only provided + for compatibility with the Queue class. + ''' + return self.put(item, block=False) + + def get_nowait(self): + '''Remove and return an item from the queue without blocking. + + Only get an item if one is immediately available. Otherwise + raise the Empty exception. + ''' + return self.get(block=False) + + def empty(self): + '''Return True if the queue is empty, False otherwise (not reliable!).''' + return len(self._queue) == 0 + + def qsize(self): + '''Return the approximate size of the queue (not reliable!).''' + return len(self._queue) + + +if SimpleQueue is None: + SimpleQueue = _PySimpleQueue From a0d9481fe57a5c231697ab3c6b0ea25f3d1343b5 Mon Sep 17 00:00:00 2001 From: Noah <33094578+coolreader18@users.noreply.github.com> Date: Wed, 17 Feb 2021 21:07:29 -0600 Subject: [PATCH 10/18] Add RLock._is_owned --- Cargo.lock | 3 +-- Cargo.toml | 2 +- vm/src/stdlib/thread.rs | 5 +++++ 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3f17e1a2b..725e2ad50 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1101,8 +1101,7 @@ checksum = "8dd5a6d5999d9907cda8ed67bbd137d3af8085216c2ac62de5be860bd41f304a" [[package]] name = "lock_api" version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd96ffd135b2fd7b973ac026d28085defbe8983df057ced3eb4f2130b0831312" +source = "git+https://github.com/coolreader18/parking_lot?branch=remutex-is_owned#391cf555e94c05450cea2e88b42c95c6631ab163" dependencies = [ "scopeguard", ] diff --git a/Cargo.toml b/Cargo.toml index f175d0b8d..39113799a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,4 +69,4 @@ opt-level = 3 [patch.crates-io] # REDOX START, Uncommment when you want to compile/check with redoxer # REDOX END - +lock_api = { git = "https://github.com/coolreader18/parking_lot", branch = "remutex-is_owned" } diff --git a/vm/src/stdlib/thread.rs b/vm/src/stdlib/thread.rs index e611b963c..58da8e0d2 100644 --- a/vm/src/stdlib/thread.rs +++ b/vm/src/stdlib/thread.rs @@ -191,6 +191,11 @@ impl PyRLock { Ok(()) } + #[pymethod] + fn _is_owned(&self) -> bool { + self.mu.is_owned_by_current_thread() + } + #[pymethod(magic)] fn exit(&self, _args: FuncArgs, vm: &VirtualMachine) -> PyResult<()> { self.release(vm) From ede2d9ed1613bbf4a0337949474e86829a4cfca0 Mon Sep 17 00:00:00 2001 From: Noah <33094578+coolreader18@users.noreply.github.com> Date: Wed, 17 Feb 2021 21:16:19 -0600 Subject: [PATCH 11/18] Add SSLSocket.cipher --- vm/src/stdlib/ssl.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/vm/src/stdlib/ssl.rs b/vm/src/stdlib/ssl.rs index d37e698e6..cf1f0220c 100644 --- a/vm/src/stdlib/ssl.rs +++ b/vm/src/stdlib/ssl.rs @@ -686,6 +686,15 @@ impl PySslSocket { } } + #[pymethod] + fn cipher(&self) -> Option { + self.stream + .read() + .ssl() + .current_cipher() + .map(cipher_to_tuple) + } + #[pymethod] fn do_handshake(&self, vm: &VirtualMachine) -> PyResult<()> { let mut stream = self.stream.write(); @@ -807,6 +816,12 @@ fn convert_ssl_error( vm.new_exception_msg(cls, msg.to_owned()) } +type CipherTuple = (&'static str, &'static str, i32); + +fn cipher_to_tuple(cipher: &ssl::SslCipherRef) -> CipherTuple { + (cipher.name(), cipher.version(), cipher.bits().secret) +} + fn cert_to_py(vm: &VirtualMachine, cert: &X509Ref, binary: bool) -> PyResult { if binary { cert.to_der() From b0f9c2809eedc1f4154624b24b37b42184515da1 Mon Sep 17 00:00:00 2001 From: Noah <33094578+coolreader18@users.noreply.github.com> Date: Thu, 18 Feb 2021 09:22:16 -0600 Subject: [PATCH 12/18] Add SSLContext.compression --- vm/build.rs | 5 +++++ vm/src/stdlib/ssl.rs | 23 +++++++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/vm/build.rs b/vm/build.rs index 950da16a6..0fa0fe022 100644 --- a/vm/build.rs +++ b/vm/build.rs @@ -49,6 +49,11 @@ fn main() { println!("cargo:rustc-cfg=ossl111"); } } + if let Ok(v) = env::var("DEP_OPENSSL_CONF") { + for conf in v.split(',') { + println!("cargo:rustc-cfg=osslconf=\"{}\"", conf); + } + } } fn git_hash() -> String { diff --git a/vm/src/stdlib/ssl.rs b/vm/src/stdlib/ssl.rs index cf1f0220c..180068ce9 100644 --- a/vm/src/stdlib/ssl.rs +++ b/vm/src/stdlib/ssl.rs @@ -42,6 +42,7 @@ mod sys { pub fn X509_get_version(x: *const X509) -> c_long; pub fn SSLv3_method() -> *const SSL_METHOD; pub fn TLSv1_method() -> *const SSL_METHOD; + pub fn COMP_get_type(meth: *const COMP_METHOD) -> i32; } } @@ -695,6 +696,28 @@ impl PySslSocket { .map(cipher_to_tuple) } + #[pymethod] + fn compression(&self) -> Option<&'static str> { + #[cfg(osslconf = "OPENSSL_NO_COMP")] + { + None + } + #[cfg(not(osslconf = "OPENSSL_NO_COMP"))] + { + let stream = self.stream.read(); + let comp_method = unsafe { sys::SSL_get_current_compression(stream.ssl().as_ptr()) }; + if comp_method.is_null() { + return None; + } + let typ = unsafe { sys::COMP_get_type(comp_method) }; + let nid = Nid::from_raw(typ); + if nid == Nid::UNDEF { + return None; + } + nid.short_name().ok() + } + } + #[pymethod] fn do_handshake(&self, vm: &VirtualMachine) -> PyResult<()> { let mut stream = self.stream.write(); From fb2efa95f95f9d6fed1c30f74e25119297b20250 Mon Sep 17 00:00:00 2001 From: Noah <33094578+coolreader18@users.noreply.github.com> Date: Thu, 18 Feb 2021 09:52:10 -0600 Subject: [PATCH 13/18] Make getaddrinfo return a full tuple for inet6 --- vm/src/stdlib/socket.rs | 42 ++++++++++++++++++++++------------------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/vm/src/stdlib/socket.rs b/vm/src/stdlib/socket.rs index e203c9b86..f1769ec09 100644 --- a/vm/src/stdlib/socket.rs +++ b/vm/src/stdlib/socket.rs @@ -244,10 +244,10 @@ impl PySocket { } #[pymethod] - fn _accept(&self, vm: &VirtualMachine) -> PyResult<(RawSocket, AddrTuple)> { + fn _accept(&self, vm: &VirtualMachine) -> PyResult<(RawSocket, PyObjectRef)> { let (sock, addr) = self.sock_op(vm, SelectKind::Read, || self.sock().accept())?; let fd = into_sock_fileno(sock); - Ok((fd, get_addr_tuple(addr))) + Ok((fd, get_addr_tuple(addr, vm))) } #[pymethod] @@ -287,14 +287,14 @@ impl PySocket { bufsize: usize, flags: OptionalArg, vm: &VirtualMachine, - ) -> PyResult<(Vec, AddrTuple)> { + ) -> PyResult<(Vec, PyObjectRef)> { let flags = flags.unwrap_or(0); let mut buffer = vec![0u8; bufsize]; let (n, addr) = self.sock_op(vm, SelectKind::Read, || { self.sock().recv_from_with_flags(&mut buffer, flags) })?; buffer.truncate(n); - Ok((buffer, get_addr_tuple(addr))) + Ok((buffer, get_addr_tuple(addr, vm))) } #[pymethod] @@ -375,22 +375,22 @@ impl PySocket { } #[pymethod] - fn getsockname(&self, vm: &VirtualMachine) -> PyResult { + fn getsockname(&self, vm: &VirtualMachine) -> PyResult { let addr = self .sock() .local_addr() .map_err(|err| convert_sock_error(vm, err))?; - Ok(get_addr_tuple(addr)) + Ok(get_addr_tuple(addr, vm)) } #[pymethod] - fn getpeername(&self, vm: &VirtualMachine) -> PyResult { + fn getpeername(&self, vm: &VirtualMachine) -> PyResult { let addr = self .sock() .peer_addr() .map_err(|err| convert_sock_error(vm, err))?; - Ok(get_addr_tuple(addr)) + Ok(get_addr_tuple(addr, vm)) } #[pymethod] @@ -575,7 +575,8 @@ impl ToSocketAddrs for Address { impl TryFromObject for Address { fn try_from_object(vm: &VirtualMachine, obj: PyObjectRef) -> PyResult { let tuple = PyTupleRef::try_from_object(vm, obj)?; - if tuple.borrow_value().len() != 2 { + // TODO: parse the tuple based on the family of the socket; extract all the info for inet6 + if tuple.borrow_value().len() < 2 { Err(vm.new_type_error("Address tuple should have only 2 values".to_owned())) } else { let host = PyStrRef::try_from_object(vm, tuple.borrow_value()[0].clone())?; @@ -590,16 +591,19 @@ impl TryFromObject for Address { } } -type AddrTuple = (String, u16); - -fn get_addr_tuple>(addr: A) -> AddrTuple { +fn get_addr_tuple>(addr: A, vm: &VirtualMachine) -> PyObjectRef { let addr = addr.into(); - if let Some(addr) = addr.as_inet() { - (addr.ip().to_string(), addr.port()) - } else if let Some(addr) = addr.as_inet6() { - (addr.ip().to_string(), addr.port()) - } else { - (String::new(), 0) + match addr.as_std() { + Some(SocketAddr::V4(addr)) => (addr.ip().to_string(), addr.port()).into_pyobject(vm), + Some(SocketAddr::V6(addr)) => ( + addr.ip().to_string(), + addr.port(), + addr.flowinfo(), + addr.scope_id(), + ) + .into_pyobject(vm), + // TODO: support AF_UNIX et al + None => (String::new(), 0).into_pyobject(vm), } } @@ -783,7 +787,7 @@ fn _socket_getaddrinfo(opts: GAIOptions, vm: &VirtualMachine) -> PyResult { vm.ctx.new_int(ai.socktype), vm.ctx.new_int(ai.protocol), ai.canonname.into_pyobject(vm), - get_addr_tuple(ai.sockaddr).into_pyobject(vm), + get_addr_tuple(ai.sockaddr, vm), ]) }) }) From a46fb496aa71875bb6de86c40089f6f4d130d729 Mon Sep 17 00:00:00 2001 From: Noah <33094578+coolreader18@users.noreply.github.com> Date: Fri, 19 Feb 2021 08:20:32 -0600 Subject: [PATCH 14/18] tp_iternext for coroutine_wrapper --- vm/src/builtins/coroutine.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/vm/src/builtins/coroutine.rs b/vm/src/builtins/coroutine.rs index 776692454..fe99d7a86 100644 --- a/vm/src/builtins/coroutine.rs +++ b/vm/src/builtins/coroutine.rs @@ -122,7 +122,7 @@ impl PyValue for PyCoroutineWrapper { } } -#[pyimpl] +#[pyimpl(with(PyIter))] impl PyCoroutineWrapper { #[pymethod] fn send(&self, val: PyObjectRef, vm: &VirtualMachine) -> PyResult { @@ -141,6 +141,12 @@ impl PyCoroutineWrapper { } } +impl PyIter for PyCoroutineWrapper { + fn next(zelf: &PyRef, vm: &VirtualMachine) -> PyResult { + zelf.send(vm.ctx.none(), vm) + } +} + pub fn init(ctx: &PyContext) { PyCoroutine::extend_class(ctx, &ctx.types.coroutine_type); PyCoroutineWrapper::extend_class(ctx, &ctx.types.coroutine_wrapper_type); From 7d99e49fd97163e2271beb1f60ed1893f3cf98e3 Mon Sep 17 00:00:00 2001 From: Noah <33094578+coolreader18@users.noreply.github.com> Date: Fri, 19 Feb 2021 11:21:36 -0600 Subject: [PATCH 15/18] Parse address tuple based on socket family --- vm/src/stdlib/socket.rs | 150 ++++++++++++++++++++++++++-------------- 1 file changed, 97 insertions(+), 53 deletions(-) diff --git a/vm/src/stdlib/socket.rs b/vm/src/stdlib/socket.rs index f1769ec09..5ef3e16c3 100644 --- a/vm/src/stdlib/socket.rs +++ b/vm/src/stdlib/socket.rs @@ -18,7 +18,7 @@ use crate::exceptions::{IntoPyException, PyBaseExceptionRef}; use crate::function::{FuncArgs, OptionalArg}; use crate::pyobject::{ BorrowValue, Either, IntoPyObject, PyClassImpl, PyObjectRef, PyRef, PyResult, PyValue, - StaticType, TryFromObject, + StaticType, TryFromObject, TypeProtocol, }; use crate::VirtualMachine; @@ -191,9 +191,68 @@ impl PySocket { } } + fn extract_address( + &self, + addr: PyObjectRef, + caller: &str, + vm: &VirtualMachine, + ) -> PyResult { + let family = self.family.load(); + match family { + c::AF_INET => { + let addr = Address::try_from_object(vm, addr)?; + let addr4 = get_addr(vm, addr, |sa| match sa { + SocketAddr::V4(v4) => Some(v4), + _ => None, + })?; + Ok(addr4.into()) + } + c::AF_INET6 => { + let tuple: PyTupleRef = addr.downcast().map_err(|obj| { + vm.new_type_error(format!( + "{}(): AF_INET6 address must be tuple, not {}", + caller, + obj.class().name + )) + })?; + let tuple = tuple.borrow_value(); + match tuple.len() { + 2 | 3 | 4 => {} + _ => { + return Err(vm.new_type_error( + "AF_INET6 address must be a tuple (host, port[, flowinfo[, scopeid]])" + .to_owned(), + )) + } + } + let addr = Address::from_tuple(tuple, vm)?; + let flowinfo = tuple + .get(2) + .map(|obj| u32::try_from_object(vm, obj.clone())) + .transpose()?; + let scopeid = tuple + .get(3) + .map(|obj| u32::try_from_object(vm, obj.clone())) + .transpose()?; + let mut addr6 = get_addr(vm, addr, |sa| match sa { + SocketAddr::V6(v6) => Some(v6), + _ => None, + })?; + if let Some(fi) = flowinfo { + addr6.set_flowinfo(fi) + } + if let Some(si) = scopeid { + addr6.set_scope_id(si) + } + Ok(addr6.into()) + } + _ => Err(vm.new_os_error(format!("{}(): bad family", caller))), + } + } + #[pymethod] - fn connect(&self, address: Address, vm: &VirtualMachine) -> PyResult<()> { - let sock_addr = get_addr(vm, address, Some(self.family.load()))?; + fn connect(&self, address: PyObjectRef, vm: &VirtualMachine) -> PyResult<()> { + let sock_addr = self.extract_address(address, "connect", vm)?; let err = match self.sock().connect(&sock_addr) { Ok(()) => return Ok(()), @@ -227,8 +286,8 @@ impl PySocket { } #[pymethod] - fn bind(&self, address: Address, vm: &VirtualMachine) -> PyResult<()> { - let sock_addr = get_addr(vm, address, Some(self.family.load()))?; + fn bind(&self, address: PyObjectRef, vm: &VirtualMachine) -> PyResult<()> { + let sock_addr = self.extract_address(address, "bind", vm)?; self.sock() .bind(&sock_addr) .map_err(|err| convert_sock_error(vm, err)) @@ -349,12 +408,12 @@ impl PySocket { fn sendto( &self, bytes: PyBytesLike, - address: Address, + address: PyObjectRef, flags: OptionalArg, vm: &VirtualMachine, ) -> PyResult { let flags = flags.unwrap_or(0); - let addr = get_addr(vm, address, Some(self.family.load()))?; + let addr = self.extract_address(address, "sendto", vm)?; self.sock_op(vm, SelectKind::Write, || { bytes.with_ref(|b| self.sock().send_to_with_flags(b, &addr, flags)) }) @@ -575,22 +634,27 @@ impl ToSocketAddrs for Address { impl TryFromObject for Address { fn try_from_object(vm: &VirtualMachine, obj: PyObjectRef) -> PyResult { let tuple = PyTupleRef::try_from_object(vm, obj)?; - // TODO: parse the tuple based on the family of the socket; extract all the info for inet6 - if tuple.borrow_value().len() < 2 { + if tuple.borrow_value().len() != 2 { Err(vm.new_type_error("Address tuple should have only 2 values".to_owned())) } else { - let host = PyStrRef::try_from_object(vm, tuple.borrow_value()[0].clone())?; - let host = if host.borrow_value().is_empty() { - PyStr::from("0.0.0.0").into_ref(vm) - } else { - host - }; - let port = u16::try_from_object(vm, tuple.borrow_value()[1].clone())?; - Ok(Address { host, port }) + Self::from_tuple(tuple.borrow_value(), vm) } } } +impl Address { + fn from_tuple(tuple: &[PyObjectRef], vm: &VirtualMachine) -> PyResult { + let host = PyStrRef::try_from_object(vm, tuple[0].clone())?; + let host = if host.borrow_value().is_empty() { + PyStr::from("0.0.0.0").into_ref(vm) + } else { + host + }; + let port = u16::try_from_object(vm, tuple[1].clone())?; + Ok(Address { host, port }) + } +} + fn get_addr_tuple>(addr: A, vm: &VirtualMachine) -> PyObjectRef { let addr = addr.into(); match addr.as_std() { @@ -890,11 +954,8 @@ fn _socket_getnameinfo( flags: i32, vm: &VirtualMachine, ) -> PyResult<(String, String)> { - let addr = get_addr(vm, address, None)?; - let nameinfo = addr - .as_std() - .and_then(|addr| dns_lookup::getnameinfo(&addr, flags).ok()); - nameinfo.ok_or_else(|| { + let addr = get_addr(vm, address, Some)?; + dns_lookup::getnameinfo(&addr, flags).map_err(|_| { let error_type = GAI_ERROR.get().unwrap().clone(); vm.new_exception_msg( error_type, @@ -903,39 +964,22 @@ fn _socket_getnameinfo( }) } -fn get_addr( +fn get_addr( vm: &VirtualMachine, addr: impl ToSocketAddrs, - domain: Option, -) -> PyResult { - let sock_addr = match addr.to_socket_addrs() { - Ok(mut sock_addrs) => match domain { - None => sock_addrs.next(), - Some(dom) => { - if dom == i32::from(Domain::ipv4()) { - sock_addrs.find(|a| a.is_ipv4()) - } else if dom == i32::from(Domain::ipv6()) { - sock_addrs.find(|a| a.is_ipv6()) - } else { - sock_addrs.next() - } - } - }, - Err(e) => { - let error_type = GAI_ERROR.get().unwrap().clone(); - return Err(vm.new_exception_msg(error_type, e.to_string())); - } - }; - match sock_addr { - Some(sock_addr) => Ok(sock_addr.into()), - None => { - let error_type = GAI_ERROR.get().unwrap().clone(); - Err(vm.new_exception_msg( - error_type, - "nodename nor servname provided, or not known".to_owned(), - )) - } - } + filter: impl FnMut(SocketAddr) -> Option, +) -> PyResult { + let mut sock_addrs = addr.to_socket_addrs().map_err(|e| { + let error_type = GAI_ERROR.get().unwrap().clone(); + vm.new_exception_msg(error_type, e.to_string()) + })?; + sock_addrs.find_map(filter).ok_or_else(|| { + let error_type = GAI_ERROR.get().unwrap().clone(); + vm.new_exception_msg( + error_type, + "nodename nor servname provided, or not known".to_owned(), + ) + }) } fn sock_fileno(sock: &Socket) -> RawSocket { From 27fdb9eef1a5047e468207b244c1eb4c5dc998d3 Mon Sep 17 00:00:00 2001 From: Noah <33094578+coolreader18@users.noreply.github.com> Date: Fri, 19 Feb 2021 11:28:51 -0600 Subject: [PATCH 16/18] Decompress.decompress() takes PyBytesLike --- vm/src/stdlib/zlib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/vm/src/stdlib/zlib.rs b/vm/src/stdlib/zlib.rs index b80e40a01..031a932f6 100644 --- a/vm/src/stdlib/zlib.rs +++ b/vm/src/stdlib/zlib.rs @@ -273,6 +273,7 @@ mod decl { Some(args.max_length) }; let data = args.data.borrow_value(); + let data = &*data; let mut d = self.decompress.lock(); let orig_in = d.total_in(); @@ -339,7 +340,7 @@ mod decl { #[derive(FromArgs)] struct DecompressArgs { #[pyarg(positional)] - data: PyBytesRef, + data: PyBytesLike, #[pyarg(any, default = "0")] max_length: usize, } From 491c4e775bca1e213bfc9406ca280858fb8567c4 Mon Sep 17 00:00:00 2001 From: Noah <33094578+coolreader18@users.noreply.github.com> Date: Fri, 19 Feb 2021 15:32:34 -0600 Subject: [PATCH 17/18] Fix json.scanstring unicode handling --- Lib/test/test_json/test_unicode.py | 2 -- extra_tests/snippets/json_snippet.py | 2 ++ vm/src/stdlib/json/machinery.rs | 12 ++++++------ 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/Lib/test/test_json/test_unicode.py b/Lib/test/test_json/test_unicode.py index 8f78a633a..e9ec511c3 100644 --- a/Lib/test/test_json/test_unicode.py +++ b/Lib/test/test_json/test_unicode.py @@ -38,8 +38,6 @@ class TestUnicode: self.assertEqual(self.loads('"' + u + '"'), u) self.assertEqual(self.loads('"z\\ud834\\udd20x"'), u) - # just takes FOREVER (3min+), unskip when it doesn't - @unittest.skip("TODO: RUSTPYTHON time") def test_unicode_decode(self): for i in range(0, 0xd7ff): u = chr(i) diff --git a/extra_tests/snippets/json_snippet.py b/extra_tests/snippets/json_snippet.py index 60e1020ce..a91f3bd81 100644 --- a/extra_tests/snippets/json_snippet.py +++ b/extra_tests/snippets/json_snippet.py @@ -184,3 +184,5 @@ assert json_dump({'a': 'b'}) == json_dump(Dict({'a': 'b'})) i = 7**500 assert json.dumps(i) == str(i) + +assert json.decoder.scanstring('✨x"', 1) == ('x', 3) diff --git a/vm/src/stdlib/json/machinery.rs b/vm/src/stdlib/json/machinery.rs index fe0051a87..7c9ac3ac4 100644 --- a/vm/src/stdlib/json/machinery.rs +++ b/vm/src/stdlib/json/machinery.rs @@ -136,12 +136,12 @@ pub fn scanstring<'a>( chunks.push(chunk); }; let unterminated_err = || DecodeError::new("Unterminated string starting at", end - 1); - let mut chunk_start = end; let mut chars = s.char_indices().enumerate().skip(end).peekable(); - while let Some((char_i, (i, c))) = chars.next() { + let (_, (mut chunk_start, _)) = chars.peek().ok_or_else(unterminated_err)?; + while let Some((char_i, (byte_i, c))) = chars.next() { match c { '"' => { - push_chunk(StrOrChar::Str(&s[chunk_start..i])); + push_chunk(StrOrChar::Str(&s[chunk_start..byte_i])); let mut out = String::with_capacity(output_len); for x in chunks { match x { @@ -152,7 +152,7 @@ pub fn scanstring<'a>( return Ok((out, char_i + 1)); } '\\' => { - push_chunk(StrOrChar::Str(&s[chunk_start..i])); + push_chunk(StrOrChar::Str(&s[chunk_start..byte_i])); let (_, (_, c)) = chars.next().ok_or_else(unterminated_err)?; let esc = match c { '"' => "\"", @@ -166,7 +166,7 @@ pub fn scanstring<'a>( 'u' => { let surrogate_err = || DecodeError::new("unpaired surrogate", char_i); let mut uni = decode_unicode(&mut chars, char_i)?; - chunk_start = char_i + 6; + chunk_start = byte_i + 6; if (0xd800..=0xdbff).contains(&uni) { // uni is a surrogate -- try to find its pair if let Some(&(pos2, (_, '\\'))) = chars.peek() { @@ -202,7 +202,7 @@ pub fn scanstring<'a>( )) } }; - chunk_start = i + 2; + chunk_start = byte_i + 2; push_chunk(StrOrChar::Str(esc)); } '\x00'..='\x1f' if strict => { From 511b2c8809a72ea28355e51f115a0551e9a9e353 Mon Sep 17 00:00:00 2001 From: Noah <33094578+coolreader18@users.noreply.github.com> Date: Sun, 21 Feb 2021 11:00:05 -0600 Subject: [PATCH 18/18] Always skip test_enum test_unique_composite --- Lib/test/test_enum.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/Lib/test/test_enum.py b/Lib/test/test_enum.py index 1ed0ff704..61fd9eb9e 100644 --- a/Lib/test/test_enum.py +++ b/Lib/test/test_enum.py @@ -2516,9 +2516,7 @@ class TestFlag(unittest.TestCase): self.assertEqual(Color.ALL.value, 7) self.assertEqual(str(Color.BLUE), 'blue') - # TODO: RUSTPYTHON - @unittest.expectedFailure - @unittest.skipIf(sys.platform == "win32", "TODO: RUSTPYTHON, universal newlines") + @unittest.skip("TODO: RUSTPYTHON, universal newlines") @support.reap_threads def test_unique_composite(self): # override __eq__ to be identity only @@ -2951,9 +2949,7 @@ class TestIntFlag(unittest.TestCase): self.assertEqual(Color.ALL.value, 7) self.assertEqual(str(Color.BLUE), 'blue') - # TODO: RUSTPYTHON - @unittest.expectedFailure - @unittest.skipIf(sys.platform == "win32", "TODO: RUSTPYTHON, universal newlines") + @unittest.skip("TODO: RUSTPYTHON, universal newlines") @support.reap_threads def test_unique_composite(self): # override __eq__ to be identity only