Merge pull request #5134 from NakanoMiku39/main

Update libraries and test files from CPython v3.12
This commit is contained in:
Jeong, YunWon
2023-12-28 13:01:13 +09:00
committed by GitHub
14 changed files with 2259 additions and 83 deletions

2
Lib/abc.py vendored
View File

@@ -18,7 +18,7 @@ def abstractmethod(funcobj):
class C(metaclass=ABCMeta):
@abstractmethod
def my_abstract_method(self, ...):
def my_abstract_method(self, arg1, arg2, argN):
...
"""
funcobj.__isabstractmethod__ = True

64
Lib/argparse.py vendored
View File

@@ -345,21 +345,22 @@ class HelpFormatter(object):
def get_lines(parts, indent, prefix=None):
lines = []
line = []
indent_length = len(indent)
if prefix is not None:
line_len = len(prefix) - 1
else:
line_len = len(indent) - 1
line_len = indent_length - 1
for part in parts:
if line_len + 1 + len(part) > text_width and line:
lines.append(indent + ' '.join(line))
line = []
line_len = len(indent) - 1
line_len = indent_length - 1
line.append(part)
line_len += len(part) + 1
if line:
lines.append(indent + ' '.join(line))
if prefix is not None:
lines[0] = lines[0][len(indent):]
lines[0] = lines[0][indent_length:]
return lines
# if prog is short, follow it with optionals or positionals
@@ -403,10 +404,18 @@ class HelpFormatter(object):
except ValueError:
continue
else:
end = start + len(group._group_actions)
group_action_count = len(group._group_actions)
end = start + group_action_count
if actions[start:end] == group._group_actions:
suppressed_actions_count = 0
for action in group._group_actions:
group_actions.add(action)
if action.help is SUPPRESS:
suppressed_actions_count += 1
exposed_actions_count = group_action_count - suppressed_actions_count
if not group.required:
if start in inserts:
inserts[start] += ' ['
@@ -416,7 +425,7 @@ class HelpFormatter(object):
inserts[end] += ']'
else:
inserts[end] = ']'
else:
elif exposed_actions_count > 1:
if start in inserts:
inserts[start] += ' ('
else:
@@ -490,7 +499,6 @@ class HelpFormatter(object):
text = _re.sub(r'(%s) ' % open, r'\1', text)
text = _re.sub(r' (%s)' % close, r'\1', text)
text = _re.sub(r'%s *%s' % (open, close), r'', text)
text = _re.sub(r'\(([^|]*)\)', r'\1', text)
text = text.strip()
# return the text
@@ -875,16 +883,19 @@ class Action(_AttributeHolder):
raise NotImplementedError(_('.__call__() not defined'))
# FIXME: remove together with `BooleanOptionalAction` deprecated arguments.
_deprecated_default = object()
class BooleanOptionalAction(Action):
def __init__(self,
option_strings,
dest,
default=None,
type=None,
choices=None,
type=_deprecated_default,
choices=_deprecated_default,
required=False,
help=None,
metavar=None):
metavar=_deprecated_default):
_option_strings = []
for option_string in option_strings:
@@ -894,6 +905,24 @@ class BooleanOptionalAction(Action):
option_string = '--no-' + option_string[2:]
_option_strings.append(option_string)
# We need `_deprecated` special value to ban explicit arguments that
# match default value. Like:
# parser.add_argument('-f', action=BooleanOptionalAction, type=int)
for field_name in ('type', 'choices', 'metavar'):
if locals()[field_name] is not _deprecated_default:
warnings._deprecated(
field_name,
"{name!r} is deprecated as of Python 3.12 and will be "
"removed in Python {remove}.",
remove=(3, 14))
if type is _deprecated_default:
type = None
if choices is _deprecated_default:
choices = None
if metavar is _deprecated_default:
metavar = None
super().__init__(
option_strings=_option_strings,
dest=dest,
@@ -2165,7 +2194,9 @@ class ArgumentParser(_AttributeHolder, _ActionsContainer):
# replace arguments referencing files with the file content
else:
try:
with open(arg_string[1:]) as args_file:
with open(arg_string[1:],
encoding=_sys.getfilesystemencoding(),
errors=_sys.getfilesystemencodeerrors()) as args_file:
arg_strings = []
for arg_line in args_file.read().splitlines():
for arg in self.convert_arg_line_to_args(arg_line):
@@ -2479,9 +2510,11 @@ class ArgumentParser(_AttributeHolder, _ActionsContainer):
not action.option_strings):
if action.default is not None:
value = action.default
self._check_value(action, value)
else:
# since arg_strings is always [] at this point
# there is no need to use self._check_value(action, value)
value = arg_strings
self._check_value(action, value)
# single argument or optional argument produces a single value
elif len(arg_strings) == 1 and action.nargs in [None, OPTIONAL]:
@@ -2523,7 +2556,6 @@ class ArgumentParser(_AttributeHolder, _ActionsContainer):
# ArgumentTypeErrors indicate errors
except ArgumentTypeError as err:
name = getattr(action.type, '__name__', repr(action.type))
msg = str(err)
raise ArgumentError(action, msg)
@@ -2595,9 +2627,11 @@ class ArgumentParser(_AttributeHolder, _ActionsContainer):
def _print_message(self, message, file=None):
if message:
if file is None:
file = _sys.stderr
file.write(message)
file = file or _sys.stderr
try:
file.write(message)
except (AttributeError, OSError):
pass
# ===============
# Exiting methods

33
Lib/base64.py vendored
View File

@@ -508,14 +508,8 @@ MAXBINSIZE = (MAXLINESIZE//4)*3
def encode(input, output):
"""Encode a file; input and output are binary files."""
while True:
s = input.read(MAXBINSIZE)
if not s:
break
while len(s) < MAXBINSIZE:
ns = input.read(MAXBINSIZE-len(s))
if not ns:
break
while s := input.read(MAXBINSIZE):
while len(s) < MAXBINSIZE and (ns := input.read(MAXBINSIZE-len(s))):
s += ns
line = binascii.b2a_base64(s)
output.write(line)
@@ -523,10 +517,7 @@ def encode(input, output):
def decode(input, output):
"""Decode a file; input and output are binary files."""
while True:
line = input.readline()
if not line:
break
while line := input.readline():
s = binascii.a2b_base64(line)
output.write(s)
@@ -567,13 +558,12 @@ def decodebytes(s):
def main():
"""Small main program"""
import sys, getopt
usage = """usage: %s [-h|-d|-e|-u|-t] [file|-]
usage = f"""usage: {sys.argv[0]} [-h|-d|-e|-u] [file|-]
-h: print this help message and exit
-d, -u: decode
-e: encode (default)
-t: encode and decode string 'Aladdin:open sesame'"""%sys.argv[0]
-e: encode (default)"""
try:
opts, args = getopt.getopt(sys.argv[1:], 'hdeut')
opts, args = getopt.getopt(sys.argv[1:], 'hdeu')
except getopt.error as msg:
sys.stdout = sys.stderr
print(msg)
@@ -584,7 +574,6 @@ def main():
if o == '-e': func = encode
if o == '-d': func = decode
if o == '-u': func = decode
if o == '-t': test(); return
if o == '-h': print(usage); return
if args and args[0] != '-':
with open(args[0], 'rb') as f:
@@ -593,15 +582,5 @@ def main():
func(sys.stdin.buffer, sys.stdout.buffer)
def test():
s0 = b"Aladdin:open sesame"
print(repr(s0))
s1 = encodebytes(s0)
print(repr(s1))
s2 = decodebytes(s1)
print(repr(s2))
assert s0 == s2
if __name__ == '__main__':
main()

26
Lib/bdb.py vendored
View File

@@ -570,9 +570,12 @@ class Bdb:
rv = frame.f_locals['__return__']
s += '->'
s += reprlib.repr(rv)
line = linecache.getline(filename, lineno, frame.f_globals)
if line:
s += lprefix + line.strip()
if lineno is not None:
line = linecache.getline(filename, lineno, frame.f_globals)
if line:
s += lprefix + line.strip()
else:
s += f'{lprefix}Warning: lineno is None'
return s
# The following methods can be called by clients to use
@@ -805,15 +808,18 @@ def checkfuncname(b, frame):
return True
# Determines if there is an effective (active) breakpoint at this
# line of code. Returns breakpoint number or 0 if none
def effective(file, line, frame):
"""Determine which breakpoint for this file:line is to be acted upon.
"""Return (active breakpoint, delete temporary flag) or (None, None) as
breakpoint to act upon.
Called only if we know there is a breakpoint at this location. Return
the breakpoint that was triggered and a boolean that indicates if it is
ok to delete a temporary breakpoint. Return (None, None) if there is no
matching breakpoint.
The "active breakpoint" is the first entry in bplist[line, file] (which
must exist) that is enabled, for which checkfuncname is True, and that
has neither a False condition nor a positive ignore count. The flag,
meaning that a temporary breakpoint should be deleted, is False only
when the condiion cannot be evaluated (in which case, ignore count is
ignored).
If no such entry exists, then (None, None) is returned.
"""
possibles = Breakpoint.bplist[file, line]
for b in possibles:

8
Lib/bisect.py vendored
View File

@@ -8,6 +8,8 @@ def insort_right(a, x, lo=0, hi=None, *, key=None):
Optional args lo (default 0) and hi (default len(a)) bound the
slice of a to be searched.
A custom key function can be supplied to customize the sort order.
"""
if key is None:
lo = bisect_right(a, x, lo, hi)
@@ -25,6 +27,8 @@ def bisect_right(a, x, lo=0, hi=None, *, key=None):
Optional args lo (default 0) and hi (default len(a)) bound the
slice of a to be searched.
A custom key function can be supplied to customize the sort order.
"""
if lo < 0:
@@ -57,6 +61,8 @@ def insort_left(a, x, lo=0, hi=None, *, key=None):
Optional args lo (default 0) and hi (default len(a)) bound the
slice of a to be searched.
A custom key function can be supplied to customize the sort order.
"""
if key is None:
@@ -74,6 +80,8 @@ def bisect_left(a, x, lo=0, hi=None, *, key=None):
Optional args lo (default 0) and hi (default len(a)) bound the
slice of a to be searched.
A custom key function can be supplied to customize the sort order.
"""
if lo < 0:

314
Lib/test/support/asynchat.py vendored Normal file
View File

@@ -0,0 +1,314 @@
# TODO: This module was deprecated and removed from CPython 3.12
# Now it is a test-only helper. Any attempts to rewrite exising tests that
# are using this module and remove it completely are appreciated!
# See: https://github.com/python/cpython/issues/72719
# -*- Mode: Python; tab-width: 4 -*-
# Id: asynchat.py,v 2.26 2000/09/07 22:29:26 rushing Exp
# Author: Sam Rushing <rushing@nightmare.com>
# ======================================================================
# Copyright 1996 by Sam Rushing
#
# All Rights Reserved
#
# Permission to use, copy, modify, and distribute this software and
# its documentation for any purpose and without fee is hereby
# granted, provided that the above copyright notice appear in all
# copies and that both that copyright notice and this permission
# notice appear in supporting documentation, and that the name of Sam
# Rushing not be used in advertising or publicity pertaining to
# distribution of the software without specific, written prior
# permission.
#
# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
# ======================================================================
r"""A class supporting chat-style (command/response) protocols.
This class adds support for 'chat' style protocols - where one side
sends a 'command', and the other sends a response (examples would be
the common internet protocols - smtp, nntp, ftp, etc..).
The handle_read() method looks at the input stream for the current
'terminator' (usually '\r\n' for single-line responses, '\r\n.\r\n'
for multi-line output), calling self.found_terminator() on its
receipt.
for example:
Say you build an async nntp client using this class. At the start
of the connection, you'll have self.terminator set to '\r\n', in
order to process the single-line greeting. Just before issuing a
'LIST' command you'll set it to '\r\n.\r\n'. The output of the LIST
command will be accumulated (using your own 'collect_incoming_data'
method) up to the terminator, and then control will be returned to
you - by calling your self.found_terminator() method.
"""
from collections import deque
from test.support import asyncore
class async_chat(asyncore.dispatcher):
"""This is an abstract class. You must derive from this class, and add
the two methods collect_incoming_data() and found_terminator()"""
# these are overridable defaults
ac_in_buffer_size = 65536
ac_out_buffer_size = 65536
# we don't want to enable the use of encoding by default, because that is a
# sign of an application bug that we don't want to pass silently
use_encoding = 0
encoding = 'latin-1'
def __init__(self, sock=None, map=None):
# for string terminator matching
self.ac_in_buffer = b''
# we use a list here rather than io.BytesIO for a few reasons...
# del lst[:] is faster than bio.truncate(0)
# lst = [] is faster than bio.truncate(0)
self.incoming = []
# we toss the use of the "simple producer" and replace it with
# a pure deque, which the original fifo was a wrapping of
self.producer_fifo = deque()
asyncore.dispatcher.__init__(self, sock, map)
def collect_incoming_data(self, data):
raise NotImplementedError("must be implemented in subclass")
def _collect_incoming_data(self, data):
self.incoming.append(data)
def _get_data(self):
d = b''.join(self.incoming)
del self.incoming[:]
return d
def found_terminator(self):
raise NotImplementedError("must be implemented in subclass")
def set_terminator(self, term):
"""Set the input delimiter.
Can be a fixed string of any length, an integer, or None.
"""
if isinstance(term, str) and self.use_encoding:
term = bytes(term, self.encoding)
elif isinstance(term, int) and term < 0:
raise ValueError('the number of received bytes must be positive')
self.terminator = term
def get_terminator(self):
return self.terminator
# grab some more data from the socket,
# throw it to the collector method,
# check for the terminator,
# if found, transition to the next state.
def handle_read(self):
try:
data = self.recv(self.ac_in_buffer_size)
except BlockingIOError:
return
except OSError:
self.handle_error()
return
if isinstance(data, str) and self.use_encoding:
data = bytes(str, self.encoding)
self.ac_in_buffer = self.ac_in_buffer + data
# Continue to search for self.terminator in self.ac_in_buffer,
# while calling self.collect_incoming_data. The while loop
# is necessary because we might read several data+terminator
# combos with a single recv(4096).
while self.ac_in_buffer:
lb = len(self.ac_in_buffer)
terminator = self.get_terminator()
if not terminator:
# no terminator, collect it all
self.collect_incoming_data(self.ac_in_buffer)
self.ac_in_buffer = b''
elif isinstance(terminator, int):
# numeric terminator
n = terminator
if lb < n:
self.collect_incoming_data(self.ac_in_buffer)
self.ac_in_buffer = b''
self.terminator = self.terminator - lb
else:
self.collect_incoming_data(self.ac_in_buffer[:n])
self.ac_in_buffer = self.ac_in_buffer[n:]
self.terminator = 0
self.found_terminator()
else:
# 3 cases:
# 1) end of buffer matches terminator exactly:
# collect data, transition
# 2) end of buffer matches some prefix:
# collect data to the prefix
# 3) end of buffer does not match any prefix:
# collect data
terminator_len = len(terminator)
index = self.ac_in_buffer.find(terminator)
if index != -1:
# we found the terminator
if index > 0:
# don't bother reporting the empty string
# (source of subtle bugs)
self.collect_incoming_data(self.ac_in_buffer[:index])
self.ac_in_buffer = self.ac_in_buffer[index+terminator_len:]
# This does the Right Thing if the terminator
# is changed here.
self.found_terminator()
else:
# check for a prefix of the terminator
index = find_prefix_at_end(self.ac_in_buffer, terminator)
if index:
if index != lb:
# we found a prefix, collect up to the prefix
self.collect_incoming_data(self.ac_in_buffer[:-index])
self.ac_in_buffer = self.ac_in_buffer[-index:]
break
else:
# no prefix, collect it all
self.collect_incoming_data(self.ac_in_buffer)
self.ac_in_buffer = b''
def handle_write(self):
self.initiate_send()
def handle_close(self):
self.close()
def push(self, data):
if not isinstance(data, (bytes, bytearray, memoryview)):
raise TypeError('data argument must be byte-ish (%r)',
type(data))
sabs = self.ac_out_buffer_size
if len(data) > sabs:
for i in range(0, len(data), sabs):
self.producer_fifo.append(data[i:i+sabs])
else:
self.producer_fifo.append(data)
self.initiate_send()
def push_with_producer(self, producer):
self.producer_fifo.append(producer)
self.initiate_send()
def readable(self):
"predicate for inclusion in the readable for select()"
# cannot use the old predicate, it violates the claim of the
# set_terminator method.
# return (len(self.ac_in_buffer) <= self.ac_in_buffer_size)
return 1
def writable(self):
"predicate for inclusion in the writable for select()"
return self.producer_fifo or (not self.connected)
def close_when_done(self):
"automatically close this channel once the outgoing queue is empty"
self.producer_fifo.append(None)
def initiate_send(self):
while self.producer_fifo and self.connected:
first = self.producer_fifo[0]
# handle empty string/buffer or None entry
if not first:
del self.producer_fifo[0]
if first is None:
self.handle_close()
return
# handle classic producer behavior
obs = self.ac_out_buffer_size
try:
data = first[:obs]
except TypeError:
data = first.more()
if data:
self.producer_fifo.appendleft(data)
else:
del self.producer_fifo[0]
continue
if isinstance(data, str) and self.use_encoding:
data = bytes(data, self.encoding)
# send the data
try:
num_sent = self.send(data)
except OSError:
self.handle_error()
return
if num_sent:
if num_sent < len(data) or obs < len(first):
self.producer_fifo[0] = first[num_sent:]
else:
del self.producer_fifo[0]
# we tried to send some actual data
return
def discard_buffers(self):
# Emergencies only!
self.ac_in_buffer = b''
del self.incoming[:]
self.producer_fifo.clear()
class simple_producer:
def __init__(self, data, buffer_size=512):
self.data = data
self.buffer_size = buffer_size
def more(self):
if len(self.data) > self.buffer_size:
result = self.data[:self.buffer_size]
self.data = self.data[self.buffer_size:]
return result
else:
result = self.data
self.data = b''
return result
# Given 'haystack', see if any prefix of 'needle' is at its end. This
# assumes an exact match has already been checked. Return the number of
# characters matched.
# for example:
# f_p_a_e("qwerty\r", "\r\n") => 1
# f_p_a_e("qwertydkjf", "\r\n") => 0
# f_p_a_e("qwerty\r\n", "\r\n") => <undefined>
# this could maybe be made faster with a computed regex?
# [answer: no; circa Python-2.0, Jan 2001]
# new python: 28961/s
# old python: 18307/s
# re: 12820/s
# regex: 14035/s
def find_prefix_at_end(haystack, needle):
l = len(needle) - 1
while l and not haystack.endswith(needle[:l]):
l -= 1
return l

649
Lib/test/support/asyncore.py vendored Normal file
View File

@@ -0,0 +1,649 @@
# TODO: This module was deprecated and removed from CPython 3.12
# Now it is a test-only helper. Any attempts to rewrite exising tests that
# are using this module and remove it completely are appreciated!
# See: https://github.com/python/cpython/issues/72719
# -*- Mode: Python -*-
# Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
# Author: Sam Rushing <rushing@nightmare.com>
# ======================================================================
# Copyright 1996 by Sam Rushing
#
# All Rights Reserved
#
# Permission to use, copy, modify, and distribute this software and
# its documentation for any purpose and without fee is hereby
# granted, provided that the above copyright notice appear in all
# copies and that both that copyright notice and this permission
# notice appear in supporting documentation, and that the name of Sam
# Rushing not be used in advertising or publicity pertaining to
# distribution of the software without specific, written prior
# permission.
#
# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
# ======================================================================
"""Basic infrastructure for asynchronous socket service clients and servers.
There are only two ways to have a program on a single processor do "more
than one thing at a time". Multi-threaded programming is the simplest and
most popular way to do it, but there is another very different technique,
that lets you have nearly all the advantages of multi-threading, without
actually using multiple threads. it's really only practical if your program
is largely I/O bound. If your program is CPU bound, then pre-emptive
scheduled threads are probably what you really need. Network servers are
rarely CPU-bound, however.
If your operating system supports the select() system call in its I/O
library (and nearly all do), then you can use it to juggle multiple
communication channels at once; doing other work while your I/O is taking
place in the "background." Although this strategy can seem strange and
complex, especially at first, it is in many ways easier to understand and
control than multi-threaded programming. The module documented here solves
many of the difficult problems for you, making the task of building
sophisticated high-performance network servers and clients a snap.
"""
import select
import socket
import sys
import time
import warnings
import os
from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL, \
ENOTCONN, ESHUTDOWN, EISCONN, EBADF, ECONNABORTED, EPIPE, EAGAIN, \
errorcode
_DISCONNECTED = frozenset({ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE,
EBADF})
try:
socket_map
except NameError:
socket_map = {}
def _strerror(err):
try:
return os.strerror(err)
except (ValueError, OverflowError, NameError):
if err in errorcode:
return errorcode[err]
return "Unknown error %s" %err
class ExitNow(Exception):
pass
_reraised_exceptions = (ExitNow, KeyboardInterrupt, SystemExit)
def read(obj):
try:
obj.handle_read_event()
except _reraised_exceptions:
raise
except:
obj.handle_error()
def write(obj):
try:
obj.handle_write_event()
except _reraised_exceptions:
raise
except:
obj.handle_error()
def _exception(obj):
try:
obj.handle_expt_event()
except _reraised_exceptions:
raise
except:
obj.handle_error()
def readwrite(obj, flags):
try:
if flags & select.POLLIN:
obj.handle_read_event()
if flags & select.POLLOUT:
obj.handle_write_event()
if flags & select.POLLPRI:
obj.handle_expt_event()
if flags & (select.POLLHUP | select.POLLERR | select.POLLNVAL):
obj.handle_close()
except OSError as e:
if e.errno not in _DISCONNECTED:
obj.handle_error()
else:
obj.handle_close()
except _reraised_exceptions:
raise
except:
obj.handle_error()
def poll(timeout=0.0, map=None):
if map is None:
map = socket_map
if map:
r = []; w = []; e = []
for fd, obj in list(map.items()):
is_r = obj.readable()
is_w = obj.writable()
if is_r:
r.append(fd)
# accepting sockets should not be writable
if is_w and not obj.accepting:
w.append(fd)
if is_r or is_w:
e.append(fd)
if [] == r == w == e:
time.sleep(timeout)
return
r, w, e = select.select(r, w, e, timeout)
for fd in r:
obj = map.get(fd)
if obj is None:
continue
read(obj)
for fd in w:
obj = map.get(fd)
if obj is None:
continue
write(obj)
for fd in e:
obj = map.get(fd)
if obj is None:
continue
_exception(obj)
def poll2(timeout=0.0, map=None):
# Use the poll() support added to the select module in Python 2.0
if map is None:
map = socket_map
if timeout is not None:
# timeout is in milliseconds
timeout = int(timeout*1000)
pollster = select.poll()
if map:
for fd, obj in list(map.items()):
flags = 0
if obj.readable():
flags |= select.POLLIN | select.POLLPRI
# accepting sockets should not be writable
if obj.writable() and not obj.accepting:
flags |= select.POLLOUT
if flags:
pollster.register(fd, flags)
r = pollster.poll(timeout)
for fd, flags in r:
obj = map.get(fd)
if obj is None:
continue
readwrite(obj, flags)
poll3 = poll2 # Alias for backward compatibility
def loop(timeout=30.0, use_poll=False, map=None, count=None):
if map is None:
map = socket_map
if use_poll and hasattr(select, 'poll'):
poll_fun = poll2
else:
poll_fun = poll
if count is None:
while map:
poll_fun(timeout, map)
else:
while map and count > 0:
poll_fun(timeout, map)
count = count - 1
class dispatcher:
debug = False
connected = False
accepting = False
connecting = False
closing = False
addr = None
ignore_log_types = frozenset({'warning'})
def __init__(self, sock=None, map=None):
if map is None:
self._map = socket_map
else:
self._map = map
self._fileno = None
if sock:
# Set to nonblocking just to make sure for cases where we
# get a socket from a blocking source.
sock.setblocking(False)
self.set_socket(sock, map)
self.connected = True
# The constructor no longer requires that the socket
# passed be connected.
try:
self.addr = sock.getpeername()
except OSError as err:
if err.errno in (ENOTCONN, EINVAL):
# To handle the case where we got an unconnected
# socket.
self.connected = False
else:
# The socket is broken in some unknown way, alert
# the user and remove it from the map (to prevent
# polling of broken sockets).
self.del_channel(map)
raise
else:
self.socket = None
def __repr__(self):
status = [self.__class__.__module__+"."+self.__class__.__qualname__]
if self.accepting and self.addr:
status.append('listening')
elif self.connected:
status.append('connected')
if self.addr is not None:
try:
status.append('%s:%d' % self.addr)
except TypeError:
status.append(repr(self.addr))
return '<%s at %#x>' % (' '.join(status), id(self))
def add_channel(self, map=None):
#self.log_info('adding channel %s' % self)
if map is None:
map = self._map
map[self._fileno] = self
def del_channel(self, map=None):
fd = self._fileno
if map is None:
map = self._map
if fd in map:
#self.log_info('closing channel %d:%s' % (fd, self))
del map[fd]
self._fileno = None
def create_socket(self, family=socket.AF_INET, type=socket.SOCK_STREAM):
self.family_and_type = family, type
sock = socket.socket(family, type)
sock.setblocking(False)
self.set_socket(sock)
def set_socket(self, sock, map=None):
self.socket = sock
self._fileno = sock.fileno()
self.add_channel(map)
def set_reuse_addr(self):
# try to re-use a server port if possible
try:
self.socket.setsockopt(
socket.SOL_SOCKET, socket.SO_REUSEADDR,
self.socket.getsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR) | 1
)
except OSError:
pass
# ==================================================
# predicates for select()
# these are used as filters for the lists of sockets
# to pass to select().
# ==================================================
def readable(self):
return True
def writable(self):
return True
# ==================================================
# socket object methods.
# ==================================================
def listen(self, num):
self.accepting = True
if os.name == 'nt' and num > 5:
num = 5
return self.socket.listen(num)
def bind(self, addr):
self.addr = addr
return self.socket.bind(addr)
def connect(self, address):
self.connected = False
self.connecting = True
err = self.socket.connect_ex(address)
if err in (EINPROGRESS, EALREADY, EWOULDBLOCK) \
or err == EINVAL and os.name == 'nt':
self.addr = address
return
if err in (0, EISCONN):
self.addr = address
self.handle_connect_event()
else:
raise OSError(err, errorcode[err])
def accept(self):
# XXX can return either an address pair or None
try:
conn, addr = self.socket.accept()
except TypeError:
return None
except OSError as why:
if why.errno in (EWOULDBLOCK, ECONNABORTED, EAGAIN):
return None
else:
raise
else:
return conn, addr
def send(self, data):
try:
result = self.socket.send(data)
return result
except OSError as why:
if why.errno == EWOULDBLOCK:
return 0
elif why.errno in _DISCONNECTED:
self.handle_close()
return 0
else:
raise
def recv(self, buffer_size):
try:
data = self.socket.recv(buffer_size)
if not data:
# a closed connection is indicated by signaling
# a read condition, and having recv() return 0.
self.handle_close()
return b''
else:
return data
except OSError as why:
# winsock sometimes raises ENOTCONN
if why.errno in _DISCONNECTED:
self.handle_close()
return b''
else:
raise
def close(self):
self.connected = False
self.accepting = False
self.connecting = False
self.del_channel()
if self.socket is not None:
try:
self.socket.close()
except OSError as why:
if why.errno not in (ENOTCONN, EBADF):
raise
# log and log_info may be overridden to provide more sophisticated
# logging and warning methods. In general, log is for 'hit' logging
# and 'log_info' is for informational, warning and error logging.
def log(self, message):
sys.stderr.write('log: %s\n' % str(message))
def log_info(self, message, type='info'):
if type not in self.ignore_log_types:
print('%s: %s' % (type, message))
def handle_read_event(self):
if self.accepting:
# accepting sockets are never connected, they "spawn" new
# sockets that are connected
self.handle_accept()
elif not self.connected:
if self.connecting:
self.handle_connect_event()
self.handle_read()
else:
self.handle_read()
def handle_connect_event(self):
err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if err != 0:
raise OSError(err, _strerror(err))
self.handle_connect()
self.connected = True
self.connecting = False
def handle_write_event(self):
if self.accepting:
# Accepting sockets shouldn't get a write event.
# We will pretend it didn't happen.
return
if not self.connected:
if self.connecting:
self.handle_connect_event()
self.handle_write()
def handle_expt_event(self):
# handle_expt_event() is called if there might be an error on the
# socket, or if there is OOB data
# check for the error condition first
err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if err != 0:
# we can get here when select.select() says that there is an
# exceptional condition on the socket
# since there is an error, we'll go ahead and close the socket
# like we would in a subclassed handle_read() that received no
# data
self.handle_close()
else:
self.handle_expt()
def handle_error(self):
nil, t, v, tbinfo = compact_traceback()
# sometimes a user repr method will crash.
try:
self_repr = repr(self)
except:
self_repr = '<__repr__(self) failed for object at %0x>' % id(self)
self.log_info(
'uncaptured python exception, closing channel %s (%s:%s %s)' % (
self_repr,
t,
v,
tbinfo
),
'error'
)
self.handle_close()
def handle_expt(self):
self.log_info('unhandled incoming priority event', 'warning')
def handle_read(self):
self.log_info('unhandled read event', 'warning')
def handle_write(self):
self.log_info('unhandled write event', 'warning')
def handle_connect(self):
self.log_info('unhandled connect event', 'warning')
def handle_accept(self):
pair = self.accept()
if pair is not None:
self.handle_accepted(*pair)
def handle_accepted(self, sock, addr):
sock.close()
self.log_info('unhandled accepted event', 'warning')
def handle_close(self):
self.log_info('unhandled close event', 'warning')
self.close()
# ---------------------------------------------------------------------------
# adds simple buffered output capability, useful for simple clients.
# [for more sophisticated usage use asynchat.async_chat]
# ---------------------------------------------------------------------------
class dispatcher_with_send(dispatcher):
def __init__(self, sock=None, map=None):
dispatcher.__init__(self, sock, map)
self.out_buffer = b''
def initiate_send(self):
num_sent = 0
num_sent = dispatcher.send(self, self.out_buffer[:65536])
self.out_buffer = self.out_buffer[num_sent:]
def handle_write(self):
self.initiate_send()
def writable(self):
return (not self.connected) or len(self.out_buffer)
def send(self, data):
if self.debug:
self.log_info('sending %s' % repr(data))
self.out_buffer = self.out_buffer + data
self.initiate_send()
# ---------------------------------------------------------------------------
# used for debugging.
# ---------------------------------------------------------------------------
def compact_traceback():
exc = sys.exception()
tb = exc.__traceback__
if not tb: # Must have a traceback
raise AssertionError("traceback does not exist")
tbinfo = []
while tb:
tbinfo.append((
tb.tb_frame.f_code.co_filename,
tb.tb_frame.f_code.co_name,
str(tb.tb_lineno)
))
tb = tb.tb_next
# just to be safe
del tb
file, function, line = tbinfo[-1]
info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo])
return (file, function, line), type(exc), exc, info
def close_all(map=None, ignore_all=False):
if map is None:
map = socket_map
for x in list(map.values()):
try:
x.close()
except OSError as x:
if x.errno == EBADF:
pass
elif not ignore_all:
raise
except _reraised_exceptions:
raise
except:
if not ignore_all:
raise
map.clear()
# Asynchronous File I/O:
#
# After a little research (reading man pages on various unixen, and
# digging through the linux kernel), I've determined that select()
# isn't meant for doing asynchronous file i/o.
# Heartening, though - reading linux/mm/filemap.c shows that linux
# supports asynchronous read-ahead. So _MOST_ of the time, the data
# will be sitting in memory for us already when we go to read it.
#
# What other OS's (besides NT) support async file i/o? [VMS?]
#
# Regardless, this is useful for pipes, and stdin/stdout...
if os.name == 'posix':
class file_wrapper:
# Here we override just enough to make a file
# look like a socket for the purposes of asyncore.
# The passed fd is automatically os.dup()'d
def __init__(self, fd):
self.fd = os.dup(fd)
def __del__(self):
if self.fd >= 0:
warnings.warn("unclosed file %r" % self, ResourceWarning,
source=self)
self.close()
def recv(self, *args):
return os.read(self.fd, *args)
def send(self, *args):
return os.write(self.fd, *args)
def getsockopt(self, level, optname, buflen=None):
if (level == socket.SOL_SOCKET and
optname == socket.SO_ERROR and
not buflen):
return 0
raise NotImplementedError("Only asyncore specific behaviour "
"implemented.")
read = recv
write = send
def close(self):
if self.fd < 0:
return
fd = self.fd
self.fd = -1
os.close(fd)
def fileno(self):
return self.fd
class file_dispatcher(dispatcher):
def __init__(self, fd, map=None):
dispatcher.__init__(self, None, map)
self.connected = True
try:
fd = fd.fileno()
except AttributeError:
pass
self.set_file(fd)
# set it to non-blocking mode
os.set_blocking(fd, False)
def set_file(self, fd):
self.socket = file_wrapper(fd)
self._fileno = self.socket.fileno()
self.add_channel()

29
Lib/test/test_abc.py vendored
View File

@@ -149,14 +149,18 @@ def test_factory(abc_ABCMeta, abc_get_cache_token):
self.assertEqual(D.foo(), 4)
self.assertEqual(D().foo(), 4)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_object_new_with_one_abstractmethod(self):
class C(metaclass=abc_ABCMeta):
@abc.abstractmethod
def method_one(self):
pass
msg = r"class C with abstract method method_one"
msg = r"class C without an implementation for abstract method 'method_one'"
self.assertRaisesRegex(TypeError, msg, C)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_object_new_with_many_abstractmethods(self):
class C(metaclass=abc_ABCMeta):
@abc.abstractmethod
@@ -165,7 +169,7 @@ def test_factory(abc_ABCMeta, abc_get_cache_token):
@abc.abstractmethod
def method_two(self):
pass
msg = r"class C with abstract methods method_one, method_two"
msg = r"class C without an implementation for abstract methods 'method_one', 'method_two'"
self.assertRaisesRegex(TypeError, msg, C)
def test_abstractmethod_integration(self):
@@ -448,15 +452,16 @@ def test_factory(abc_ABCMeta, abc_get_cache_token):
# Also check that issubclass() propagates exceptions raised by
# __subclasses__.
class CustomError(Exception): ...
exc_msg = "exception from __subclasses__"
def raise_exc():
raise Exception(exc_msg)
raise CustomError(exc_msg)
class S(metaclass=abc_ABCMeta):
__subclasses__ = raise_exc
with self.assertRaisesRegex(Exception, exc_msg):
with self.assertRaisesRegex(CustomError, exc_msg):
issubclass(int, S)
def test_subclasshook(self):
@@ -521,6 +526,8 @@ def test_factory(abc_ABCMeta, abc_get_cache_token):
self.assertEqual(A.__abstractmethods__, set())
A()
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_update_new_abstractmethods(self):
class A(metaclass=abc_ABCMeta):
@abc.abstractmethod
@@ -534,9 +541,11 @@ def test_factory(abc_ABCMeta, abc_get_cache_token):
A.foo = updated_foo
abc.update_abstractmethods(A)
self.assertEqual(A.__abstractmethods__, {'foo', 'bar'})
msg = "class A with abstract methods bar, foo"
msg = "class A without an implementation for abstract methods 'bar', 'foo'"
self.assertRaisesRegex(TypeError, msg, A)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_update_implementation(self):
class A(metaclass=abc_ABCMeta):
@abc.abstractmethod
@@ -546,7 +555,7 @@ def test_factory(abc_ABCMeta, abc_get_cache_token):
class B(A):
pass
msg = "class B with abstract method foo"
msg = "class B without an implementation for abstract method 'foo'"
self.assertRaisesRegex(TypeError, msg, B)
self.assertEqual(B.__abstractmethods__, {'foo'})
@@ -588,6 +597,8 @@ def test_factory(abc_ABCMeta, abc_get_cache_token):
A()
self.assertFalse(hasattr(A, '__abstractmethods__'))
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_update_del_implementation(self):
class A(metaclass=abc_ABCMeta):
@abc.abstractmethod
@@ -604,9 +615,11 @@ def test_factory(abc_ABCMeta, abc_get_cache_token):
abc.update_abstractmethods(B)
msg = "class B with abstract method foo"
msg = "class B without an implementation for abstract method 'foo'"
self.assertRaisesRegex(TypeError, msg, B)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_update_layered_implementation(self):
class A(metaclass=abc_ABCMeta):
@abc.abstractmethod
@@ -626,7 +639,7 @@ def test_factory(abc_ABCMeta, abc_get_cache_token):
abc.update_abstractmethods(C)
msg = "class C with abstract method foo"
msg = "class C without an implementation for abstract method 'foo'"
self.assertRaisesRegex(TypeError, msg, C)
def test_update_multi_inheritance(self):

View File

@@ -1,5 +1,7 @@
# Author: Steven J. Bethard <steven.bethard@gmail.com>.
import contextlib
import functools
import inspect
import io
import operator
@@ -35,6 +37,35 @@ class StdIOBuffer(io.TextIOWrapper):
return self.buffer.raw.getvalue().decode('utf-8')
class StdStreamTest(unittest.TestCase):
def test_skip_invalid_stderr(self):
parser = argparse.ArgumentParser()
with (
contextlib.redirect_stderr(None),
mock.patch('argparse._sys.exit')
):
parser.exit(status=0, message='foo')
def test_skip_invalid_stdout(self):
parser = argparse.ArgumentParser()
for func in (
parser.print_usage,
parser.print_help,
functools.partial(parser.parse_args, ['-h'])
):
with (
self.subTest(func=func),
contextlib.redirect_stdout(None),
# argparse uses stderr as a fallback
StdIOBuffer() as mocked_stderr,
contextlib.redirect_stderr(mocked_stderr),
mock.patch('argparse._sys.exit'),
):
func()
self.assertRegex(mocked_stderr.getvalue(), r'usage:')
class TestCase(unittest.TestCase):
def setUp(self):
@@ -734,6 +765,49 @@ class TestBooleanOptionalAction(ParserTestCase):
self.assertIn("got an unexpected keyword argument 'const'", str(cm.exception))
def test_deprecated_init_kw(self):
# See gh-92248
parser = argparse.ArgumentParser()
with self.assertWarns(DeprecationWarning):
parser.add_argument(
'-a',
action=argparse.BooleanOptionalAction,
type=None,
)
with self.assertWarns(DeprecationWarning):
parser.add_argument(
'-b',
action=argparse.BooleanOptionalAction,
type=bool,
)
with self.assertWarns(DeprecationWarning):
parser.add_argument(
'-c',
action=argparse.BooleanOptionalAction,
metavar=None,
)
with self.assertWarns(DeprecationWarning):
parser.add_argument(
'-d',
action=argparse.BooleanOptionalAction,
metavar='d',
)
with self.assertWarns(DeprecationWarning):
parser.add_argument(
'-e',
action=argparse.BooleanOptionalAction,
choices=None,
)
with self.assertWarns(DeprecationWarning):
parser.add_argument(
'-f',
action=argparse.BooleanOptionalAction,
choices=(),
)
class TestBooleanOptionalActionRequired(ParserTestCase):
"""Tests BooleanOptionalAction required"""
@@ -1505,14 +1579,15 @@ class TestArgumentsFromFile(TempDirMixin, ParserTestCase):
def setUp(self):
super(TestArgumentsFromFile, self).setUp()
file_texts = [
('hello', 'hello world!\n'),
('recursive', '-a\n'
'A\n'
'@hello'),
('invalid', '@no-such-path\n'),
('hello', os.fsencode(self.hello) + b'\n'),
('recursive', b'-a\n'
b'A\n'
b'@hello'),
('invalid', b'@no-such-path\n'),
('undecodable', self.undecodable + b'\n'),
]
for path, text in file_texts:
with open(path, 'w', encoding="utf-8") as file:
with open(path, 'wb') as file:
file.write(text)
parser_signature = Sig(fromfile_prefix_chars='@')
@@ -1522,15 +1597,25 @@ class TestArgumentsFromFile(TempDirMixin, ParserTestCase):
Sig('y', nargs='+'),
]
failures = ['', '-b', 'X', '@invalid', '@missing']
hello = 'hello world!' + os_helper.FS_NONASCII
successes = [
('X Y', NS(a=None, x='X', y=['Y'])),
('X -a A Y Z', NS(a='A', x='X', y=['Y', 'Z'])),
('@hello X', NS(a=None, x='hello world!', y=['X'])),
('X @hello', NS(a=None, x='X', y=['hello world!'])),
('-a B @recursive Y Z', NS(a='A', x='hello world!', y=['Y', 'Z'])),
('X @recursive Z -a B', NS(a='B', x='X', y=['hello world!', 'Z'])),
('@hello X', NS(a=None, x=hello, y=['X'])),
('X @hello', NS(a=None, x='X', y=[hello])),
('-a B @recursive Y Z', NS(a='A', x=hello, y=['Y', 'Z'])),
('X @recursive Z -a B', NS(a='B', x='X', y=[hello, 'Z'])),
(["-a", "", "X", "Y"], NS(a='', x='X', y=['Y'])),
]
if os_helper.TESTFN_UNDECODABLE:
undecodable = os_helper.TESTFN_UNDECODABLE.lstrip(b'@')
decoded_undecodable = os.fsdecode(undecodable)
successes += [
('@undecodable X', NS(a=None, x=decoded_undecodable, y=['X'])),
('X @undecodable', NS(a=None, x='X', y=[decoded_undecodable])),
]
else:
undecodable = b''
class TestArgumentsFromFileConverter(TempDirMixin, ParserTestCase):
@@ -1539,10 +1624,10 @@ class TestArgumentsFromFileConverter(TempDirMixin, ParserTestCase):
def setUp(self):
super(TestArgumentsFromFileConverter, self).setUp()
file_texts = [
('hello', 'hello world!\n'),
('hello', b'hello world!\n'),
]
for path, text in file_texts:
with open(path, 'w', encoding="utf-8") as file:
with open(path, 'wb') as file:
file.write(text)
class FromFileConverterArgumentParser(ErrorRaisingArgumentParser):
@@ -3753,6 +3838,28 @@ class TestHelpUsage(HelpTestCase):
version = ''
class TestHelpUsageWithParentheses(HelpTestCase):
parser_signature = Sig(prog='PROG')
argument_signatures = [
Sig('positional', metavar='(example) positional'),
Sig('-p', '--optional', metavar='{1 (option A), 2 (option B)}'),
]
usage = '''\
usage: PROG [-h] [-p {1 (option A), 2 (option B)}] (example) positional
'''
help = usage + '''\
positional arguments:
(example) positional
options:
-h, --help show this help message and exit
-p {1 (option A), 2 (option B)}, --optional {1 (option A), 2 (option B)}
'''
version = ''
class TestHelpOnlyUserGroups(HelpTestCase):
"""Test basic usage messages"""
@@ -5219,6 +5326,13 @@ class TestParseKnownArgs(TestCase):
self.assertEqual(NS(v=3, spam=True, badger="B"), args)
self.assertEqual(["C", "--foo", "4"], extras)
def test_zero_or_more_optional(self):
parser = argparse.ArgumentParser()
parser.add_argument('x', nargs='*', choices=('x', 'y'))
args = parser.parse_args([])
self.assertEqual(NS(x=[]), args)
# ===========================
# parse_intermixed_args tests
# ===========================

View File

@@ -31,6 +31,8 @@ class LegacyBase64TestCase(unittest.TestCase):
b"YWJjZGVmZ2hpamtsbW5vcHFyc3R1dnd4eXpBQkNE"
b"RUZHSElKS0xNTk9QUVJTVFVWV1hZWjAxMjM0\nNT"
b"Y3ODkhQCMwXiYqKCk7Ojw+LC4gW117fQ==\n")
eq(base64.encodebytes(b"Aladdin:open sesame"),
b"QWxhZGRpbjpvcGVuIHNlc2FtZQ==\n")
# Non-bytes
eq(base64.encodebytes(bytearray(b'abc')), b'YWJj\n')
eq(base64.encodebytes(memoryview(b'abc')), b'YWJj\n')
@@ -50,6 +52,8 @@ class LegacyBase64TestCase(unittest.TestCase):
b"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
b"0123456789!@#0^&*();:<>,. []{}")
eq(base64.decodebytes(b''), b'')
eq(base64.decodebytes(b"QWxhZGRpbjpvcGVuIHNlc2FtZQ==\n"),
b"Aladdin:open sesame")
# Non-bytes
eq(base64.decodebytes(bytearray(b'YWJj\n')), b'abc')
eq(base64.decodebytes(memoryview(b'YWJj\n')), b'abc')
@@ -762,14 +766,6 @@ class TestMain(unittest.TestCase):
def get_output(self, *args):
return script_helper.assert_python_ok('-m', 'base64', *args).out
def test_encode_decode(self):
output = self.get_output('-t')
self.assertSequenceEqual(output.splitlines(), (
b"b'Aladdin:open sesame'",
br"b'QWxhZGRpbjpvcGVuIHNlc2FtZQ==\n'",
b"b'Aladdin:open sesame'",
))
def test_encode_file(self):
with open(os_helper.TESTFN, 'wb') as fp:
fp.write(b'a\xffb\n')

19
Lib/test/test_bdb.py vendored
View File

@@ -59,6 +59,7 @@ from contextlib import contextmanager
from itertools import islice, repeat
from test.support import import_helper
from test.support import os_helper
from test.support import patch_list
class BdbException(Exception): pass
@@ -432,8 +433,9 @@ class TracerRun():
not_empty = ''
if self.tracer.set_list:
not_empty += 'All paired tuples have not been processed, '
not_empty += ('the last one was number %d' %
not_empty += ('the last one was number %d\n' %
self.tracer.expect_set_no)
not_empty += repr(self.tracer.set_list)
# Make a BdbNotExpectedError a unittest failure.
if type_ is not None and issubclass(BdbNotExpectedError, type_):
@@ -728,6 +730,14 @@ class StateTestCase(BaseTestCase):
def test_skip(self):
# Check that tracing is skipped over the import statement in
# 'tfunc_import()'.
# Remove all but the standard importers.
sys.meta_path[:] = (
item
for item in sys.meta_path
if item.__module__.startswith('_frozen_importlib')
)
code = """
def main():
lno = 3
@@ -1224,5 +1234,12 @@ class IssuesTestCase(BaseTestCase):
tracer.runcall(tfunc_import)
class TestRegressions(unittest.TestCase):
def test_format_stack_entry_no_lineno(self):
# See gh-101517
self.assertIn('Warning: lineno is None',
Bdb().format_stack_entry((sys._getframe(), None)))
if __name__ == "__main__":
unittest.main()

View File

@@ -263,6 +263,34 @@ class TestBisect:
for f in (self.module.insort_left, self.module.insort_right):
self.assertRaises(TypeError, f, x, y, key = "b")
def test_lt_returns_non_bool(self):
class A:
def __init__(self, val):
self.val = val
def __lt__(self, other):
return "nonempty" if self.val < other.val else ""
data = [A(i) for i in range(100)]
i1 = self.module.bisect_left(data, A(33))
i2 = self.module.bisect_right(data, A(33))
self.assertEqual(i1, 33)
self.assertEqual(i2, 34)
def test_lt_returns_notimplemented(self):
class A:
def __init__(self, val):
self.val = val
def __lt__(self, other):
return NotImplemented
def __gt__(self, other):
return self.val > other.val
data = [A(i) for i in range(100)]
i1 = self.module.bisect_left(data, A(40))
i2 = self.module.bisect_right(data, A(40))
self.assertEqual(i1, 40)
self.assertEqual(i2, 41)
class TestBisectPython(TestBisect, unittest.TestCase):
module = py_bisect

1018
Lib/test/test_bz2.py vendored Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -217,7 +217,7 @@ class OtherFileTests:
self._checkBufferSize(1)
def testTruncateOnWindows(self):
# SF bug <http://www.python.org/sf/801631>
# SF bug <https://bugs.python.org/issue801631>
# "file.truncate fault on windows"
f = self.open(TESTFN, 'wb')