Files
RustPython/Lib/_dummy_thread.py
Jeong, YunWon 69601a18a4 Upgrade threading to 3.13.11; sys.setprofile & impl more threading (#6691)
* Upgrade threading,test_threading from 3.13.11

* increase ci limit to 45mins

* impl more threading

* no static for asyncgen
2026-01-12 15:33:28 +09:00

339 lines
8.7 KiB
Python
Vendored

"""Drop-in replacement for the thread module.
Meant to be used as a brain-dead substitute so that threaded code does
not need to be rewritten for when the thread module is not present.
Suggested usage is::
try:
import _thread
except ImportError:
import _dummy_thread as _thread
"""
# Exports only things specified by thread documentation;
# skipping obsolete synonyms allocate(), start_new(), exit_thread().
__all__ = [
"error",
"start_new_thread",
"exit",
"get_ident",
"allocate_lock",
"interrupt_main",
"LockType",
"RLock",
"_count",
"start_joinable_thread",
"daemon_threads_allowed",
"_shutdown",
"_make_thread_handle",
"_ThreadHandle",
"_get_main_thread_ident",
"_is_main_interpreter",
"_local",
]
# A dummy value
TIMEOUT_MAX = 2**31
# Main thread ident for dummy implementation
_MAIN_THREAD_IDENT = -1
# NOTE: this module can be imported early in the extension building process,
# and so top level imports of other modules should be avoided. Instead, all
# imports are done when needed on a function-by-function basis. Since threads
# are disabled, the import lock should not be an issue anyway (??).
error = RuntimeError
def start_new_thread(function, args, kwargs={}):
"""Dummy implementation of _thread.start_new_thread().
Compatibility is maintained by making sure that ``args`` is a
tuple and ``kwargs`` is a dictionary. If an exception is raised
and it is SystemExit (which can be done by _thread.exit()) it is
caught and nothing is done; all other exceptions are printed out
by using traceback.print_exc().
If the executed function calls interrupt_main the KeyboardInterrupt will be
raised when the function returns.
"""
if type(args) != type(tuple()):
raise TypeError("2nd arg must be a tuple")
if type(kwargs) != type(dict()):
raise TypeError("3rd arg must be a dict")
global _main
_main = False
try:
function(*args, **kwargs)
except SystemExit:
pass
except:
import traceback
traceback.print_exc()
_main = True
global _interrupt
if _interrupt:
_interrupt = False
raise KeyboardInterrupt
def start_joinable_thread(function, handle=None, daemon=True):
"""Dummy implementation of _thread.start_joinable_thread().
In dummy thread, we just run the function synchronously.
"""
if handle is None:
handle = _ThreadHandle()
try:
function()
except SystemExit:
pass
except:
import traceback
traceback.print_exc()
handle._set_done()
return handle
def daemon_threads_allowed():
"""Dummy implementation of _thread.daemon_threads_allowed()."""
return True
def _shutdown():
"""Dummy implementation of _thread._shutdown()."""
pass
def _make_thread_handle(ident):
"""Dummy implementation of _thread._make_thread_handle()."""
handle = _ThreadHandle()
handle._ident = ident
return handle
def _get_main_thread_ident():
"""Dummy implementation of _thread._get_main_thread_ident()."""
return _MAIN_THREAD_IDENT
def _is_main_interpreter():
"""Dummy implementation of _thread._is_main_interpreter()."""
return True
def exit():
"""Dummy implementation of _thread.exit()."""
raise SystemExit
def get_ident():
"""Dummy implementation of _thread.get_ident().
Since this module should only be used when _threadmodule is not
available, it is safe to assume that the current process is the
only thread. Thus a constant can be safely returned.
"""
return _MAIN_THREAD_IDENT
def allocate_lock():
"""Dummy implementation of _thread.allocate_lock()."""
return LockType()
def stack_size(size=None):
"""Dummy implementation of _thread.stack_size()."""
if size is not None:
raise error("setting thread stack size not supported")
return 0
def _set_sentinel():
"""Dummy implementation of _thread._set_sentinel()."""
return LockType()
def _count():
"""Dummy implementation of _thread._count()."""
return 0
class LockType(object):
"""Class implementing dummy implementation of _thread.LockType.
Compatibility is maintained by maintaining self.locked_status
which is a boolean that stores the state of the lock. Pickling of
the lock, though, should not be done since if the _thread module is
then used with an unpickled ``lock()`` from here problems could
occur from this class not having atomic methods.
"""
def __init__(self):
self.locked_status = False
def acquire(self, waitflag=None, timeout=-1):
"""Dummy implementation of acquire().
For blocking calls, self.locked_status is automatically set to
True and returned appropriately based on value of
``waitflag``. If it is non-blocking, then the value is
actually checked and not set if it is already acquired. This
is all done so that threading.Condition's assert statements
aren't triggered and throw a little fit.
"""
if waitflag is None or waitflag:
self.locked_status = True
return True
else:
if not self.locked_status:
self.locked_status = True
return True
else:
if timeout > 0:
import time
time.sleep(timeout)
return False
__enter__ = acquire
def __exit__(self, typ, val, tb):
self.release()
def release(self):
"""Release the dummy lock."""
# XXX Perhaps shouldn't actually bother to test? Could lead
# to problems for complex, threaded code.
if not self.locked_status:
raise error
self.locked_status = False
return True
def locked(self):
return self.locked_status
def _at_fork_reinit(self):
self.locked_status = False
def __repr__(self):
return "<%s %s.%s object at %s>" % (
"locked" if self.locked_status else "unlocked",
self.__class__.__module__,
self.__class__.__qualname__,
hex(id(self)),
)
class _ThreadHandle:
"""Dummy implementation of _thread._ThreadHandle."""
def __init__(self):
self._ident = _MAIN_THREAD_IDENT
self._done = False
@property
def ident(self):
return self._ident
def _set_done(self):
self._done = True
def is_done(self):
return self._done
def join(self, timeout=None):
# In dummy thread, thread is always done
return
def __repr__(self):
return f"<_ThreadHandle ident={self._ident}>"
# Used to signal that interrupt_main was called in a "thread"
_interrupt = False
# True when not executing in a "thread"
_main = True
def interrupt_main():
"""Set _interrupt flag to True to have start_new_thread raise
KeyboardInterrupt upon exiting."""
if _main:
raise KeyboardInterrupt
else:
global _interrupt
_interrupt = True
class RLock:
def __init__(self):
self.locked_count = 0
def acquire(self, waitflag=None, timeout=-1):
self.locked_count += 1
return True
__enter__ = acquire
def __exit__(self, typ, val, tb):
self.release()
def release(self):
if not self.locked_count:
raise error
self.locked_count -= 1
return True
def locked(self):
return self.locked_count != 0
def __repr__(self):
return "<%s %s.%s object owner=%s count=%s at %s>" % (
"locked" if self.locked_count else "unlocked",
self.__class__.__module__,
self.__class__.__qualname__,
get_ident() if self.locked_count else 0,
self.locked_count,
hex(id(self)),
)
class _local:
"""Dummy implementation of _thread._local (thread-local storage)."""
def __init__(self):
object.__setattr__(self, "_local__impl", {})
def __getattribute__(self, name):
if name.startswith("_local__"):
return object.__getattribute__(self, name)
impl = object.__getattribute__(self, "_local__impl")
try:
return impl[name]
except KeyError:
raise AttributeError(name)
def __setattr__(self, name, value):
if name.startswith("_local__"):
return object.__setattr__(self, name, value)
impl = object.__getattribute__(self, "_local__impl")
impl[name] = value
def __delattr__(self, name):
if name.startswith("_local__"):
return object.__delattr__(self, name)
impl = object.__getattribute__(self, "_local__impl")
try:
del impl[name]
except KeyError:
raise AttributeError(name)