forked from Rust-related/RustPython
Merge pull request #4587 from youknowone/crlf
Fix CR/LF of python libraries
This commit is contained in:
2582
Lib/test/test_bigmem.py
vendored
2582
Lib/test/test_bigmem.py
vendored
File diff suppressed because it is too large
Load Diff
8848
Lib/test/test_buffer.py
vendored
8848
Lib/test/test_buffer.py
vendored
File diff suppressed because it is too large
Load Diff
136
Lib/test/test_cgitb.py
vendored
136
Lib/test/test_cgitb.py
vendored
@@ -1,68 +1,68 @@
|
||||
from test.support.os_helper import temp_dir
|
||||
from test.support.script_helper import assert_python_failure
|
||||
import unittest
|
||||
import sys
|
||||
import cgitb
|
||||
|
||||
class TestCgitb(unittest.TestCase):
|
||||
|
||||
def test_fonts(self):
|
||||
text = "Hello Robbie!"
|
||||
self.assertEqual(cgitb.small(text), "<small>{}</small>".format(text))
|
||||
self.assertEqual(cgitb.strong(text), "<strong>{}</strong>".format(text))
|
||||
self.assertEqual(cgitb.grey(text),
|
||||
'<font color="#909090">{}</font>'.format(text))
|
||||
|
||||
def test_blanks(self):
|
||||
self.assertEqual(cgitb.small(""), "")
|
||||
self.assertEqual(cgitb.strong(""), "")
|
||||
self.assertEqual(cgitb.grey(""), "")
|
||||
|
||||
def test_html(self):
|
||||
try:
|
||||
raise ValueError("Hello World")
|
||||
except ValueError as err:
|
||||
# If the html was templated we could do a bit more here.
|
||||
# At least check that we get details on what we just raised.
|
||||
html = cgitb.html(sys.exc_info())
|
||||
self.assertIn("ValueError", html)
|
||||
self.assertIn(str(err), html)
|
||||
|
||||
def test_text(self):
|
||||
try:
|
||||
raise ValueError("Hello World")
|
||||
except ValueError as err:
|
||||
text = cgitb.text(sys.exc_info())
|
||||
self.assertIn("ValueError", text)
|
||||
self.assertIn("Hello World", text)
|
||||
|
||||
def test_syshook_no_logdir_default_format(self):
|
||||
with temp_dir() as tracedir:
|
||||
rc, out, err = assert_python_failure(
|
||||
'-c',
|
||||
('import cgitb; cgitb.enable(logdir=%s); '
|
||||
'raise ValueError("Hello World")') % repr(tracedir))
|
||||
out = out.decode(sys.getfilesystemencoding())
|
||||
self.assertIn("ValueError", out)
|
||||
self.assertIn("Hello World", out)
|
||||
self.assertIn("<strong><module></strong>", out)
|
||||
# By default we emit HTML markup.
|
||||
self.assertIn('<p>', out)
|
||||
self.assertIn('</p>', out)
|
||||
|
||||
def test_syshook_no_logdir_text_format(self):
|
||||
# Issue 12890: we were emitting the <p> tag in text mode.
|
||||
with temp_dir() as tracedir:
|
||||
rc, out, err = assert_python_failure(
|
||||
'-c',
|
||||
('import cgitb; cgitb.enable(format="text", logdir=%s); '
|
||||
'raise ValueError("Hello World")') % repr(tracedir))
|
||||
out = out.decode(sys.getfilesystemencoding())
|
||||
self.assertIn("ValueError", out)
|
||||
self.assertIn("Hello World", out)
|
||||
self.assertNotIn('<p>', out)
|
||||
self.assertNotIn('</p>', out)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
from test.support.os_helper import temp_dir
|
||||
from test.support.script_helper import assert_python_failure
|
||||
import unittest
|
||||
import sys
|
||||
import cgitb
|
||||
|
||||
class TestCgitb(unittest.TestCase):
|
||||
|
||||
def test_fonts(self):
|
||||
text = "Hello Robbie!"
|
||||
self.assertEqual(cgitb.small(text), "<small>{}</small>".format(text))
|
||||
self.assertEqual(cgitb.strong(text), "<strong>{}</strong>".format(text))
|
||||
self.assertEqual(cgitb.grey(text),
|
||||
'<font color="#909090">{}</font>'.format(text))
|
||||
|
||||
def test_blanks(self):
|
||||
self.assertEqual(cgitb.small(""), "")
|
||||
self.assertEqual(cgitb.strong(""), "")
|
||||
self.assertEqual(cgitb.grey(""), "")
|
||||
|
||||
def test_html(self):
|
||||
try:
|
||||
raise ValueError("Hello World")
|
||||
except ValueError as err:
|
||||
# If the html was templated we could do a bit more here.
|
||||
# At least check that we get details on what we just raised.
|
||||
html = cgitb.html(sys.exc_info())
|
||||
self.assertIn("ValueError", html)
|
||||
self.assertIn(str(err), html)
|
||||
|
||||
def test_text(self):
|
||||
try:
|
||||
raise ValueError("Hello World")
|
||||
except ValueError as err:
|
||||
text = cgitb.text(sys.exc_info())
|
||||
self.assertIn("ValueError", text)
|
||||
self.assertIn("Hello World", text)
|
||||
|
||||
def test_syshook_no_logdir_default_format(self):
|
||||
with temp_dir() as tracedir:
|
||||
rc, out, err = assert_python_failure(
|
||||
'-c',
|
||||
('import cgitb; cgitb.enable(logdir=%s); '
|
||||
'raise ValueError("Hello World")') % repr(tracedir))
|
||||
out = out.decode(sys.getfilesystemencoding())
|
||||
self.assertIn("ValueError", out)
|
||||
self.assertIn("Hello World", out)
|
||||
self.assertIn("<strong><module></strong>", out)
|
||||
# By default we emit HTML markup.
|
||||
self.assertIn('<p>', out)
|
||||
self.assertIn('</p>', out)
|
||||
|
||||
def test_syshook_no_logdir_text_format(self):
|
||||
# Issue 12890: we were emitting the <p> tag in text mode.
|
||||
with temp_dir() as tracedir:
|
||||
rc, out, err = assert_python_failure(
|
||||
'-c',
|
||||
('import cgitb; cgitb.enable(format="text", logdir=%s); '
|
||||
'raise ValueError("Hello World")') % repr(tracedir))
|
||||
out = out.decode(sys.getfilesystemencoding())
|
||||
self.assertIn("ValueError", out)
|
||||
self.assertIn("Hello World", out)
|
||||
self.assertNotIn('<p>', out)
|
||||
self.assertNotIn('</p>', out)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
1786
Lib/test/test_cmd_line.py
vendored
1786
Lib/test/test_cmd_line.py
vendored
File diff suppressed because it is too large
Load Diff
1566
Lib/test/test_cmd_line_script.py
vendored
1566
Lib/test/test_cmd_line_script.py
vendored
File diff suppressed because it is too large
Load Diff
314
Lib/test/test_code_module.py
vendored
314
Lib/test/test_code_module.py
vendored
@@ -1,157 +1,157 @@
|
||||
"Test InteractiveConsole and InteractiveInterpreter from code module"
|
||||
import sys
|
||||
import unittest
|
||||
from textwrap import dedent
|
||||
from contextlib import ExitStack
|
||||
from unittest import mock
|
||||
from test.support import import_helper
|
||||
|
||||
code = import_helper.import_module('code')
|
||||
|
||||
|
||||
class TestInteractiveConsole(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.console = code.InteractiveConsole()
|
||||
self.mock_sys()
|
||||
|
||||
def mock_sys(self):
|
||||
"Mock system environment for InteractiveConsole"
|
||||
# use exit stack to match patch context managers to addCleanup
|
||||
stack = ExitStack()
|
||||
self.addCleanup(stack.close)
|
||||
self.infunc = stack.enter_context(mock.patch('code.input',
|
||||
create=True))
|
||||
self.stdout = stack.enter_context(mock.patch('code.sys.stdout'))
|
||||
self.stderr = stack.enter_context(mock.patch('code.sys.stderr'))
|
||||
prepatch = mock.patch('code.sys', wraps=code.sys, spec=code.sys)
|
||||
self.sysmod = stack.enter_context(prepatch)
|
||||
if sys.excepthook is sys.__excepthook__:
|
||||
self.sysmod.excepthook = self.sysmod.__excepthook__
|
||||
del self.sysmod.ps1
|
||||
del self.sysmod.ps2
|
||||
|
||||
def test_ps1(self):
|
||||
self.infunc.side_effect = EOFError('Finished')
|
||||
self.console.interact()
|
||||
self.assertEqual(self.sysmod.ps1, '>>> ')
|
||||
self.sysmod.ps1 = 'custom1> '
|
||||
self.console.interact()
|
||||
self.assertEqual(self.sysmod.ps1, 'custom1> ')
|
||||
|
||||
def test_ps2(self):
|
||||
self.infunc.side_effect = EOFError('Finished')
|
||||
self.console.interact()
|
||||
self.assertEqual(self.sysmod.ps2, '... ')
|
||||
self.sysmod.ps1 = 'custom2> '
|
||||
self.console.interact()
|
||||
self.assertEqual(self.sysmod.ps1, 'custom2> ')
|
||||
|
||||
def test_console_stderr(self):
|
||||
self.infunc.side_effect = ["'antioch'", "", EOFError('Finished')]
|
||||
self.console.interact()
|
||||
for call in list(self.stdout.method_calls):
|
||||
if 'antioch' in ''.join(call[1]):
|
||||
break
|
||||
else:
|
||||
raise AssertionError("no console stdout")
|
||||
|
||||
def test_syntax_error(self):
|
||||
self.infunc.side_effect = ["undefined", EOFError('Finished')]
|
||||
self.console.interact()
|
||||
for call in self.stderr.method_calls:
|
||||
if 'NameError' in ''.join(call[1]):
|
||||
break
|
||||
else:
|
||||
raise AssertionError("No syntax error from console")
|
||||
|
||||
def test_sysexcepthook(self):
|
||||
self.infunc.side_effect = ["raise ValueError('')",
|
||||
EOFError('Finished')]
|
||||
hook = mock.Mock()
|
||||
self.sysmod.excepthook = hook
|
||||
self.console.interact()
|
||||
self.assertTrue(hook.called)
|
||||
|
||||
def test_banner(self):
|
||||
# with banner
|
||||
self.infunc.side_effect = EOFError('Finished')
|
||||
self.console.interact(banner='Foo')
|
||||
self.assertEqual(len(self.stderr.method_calls), 3)
|
||||
banner_call = self.stderr.method_calls[0]
|
||||
self.assertEqual(banner_call, ['write', ('Foo\n',), {}])
|
||||
|
||||
# no banner
|
||||
self.stderr.reset_mock()
|
||||
self.infunc.side_effect = EOFError('Finished')
|
||||
self.console.interact(banner='')
|
||||
self.assertEqual(len(self.stderr.method_calls), 2)
|
||||
|
||||
def test_exit_msg(self):
|
||||
# default exit message
|
||||
self.infunc.side_effect = EOFError('Finished')
|
||||
self.console.interact(banner='')
|
||||
self.assertEqual(len(self.stderr.method_calls), 2)
|
||||
err_msg = self.stderr.method_calls[1]
|
||||
expected = 'now exiting InteractiveConsole...\n'
|
||||
self.assertEqual(err_msg, ['write', (expected,), {}])
|
||||
|
||||
# no exit message
|
||||
self.stderr.reset_mock()
|
||||
self.infunc.side_effect = EOFError('Finished')
|
||||
self.console.interact(banner='', exitmsg='')
|
||||
self.assertEqual(len(self.stderr.method_calls), 1)
|
||||
|
||||
# custom exit message
|
||||
self.stderr.reset_mock()
|
||||
message = (
|
||||
'bye! \N{GREEK SMALL LETTER ZETA}\N{CYRILLIC SMALL LETTER ZHE}'
|
||||
)
|
||||
self.infunc.side_effect = EOFError('Finished')
|
||||
self.console.interact(banner='', exitmsg=message)
|
||||
self.assertEqual(len(self.stderr.method_calls), 2)
|
||||
err_msg = self.stderr.method_calls[1]
|
||||
expected = message + '\n'
|
||||
self.assertEqual(err_msg, ['write', (expected,), {}])
|
||||
|
||||
# TODO: RUSTPYTHON
|
||||
@unittest.expectedFailure
|
||||
def test_cause_tb(self):
|
||||
self.infunc.side_effect = ["raise ValueError('') from AttributeError",
|
||||
EOFError('Finished')]
|
||||
self.console.interact()
|
||||
output = ''.join(''.join(call[1]) for call in self.stderr.method_calls)
|
||||
expected = dedent("""
|
||||
AttributeError
|
||||
|
||||
The above exception was the direct cause of the following exception:
|
||||
|
||||
Traceback (most recent call last):
|
||||
File "<console>", line 1, in <module>
|
||||
ValueError
|
||||
""")
|
||||
self.assertIn(expected, output)
|
||||
|
||||
# TODO: RUSTPYTHON
|
||||
@unittest.expectedFailure
|
||||
def test_context_tb(self):
|
||||
self.infunc.side_effect = ["try: ham\nexcept: eggs\n",
|
||||
EOFError('Finished')]
|
||||
self.console.interact()
|
||||
output = ''.join(''.join(call[1]) for call in self.stderr.method_calls)
|
||||
expected = dedent("""
|
||||
Traceback (most recent call last):
|
||||
File "<console>", line 1, in <module>
|
||||
NameError: name 'ham' is not defined
|
||||
|
||||
During handling of the above exception, another exception occurred:
|
||||
|
||||
Traceback (most recent call last):
|
||||
File "<console>", line 2, in <module>
|
||||
NameError: name 'eggs' is not defined
|
||||
""")
|
||||
self.assertIn(expected, output)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
"Test InteractiveConsole and InteractiveInterpreter from code module"
|
||||
import sys
|
||||
import unittest
|
||||
from textwrap import dedent
|
||||
from contextlib import ExitStack
|
||||
from unittest import mock
|
||||
from test.support import import_helper
|
||||
|
||||
code = import_helper.import_module('code')
|
||||
|
||||
|
||||
class TestInteractiveConsole(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.console = code.InteractiveConsole()
|
||||
self.mock_sys()
|
||||
|
||||
def mock_sys(self):
|
||||
"Mock system environment for InteractiveConsole"
|
||||
# use exit stack to match patch context managers to addCleanup
|
||||
stack = ExitStack()
|
||||
self.addCleanup(stack.close)
|
||||
self.infunc = stack.enter_context(mock.patch('code.input',
|
||||
create=True))
|
||||
self.stdout = stack.enter_context(mock.patch('code.sys.stdout'))
|
||||
self.stderr = stack.enter_context(mock.patch('code.sys.stderr'))
|
||||
prepatch = mock.patch('code.sys', wraps=code.sys, spec=code.sys)
|
||||
self.sysmod = stack.enter_context(prepatch)
|
||||
if sys.excepthook is sys.__excepthook__:
|
||||
self.sysmod.excepthook = self.sysmod.__excepthook__
|
||||
del self.sysmod.ps1
|
||||
del self.sysmod.ps2
|
||||
|
||||
def test_ps1(self):
|
||||
self.infunc.side_effect = EOFError('Finished')
|
||||
self.console.interact()
|
||||
self.assertEqual(self.sysmod.ps1, '>>> ')
|
||||
self.sysmod.ps1 = 'custom1> '
|
||||
self.console.interact()
|
||||
self.assertEqual(self.sysmod.ps1, 'custom1> ')
|
||||
|
||||
def test_ps2(self):
|
||||
self.infunc.side_effect = EOFError('Finished')
|
||||
self.console.interact()
|
||||
self.assertEqual(self.sysmod.ps2, '... ')
|
||||
self.sysmod.ps1 = 'custom2> '
|
||||
self.console.interact()
|
||||
self.assertEqual(self.sysmod.ps1, 'custom2> ')
|
||||
|
||||
def test_console_stderr(self):
|
||||
self.infunc.side_effect = ["'antioch'", "", EOFError('Finished')]
|
||||
self.console.interact()
|
||||
for call in list(self.stdout.method_calls):
|
||||
if 'antioch' in ''.join(call[1]):
|
||||
break
|
||||
else:
|
||||
raise AssertionError("no console stdout")
|
||||
|
||||
def test_syntax_error(self):
|
||||
self.infunc.side_effect = ["undefined", EOFError('Finished')]
|
||||
self.console.interact()
|
||||
for call in self.stderr.method_calls:
|
||||
if 'NameError' in ''.join(call[1]):
|
||||
break
|
||||
else:
|
||||
raise AssertionError("No syntax error from console")
|
||||
|
||||
def test_sysexcepthook(self):
|
||||
self.infunc.side_effect = ["raise ValueError('')",
|
||||
EOFError('Finished')]
|
||||
hook = mock.Mock()
|
||||
self.sysmod.excepthook = hook
|
||||
self.console.interact()
|
||||
self.assertTrue(hook.called)
|
||||
|
||||
def test_banner(self):
|
||||
# with banner
|
||||
self.infunc.side_effect = EOFError('Finished')
|
||||
self.console.interact(banner='Foo')
|
||||
self.assertEqual(len(self.stderr.method_calls), 3)
|
||||
banner_call = self.stderr.method_calls[0]
|
||||
self.assertEqual(banner_call, ['write', ('Foo\n',), {}])
|
||||
|
||||
# no banner
|
||||
self.stderr.reset_mock()
|
||||
self.infunc.side_effect = EOFError('Finished')
|
||||
self.console.interact(banner='')
|
||||
self.assertEqual(len(self.stderr.method_calls), 2)
|
||||
|
||||
def test_exit_msg(self):
|
||||
# default exit message
|
||||
self.infunc.side_effect = EOFError('Finished')
|
||||
self.console.interact(banner='')
|
||||
self.assertEqual(len(self.stderr.method_calls), 2)
|
||||
err_msg = self.stderr.method_calls[1]
|
||||
expected = 'now exiting InteractiveConsole...\n'
|
||||
self.assertEqual(err_msg, ['write', (expected,), {}])
|
||||
|
||||
# no exit message
|
||||
self.stderr.reset_mock()
|
||||
self.infunc.side_effect = EOFError('Finished')
|
||||
self.console.interact(banner='', exitmsg='')
|
||||
self.assertEqual(len(self.stderr.method_calls), 1)
|
||||
|
||||
# custom exit message
|
||||
self.stderr.reset_mock()
|
||||
message = (
|
||||
'bye! \N{GREEK SMALL LETTER ZETA}\N{CYRILLIC SMALL LETTER ZHE}'
|
||||
)
|
||||
self.infunc.side_effect = EOFError('Finished')
|
||||
self.console.interact(banner='', exitmsg=message)
|
||||
self.assertEqual(len(self.stderr.method_calls), 2)
|
||||
err_msg = self.stderr.method_calls[1]
|
||||
expected = message + '\n'
|
||||
self.assertEqual(err_msg, ['write', (expected,), {}])
|
||||
|
||||
# TODO: RUSTPYTHON
|
||||
@unittest.expectedFailure
|
||||
def test_cause_tb(self):
|
||||
self.infunc.side_effect = ["raise ValueError('') from AttributeError",
|
||||
EOFError('Finished')]
|
||||
self.console.interact()
|
||||
output = ''.join(''.join(call[1]) for call in self.stderr.method_calls)
|
||||
expected = dedent("""
|
||||
AttributeError
|
||||
|
||||
The above exception was the direct cause of the following exception:
|
||||
|
||||
Traceback (most recent call last):
|
||||
File "<console>", line 1, in <module>
|
||||
ValueError
|
||||
""")
|
||||
self.assertIn(expected, output)
|
||||
|
||||
# TODO: RUSTPYTHON
|
||||
@unittest.expectedFailure
|
||||
def test_context_tb(self):
|
||||
self.infunc.side_effect = ["try: ham\nexcept: eggs\n",
|
||||
EOFError('Finished')]
|
||||
self.console.interact()
|
||||
output = ''.join(''.join(call[1]) for call in self.stderr.method_calls)
|
||||
expected = dedent("""
|
||||
Traceback (most recent call last):
|
||||
File "<console>", line 1, in <module>
|
||||
NameError: name 'ham' is not defined
|
||||
|
||||
During handling of the above exception, another exception occurred:
|
||||
|
||||
Traceback (most recent call last):
|
||||
File "<console>", line 2, in <module>
|
||||
NameError: name 'eggs' is not defined
|
||||
""")
|
||||
self.assertIn(expected, output)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
2136
Lib/test/test_contextlib.py
vendored
2136
Lib/test/test_contextlib.py
vendored
File diff suppressed because it is too large
Load Diff
556
Lib/test/test_dummy_thread.py
vendored
556
Lib/test/test_dummy_thread.py
vendored
@@ -1,278 +1,278 @@
|
||||
import _dummy_thread as _thread
|
||||
import time
|
||||
import queue
|
||||
import random
|
||||
import unittest
|
||||
from test import support
|
||||
from unittest import mock
|
||||
|
||||
DELAY = 0
|
||||
|
||||
|
||||
class LockTests(unittest.TestCase):
|
||||
"""Test lock objects."""
|
||||
|
||||
def setUp(self):
|
||||
# Create a lock
|
||||
self.lock = _thread.allocate_lock()
|
||||
|
||||
def test_initlock(self):
|
||||
#Make sure locks start locked
|
||||
self.assertFalse(self.lock.locked(),
|
||||
"Lock object is not initialized unlocked.")
|
||||
|
||||
def test_release(self):
|
||||
# Test self.lock.release()
|
||||
self.lock.acquire()
|
||||
self.lock.release()
|
||||
self.assertFalse(self.lock.locked(),
|
||||
"Lock object did not release properly.")
|
||||
|
||||
def test_LockType_context_manager(self):
|
||||
with _thread.LockType():
|
||||
pass
|
||||
self.assertFalse(self.lock.locked(),
|
||||
"Acquired Lock was not released")
|
||||
|
||||
def test_improper_release(self):
|
||||
#Make sure release of an unlocked thread raises RuntimeError
|
||||
self.assertRaises(RuntimeError, self.lock.release)
|
||||
|
||||
def test_cond_acquire_success(self):
|
||||
#Make sure the conditional acquiring of the lock works.
|
||||
self.assertTrue(self.lock.acquire(0),
|
||||
"Conditional acquiring of the lock failed.")
|
||||
|
||||
def test_cond_acquire_fail(self):
|
||||
#Test acquiring locked lock returns False
|
||||
self.lock.acquire(0)
|
||||
self.assertFalse(self.lock.acquire(0),
|
||||
"Conditional acquiring of a locked lock incorrectly "
|
||||
"succeeded.")
|
||||
|
||||
def test_uncond_acquire_success(self):
|
||||
#Make sure unconditional acquiring of a lock works.
|
||||
self.lock.acquire()
|
||||
self.assertTrue(self.lock.locked(),
|
||||
"Uncondional locking failed.")
|
||||
|
||||
def test_uncond_acquire_return_val(self):
|
||||
#Make sure that an unconditional locking returns True.
|
||||
self.assertIs(self.lock.acquire(1), True,
|
||||
"Unconditional locking did not return True.")
|
||||
self.assertIs(self.lock.acquire(), True)
|
||||
|
||||
def test_uncond_acquire_blocking(self):
|
||||
#Make sure that unconditional acquiring of a locked lock blocks.
|
||||
def delay_unlock(to_unlock, delay):
|
||||
"""Hold on to lock for a set amount of time before unlocking."""
|
||||
time.sleep(delay)
|
||||
to_unlock.release()
|
||||
|
||||
self.lock.acquire()
|
||||
start_time = int(time.monotonic())
|
||||
_thread.start_new_thread(delay_unlock,(self.lock, DELAY))
|
||||
if support.verbose:
|
||||
print()
|
||||
print("*** Waiting for thread to release the lock "\
|
||||
"(approx. %s sec.) ***" % DELAY)
|
||||
self.lock.acquire()
|
||||
end_time = int(time.monotonic())
|
||||
if support.verbose:
|
||||
print("done")
|
||||
self.assertGreaterEqual(end_time - start_time, DELAY,
|
||||
"Blocking by unconditional acquiring failed.")
|
||||
|
||||
@mock.patch('time.sleep')
|
||||
def test_acquire_timeout(self, mock_sleep):
|
||||
"""Test invoking acquire() with a positive timeout when the lock is
|
||||
already acquired. Ensure that time.sleep() is invoked with the given
|
||||
timeout and that False is returned."""
|
||||
|
||||
self.lock.acquire()
|
||||
retval = self.lock.acquire(waitflag=0, timeout=1)
|
||||
self.assertTrue(mock_sleep.called)
|
||||
mock_sleep.assert_called_once_with(1)
|
||||
self.assertEqual(retval, False)
|
||||
|
||||
def test_lock_representation(self):
|
||||
self.lock.acquire()
|
||||
self.assertIn("locked", repr(self.lock))
|
||||
self.lock.release()
|
||||
self.assertIn("unlocked", repr(self.lock))
|
||||
|
||||
|
||||
class RLockTests(unittest.TestCase):
|
||||
"""Test dummy RLock objects."""
|
||||
|
||||
def setUp(self):
|
||||
self.rlock = _thread.RLock()
|
||||
|
||||
def test_multiple_acquire(self):
|
||||
self.assertIn("unlocked", repr(self.rlock))
|
||||
self.rlock.acquire()
|
||||
self.rlock.acquire()
|
||||
self.assertIn("locked", repr(self.rlock))
|
||||
self.rlock.release()
|
||||
self.assertIn("locked", repr(self.rlock))
|
||||
self.rlock.release()
|
||||
self.assertIn("unlocked", repr(self.rlock))
|
||||
self.assertRaises(RuntimeError, self.rlock.release)
|
||||
|
||||
|
||||
class MiscTests(unittest.TestCase):
|
||||
"""Miscellaneous tests."""
|
||||
|
||||
def test_exit(self):
|
||||
self.assertRaises(SystemExit, _thread.exit)
|
||||
|
||||
# TODO: RUSTPYTHON
|
||||
@unittest.expectedFailure
|
||||
def test_ident(self):
|
||||
self.assertIsInstance(_thread.get_ident(), int,
|
||||
"_thread.get_ident() returned a non-integer")
|
||||
self.assertGreater(_thread.get_ident(), 0)
|
||||
|
||||
def test_LockType(self):
|
||||
self.assertIsInstance(_thread.allocate_lock(), _thread.LockType,
|
||||
"_thread.LockType is not an instance of what "
|
||||
"is returned by _thread.allocate_lock()")
|
||||
|
||||
def test_set_sentinel(self):
|
||||
self.assertIsInstance(_thread._set_sentinel(), _thread.LockType,
|
||||
"_thread._set_sentinel() did not return a "
|
||||
"LockType instance.")
|
||||
|
||||
def test_interrupt_main(self):
|
||||
#Calling start_new_thread with a function that executes interrupt_main
|
||||
# should raise KeyboardInterrupt upon completion.
|
||||
def call_interrupt():
|
||||
_thread.interrupt_main()
|
||||
|
||||
self.assertRaises(KeyboardInterrupt,
|
||||
_thread.start_new_thread,
|
||||
call_interrupt,
|
||||
tuple())
|
||||
|
||||
def test_interrupt_in_main(self):
|
||||
self.assertRaises(KeyboardInterrupt, _thread.interrupt_main)
|
||||
|
||||
def test_stack_size_None(self):
|
||||
retval = _thread.stack_size(None)
|
||||
self.assertEqual(retval, 0)
|
||||
|
||||
def test_stack_size_not_None(self):
|
||||
with self.assertRaises(_thread.error) as cm:
|
||||
_thread.stack_size("")
|
||||
self.assertEqual(cm.exception.args[0],
|
||||
"setting thread stack size not supported")
|
||||
|
||||
|
||||
class ThreadTests(unittest.TestCase):
|
||||
"""Test thread creation."""
|
||||
|
||||
def test_arg_passing(self):
|
||||
#Make sure that parameter passing works.
|
||||
def arg_tester(queue, arg1=False, arg2=False):
|
||||
"""Use to test _thread.start_new_thread() passes args properly."""
|
||||
queue.put((arg1, arg2))
|
||||
|
||||
testing_queue = queue.Queue(1)
|
||||
_thread.start_new_thread(arg_tester, (testing_queue, True, True))
|
||||
result = testing_queue.get()
|
||||
self.assertTrue(result[0] and result[1],
|
||||
"Argument passing for thread creation "
|
||||
"using tuple failed")
|
||||
|
||||
_thread.start_new_thread(
|
||||
arg_tester,
|
||||
tuple(),
|
||||
{'queue':testing_queue, 'arg1':True, 'arg2':True})
|
||||
|
||||
result = testing_queue.get()
|
||||
self.assertTrue(result[0] and result[1],
|
||||
"Argument passing for thread creation "
|
||||
"using kwargs failed")
|
||||
|
||||
_thread.start_new_thread(
|
||||
arg_tester,
|
||||
(testing_queue, True),
|
||||
{'arg2':True})
|
||||
|
||||
result = testing_queue.get()
|
||||
self.assertTrue(result[0] and result[1],
|
||||
"Argument passing for thread creation using both tuple"
|
||||
" and kwargs failed")
|
||||
|
||||
def test_multi_thread_creation(self):
|
||||
def queue_mark(queue, delay):
|
||||
time.sleep(delay)
|
||||
queue.put(_thread.get_ident())
|
||||
|
||||
thread_count = 5
|
||||
testing_queue = queue.Queue(thread_count)
|
||||
|
||||
if support.verbose:
|
||||
print()
|
||||
print("*** Testing multiple thread creation "
|
||||
"(will take approx. %s to %s sec.) ***" % (
|
||||
DELAY, thread_count))
|
||||
|
||||
for count in range(thread_count):
|
||||
if DELAY:
|
||||
local_delay = round(random.random(), 1)
|
||||
else:
|
||||
local_delay = 0
|
||||
_thread.start_new_thread(queue_mark,
|
||||
(testing_queue, local_delay))
|
||||
time.sleep(DELAY)
|
||||
if support.verbose:
|
||||
print('done')
|
||||
self.assertEqual(testing_queue.qsize(), thread_count,
|
||||
"Not all %s threads executed properly "
|
||||
"after %s sec." % (thread_count, DELAY))
|
||||
|
||||
def test_args_not_tuple(self):
|
||||
"""
|
||||
Test invoking start_new_thread() with a non-tuple value for "args".
|
||||
Expect TypeError with a meaningful error message to be raised.
|
||||
"""
|
||||
with self.assertRaises(TypeError) as cm:
|
||||
_thread.start_new_thread(mock.Mock(), [])
|
||||
self.assertEqual(cm.exception.args[0], "2nd arg must be a tuple")
|
||||
|
||||
def test_kwargs_not_dict(self):
|
||||
"""
|
||||
Test invoking start_new_thread() with a non-dict value for "kwargs".
|
||||
Expect TypeError with a meaningful error message to be raised.
|
||||
"""
|
||||
with self.assertRaises(TypeError) as cm:
|
||||
_thread.start_new_thread(mock.Mock(), tuple(), kwargs=[])
|
||||
self.assertEqual(cm.exception.args[0], "3rd arg must be a dict")
|
||||
|
||||
def test_SystemExit(self):
|
||||
"""
|
||||
Test invoking start_new_thread() with a function that raises
|
||||
SystemExit.
|
||||
The exception should be discarded.
|
||||
"""
|
||||
func = mock.Mock(side_effect=SystemExit())
|
||||
try:
|
||||
_thread.start_new_thread(func, tuple())
|
||||
except SystemExit:
|
||||
self.fail("start_new_thread raised SystemExit.")
|
||||
|
||||
@mock.patch('traceback.print_exc')
|
||||
def test_RaiseException(self, mock_print_exc):
|
||||
"""
|
||||
Test invoking start_new_thread() with a function that raises exception.
|
||||
|
||||
The exception should be discarded and the traceback should be printed
|
||||
via traceback.print_exc()
|
||||
"""
|
||||
func = mock.Mock(side_effect=Exception)
|
||||
_thread.start_new_thread(func, tuple())
|
||||
self.assertTrue(mock_print_exc.called)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
import _dummy_thread as _thread
|
||||
import time
|
||||
import queue
|
||||
import random
|
||||
import unittest
|
||||
from test import support
|
||||
from unittest import mock
|
||||
|
||||
DELAY = 0
|
||||
|
||||
|
||||
class LockTests(unittest.TestCase):
|
||||
"""Test lock objects."""
|
||||
|
||||
def setUp(self):
|
||||
# Create a lock
|
||||
self.lock = _thread.allocate_lock()
|
||||
|
||||
def test_initlock(self):
|
||||
#Make sure locks start locked
|
||||
self.assertFalse(self.lock.locked(),
|
||||
"Lock object is not initialized unlocked.")
|
||||
|
||||
def test_release(self):
|
||||
# Test self.lock.release()
|
||||
self.lock.acquire()
|
||||
self.lock.release()
|
||||
self.assertFalse(self.lock.locked(),
|
||||
"Lock object did not release properly.")
|
||||
|
||||
def test_LockType_context_manager(self):
|
||||
with _thread.LockType():
|
||||
pass
|
||||
self.assertFalse(self.lock.locked(),
|
||||
"Acquired Lock was not released")
|
||||
|
||||
def test_improper_release(self):
|
||||
#Make sure release of an unlocked thread raises RuntimeError
|
||||
self.assertRaises(RuntimeError, self.lock.release)
|
||||
|
||||
def test_cond_acquire_success(self):
|
||||
#Make sure the conditional acquiring of the lock works.
|
||||
self.assertTrue(self.lock.acquire(0),
|
||||
"Conditional acquiring of the lock failed.")
|
||||
|
||||
def test_cond_acquire_fail(self):
|
||||
#Test acquiring locked lock returns False
|
||||
self.lock.acquire(0)
|
||||
self.assertFalse(self.lock.acquire(0),
|
||||
"Conditional acquiring of a locked lock incorrectly "
|
||||
"succeeded.")
|
||||
|
||||
def test_uncond_acquire_success(self):
|
||||
#Make sure unconditional acquiring of a lock works.
|
||||
self.lock.acquire()
|
||||
self.assertTrue(self.lock.locked(),
|
||||
"Uncondional locking failed.")
|
||||
|
||||
def test_uncond_acquire_return_val(self):
|
||||
#Make sure that an unconditional locking returns True.
|
||||
self.assertIs(self.lock.acquire(1), True,
|
||||
"Unconditional locking did not return True.")
|
||||
self.assertIs(self.lock.acquire(), True)
|
||||
|
||||
def test_uncond_acquire_blocking(self):
|
||||
#Make sure that unconditional acquiring of a locked lock blocks.
|
||||
def delay_unlock(to_unlock, delay):
|
||||
"""Hold on to lock for a set amount of time before unlocking."""
|
||||
time.sleep(delay)
|
||||
to_unlock.release()
|
||||
|
||||
self.lock.acquire()
|
||||
start_time = int(time.monotonic())
|
||||
_thread.start_new_thread(delay_unlock,(self.lock, DELAY))
|
||||
if support.verbose:
|
||||
print()
|
||||
print("*** Waiting for thread to release the lock "\
|
||||
"(approx. %s sec.) ***" % DELAY)
|
||||
self.lock.acquire()
|
||||
end_time = int(time.monotonic())
|
||||
if support.verbose:
|
||||
print("done")
|
||||
self.assertGreaterEqual(end_time - start_time, DELAY,
|
||||
"Blocking by unconditional acquiring failed.")
|
||||
|
||||
@mock.patch('time.sleep')
|
||||
def test_acquire_timeout(self, mock_sleep):
|
||||
"""Test invoking acquire() with a positive timeout when the lock is
|
||||
already acquired. Ensure that time.sleep() is invoked with the given
|
||||
timeout and that False is returned."""
|
||||
|
||||
self.lock.acquire()
|
||||
retval = self.lock.acquire(waitflag=0, timeout=1)
|
||||
self.assertTrue(mock_sleep.called)
|
||||
mock_sleep.assert_called_once_with(1)
|
||||
self.assertEqual(retval, False)
|
||||
|
||||
def test_lock_representation(self):
|
||||
self.lock.acquire()
|
||||
self.assertIn("locked", repr(self.lock))
|
||||
self.lock.release()
|
||||
self.assertIn("unlocked", repr(self.lock))
|
||||
|
||||
|
||||
class RLockTests(unittest.TestCase):
|
||||
"""Test dummy RLock objects."""
|
||||
|
||||
def setUp(self):
|
||||
self.rlock = _thread.RLock()
|
||||
|
||||
def test_multiple_acquire(self):
|
||||
self.assertIn("unlocked", repr(self.rlock))
|
||||
self.rlock.acquire()
|
||||
self.rlock.acquire()
|
||||
self.assertIn("locked", repr(self.rlock))
|
||||
self.rlock.release()
|
||||
self.assertIn("locked", repr(self.rlock))
|
||||
self.rlock.release()
|
||||
self.assertIn("unlocked", repr(self.rlock))
|
||||
self.assertRaises(RuntimeError, self.rlock.release)
|
||||
|
||||
|
||||
class MiscTests(unittest.TestCase):
|
||||
"""Miscellaneous tests."""
|
||||
|
||||
def test_exit(self):
|
||||
self.assertRaises(SystemExit, _thread.exit)
|
||||
|
||||
# TODO: RUSTPYTHON
|
||||
@unittest.expectedFailure
|
||||
def test_ident(self):
|
||||
self.assertIsInstance(_thread.get_ident(), int,
|
||||
"_thread.get_ident() returned a non-integer")
|
||||
self.assertGreater(_thread.get_ident(), 0)
|
||||
|
||||
def test_LockType(self):
|
||||
self.assertIsInstance(_thread.allocate_lock(), _thread.LockType,
|
||||
"_thread.LockType is not an instance of what "
|
||||
"is returned by _thread.allocate_lock()")
|
||||
|
||||
def test_set_sentinel(self):
|
||||
self.assertIsInstance(_thread._set_sentinel(), _thread.LockType,
|
||||
"_thread._set_sentinel() did not return a "
|
||||
"LockType instance.")
|
||||
|
||||
def test_interrupt_main(self):
|
||||
#Calling start_new_thread with a function that executes interrupt_main
|
||||
# should raise KeyboardInterrupt upon completion.
|
||||
def call_interrupt():
|
||||
_thread.interrupt_main()
|
||||
|
||||
self.assertRaises(KeyboardInterrupt,
|
||||
_thread.start_new_thread,
|
||||
call_interrupt,
|
||||
tuple())
|
||||
|
||||
def test_interrupt_in_main(self):
|
||||
self.assertRaises(KeyboardInterrupt, _thread.interrupt_main)
|
||||
|
||||
def test_stack_size_None(self):
|
||||
retval = _thread.stack_size(None)
|
||||
self.assertEqual(retval, 0)
|
||||
|
||||
def test_stack_size_not_None(self):
|
||||
with self.assertRaises(_thread.error) as cm:
|
||||
_thread.stack_size("")
|
||||
self.assertEqual(cm.exception.args[0],
|
||||
"setting thread stack size not supported")
|
||||
|
||||
|
||||
class ThreadTests(unittest.TestCase):
|
||||
"""Test thread creation."""
|
||||
|
||||
def test_arg_passing(self):
|
||||
#Make sure that parameter passing works.
|
||||
def arg_tester(queue, arg1=False, arg2=False):
|
||||
"""Use to test _thread.start_new_thread() passes args properly."""
|
||||
queue.put((arg1, arg2))
|
||||
|
||||
testing_queue = queue.Queue(1)
|
||||
_thread.start_new_thread(arg_tester, (testing_queue, True, True))
|
||||
result = testing_queue.get()
|
||||
self.assertTrue(result[0] and result[1],
|
||||
"Argument passing for thread creation "
|
||||
"using tuple failed")
|
||||
|
||||
_thread.start_new_thread(
|
||||
arg_tester,
|
||||
tuple(),
|
||||
{'queue':testing_queue, 'arg1':True, 'arg2':True})
|
||||
|
||||
result = testing_queue.get()
|
||||
self.assertTrue(result[0] and result[1],
|
||||
"Argument passing for thread creation "
|
||||
"using kwargs failed")
|
||||
|
||||
_thread.start_new_thread(
|
||||
arg_tester,
|
||||
(testing_queue, True),
|
||||
{'arg2':True})
|
||||
|
||||
result = testing_queue.get()
|
||||
self.assertTrue(result[0] and result[1],
|
||||
"Argument passing for thread creation using both tuple"
|
||||
" and kwargs failed")
|
||||
|
||||
def test_multi_thread_creation(self):
|
||||
def queue_mark(queue, delay):
|
||||
time.sleep(delay)
|
||||
queue.put(_thread.get_ident())
|
||||
|
||||
thread_count = 5
|
||||
testing_queue = queue.Queue(thread_count)
|
||||
|
||||
if support.verbose:
|
||||
print()
|
||||
print("*** Testing multiple thread creation "
|
||||
"(will take approx. %s to %s sec.) ***" % (
|
||||
DELAY, thread_count))
|
||||
|
||||
for count in range(thread_count):
|
||||
if DELAY:
|
||||
local_delay = round(random.random(), 1)
|
||||
else:
|
||||
local_delay = 0
|
||||
_thread.start_new_thread(queue_mark,
|
||||
(testing_queue, local_delay))
|
||||
time.sleep(DELAY)
|
||||
if support.verbose:
|
||||
print('done')
|
||||
self.assertEqual(testing_queue.qsize(), thread_count,
|
||||
"Not all %s threads executed properly "
|
||||
"after %s sec." % (thread_count, DELAY))
|
||||
|
||||
def test_args_not_tuple(self):
|
||||
"""
|
||||
Test invoking start_new_thread() with a non-tuple value for "args".
|
||||
Expect TypeError with a meaningful error message to be raised.
|
||||
"""
|
||||
with self.assertRaises(TypeError) as cm:
|
||||
_thread.start_new_thread(mock.Mock(), [])
|
||||
self.assertEqual(cm.exception.args[0], "2nd arg must be a tuple")
|
||||
|
||||
def test_kwargs_not_dict(self):
|
||||
"""
|
||||
Test invoking start_new_thread() with a non-dict value for "kwargs".
|
||||
Expect TypeError with a meaningful error message to be raised.
|
||||
"""
|
||||
with self.assertRaises(TypeError) as cm:
|
||||
_thread.start_new_thread(mock.Mock(), tuple(), kwargs=[])
|
||||
self.assertEqual(cm.exception.args[0], "3rd arg must be a dict")
|
||||
|
||||
def test_SystemExit(self):
|
||||
"""
|
||||
Test invoking start_new_thread() with a function that raises
|
||||
SystemExit.
|
||||
The exception should be discarded.
|
||||
"""
|
||||
func = mock.Mock(side_effect=SystemExit())
|
||||
try:
|
||||
_thread.start_new_thread(func, tuple())
|
||||
except SystemExit:
|
||||
self.fail("start_new_thread raised SystemExit.")
|
||||
|
||||
@mock.patch('traceback.print_exc')
|
||||
def test_RaiseException(self, mock_print_exc):
|
||||
"""
|
||||
Test invoking start_new_thread() with a function that raises exception.
|
||||
|
||||
The exception should be discarded and the traceback should be printed
|
||||
via traceback.print_exc()
|
||||
"""
|
||||
func = mock.Mock(side_effect=Exception)
|
||||
_thread.start_new_thread(func, tuple())
|
||||
self.assertTrue(mock_print_exc.called)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
120
Lib/test/test_dummy_threading.py
vendored
120
Lib/test/test_dummy_threading.py
vendored
@@ -1,60 +1,60 @@
|
||||
from test import support
|
||||
import unittest
|
||||
import dummy_threading as _threading
|
||||
import time
|
||||
|
||||
class DummyThreadingTestCase(unittest.TestCase):
|
||||
|
||||
class TestThread(_threading.Thread):
|
||||
|
||||
def run(self):
|
||||
global running
|
||||
global sema
|
||||
global mutex
|
||||
# Uncomment if testing another module, such as the real 'threading'
|
||||
# module.
|
||||
#delay = random.random() * 2
|
||||
delay = 0
|
||||
if support.verbose:
|
||||
print('task', self.name, 'will run for', delay, 'sec')
|
||||
sema.acquire()
|
||||
mutex.acquire()
|
||||
running += 1
|
||||
if support.verbose:
|
||||
print(running, 'tasks are running')
|
||||
mutex.release()
|
||||
time.sleep(delay)
|
||||
if support.verbose:
|
||||
print('task', self.name, 'done')
|
||||
mutex.acquire()
|
||||
running -= 1
|
||||
if support.verbose:
|
||||
print(self.name, 'is finished.', running, 'tasks are running')
|
||||
mutex.release()
|
||||
sema.release()
|
||||
|
||||
def setUp(self):
|
||||
self.numtasks = 10
|
||||
global sema
|
||||
sema = _threading.BoundedSemaphore(value=3)
|
||||
global mutex
|
||||
mutex = _threading.RLock()
|
||||
global running
|
||||
running = 0
|
||||
self.threads = []
|
||||
|
||||
def test_tasks(self):
|
||||
for i in range(self.numtasks):
|
||||
t = self.TestThread(name="<thread %d>"%i)
|
||||
self.threads.append(t)
|
||||
t.start()
|
||||
|
||||
if support.verbose:
|
||||
print('waiting for all tasks to complete')
|
||||
for t in self.threads:
|
||||
t.join()
|
||||
if support.verbose:
|
||||
print('all tasks done')
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
from test import support
|
||||
import unittest
|
||||
import dummy_threading as _threading
|
||||
import time
|
||||
|
||||
class DummyThreadingTestCase(unittest.TestCase):
|
||||
|
||||
class TestThread(_threading.Thread):
|
||||
|
||||
def run(self):
|
||||
global running
|
||||
global sema
|
||||
global mutex
|
||||
# Uncomment if testing another module, such as the real 'threading'
|
||||
# module.
|
||||
#delay = random.random() * 2
|
||||
delay = 0
|
||||
if support.verbose:
|
||||
print('task', self.name, 'will run for', delay, 'sec')
|
||||
sema.acquire()
|
||||
mutex.acquire()
|
||||
running += 1
|
||||
if support.verbose:
|
||||
print(running, 'tasks are running')
|
||||
mutex.release()
|
||||
time.sleep(delay)
|
||||
if support.verbose:
|
||||
print('task', self.name, 'done')
|
||||
mutex.acquire()
|
||||
running -= 1
|
||||
if support.verbose:
|
||||
print(self.name, 'is finished.', running, 'tasks are running')
|
||||
mutex.release()
|
||||
sema.release()
|
||||
|
||||
def setUp(self):
|
||||
self.numtasks = 10
|
||||
global sema
|
||||
sema = _threading.BoundedSemaphore(value=3)
|
||||
global mutex
|
||||
mutex = _threading.RLock()
|
||||
global running
|
||||
running = 0
|
||||
self.threads = []
|
||||
|
||||
def test_tasks(self):
|
||||
for i in range(self.numtasks):
|
||||
t = self.TestThread(name="<thread %d>"%i)
|
||||
self.threads.append(t)
|
||||
t.start()
|
||||
|
||||
if support.verbose:
|
||||
print('waiting for all tasks to complete')
|
||||
for t in self.threads:
|
||||
t.join()
|
||||
if support.verbose:
|
||||
print('all tasks done')
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
8
Lib/test/tracedmodules/__init__.py
vendored
8
Lib/test/tracedmodules/__init__.py
vendored
@@ -1,4 +1,4 @@
|
||||
"""This package contains modules that help testing the trace.py module. Note
|
||||
that the exact location of functions in these modules is important, as trace.py
|
||||
takes the real line numbers into account.
|
||||
"""
|
||||
"""This package contains modules that help testing the trace.py module. Note
|
||||
that the exact location of functions in these modules is important, as trace.py
|
||||
takes the real line numbers into account.
|
||||
"""
|
||||
|
||||
18
Lib/test/tracedmodules/testmod.py
vendored
18
Lib/test/tracedmodules/testmod.py
vendored
@@ -1,9 +1,9 @@
|
||||
def func(x):
|
||||
b = x + 1
|
||||
return b + 2
|
||||
|
||||
def func2():
|
||||
"""Test function for issue 9936 """
|
||||
return (1,
|
||||
2,
|
||||
3)
|
||||
def func(x):
|
||||
b = x + 1
|
||||
return b + 2
|
||||
|
||||
def func2():
|
||||
"""Test function for issue 9936 """
|
||||
return (1,
|
||||
2,
|
||||
3)
|
||||
|
||||
Reference in New Issue
Block a user