Update test_itertools.py from Cpython v3.11.2 (#4836)

* Update test_itertools.py from Cpython v3.11.2
This commit is contained in:
Andrey Maltsev
2023-04-04 17:40:23 +03:00
committed by GitHub
parent ca44da8bdf
commit 30718a3be5

View File

@@ -1,5 +1,7 @@
import doctest
import unittest
from test import support
from test.support import threading_helper
from itertools import *
import weakref
from decimal import Decimal
@@ -206,6 +208,7 @@ class TestBasicOps(unittest.TestCase):
it = chain()
it.__setstate__((iter(['abc', 'def']), iter(['ghi'])))
self.assertEqual(list(it), ['ghi', 'a', 'b', 'c', 'd', 'e', 'f'])
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_combinations(self):
@@ -636,8 +639,6 @@ class TestBasicOps(unittest.TestCase):
for proto in range(pickle.HIGHEST_PROTOCOL + 1):
self.pickletest(proto, count(i, j))
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_cycle(self):
self.assertEqual(take(10, cycle('abc')), list('abcabcabca'))
self.assertEqual(list(cycle('')), [])
@@ -645,6 +646,9 @@ class TestBasicOps(unittest.TestCase):
self.assertRaises(TypeError, cycle, 5)
self.assertEqual(list(islice(cycle(gen3()),10)), [0,1,2,0,1,2,0,1,2,0])
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_cycle_copy_pickle(self):
# check copy, deepcopy, pickle
c = cycle('abc')
self.assertEqual(next(c), 'a')
@@ -680,6 +684,39 @@ class TestBasicOps(unittest.TestCase):
d = pickle.loads(p) # rebuild the cycle object
self.assertEqual(take(20, d), list('cdeabcdeabcdeabcdeab'))
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_cycle_unpickle_compat(self):
testcases = [
b'citertools\ncycle\n(c__builtin__\niter\n((lI1\naI2\naI3\natRI1\nbtR((lI1\naI0\ntb.',
b'citertools\ncycle\n(c__builtin__\niter\n(](K\x01K\x02K\x03etRK\x01btR(]K\x01aK\x00tb.',
b'\x80\x02citertools\ncycle\nc__builtin__\niter\n](K\x01K\x02K\x03e\x85RK\x01b\x85R]K\x01aK\x00\x86b.',
b'\x80\x03citertools\ncycle\ncbuiltins\niter\n](K\x01K\x02K\x03e\x85RK\x01b\x85R]K\x01aK\x00\x86b.',
b'\x80\x04\x95=\x00\x00\x00\x00\x00\x00\x00\x8c\titertools\x8c\x05cycle\x93\x8c\x08builtins\x8c\x04iter\x93](K\x01K\x02K\x03e\x85RK\x01b\x85R]K\x01aK\x00\x86b.',
b'citertools\ncycle\n(c__builtin__\niter\n((lp0\nI1\naI2\naI3\natRI1\nbtR(g0\nI1\ntb.',
b'citertools\ncycle\n(c__builtin__\niter\n(]q\x00(K\x01K\x02K\x03etRK\x01btR(h\x00K\x01tb.',
b'\x80\x02citertools\ncycle\nc__builtin__\niter\n]q\x00(K\x01K\x02K\x03e\x85RK\x01b\x85Rh\x00K\x01\x86b.',
b'\x80\x03citertools\ncycle\ncbuiltins\niter\n]q\x00(K\x01K\x02K\x03e\x85RK\x01b\x85Rh\x00K\x01\x86b.',
b'\x80\x04\x95<\x00\x00\x00\x00\x00\x00\x00\x8c\titertools\x8c\x05cycle\x93\x8c\x08builtins\x8c\x04iter\x93]\x94(K\x01K\x02K\x03e\x85RK\x01b\x85Rh\x00K\x01\x86b.',
b'citertools\ncycle\n(c__builtin__\niter\n((lI1\naI2\naI3\natRI1\nbtR((lI1\naI00\ntb.',
b'citertools\ncycle\n(c__builtin__\niter\n(](K\x01K\x02K\x03etRK\x01btR(]K\x01aI00\ntb.',
b'\x80\x02citertools\ncycle\nc__builtin__\niter\n](K\x01K\x02K\x03e\x85RK\x01b\x85R]K\x01a\x89\x86b.',
b'\x80\x03citertools\ncycle\ncbuiltins\niter\n](K\x01K\x02K\x03e\x85RK\x01b\x85R]K\x01a\x89\x86b.',
b'\x80\x04\x95<\x00\x00\x00\x00\x00\x00\x00\x8c\titertools\x8c\x05cycle\x93\x8c\x08builtins\x8c\x04iter\x93](K\x01K\x02K\x03e\x85RK\x01b\x85R]K\x01a\x89\x86b.',
b'citertools\ncycle\n(c__builtin__\niter\n((lp0\nI1\naI2\naI3\natRI1\nbtR(g0\nI01\ntb.',
b'citertools\ncycle\n(c__builtin__\niter\n(]q\x00(K\x01K\x02K\x03etRK\x01btR(h\x00I01\ntb.',
b'\x80\x02citertools\ncycle\nc__builtin__\niter\n]q\x00(K\x01K\x02K\x03e\x85RK\x01b\x85Rh\x00\x88\x86b.',
b'\x80\x03citertools\ncycle\ncbuiltins\niter\n]q\x00(K\x01K\x02K\x03e\x85RK\x01b\x85Rh\x00\x88\x86b.',
b'\x80\x04\x95;\x00\x00\x00\x00\x00\x00\x00\x8c\titertools\x8c\x05cycle\x93\x8c\x08builtins\x8c\x04iter\x93]\x94(K\x01K\x02K\x03e\x85RK\x01b\x85Rh\x00\x88\x86b.',
]
assert len(testcases) == 20
for t in testcases:
it = pickle.loads(t)
self.assertEqual(take(10, it), [2, 3, 1, 2, 3, 1, 2, 3, 1, 2])
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_cycle_setstate(self):
@@ -1562,6 +1599,7 @@ class TestBasicOps(unittest.TestCase):
next(a)
@unittest.skip("TODO: RUSTPYTHON, hangs")
@threading_helper.requires_working_threading()
def test_tee_concurrent(self):
start = threading.Event()
finish = threading.Event()
@@ -2261,18 +2299,62 @@ class RegressionTests(unittest.TestCase):
class SubclassWithKwargsTest(unittest.TestCase):
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_keywords_in_subclass(self):
# count is not subclassable...
for cls in (repeat, zip, filter, filterfalse, chain, map,
starmap, islice, takewhile, dropwhile, cycle, compress):
class Subclass(cls):
def __init__(self, newarg=None, *args):
cls.__init__(self, *args)
try:
Subclass(newarg=1)
except TypeError as err:
# we expect type errors because of wrong argument count
self.assertNotIn("keyword arguments", err.args[0])
testcases = [
(repeat, (1, 2), [1, 1]),
(zip, ([1, 2], 'ab'), [(1, 'a'), (2, 'b')]),
(filter, (None, [0, 1]), [1]),
(filterfalse, (None, [0, 1]), [0]),
(chain, ([1, 2], [3, 4]), [1, 2, 3]),
(map, (str, [1, 2]), ['1', '2']),
(starmap, (operator.pow, ((2, 3), (3, 2))), [8, 9]),
(islice, ([1, 2, 3, 4], 1, 3), [2, 3]),
(takewhile, (isEven, [2, 3, 4]), [2]),
(dropwhile, (isEven, [2, 3, 4]), [3, 4]),
(cycle, ([1, 2],), [1, 2, 1]),
(compress, ('ABC', [1, 0, 1]), ['A', 'C']),
]
for cls, args, result in testcases:
with self.subTest(cls):
class subclass(cls):
pass
u = subclass(*args)
self.assertIs(type(u), subclass)
self.assertEqual(list(islice(u, 0, 3)), result)
with self.assertRaises(TypeError):
subclass(*args, newarg=3)
for cls, args, result in testcases:
# Constructors of repeat, zip, compress accept keyword arguments.
# Their subclasses need overriding __new__ to support new
# keyword arguments.
if cls in [repeat, zip, compress]:
continue
with self.subTest(cls):
class subclass_with_init(cls):
def __init__(self, *args, newarg=None):
self.newarg = newarg
u = subclass_with_init(*args, newarg=3)
self.assertIs(type(u), subclass_with_init)
self.assertEqual(list(islice(u, 0, 3)), result)
self.assertEqual(u.newarg, 3)
for cls, args, result in testcases:
with self.subTest(cls):
class subclass_with_new(cls):
def __new__(cls, *args, newarg=None):
self = super().__new__(cls, *args)
self.newarg = newarg
return self
u = subclass_with_new(*args, newarg=3)
self.assertIs(type(u), subclass_with_new)
self.assertEqual(list(islice(u, 0, 3)), result)
self.assertEqual(u.newarg, 3)
@support.cpython_only
class SizeofTest(unittest.TestCase):
@@ -2313,419 +2395,10 @@ class SizeofTest(unittest.TestCase):
basesize + 10 * self.ssize_t + 4 * self.ssize_t)
libreftest = """ Doctest for examples in the library reference: libitertools.tex
def load_tests(loader, tests, pattern):
tests.addTest(doctest.DocTestSuite())
return tests
>>> amounts = [120.15, 764.05, 823.14]
>>> for checknum, amount in zip(count(1200), amounts):
... print('Check %d is for $%.2f' % (checknum, amount))
...
Check 1200 is for $120.15
Check 1201 is for $764.05
Check 1202 is for $823.14
>>> import operator
>>> for cube in map(operator.pow, range(1,4), repeat(3)):
... print(cube)
...
1
8
27
>>> reportlines = ['EuroPython', 'Roster', '', 'alex', '', 'laura', '', 'martin', '', 'walter', '', 'samuele']
>>> for name in islice(reportlines, 3, None, 2):
... print(name.title())
...
Alex
Laura
Martin
Walter
Samuele
>>> from operator import itemgetter
>>> d = dict(a=1, b=2, c=1, d=2, e=1, f=2, g=3)
>>> di = sorted(sorted(d.items()), key=itemgetter(1))
>>> for k, g in groupby(di, itemgetter(1)):
... print(k, list(map(itemgetter(0), g)))
...
1 ['a', 'c', 'e']
2 ['b', 'd', 'f']
3 ['g']
# Find runs of consecutive numbers using groupby. The key to the solution
# is differencing with a range so that consecutive numbers all appear in
# same group.
>>> data = [ 1, 4,5,6, 10, 15,16,17,18, 22, 25,26,27,28]
>>> for k, g in groupby(enumerate(data), lambda t:t[0]-t[1]):
... print(list(map(operator.itemgetter(1), g)))
...
[1]
[4, 5, 6]
[10]
[15, 16, 17, 18]
[22]
[25, 26, 27, 28]
>>> def take(n, iterable):
... "Return first n items of the iterable as a list"
... return list(islice(iterable, n))
>>> def prepend(value, iterator):
... "Prepend a single value in front of an iterator"
... # prepend(1, [2, 3, 4]) -> 1 2 3 4
... return chain([value], iterator)
>>> def enumerate(iterable, start=0):
... return zip(count(start), iterable)
>>> def tabulate(function, start=0):
... "Return function(0), function(1), ..."
... return map(function, count(start))
>>> import collections
>>> def consume(iterator, n=None):
... "Advance the iterator n-steps ahead. If n is None, consume entirely."
... # Use functions that consume iterators at C speed.
... if n is None:
... # feed the entire iterator into a zero-length deque
... collections.deque(iterator, maxlen=0)
... else:
... # advance to the empty slice starting at position n
... next(islice(iterator, n, n), None)
>>> def nth(iterable, n, default=None):
... "Returns the nth item or a default value"
... return next(islice(iterable, n, None), default)
>>> def all_equal(iterable):
... "Returns True if all the elements are equal to each other"
... g = groupby(iterable)
... return next(g, True) and not next(g, False)
>>> def quantify(iterable, pred=bool):
... "Count how many times the predicate is true"
... return sum(map(pred, iterable))
>>> def pad_none(iterable):
... "Returns the sequence elements and then returns None indefinitely"
... return chain(iterable, repeat(None))
>>> def ncycles(iterable, n):
... "Returns the sequence elements n times"
... return chain(*repeat(iterable, n))
>>> def dotproduct(vec1, vec2):
... return sum(map(operator.mul, vec1, vec2))
>>> def flatten(listOfLists):
... return list(chain.from_iterable(listOfLists))
>>> def repeatfunc(func, times=None, *args):
... "Repeat calls to func with specified arguments."
... " Example: repeatfunc(random.random)"
... if times is None:
... return starmap(func, repeat(args))
... else:
... return starmap(func, repeat(args, times))
>>> def grouper(iterable, n, *, incomplete='fill', fillvalue=None):
... "Collect data into non-overlapping fixed-length chunks or blocks"
... # grouper('ABCDEFG', 3, fillvalue='x') --> ABC DEF Gxx
... # grouper('ABCDEFG', 3, incomplete='strict') --> ABC DEF ValueError
... # grouper('ABCDEFG', 3, incomplete='ignore') --> ABC DEF
... args = [iter(iterable)] * n
... if incomplete == 'fill':
... return zip_longest(*args, fillvalue=fillvalue)
... if incomplete == 'strict':
... return zip(*args, strict=True)
... if incomplete == 'ignore':
... return zip(*args)
... else:
... raise ValueError('Expected fill, strict, or ignore')
>>> def triplewise(iterable):
... "Return overlapping triplets from an iterable"
... # pairwise('ABCDEFG') -> ABC BCD CDE DEF EFG
... for (a, _), (b, c) in pairwise(pairwise(iterable)):
... yield a, b, c
>>> import collections
>>> def sliding_window(iterable, n):
... # sliding_window('ABCDEFG', 4) -> ABCD BCDE CDEF DEFG
... it = iter(iterable)
... window = collections.deque(islice(it, n), maxlen=n)
... if len(window) == n:
... yield tuple(window)
... for x in it:
... window.append(x)
... yield tuple(window)
>>> def roundrobin(*iterables):
... "roundrobin('ABC', 'D', 'EF') --> A D E B F C"
... # Recipe credited to George Sakkis
... pending = len(iterables)
... nexts = cycle(iter(it).__next__ for it in iterables)
... while pending:
... try:
... for next in nexts:
... yield next()
... except StopIteration:
... pending -= 1
... nexts = cycle(islice(nexts, pending))
>>> def partition(pred, iterable):
... "Use a predicate to partition entries into false entries and true entries"
... # partition(is_odd, range(10)) --> 0 2 4 6 8 and 1 3 5 7 9
... t1, t2 = tee(iterable)
... return filterfalse(pred, t1), filter(pred, t2)
>>> def before_and_after(predicate, it):
... ''' Variant of takewhile() that allows complete
... access to the remainder of the iterator.
...
... >>> all_upper, remainder = before_and_after(str.isupper, 'ABCdEfGhI')
... >>> str.join('', all_upper)
... 'ABC'
... >>> str.join('', remainder)
... 'dEfGhI'
...
... Note that the first iterator must be fully
... consumed before the second iterator can
... generate valid results.
... '''
... it = iter(it)
... transition = []
... def true_iterator():
... for elem in it:
... if predicate(elem):
... yield elem
... else:
... transition.append(elem)
... return
... def remainder_iterator():
... yield from transition
... yield from it
... return true_iterator(), remainder_iterator()
>>> def powerset(iterable):
... "powerset([1,2,3]) --> () (1,) (2,) (3,) (1,2) (1,3) (2,3) (1,2,3)"
... s = list(iterable)
... return chain.from_iterable(combinations(s, r) for r in range(len(s)+1))
>>> def unique_everseen(iterable, key=None):
... "List unique elements, preserving order. Remember all elements ever seen."
... # unique_everseen('AAAABBBCCDAABBB') --> A B C D
... # unique_everseen('ABBCcAD', str.lower) --> A B C D
... seen = set()
... seen_add = seen.add
... if key is None:
... for element in iterable:
... if element not in seen:
... seen_add(element)
... yield element
... else:
... for element in iterable:
... k = key(element)
... if k not in seen:
... seen_add(k)
... yield element
>>> def unique_justseen(iterable, key=None):
... "List unique elements, preserving order. Remember only the element just seen."
... # unique_justseen('AAAABBBCCDAABBB') --> A B C D A B
... # unique_justseen('ABBCcAD', str.lower) --> A B C A D
... return map(next, map(itemgetter(1), groupby(iterable, key)))
>>> def first_true(iterable, default=False, pred=None):
... '''Returns the first true value in the iterable.
...
... If no true value is found, returns *default*
...
... If *pred* is not None, returns the first item
... for which pred(item) is true.
...
... '''
... # first_true([a,b,c], x) --> a or b or c or x
... # first_true([a,b], x, f) --> a if f(a) else b if f(b) else x
... return next(filter(pred, iterable), default)
>>> def nth_combination(iterable, r, index):
... 'Equivalent to list(combinations(iterable, r))[index]'
... pool = tuple(iterable)
... n = len(pool)
... if r < 0 or r > n:
... raise ValueError
... c = 1
... k = min(r, n-r)
... for i in range(1, k+1):
... c = c * (n - k + i) // i
... if index < 0:
... index += c
... if index < 0 or index >= c:
... raise IndexError
... result = []
... while r:
... c, n, r = c*r//n, n-1, r-1
... while index >= c:
... index -= c
... c, n = c*(n-r)//n, n-1
... result.append(pool[-1-n])
... return tuple(result)
This is not part of the examples but it tests to make sure the definitions
perform as purported.
>>> take(10, count())
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> list(prepend(1, [2, 3, 4]))
[1, 2, 3, 4]
>>> list(enumerate('abc'))
[(0, 'a'), (1, 'b'), (2, 'c')]
>>> list(islice(tabulate(lambda x: 2*x), 4))
[0, 2, 4, 6]
>>> it = iter(range(10))
>>> consume(it, 3)
>>> next(it)
3
>>> consume(it)
>>> next(it, 'Done')
'Done'
>>> nth('abcde', 3)
'd'
>>> nth('abcde', 9) is None
True
>>> [all_equal(s) for s in ('', 'A', 'AAAA', 'AAAB', 'AAABA')]
[True, True, True, False, False]
>>> quantify(range(99), lambda x: x%2==0)
50
>>> a = [[1, 2, 3], [4, 5, 6]]
>>> flatten(a)
[1, 2, 3, 4, 5, 6]
>>> list(repeatfunc(pow, 5, 2, 3))
[8, 8, 8, 8, 8]
>>> import random
>>> take(5, map(int, repeatfunc(random.random)))
[0, 0, 0, 0, 0]
>>> list(islice(pad_none('abc'), 0, 6))
['a', 'b', 'c', None, None, None]
>>> list(ncycles('abc', 3))
['a', 'b', 'c', 'a', 'b', 'c', 'a', 'b', 'c']
>>> dotproduct([1,2,3], [4,5,6])
32
>>> list(grouper('abcdefg', 3, fillvalue='x'))
[('a', 'b', 'c'), ('d', 'e', 'f'), ('g', 'x', 'x')]
>>> it = grouper('abcdefg', 3, incomplete='strict')
>>> next(it)
('a', 'b', 'c')
>>> next(it)
('d', 'e', 'f')
>>> next(it)
Traceback (most recent call last):
...
ValueError: zip() argument 2 is shorter than argument 1
>>> list(grouper('abcdefg', n=3, incomplete='ignore'))
[('a', 'b', 'c'), ('d', 'e', 'f')]
>>> list(triplewise('ABCDEFG'))
[('A', 'B', 'C'), ('B', 'C', 'D'), ('C', 'D', 'E'), ('D', 'E', 'F'), ('E', 'F', 'G')]
>>> list(sliding_window('ABCDEFG', 4))
[('A', 'B', 'C', 'D'), ('B', 'C', 'D', 'E'), ('C', 'D', 'E', 'F'), ('D', 'E', 'F', 'G')]
>>> list(roundrobin('abc', 'd', 'ef'))
['a', 'd', 'e', 'b', 'f', 'c']
>>> def is_odd(x):
... return x % 2 == 1
>>> evens, odds = partition(is_odd, range(10))
>>> list(evens)
[0, 2, 4, 6, 8]
>>> list(odds)
[1, 3, 5, 7, 9]
>>> it = iter('ABCdEfGhI')
>>> all_upper, remainder = before_and_after(str.isupper, it)
>>> ''.join(all_upper)
'ABC'
>>> ''.join(remainder)
'dEfGhI'
>>> list(powerset([1,2,3]))
[(), (1,), (2,), (3,), (1, 2), (1, 3), (2, 3), (1, 2, 3)]
>>> all(len(list(powerset(range(n)))) == 2**n for n in range(18))
True
>>> list(powerset('abcde')) == sorted(sorted(set(powerset('abcde'))), key=len)
True
>>> list(unique_everseen('AAAABBBCCDAABBB'))
['A', 'B', 'C', 'D']
>>> list(unique_everseen('ABBCcAD', str.lower))
['A', 'B', 'C', 'D']
>>> list(unique_justseen('AAAABBBCCDAABBB'))
['A', 'B', 'C', 'D', 'A', 'B']
>>> list(unique_justseen('ABBCcAD', str.lower))
['A', 'B', 'C', 'A', 'D']
>>> first_true('ABC0DEF1', '9', str.isdigit)
'0'
>>> population = 'ABCDEFGH'
>>> for r in range(len(population) + 1):
... seq = list(combinations(population, r))
... for i in range(len(seq)):
... assert nth_combination(population, r, i) == seq[i]
... for i in range(-len(seq), 0):
... assert nth_combination(population, r, i) == seq[i]
"""
__test__ = {'libreftest' : libreftest}
def test_main(verbose=None):
test_classes = (TestBasicOps, TestVariousIteratorArgs, TestGC,
RegressionTests, LengthTransparency,
SubclassWithKwargsTest, TestExamples,
TestPurePythonRoughEquivalents,
SizeofTest)
support.run_unittest(*test_classes)
# verify reference counting
if verbose and hasattr(sys, "gettotalrefcount"):
import gc
counts = [None] * 5
for i in range(len(counts)):
support.run_unittest(*test_classes)
gc.collect()
counts[i] = sys.gettotalrefcount()
print(counts)
# doctest the examples in the library reference
support.run_doctest(sys.modules[__name__], verbose)
if __name__ == "__main__":
test_main(verbose=True)
unittest.main()