Merge pull request #5731 from arihant2math/pprint-313

This commit is contained in:
Jeong, YunWon
2025-04-22 14:18:35 +09:00
committed by GitHub
11 changed files with 748 additions and 226 deletions

2
Lib/colorsys.py vendored
View File

@@ -24,7 +24,7 @@ HSV: Hue, Saturation, Value
__all__ = ["rgb_to_yiq","yiq_to_rgb","rgb_to_hls","hls_to_rgb",
"rgb_to_hsv","hsv_to_rgb"]
# Some floating point constants
# Some floating-point constants
ONE_THIRD = 1.0/3.0
ONE_SIXTH = 1.0/6.0

2
Lib/graphlib.py vendored
View File

@@ -154,7 +154,7 @@ class TopologicalSorter:
This method unblocks any successor of each node in *nodes* for being returned
in the future by a call to "get_ready".
Raises :exec:`ValueError` if any node in *nodes* has already been marked as
Raises ValueError if any node in *nodes* has already been marked as
processed by a previous call to this method, if a node was not added to the
graph by using "add" or if called without calling "prepare" previously or if
node has not yet been returned by "get_ready".

91
Lib/linecache.py vendored
View File

@@ -5,17 +5,13 @@ is not found, it will look down the module search path for a file by
that name.
"""
import functools
import sys
import os
import tokenize
__all__ = ["getline", "clearcache", "checkcache", "lazycache"]
# The cache. Maps filenames to either a thunk which will provide source code,
# or a tuple (size, mtime, lines, fullname) once loaded.
cache = {}
_interactive_cache = {}
def clearcache():
@@ -49,28 +45,54 @@ def getlines(filename, module_globals=None):
return []
def _getline_from_code(filename, lineno):
lines = _getlines_from_code(filename)
if 1 <= lineno <= len(lines):
return lines[lineno - 1]
return ''
def _make_key(code):
return (code.co_filename, code.co_qualname, code.co_firstlineno)
def _getlines_from_code(code):
code_id = _make_key(code)
if code_id in _interactive_cache:
entry = _interactive_cache[code_id]
if len(entry) != 1:
return _interactive_cache[code_id][2]
return []
def checkcache(filename=None):
"""Discard cache entries that are out of date.
(This is not checked upon each call!)"""
if filename is None:
filenames = list(cache.keys())
elif filename in cache:
filenames = [filename]
# get keys atomically
filenames = cache.copy().keys()
else:
return
filenames = [filename]
for filename in filenames:
entry = cache[filename]
try:
entry = cache[filename]
except KeyError:
continue
if len(entry) == 1:
# lazy cache entry, leave it lazy.
continue
size, mtime, lines, fullname = entry
if mtime is None:
continue # no-op for files loaded via a __loader__
try:
# This import can fail if the interpreter is shutting down
import os
except ImportError:
return
try:
stat = os.stat(fullname)
except OSError:
except (OSError, ValueError):
cache.pop(filename, None)
continue
if size != stat.st_size or mtime != stat.st_mtime:
@@ -82,6 +104,17 @@ def updatecache(filename, module_globals=None):
If something's wrong, print a message, discard the cache entry,
and return an empty list."""
# These imports are not at top level because linecache is in the critical
# path of the interpreter startup and importing os and sys take a lot of time
# and slows down the startup sequence.
try:
import os
import sys
import tokenize
except ImportError:
# These import can fail if the interpreter is shutting down
return []
if filename in cache:
if len(cache[filename]) != 1:
cache.pop(filename, None)
@@ -128,16 +161,20 @@ def updatecache(filename, module_globals=None):
try:
stat = os.stat(fullname)
break
except OSError:
except (OSError, ValueError):
pass
else:
return []
except ValueError: # may be raised by os.stat()
return []
try:
with tokenize.open(fullname) as fp:
lines = fp.readlines()
except (OSError, UnicodeDecodeError, SyntaxError):
return []
if lines and not lines[-1].endswith('\n'):
if not lines:
lines = ['\n']
elif not lines[-1].endswith('\n'):
lines[-1] += '\n'
size, mtime = stat.st_size, stat.st_mtime
cache[filename] = size, mtime, lines, fullname
@@ -166,17 +203,29 @@ def lazycache(filename, module_globals):
return False
# Try for a __loader__, if available
if module_globals and '__name__' in module_globals:
name = module_globals['__name__']
if (loader := module_globals.get('__loader__')) is None:
if spec := module_globals.get('__spec__'):
try:
loader = spec.loader
except AttributeError:
pass
spec = module_globals.get('__spec__')
name = getattr(spec, 'name', None) or module_globals['__name__']
loader = getattr(spec, 'loader', None)
if loader is None:
loader = module_globals.get('__loader__')
get_source = getattr(loader, 'get_source', None)
if name and get_source:
get_lines = functools.partial(get_source, name)
def get_lines(name=name, *args, **kwargs):
return get_source(name, *args, **kwargs)
cache[filename] = (get_lines,)
return True
return False
def _register_code(code, string, name):
entry = (len(string),
None,
[line + '\n' for line in string.splitlines()],
name)
stack = [code]
while stack:
code = stack.pop()
for const in code.co_consts:
if isinstance(const, type(code)):
stack.append(const)
_interactive_cache[_make_key(code)] = entry

3
Lib/pprint.py vendored
View File

@@ -128,6 +128,9 @@ class PrettyPrinter:
sort_dicts
If true, dict keys are sorted.
underscore_numbers
If true, digit groups are separated with underscores.
"""
indent = int(indent)
width = int(width)

60
Lib/queue.py vendored
View File

@@ -10,7 +10,15 @@ try:
except ImportError:
SimpleQueue = None
__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue', 'SimpleQueue']
__all__ = [
'Empty',
'Full',
'ShutDown',
'Queue',
'PriorityQueue',
'LifoQueue',
'SimpleQueue',
]
try:
@@ -25,6 +33,10 @@ class Full(Exception):
pass
class ShutDown(Exception):
'''Raised when put/get with shut-down queue.'''
class Queue:
'''Create a queue object with a given maximum size.
@@ -54,6 +66,9 @@ class Queue:
self.all_tasks_done = threading.Condition(self.mutex)
self.unfinished_tasks = 0
# Queue shutdown state
self.is_shutdown = False
def task_done(self):
'''Indicate that a formerly enqueued task is complete.
@@ -65,6 +80,9 @@ class Queue:
have been processed (meaning that a task_done() call was received
for every item that had been put() into the queue).
shutdown(immediate=True) calls task_done() for each remaining item in
the queue.
Raises a ValueError if called more times than there were items
placed in the queue.
'''
@@ -129,8 +147,12 @@ class Queue:
Otherwise ('block' is false), put an item on the queue if a free slot
is immediately available, else raise the Full exception ('timeout'
is ignored in that case).
Raises ShutDown if the queue has been shut down.
'''
with self.not_full:
if self.is_shutdown:
raise ShutDown
if self.maxsize > 0:
if not block:
if self._qsize() >= self.maxsize:
@@ -138,6 +160,8 @@ class Queue:
elif timeout is None:
while self._qsize() >= self.maxsize:
self.not_full.wait()
if self.is_shutdown:
raise ShutDown
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
@@ -147,6 +171,8 @@ class Queue:
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
if self.is_shutdown:
raise ShutDown
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()
@@ -161,14 +187,21 @@ class Queue:
Otherwise ('block' is false), return an item if one is immediately
available, else raise the Empty exception ('timeout' is ignored
in that case).
Raises ShutDown if the queue has been shut down and is empty,
or if the queue has been shut down immediately.
'''
with self.not_empty:
if self.is_shutdown and not self._qsize():
raise ShutDown
if not block:
if not self._qsize():
raise Empty
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
if self.is_shutdown and not self._qsize():
raise ShutDown
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
@@ -178,6 +211,8 @@ class Queue:
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
if self.is_shutdown and not self._qsize():
raise ShutDown
item = self._get()
self.not_full.notify()
return item
@@ -198,6 +233,29 @@ class Queue:
'''
return self.get(block=False)
def shutdown(self, immediate=False):
'''Shut-down the queue, making queue gets and puts raise ShutDown.
By default, gets will only raise once the queue is empty. Set
'immediate' to True to make gets raise immediately instead.
All blocked callers of put() and get() will be unblocked. If
'immediate', a task is marked as done for each item remaining in
the queue, which may unblock callers of join().
'''
with self.mutex:
self.is_shutdown = True
if immediate:
while self._qsize():
self._get()
if self.unfinished_tasks > 0:
self.unfinished_tasks -= 1
# release all blocked threads in `join()`
self.all_tasks_done.notify_all()
# All getters need to re-check queue-empty to raise ShutDown
self.not_empty.notify_all()
self.not_full.notify_all()
# Override these methods to implement other queue organizations
# (e.g. stack or priority queue).
# These will only be called with appropriate locks held

2
Lib/sched.py vendored
View File

@@ -11,7 +11,7 @@ substituting time and sleep from built-in module time, or you can
implement simulated time by writing your own functions. This can
also be used to integrate scheduling with STDWIN events; the delay
function is allowed to modify the queue. Time can be expressed as
integers or floating point numbers, as long as it is consistent.
integers or floating-point numbers, as long as it is consistent.
Events are specified by tuples (time, priority, action, argument, kwargs).
As in UNIX, lower priority numbers mean higher priority; in this

View File

@@ -4,7 +4,6 @@ import random
import unittest
import doctest
from test import support
from test.support import import_helper
from unittest import TestCase, skipUnless
from operator import itemgetter

View File

@@ -5,8 +5,10 @@ import unittest
import os.path
import tempfile
import tokenize
from importlib.machinery import ModuleSpec
from test import support
from test.support import os_helper
from test.support.script_helper import assert_python_ok
FILENAME = linecache.__file__
@@ -82,6 +84,10 @@ class GetLineTestsBadData(TempFile):
class EmptyFile(GetLineTestsGoodData, unittest.TestCase):
file_list = []
def test_getlines(self):
lines = linecache.getlines(self.file_name)
self.assertEqual(lines, ['\n'])
class SingleEmptyLine(GetLineTestsGoodData, unittest.TestCase):
file_list = ['\n']
@@ -97,6 +103,16 @@ class BadUnicode_WithDeclaration(GetLineTestsBadData, unittest.TestCase):
file_byte_string = b'# coding=utf-8\n\x80abc'
class FakeLoader:
def get_source(self, fullname):
return f'source for {fullname}'
class NoSourceLoader:
def get_source(self, fullname):
return None
class LineCacheTests(unittest.TestCase):
def test_getline(self):
@@ -238,6 +254,70 @@ class LineCacheTests(unittest.TestCase):
self.assertEqual(lines3, [])
self.assertEqual(linecache.getlines(FILENAME), lines)
def test_loader(self):
filename = 'scheme://path'
for loader in (None, object(), NoSourceLoader()):
linecache.clearcache()
module_globals = {'__name__': 'a.b.c', '__loader__': loader}
self.assertEqual(linecache.getlines(filename, module_globals), [])
linecache.clearcache()
module_globals = {'__name__': 'a.b.c', '__loader__': FakeLoader()}
self.assertEqual(linecache.getlines(filename, module_globals),
['source for a.b.c\n'])
for spec in (None, object(), ModuleSpec('', FakeLoader())):
linecache.clearcache()
module_globals = {'__name__': 'a.b.c', '__loader__': FakeLoader(),
'__spec__': spec}
self.assertEqual(linecache.getlines(filename, module_globals),
['source for a.b.c\n'])
linecache.clearcache()
spec = ModuleSpec('x.y.z', FakeLoader())
module_globals = {'__name__': 'a.b.c', '__loader__': spec.loader,
'__spec__': spec}
self.assertEqual(linecache.getlines(filename, module_globals),
['source for x.y.z\n'])
def test_invalid_names(self):
for name, desc in [
('\x00', 'NUL bytes filename'),
(__file__ + '\x00', 'filename with embedded NUL bytes'),
# A filename with surrogate codes. A UnicodeEncodeError is raised
# by os.stat() upon querying, which is a subclass of ValueError.
("\uD834\uDD1E.py", 'surrogate codes (MUSICAL SYMBOL G CLEF)'),
# For POSIX platforms, an OSError will be raised but for Windows
# platforms, a ValueError is raised due to the path_t converter.
# See: https://github.com/python/cpython/issues/122170
('a' * 1_000_000, 'very long filename'),
]:
with self.subTest(f'updatecache: {desc}'):
linecache.clearcache()
lines = linecache.updatecache(name)
self.assertListEqual(lines, [])
self.assertNotIn(name, linecache.cache)
# hack into the cache (it shouldn't be allowed
# but we never know what people do...)
for key, fullname in [(name, 'ok'), ('key', name), (name, name)]:
with self.subTest(f'checkcache: {desc}',
key=key, fullname=fullname):
linecache.clearcache()
linecache.cache[key] = (0, 1234, [], fullname)
linecache.checkcache(key)
self.assertNotIn(key, linecache.cache)
# just to be sure that we did not mess with cache
linecache.clearcache()
def test_linecache_python_string(self):
cmdline = "import linecache;assert len(linecache.cache) == 0"
retcode, stdout, stderr = assert_python_ok('-c', cmdline)
self.assertEqual(retcode, 0)
self.assertEqual(stdout, b'')
self.assertEqual(stderr, b'')
class LineCacheInvalidationTests(unittest.TestCase):
def setUp(self):

View File

@@ -7,8 +7,8 @@ import io
import itertools
import pprint
import random
import re
import test.support
import test.test_set
import types
import unittest
@@ -535,7 +535,10 @@ AdvancedNamespace(the=0,
def test_dataclass_no_repr(self):
dc = dataclass3()
formatted = pprint.pformat(dc, width=10)
self.assertRegex(formatted, r"<test.test_pprint.dataclass3 object at \w+>")
self.assertRegex(
formatted,
fr"<{re.escape(__name__)}.dataclass3 object at \w+>",
)
def test_recursive_dataclass(self):
dc = dataclass4(None)
@@ -619,9 +622,6 @@ frozenset2({0,
self.assertEqual(pprint.pformat(frozenset3(range(7)), width=20),
'frozenset3({0, 1, 2, 3, 4, 5, 6})')
@unittest.expectedFailure
#See http://bugs.python.org/issue13907
@test.support.cpython_only
def test_set_of_sets_reprs(self):
# This test creates a complex arrangement of frozensets and
# compares the pretty-printed repr against a string hard-coded in
@@ -632,204 +632,106 @@ frozenset2({0,
# partial ordering (subset relationships), the output of the
# list.sort() method is undefined for lists of sets."
#
# In a nutshell, the test assumes frozenset({0}) will always
# sort before frozenset({1}), but:
#
# >>> frozenset({0}) < frozenset({1})
# False
# >>> frozenset({1}) < frozenset({0})
# False
#
# Consequently, this test is fragile and
# implementation-dependent. Small changes to Python's sort
# algorithm cause the test to fail when it should pass.
# XXX Or changes to the dictionary implementation...
# In this test we list all possible invariants of the result
# for unordered frozensets.
#
# This test has a long history, see:
# - https://github.com/python/cpython/commit/969fe57baa0eb80332990f9cda936a33e13fabef
# - https://github.com/python/cpython/issues/58115
# - https://github.com/python/cpython/issues/111147
cube_repr_tgt = """\
{frozenset(): frozenset({frozenset({2}), frozenset({0}), frozenset({1})}),
frozenset({0}): frozenset({frozenset(),
frozenset({0, 2}),
frozenset({0, 1})}),
frozenset({1}): frozenset({frozenset(),
frozenset({1, 2}),
frozenset({0, 1})}),
frozenset({2}): frozenset({frozenset(),
frozenset({1, 2}),
frozenset({0, 2})}),
frozenset({1, 2}): frozenset({frozenset({2}),
frozenset({1}),
frozenset({0, 1, 2})}),
frozenset({0, 2}): frozenset({frozenset({2}),
frozenset({0}),
frozenset({0, 1, 2})}),
frozenset({0, 1}): frozenset({frozenset({0}),
frozenset({1}),
frozenset({0, 1, 2})}),
frozenset({0, 1, 2}): frozenset({frozenset({1, 2}),
frozenset({0, 2}),
frozenset({0, 1})})}"""
cube = test.test_set.cube(3)
self.assertEqual(pprint.pformat(cube), cube_repr_tgt)
cubo_repr_tgt = """\
{frozenset({frozenset({0, 2}), frozenset({0})}): frozenset({frozenset({frozenset({0,
2}),
frozenset({0,
1,
2})}),
frozenset({frozenset({0}),
frozenset({0,
1})}),
frozenset({frozenset(),
frozenset({0})}),
frozenset({frozenset({2}),
frozenset({0,
2})})}),
frozenset({frozenset({0, 1}), frozenset({1})}): frozenset({frozenset({frozenset({0,
1}),
frozenset({0,
1,
2})}),
frozenset({frozenset({0}),
frozenset({0,
1})}),
frozenset({frozenset({1}),
frozenset({1,
2})}),
frozenset({frozenset(),
frozenset({1})})}),
frozenset({frozenset({1, 2}), frozenset({1})}): frozenset({frozenset({frozenset({1,
2}),
frozenset({0,
1,
2})}),
frozenset({frozenset({2}),
frozenset({1,
2})}),
frozenset({frozenset(),
frozenset({1})}),
frozenset({frozenset({1}),
frozenset({0,
1})})}),
frozenset({frozenset({1, 2}), frozenset({2})}): frozenset({frozenset({frozenset({1,
2}),
frozenset({0,
1,
2})}),
frozenset({frozenset({1}),
frozenset({1,
2})}),
frozenset({frozenset({2}),
frozenset({0,
2})}),
frozenset({frozenset(),
frozenset({2})})}),
frozenset({frozenset(), frozenset({0})}): frozenset({frozenset({frozenset({0}),
frozenset({0,
1})}),
frozenset({frozenset({0}),
frozenset({0,
2})}),
frozenset({frozenset(),
frozenset({1})}),
frozenset({frozenset(),
frozenset({2})})}),
frozenset({frozenset(), frozenset({1})}): frozenset({frozenset({frozenset(),
frozenset({0})}),
frozenset({frozenset({1}),
frozenset({1,
2})}),
frozenset({frozenset(),
frozenset({2})}),
frozenset({frozenset({1}),
frozenset({0,
1})})}),
frozenset({frozenset({2}), frozenset()}): frozenset({frozenset({frozenset({2}),
frozenset({1,
2})}),
frozenset({frozenset(),
frozenset({0})}),
frozenset({frozenset(),
frozenset({1})}),
frozenset({frozenset({2}),
frozenset({0,
2})})}),
frozenset({frozenset({0, 1, 2}), frozenset({0, 1})}): frozenset({frozenset({frozenset({1,
2}),
frozenset({0,
1,
2})}),
frozenset({frozenset({0,
2}),
frozenset({0,
1,
2})}),
frozenset({frozenset({0}),
frozenset({0,
1})}),
frozenset({frozenset({1}),
frozenset({0,
1})})}),
frozenset({frozenset({0}), frozenset({0, 1})}): frozenset({frozenset({frozenset(),
frozenset({0})}),
frozenset({frozenset({0,
1}),
frozenset({0,
1,
2})}),
frozenset({frozenset({0}),
frozenset({0,
2})}),
frozenset({frozenset({1}),
frozenset({0,
1})})}),
frozenset({frozenset({2}), frozenset({0, 2})}): frozenset({frozenset({frozenset({0,
2}),
frozenset({0,
1,
2})}),
frozenset({frozenset({2}),
frozenset({1,
2})}),
frozenset({frozenset({0}),
frozenset({0,
2})}),
frozenset({frozenset(),
frozenset({2})})}),
frozenset({frozenset({0, 1, 2}), frozenset({0, 2})}): frozenset({frozenset({frozenset({1,
2}),
frozenset({0,
1,
2})}),
frozenset({frozenset({0,
1}),
frozenset({0,
1,
2})}),
frozenset({frozenset({0}),
frozenset({0,
2})}),
frozenset({frozenset({2}),
frozenset({0,
2})})}),
frozenset({frozenset({1, 2}), frozenset({0, 1, 2})}): frozenset({frozenset({frozenset({0,
2}),
frozenset({0,
1,
2})}),
frozenset({frozenset({0,
1}),
frozenset({0,
1,
2})}),
frozenset({frozenset({2}),
frozenset({1,
2})}),
frozenset({frozenset({1}),
frozenset({1,
2})})})}"""
import textwrap
cubo = test.test_set.linegraph(cube)
self.assertEqual(pprint.pformat(cubo), cubo_repr_tgt)
# Single-line, always ordered:
fs0 = frozenset()
fs1 = frozenset(('abc', 'xyz'))
data = frozenset((fs0, fs1))
self.assertEqual(pprint.pformat(data),
'frozenset({%r, %r})' % (fs0, fs1))
self.assertEqual(pprint.pformat(data), repr(data))
fs2 = frozenset(('one', 'two'))
data = {fs2: frozenset((fs0, fs1))}
self.assertEqual(pprint.pformat(data),
"{%r: frozenset({%r, %r})}" % (fs2, fs0, fs1))
self.assertEqual(pprint.pformat(data), repr(data))
# Single-line, unordered:
fs1 = frozenset(("xyz", "qwerty"))
fs2 = frozenset(("abcd", "spam"))
fs = frozenset((fs1, fs2))
self.assertEqual(pprint.pformat(fs), repr(fs))
# Multiline, unordered:
def check(res, invariants):
self.assertIn(res, [textwrap.dedent(i).strip() for i in invariants])
# Inner-most frozensets are singleline, result is multiline, unordered:
fs1 = frozenset(('regular string', 'other string'))
fs2 = frozenset(('third string', 'one more string'))
check(
pprint.pformat(frozenset((fs1, fs2))),
[
"""
frozenset({%r,
%r})
""" % (fs1, fs2),
"""
frozenset({%r,
%r})
""" % (fs2, fs1),
],
)
# Everything is multiline, unordered:
check(
pprint.pformat(
frozenset((
frozenset((
"xyz very-very long string",
"qwerty is also absurdly long",
)),
frozenset((
"abcd is even longer that before",
"spam is not so long",
)),
)),
),
[
"""
frozenset({frozenset({'abcd is even longer that before',
'spam is not so long'}),
frozenset({'qwerty is also absurdly long',
'xyz very-very long string'})})
""",
"""
frozenset({frozenset({'abcd is even longer that before',
'spam is not so long'}),
frozenset({'xyz very-very long string',
'qwerty is also absurdly long'})})
""",
"""
frozenset({frozenset({'qwerty is also absurdly long',
'xyz very-very long string'}),
frozenset({'abcd is even longer that before',
'spam is not so long'})})
""",
"""
frozenset({frozenset({'qwerty is also absurdly long',
'xyz very-very long string'}),
frozenset({'spam is not so long',
'abcd is even longer that before'})})
""",
],
)
def test_depth(self):
nested_tuple = (1, (2, (3, (4, (5, 6)))))

416
Lib/test/test_queue.py vendored
View File

@@ -2,6 +2,7 @@
# to ensure the Queue locks remain stable.
import itertools
import random
import sys
import threading
import time
import unittest
@@ -10,6 +11,8 @@ from test.support import gc_collect
from test.support import import_helper
from test.support import threading_helper
# queue module depends on threading primitives
threading_helper.requires_working_threading(module=True)
py_queue = import_helper.import_fresh_module('queue', blocked=['_queue'])
c_queue = import_helper.import_fresh_module('queue', fresh=['_queue'])
@@ -239,6 +242,418 @@ class BaseQueueTestMixin(BlockingTestMixin):
with self.assertRaises(self.queue.Full):
q.put_nowait(4)
def test_shutdown_empty(self):
q = self.type2test()
q.shutdown()
with self.assertRaises(self.queue.ShutDown):
q.put("data")
with self.assertRaises(self.queue.ShutDown):
q.get()
def test_shutdown_nonempty(self):
q = self.type2test()
q.put("data")
q.shutdown()
q.get()
with self.assertRaises(self.queue.ShutDown):
q.get()
def test_shutdown_immediate(self):
q = self.type2test()
q.put("data")
q.shutdown(immediate=True)
with self.assertRaises(self.queue.ShutDown):
q.get()
def test_shutdown_allowed_transitions(self):
# allowed transitions would be from alive via shutdown to immediate
q = self.type2test()
self.assertFalse(q.is_shutdown)
q.shutdown()
self.assertTrue(q.is_shutdown)
q.shutdown(immediate=True)
self.assertTrue(q.is_shutdown)
q.shutdown(immediate=False)
def _shutdown_all_methods_in_one_thread(self, immediate):
q = self.type2test(2)
q.put("L")
q.put_nowait("O")
q.shutdown(immediate)
with self.assertRaises(self.queue.ShutDown):
q.put("E")
with self.assertRaises(self.queue.ShutDown):
q.put_nowait("W")
if immediate:
with self.assertRaises(self.queue.ShutDown):
q.get()
with self.assertRaises(self.queue.ShutDown):
q.get_nowait()
with self.assertRaises(ValueError):
q.task_done()
q.join()
else:
self.assertIn(q.get(), "LO")
q.task_done()
self.assertIn(q.get(), "LO")
q.task_done()
q.join()
# on shutdown(immediate=False)
# when queue is empty, should raise ShutDown Exception
with self.assertRaises(self.queue.ShutDown):
q.get() # p.get(True)
with self.assertRaises(self.queue.ShutDown):
q.get_nowait() # p.get(False)
with self.assertRaises(self.queue.ShutDown):
q.get(True, 1.0)
def test_shutdown_all_methods_in_one_thread(self):
return self._shutdown_all_methods_in_one_thread(False)
def test_shutdown_immediate_all_methods_in_one_thread(self):
return self._shutdown_all_methods_in_one_thread(True)
def _write_msg_thread(self, q, n, results,
i_when_exec_shutdown, event_shutdown,
barrier_start):
# All `write_msg_threads`
# put several items into the queue.
for i in range(0, i_when_exec_shutdown//2):
q.put((i, 'LOYD'))
# Wait for the barrier to be complete.
barrier_start.wait()
for i in range(i_when_exec_shutdown//2, n):
try:
q.put((i, "YDLO"))
except self.queue.ShutDown:
results.append(False)
break
# Trigger queue shutdown.
if i == i_when_exec_shutdown:
# Only one thread should call shutdown().
if not event_shutdown.is_set():
event_shutdown.set()
results.append(True)
def _read_msg_thread(self, q, results, barrier_start):
# Get at least one item.
q.get(True)
q.task_done()
# Wait for the barrier to be complete.
barrier_start.wait()
while True:
try:
q.get(False)
q.task_done()
except self.queue.ShutDown:
results.append(True)
break
except self.queue.Empty:
pass
def _shutdown_thread(self, q, results, event_end, immediate):
event_end.wait()
q.shutdown(immediate)
results.append(q.qsize() == 0)
def _join_thread(self, q, barrier_start):
# Wait for the barrier to be complete.
barrier_start.wait()
q.join()
def _shutdown_all_methods_in_many_threads(self, immediate):
# Run a 'multi-producers/consumers queue' use case,
# with enough items into the queue.
# When shutdown, all running threads will be joined.
q = self.type2test()
ps = []
res_puts = []
res_gets = []
res_shutdown = []
write_threads = 4
read_threads = 6
join_threads = 2
nb_msgs = 1024*64
nb_msgs_w = nb_msgs // write_threads
when_exec_shutdown = nb_msgs_w // 2
# Use of a Barrier to ensure that
# - all write threads put all their items into the queue,
# - all read thread get at least one item from the queue,
# and keep on running until shutdown.
# The join thread is started only when shutdown is immediate.
nparties = write_threads + read_threads
if immediate:
nparties += join_threads
barrier_start = threading.Barrier(nparties)
ev_exec_shutdown = threading.Event()
lprocs = [
(self._write_msg_thread, write_threads, (q, nb_msgs_w, res_puts,
when_exec_shutdown, ev_exec_shutdown,
barrier_start)),
(self._read_msg_thread, read_threads, (q, res_gets, barrier_start)),
(self._shutdown_thread, 1, (q, res_shutdown, ev_exec_shutdown, immediate)),
]
if immediate:
lprocs.append((self._join_thread, join_threads, (q, barrier_start)))
# start all threads.
for func, n, args in lprocs:
for i in range(n):
ps.append(threading.Thread(target=func, args=args))
ps[-1].start()
for thread in ps:
thread.join()
self.assertTrue(True in res_puts)
self.assertEqual(res_gets.count(True), read_threads)
if immediate:
self.assertListEqual(res_shutdown, [True])
self.assertTrue(q.empty())
def test_shutdown_all_methods_in_many_threads(self):
return self._shutdown_all_methods_in_many_threads(False)
def test_shutdown_immediate_all_methods_in_many_threads(self):
return self._shutdown_all_methods_in_many_threads(True)
def _get(self, q, go, results, shutdown=False):
go.wait()
try:
msg = q.get()
results.append(not shutdown)
return not shutdown
except self.queue.ShutDown:
results.append(shutdown)
return shutdown
def _get_shutdown(self, q, go, results):
return self._get(q, go, results, True)
def _get_task_done(self, q, go, results):
go.wait()
try:
msg = q.get()
q.task_done()
results.append(True)
return msg
except self.queue.ShutDown:
results.append(False)
return False
def _put(self, q, msg, go, results, shutdown=False):
go.wait()
try:
q.put(msg)
results.append(not shutdown)
return not shutdown
except self.queue.ShutDown:
results.append(shutdown)
return shutdown
def _put_shutdown(self, q, msg, go, results):
return self._put(q, msg, go, results, True)
def _join(self, q, results, shutdown=False):
try:
q.join()
results.append(not shutdown)
return not shutdown
except self.queue.ShutDown:
results.append(shutdown)
return shutdown
def _join_shutdown(self, q, results):
return self._join(q, results, True)
def _shutdown_get(self, immediate):
q = self.type2test(2)
results = []
go = threading.Event()
q.put("Y")
q.put("D")
# queue full
if immediate:
thrds = (
(self._get_shutdown, (q, go, results)),
(self._get_shutdown, (q, go, results)),
)
else:
thrds = (
# on shutdown(immediate=False)
# one of these threads should raise Shutdown
(self._get, (q, go, results)),
(self._get, (q, go, results)),
(self._get, (q, go, results)),
)
threads = []
for func, params in thrds:
threads.append(threading.Thread(target=func, args=params))
threads[-1].start()
q.shutdown(immediate)
go.set()
for t in threads:
t.join()
if immediate:
self.assertListEqual(results, [True, True])
else:
self.assertListEqual(sorted(results), [False] + [True]*(len(thrds)-1))
def test_shutdown_get(self):
return self._shutdown_get(False)
def test_shutdown_immediate_get(self):
return self._shutdown_get(True)
def _shutdown_put(self, immediate):
q = self.type2test(2)
results = []
go = threading.Event()
q.put("Y")
q.put("D")
# queue fulled
thrds = (
(self._put_shutdown, (q, "E", go, results)),
(self._put_shutdown, (q, "W", go, results)),
)
threads = []
for func, params in thrds:
threads.append(threading.Thread(target=func, args=params))
threads[-1].start()
q.shutdown()
go.set()
for t in threads:
t.join()
self.assertEqual(results, [True]*len(thrds))
def test_shutdown_put(self):
return self._shutdown_put(False)
def test_shutdown_immediate_put(self):
return self._shutdown_put(True)
def _shutdown_join(self, immediate):
q = self.type2test()
results = []
q.put("Y")
go = threading.Event()
nb = q.qsize()
thrds = (
(self._join, (q, results)),
(self._join, (q, results)),
)
threads = []
for func, params in thrds:
threads.append(threading.Thread(target=func, args=params))
threads[-1].start()
if not immediate:
res = []
for i in range(nb):
threads.append(threading.Thread(target=self._get_task_done, args=(q, go, res)))
threads[-1].start()
q.shutdown(immediate)
go.set()
for t in threads:
t.join()
self.assertEqual(results, [True]*len(thrds))
def test_shutdown_immediate_join(self):
return self._shutdown_join(True)
def test_shutdown_join(self):
return self._shutdown_join(False)
def _shutdown_put_join(self, immediate):
q = self.type2test(2)
results = []
go = threading.Event()
q.put("Y")
# queue not fulled
thrds = (
(self._put_shutdown, (q, "E", go, results)),
(self._join, (q, results)),
)
threads = []
for func, params in thrds:
threads.append(threading.Thread(target=func, args=params))
threads[-1].start()
self.assertEqual(q.unfinished_tasks, 1)
q.shutdown(immediate)
go.set()
if immediate:
with self.assertRaises(self.queue.ShutDown):
q.get_nowait()
else:
result = q.get()
self.assertEqual(result, "Y")
q.task_done()
for t in threads:
t.join()
self.assertEqual(results, [True]*len(thrds))
def test_shutdown_immediate_put_join(self):
return self._shutdown_put_join(True)
def test_shutdown_put_join(self):
return self._shutdown_put_join(False)
def test_shutdown_get_task_done_join(self):
q = self.type2test(2)
results = []
go = threading.Event()
q.put("Y")
q.put("D")
self.assertEqual(q.unfinished_tasks, q.qsize())
thrds = (
(self._get_task_done, (q, go, results)),
(self._get_task_done, (q, go, results)),
(self._join, (q, results)),
(self._join, (q, results)),
)
threads = []
for func, params in thrds:
threads.append(threading.Thread(target=func, args=params))
threads[-1].start()
go.set()
q.shutdown(False)
for t in threads:
t.join()
self.assertEqual(results, [True]*len(thrds))
def test_shutdown_pending_get(self):
def get():
try:
results.append(q.get())
except Exception as e:
results.append(e)
q = self.type2test()
results = []
get_thread = threading.Thread(target=get)
get_thread.start()
q.shutdown(immediate=False)
get_thread.join(timeout=10.0)
self.assertFalse(get_thread.is_alive())
self.assertEqual(len(results), 1)
self.assertIsInstance(results[0], self.queue.ShutDown)
class QueueTest(BaseQueueTestMixin):
def setUp(self):
@@ -289,6 +704,7 @@ class CPriorityQueueTest(PriorityQueueTest, unittest.TestCase):
# A Queue subclass that can provoke failure at a moment's notice :)
class FailingQueueException(Exception): pass
class FailingQueueTest(BlockingTestMixin):
def setUp(self):

View File

@@ -58,6 +58,7 @@ class TestCase(unittest.TestCase):
scheduler.run()
self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])
@threading_helper.requires_working_threading()
def test_enter_concurrent(self):
q = queue.Queue()
fun = q.put
@@ -91,10 +92,23 @@ class TestCase(unittest.TestCase):
l = []
fun = lambda x: l.append(x)
scheduler = sched.scheduler(time.time, time.sleep)
for priority in [1, 2, 3, 4, 5]:
z = scheduler.enterabs(0.01, priority, fun, (priority,))
scheduler.run()
self.assertEqual(l, [1, 2, 3, 4, 5])
cases = [
([1, 2, 3, 4, 5], [1, 2, 3, 4, 5]),
([5, 4, 3, 2, 1], [1, 2, 3, 4, 5]),
([2, 5, 3, 1, 4], [1, 2, 3, 4, 5]),
([1, 2, 3, 2, 1], [1, 1, 2, 2, 3]),
]
for priorities, expected in cases:
with self.subTest(priorities=priorities, expected=expected):
for priority in priorities:
scheduler.enterabs(0.01, priority, fun, (priority,))
scheduler.run()
self.assertEqual(l, expected)
# Cleanup:
self.assertTrue(scheduler.empty())
l.clear()
def test_cancel(self):
l = []
@@ -111,6 +125,7 @@ class TestCase(unittest.TestCase):
scheduler.run()
self.assertEqual(l, [0.02, 0.03, 0.04])
@threading_helper.requires_working_threading()
def test_cancel_concurrent(self):
q = queue.Queue()
fun = q.put