Update pickle tests from 3.13.5

This commit is contained in:
ShaharNaveh
2025-07-25 11:39:52 +02:00
parent 4fb5736694
commit 68cd33f37e
4 changed files with 1544 additions and 305 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -16,6 +16,7 @@ from test.support import import_helper
from test.pickletester import AbstractHookTests
from test.pickletester import AbstractUnpickleTests
from test.pickletester import AbstractPicklingErrorTests
from test.pickletester import AbstractPickleTests
from test.pickletester import AbstractPickleModuleTests
from test.pickletester import AbstractPersistentPicklerTests
@@ -40,16 +41,6 @@ class PyPickleTests(AbstractPickleModuleTests, unittest.TestCase):
Pickler = pickle._Pickler
Unpickler = pickle._Unpickler
# TODO: RUSTPYTHON, AttributeError: module 'pickle' has no attribute 'PickleBuffer'
@unittest.expectedFailure
def test_dump_load_oob_buffers(self): # TODO: RUSTPYTHON, remove when this passes
super().test_dump_load_oob_buffers() # TODO: RUSTPYTHON, remove when this passes
# TODO: RUSTPYTHON, AttributeError: module 'pickle' has no attribute 'PickleBuffer'
@unittest.expectedFailure
def test_dumps_loads_oob_buffers(self): # TODO: RUSTPYTHON, remove when this passes
super().test_dumps_loads_oob_buffers() # TODO: RUSTPYTHON, remove when this passes
class PyUnpicklerTests(AbstractUnpickleTests, unittest.TestCase):
@@ -59,72 +50,29 @@ class PyUnpicklerTests(AbstractUnpickleTests, unittest.TestCase):
AttributeError, ValueError,
struct.error, IndexError, ImportError)
# TODO: RUSTPYTHON, AssertionError: ValueError not raised
@unittest.expectedFailure
def test_badly_escaped_string(self): # TODO: RUSTPYTHON, remove when this passes
super().test_badly_escaped_string() # TODO: RUSTPYTHON, remove when this passes
# TODO: RUSTPYTHON, AssertionError
@unittest.expectedFailure
def test_correctly_quoted_string(self): # TODO: RUSTPYTHON, remove when this passes
super().test_correctly_quoted_string() # TODO: RUSTPYTHON, remove when this passes
# TODO: RUSTPYTHON, AssertionError
@unittest.expectedFailure
def test_load_python2_str_as_bytes(self): # TODO: RUSTPYTHON, remove when this passes
super().test_load_python2_str_as_bytes() # TODO: RUSTPYTHON, remove when this passes
def loads(self, buf, **kwds):
f = io.BytesIO(buf)
u = self.unpickler(f, **kwds)
return u.load()
class PyPicklingErrorTests(AbstractPicklingErrorTests, unittest.TestCase):
pickler = pickle._Pickler
def dumps(self, arg, proto=None, **kwargs):
f = io.BytesIO()
p = self.pickler(f, proto, **kwargs)
p.dump(arg)
f.seek(0)
return bytes(f.read())
class PyPicklerTests(AbstractPickleTests, unittest.TestCase):
pickler = pickle._Pickler
unpickler = pickle._Unpickler
# TODO: RUSTPYTHON, AttributeError: module 'pickle' has no attribute 'PickleBuffer'
@unittest.expectedFailure
def test_buffer_callback_error(self): # TODO: RUSTPYTHON, remove when this passes
super().test_buffer_callback_error() # TODO: RUSTPYTHON, remove when this passes
# TODO: RUSTPYTHON, AttributeError: module 'pickle' has no attribute 'PickleBuffer'
@unittest.expectedFailure
def test_buffers_error(self): # TODO: RUSTPYTHON, remove when this passes
super().test_buffers_error() # TODO: RUSTPYTHON, remove when this passes
# TODO: RUSTPYTHON, AssertionError
@unittest.expectedFailure
def test_complex_newobj_ex(self): # TODO: RUSTPYTHON, remove when this passes
super().test_complex_newobj_ex() # TODO: RUSTPYTHON, remove when this passes
# TODO: RUSTPYTHON, TypeError: cannot pickle 'method' object
@unittest.expectedFailure
def test_in_band_buffers(self): # TODO: RUSTPYTHON, remove when this passes
super().test_in_band_buffers() # TODO: RUSTPYTHON, remove when this passes
# TODO: RUSTPYTHON, AttributeError: module 'pickle' has no attribute 'PickleBuffer'
@unittest.expectedFailure
def test_oob_buffers(self): # TODO: RUSTPYTHON, remove when this passes
super().test_oob_buffers() # TODO: RUSTPYTHON, remove when this passes
# TODO: RUSTPYTHON, AttributeError: module 'pickle' has no attribute 'PickleBuffer'
@unittest.expectedFailure
def test_oob_buffers_writable_to_readonly(self): # TODO: RUSTPYTHON, remove when this passes
super().test_oob_buffers_writable_to_readonly() # TODO: RUSTPYTHON, remove when this passes
# TODO: RUSTPYTHON, TypeError: Expected type 'bytes', not 'bytearray'
@unittest.expectedFailure
def test_optional_frames(self): # TODO: RUSTPYTHON, remove when this passes
super().test_optional_frames() # TODO: RUSTPYTHON, remove when this passes
# TODO: RUSTPYTHON, AttributeError: module 'pickle' has no attribute 'PickleBuffer'
@unittest.expectedFailure
def test_picklebuffer_error(self): # TODO: RUSTPYTHON, remove when this passes
super().test_picklebuffer_error() # TODO: RUSTPYTHON, remove when this passes
def dumps(self, arg, proto=None, **kwargs):
f = io.BytesIO()
p = self.pickler(f, proto, **kwargs)
@@ -146,61 +94,6 @@ class InMemoryPickleTests(AbstractPickleTests, AbstractUnpickleTests,
AttributeError, ValueError,
struct.error, IndexError, ImportError)
# TODO: RUSTPYTHON, AssertionError: ValueError not raised
@unittest.expectedFailure
def test_badly_escaped_string(self): # TODO: RUSTPYTHON, remove when this passes
super().test_badly_escaped_string() # TODO: RUSTPYTHON, remove when this passes
# TODO: RUSTPYTHON, AttributeError: module 'pickle' has no attribute 'PickleBuffer'
@unittest.expectedFailure
def test_buffer_callback_error(self): # TODO: RUSTPYTHON, remove when this passes
super().test_buffer_callback_error() # TODO: RUSTPYTHON, remove when this passes
# TODO: RUSTPYTHON, AttributeError: module 'pickle' has no attribute 'PickleBuffer'
@unittest.expectedFailure
def test_buffers_error(self): # TODO: RUSTPYTHON, remove when this passes
super().test_buffers_error() # TODO: RUSTPYTHON, remove when this passes
# TODO: RUSTPYTHON, AssertionError
@unittest.expectedFailure
def test_complex_newobj_ex(self): # TODO: RUSTPYTHON, remove when this passes
super().test_complex_newobj_ex() # TODO: RUSTPYTHON, remove when this passes
# TODO: RUSTPYTHON, AssertionError
@unittest.expectedFailure
def test_correctly_quoted_string(self): # TODO: RUSTPYTHON, remove when this passes
super().test_correctly_quoted_string() # TODO: RUSTPYTHON, remove when this passes
# TODO: RUSTPYTHON, TypeError: cannot pickle 'method' object
@unittest.expectedFailure
def test_in_band_buffers(self): # TODO: RUSTPYTHON, remove when this passes
super().test_in_band_buffers() # TODO: RUSTPYTHON, remove when this passes
# TODO: RUSTPYTHON, AssertionError
@unittest.expectedFailure
def test_load_python2_str_as_bytes(self): # TODO: RUSTPYTHON, remove when this passes
super().test_load_python2_str_as_bytes() # TODO: RUSTPYTHON, remove when this passes
# TODO: RUSTPYTHON, AttributeError: module 'pickle' has no attribute 'PickleBuffer'
@unittest.expectedFailure
def test_oob_buffers(self): # TODO: RUSTPYTHON, remove when this passes
super().test_oob_buffers() # TODO: RUSTPYTHON, remove when this passes
# TODO: RUSTPYTHON, AttributeError: module 'pickle' has no attribute 'PickleBuffer'
@unittest.expectedFailure
def test_oob_buffers_writable_to_readonly(self): # TODO: RUSTPYTHON, remove when this passes
super().test_oob_buffers_writable_to_readonly() # TODO: RUSTPYTHON, remove when this passes
# TODO: RUSTPYTHON, TypeError: Expected type 'bytes', not 'bytearray'
@unittest.expectedFailure
def test_optional_frames(self): # TODO: RUSTPYTHON, remove when this passes
super().test_optional_frames() # TODO: RUSTPYTHON, remove when this passes
# TODO: RUSTPYTHON, AttributeError: module 'pickle' has no attribute 'PickleBuffer'
@unittest.expectedFailure
def test_picklebuffer_error(self): # TODO: RUSTPYTHON, remove when this passes
super().test_picklebuffer_error() # TODO: RUSTPYTHON, remove when this passes
def dumps(self, arg, protocol=None, **kwargs):
return pickle.dumps(arg, protocol, **kwargs)
@@ -208,6 +101,8 @@ class InMemoryPickleTests(AbstractPickleTests, AbstractUnpickleTests,
return pickle.loads(buf, **kwds)
test_framed_write_sizes_with_delayed_writer = None
test_find_class = None
test_custom_find_class = None
class PersistentPicklerUnpicklerMixin(object):
@@ -242,6 +137,7 @@ class PyIdPersPicklerTests(AbstractIdentityPersistentPicklerTests,
pickler = pickle._Pickler
unpickler = pickle._Unpickler
persistent_load_error = pickle.UnpicklingError
@support.cpython_only
def test_pickler_reference_cycle(self):
@@ -296,7 +192,6 @@ class PyIdPersPicklerTests(AbstractIdentityPersistentPicklerTests,
support.gc_collect()
self.assertIsNone(table_ref())
@support.cpython_only
def test_unpickler_reference_cycle(self):
def check(Unpickler):
@@ -326,6 +221,112 @@ class PyIdPersPicklerTests(AbstractIdentityPersistentPicklerTests,
return pid
check(PersUnpickler)
def test_pickler_super(self):
class PersPickler(self.pickler):
def persistent_id(subself, obj):
called.append(obj)
self.assertIsNone(super().persistent_id(obj))
return obj
for proto in range(pickle.HIGHEST_PROTOCOL + 1):
f = io.BytesIO()
pickler = PersPickler(f, proto)
called = []
pickler.dump('abc')
self.assertEqual(called, ['abc'])
self.assertEqual(self.loads(f.getvalue()), 'abc')
def test_unpickler_super(self):
class PersUnpickler(self.unpickler):
def persistent_load(subself, pid):
called.append(pid)
with self.assertRaises(self.persistent_load_error):
super().persistent_load(pid)
return pid
for proto in range(pickle.HIGHEST_PROTOCOL + 1):
unpickler = PersUnpickler(io.BytesIO(self.dumps('abc', proto)))
called = []
self.assertEqual(unpickler.load(), 'abc')
self.assertEqual(called, ['abc'])
def test_pickler_instance_attribute(self):
def persistent_id(obj):
called.append(obj)
return obj
for proto in range(pickle.HIGHEST_PROTOCOL + 1):
f = io.BytesIO()
pickler = self.pickler(f, proto)
called = []
old_persistent_id = pickler.persistent_id
pickler.persistent_id = persistent_id
self.assertEqual(pickler.persistent_id, persistent_id)
pickler.dump('abc')
self.assertEqual(called, ['abc'])
self.assertEqual(self.loads(f.getvalue()), 'abc')
del pickler.persistent_id
self.assertEqual(pickler.persistent_id, old_persistent_id)
def test_unpickler_instance_attribute(self):
def persistent_load(pid):
called.append(pid)
return pid
for proto in range(pickle.HIGHEST_PROTOCOL + 1):
unpickler = self.unpickler(io.BytesIO(self.dumps('abc', proto)))
called = []
old_persistent_load = unpickler.persistent_load
unpickler.persistent_load = persistent_load
self.assertEqual(unpickler.persistent_load, persistent_load)
self.assertEqual(unpickler.load(), 'abc')
self.assertEqual(called, ['abc'])
del unpickler.persistent_load
self.assertEqual(unpickler.persistent_load, old_persistent_load)
def test_pickler_super_instance_attribute(self):
class PersPickler(self.pickler):
def persistent_id(subself, obj):
raise AssertionError('should never be called')
def _persistent_id(subself, obj):
called.append(obj)
self.assertIsNone(super().persistent_id(obj))
return obj
for proto in range(pickle.HIGHEST_PROTOCOL + 1):
f = io.BytesIO()
pickler = PersPickler(f, proto)
called = []
old_persistent_id = pickler.persistent_id
pickler.persistent_id = pickler._persistent_id
self.assertEqual(pickler.persistent_id, pickler._persistent_id)
pickler.dump('abc')
self.assertEqual(called, ['abc'])
self.assertEqual(self.loads(f.getvalue()), 'abc')
del pickler.persistent_id
self.assertEqual(pickler.persistent_id, old_persistent_id)
def test_unpickler_super_instance_attribute(self):
class PersUnpickler(self.unpickler):
def persistent_load(subself, pid):
raise AssertionError('should never be called')
def _persistent_load(subself, pid):
called.append(pid)
with self.assertRaises(self.persistent_load_error):
super().persistent_load(pid)
return pid
for proto in range(pickle.HIGHEST_PROTOCOL + 1):
unpickler = PersUnpickler(io.BytesIO(self.dumps('abc', proto)))
called = []
old_persistent_load = unpickler.persistent_load
unpickler.persistent_load = unpickler._persistent_load
self.assertEqual(unpickler.persistent_load, unpickler._persistent_load)
self.assertEqual(unpickler.load(), 'abc')
self.assertEqual(called, ['abc'])
del unpickler.persistent_load
self.assertEqual(unpickler.persistent_load, old_persistent_load)
class PyPicklerUnpicklerObjectTests(AbstractPicklerUnpicklerObjectTests, unittest.TestCase):
@@ -365,6 +366,9 @@ if has_c_implementation:
bad_stack_errors = (pickle.UnpicklingError,)
truncated_errors = (pickle.UnpicklingError,)
class CPicklingErrorTests(PyPicklingErrorTests):
pickler = _pickle.Pickler
class CPicklerTests(PyPicklerTests):
pickler = _pickle.Pickler
unpickler = _pickle.Unpickler
@@ -376,6 +380,7 @@ if has_c_implementation:
class CIdPersPicklerTests(PyIdPersPicklerTests):
pickler = _pickle.Pickler
unpickler = _pickle.Unpickler
persistent_load_error = _pickle.UnpicklingError
class CDumpPickle_LoadPickle(PyPicklerTests):
pickler = _pickle.Pickler
@@ -499,7 +504,9 @@ if has_c_implementation:
check_unpickler(recurse(1), 32, 20)
check_unpickler(recurse(20), 32, 20)
check_unpickler(recurse(50), 64, 60)
check_unpickler(recurse(100), 128, 140)
if not (support.is_wasi and support.Py_DEBUG):
# stack depth too shallow in pydebug WASI.
check_unpickler(recurse(100), 128, 140)
u = unpickler(io.BytesIO(pickle.dumps('a', 0)),
encoding='ASCII', errors='strict')
@@ -659,13 +666,12 @@ class CompatPickleTests(unittest.TestCase):
if exc in (BlockingIOError,
ResourceWarning,
StopAsyncIteration,
PythonFinalizationError,
RecursionError,
EncodingWarning,
BaseExceptionGroup,
ExceptionGroup):
continue
# TODO: RUSTPYTHON: fix name mapping for _IncompleteInputError
if exc is _IncompleteInputError:
ExceptionGroup,
_IncompleteInputError):
continue
if exc is not OSError and issubclass(exc, OSError):
self.assertEqual(reverse_mapping('builtins', name),
@@ -694,7 +700,7 @@ class CompatPickleTests(unittest.TestCase):
def load_tests(loader, tests, pattern):
tests.addTest(doctest.DocTestSuite())
tests.addTest(doctest.DocTestSuite(pickle))
return tests

153
Lib/test/test_picklebuffer.py vendored Normal file
View File

@@ -0,0 +1,153 @@
"""Unit tests for the PickleBuffer object.
Pickling tests themselves are in pickletester.py.
"""
import gc
from pickle import PickleBuffer
import weakref
import unittest
from test.support import import_helper
class B(bytes):
pass
class PickleBufferTest(unittest.TestCase):
def check_memoryview(self, pb, equiv):
with memoryview(pb) as m:
with memoryview(equiv) as expected:
self.assertEqual(m.nbytes, expected.nbytes)
self.assertEqual(m.readonly, expected.readonly)
self.assertEqual(m.itemsize, expected.itemsize)
self.assertEqual(m.shape, expected.shape)
self.assertEqual(m.strides, expected.strides)
self.assertEqual(m.c_contiguous, expected.c_contiguous)
self.assertEqual(m.f_contiguous, expected.f_contiguous)
self.assertEqual(m.format, expected.format)
self.assertEqual(m.tobytes(), expected.tobytes())
def test_constructor_failure(self):
with self.assertRaises(TypeError):
PickleBuffer()
with self.assertRaises(TypeError):
PickleBuffer("foo")
# Released memoryview fails taking a buffer
m = memoryview(b"foo")
m.release()
with self.assertRaises(ValueError):
PickleBuffer(m)
def test_basics(self):
pb = PickleBuffer(b"foo")
self.assertEqual(b"foo", bytes(pb))
with memoryview(pb) as m:
self.assertTrue(m.readonly)
pb = PickleBuffer(bytearray(b"foo"))
self.assertEqual(b"foo", bytes(pb))
with memoryview(pb) as m:
self.assertFalse(m.readonly)
m[0] = 48
self.assertEqual(b"0oo", bytes(pb))
def test_release(self):
pb = PickleBuffer(b"foo")
pb.release()
with self.assertRaises(ValueError) as raises:
memoryview(pb)
self.assertIn("operation forbidden on released PickleBuffer object",
str(raises.exception))
# Idempotency
pb.release()
def test_cycle(self):
b = B(b"foo")
pb = PickleBuffer(b)
b.cycle = pb
wpb = weakref.ref(pb)
del b, pb
gc.collect()
self.assertIsNone(wpb())
def test_ndarray_2d(self):
# C-contiguous
ndarray = import_helper.import_module("_testbuffer").ndarray
arr = ndarray(list(range(12)), shape=(4, 3), format='<i')
self.assertTrue(arr.c_contiguous)
self.assertFalse(arr.f_contiguous)
pb = PickleBuffer(arr)
self.check_memoryview(pb, arr)
# Non-contiguous
arr = arr[::2]
self.assertFalse(arr.c_contiguous)
self.assertFalse(arr.f_contiguous)
pb = PickleBuffer(arr)
self.check_memoryview(pb, arr)
# F-contiguous
arr = ndarray(list(range(12)), shape=(3, 4), strides=(4, 12), format='<i')
self.assertTrue(arr.f_contiguous)
self.assertFalse(arr.c_contiguous)
pb = PickleBuffer(arr)
self.check_memoryview(pb, arr)
# Tests for PickleBuffer.raw()
def check_raw(self, obj, equiv):
pb = PickleBuffer(obj)
with pb.raw() as m:
self.assertIsInstance(m, memoryview)
self.check_memoryview(m, equiv)
def test_raw(self):
for obj in (b"foo", bytearray(b"foo")):
with self.subTest(obj=obj):
self.check_raw(obj, obj)
def test_raw_ndarray(self):
# 1-D, contiguous
ndarray = import_helper.import_module("_testbuffer").ndarray
arr = ndarray(list(range(3)), shape=(3,), format='<h')
equiv = b"\x00\x00\x01\x00\x02\x00"
self.check_raw(arr, equiv)
# 2-D, C-contiguous
arr = ndarray(list(range(6)), shape=(2, 3), format='<h')
equiv = b"\x00\x00\x01\x00\x02\x00\x03\x00\x04\x00\x05\x00"
self.check_raw(arr, equiv)
# 2-D, F-contiguous
arr = ndarray(list(range(6)), shape=(2, 3), strides=(2, 4),
format='<h')
# Note this is different from arr.tobytes()
equiv = b"\x00\x00\x01\x00\x02\x00\x03\x00\x04\x00\x05\x00"
self.check_raw(arr, equiv)
# 0-D
arr = ndarray(456, shape=(), format='<i')
equiv = b'\xc8\x01\x00\x00'
self.check_raw(arr, equiv)
def check_raw_non_contiguous(self, obj):
pb = PickleBuffer(obj)
with self.assertRaisesRegex(BufferError, "non-contiguous"):
pb.raw()
def test_raw_non_contiguous(self):
# 1-D
ndarray = import_helper.import_module("_testbuffer").ndarray
arr = ndarray(list(range(6)), shape=(6,), format='<i')[::2]
self.check_raw_non_contiguous(arr)
# 2-D
arr = ndarray(list(range(12)), shape=(4, 3), format='<i')[::2]
self.check_raw_non_contiguous(arr)
def test_raw_released(self):
pb = PickleBuffer(b"foo")
pb.release()
with self.assertRaises(ValueError) as raises:
pb.raw()
if __name__ == "__main__":
unittest.main()

View File

@@ -1,3 +1,4 @@
import io
import pickle
import pickletools
from test import support
@@ -7,52 +8,6 @@ import unittest
class OptimizedPickleTests(AbstractPickleTests, unittest.TestCase):
# TODO: RUSTPYTHON, AttributeError: module 'pickle' has no attribute 'PickleBuffer'
@unittest.expectedFailure
def test_buffer_callback_error(self): # TODO: RUSTPYTHON, remove when this passes
super().test_buffer_callback_error()
# TODO: RUSTPYTHON, AttributeError: module 'pickle' has no attribute 'PickleBuffer'
@unittest.expectedFailure
def test_buffers_error(self): # TODO: RUSTPYTHON, remove when this passes
super().test_buffers_error()
def test_compat_pickle(self): # TODO: RUSTPYTHON, remove when this passes
super().test_compat_pickle()
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_complex_newobj_ex(self): # TODO: RUSTPYTHON, remove when this passes
super().test_complex_newobj_ex()
# TODO: RUSTPYTHON, TypeError: cannot pickle 'method' object
@unittest.expectedFailure
def test_in_band_buffers(self): # TODO: RUSTPYTHON, remove when this passes
super().test_in_band_buffers()
def test_notimplemented(self): # TODO: RUSTPYTHON, remove when this passes
super().test_notimplemented()
# TODO: RUSTPYTHON, AttributeError: module 'pickle' has no attribute 'PickleBuffer'
@unittest.expectedFailure
def test_oob_buffers(self): # TODO: RUSTPYTHON, remove when this passes
super().test_oob_buffers()
# TODO: RUSTPYTHON, AttributeError: module 'pickle' has no attribute 'PickleBuffer'
@unittest.expectedFailure
def test_oob_buffers_writable_to_readonly(self): # TODO: RUSTPYTHON, remove when this passes
super().test_oob_buffers_writable_to_readonly()
# TODO: RUSTPYTHON, TypeError: Expected type 'bytes', not 'bytearray'
@unittest.expectedFailure
def test_optional_frames(self): # TODO: RUSTPYTHON, remove when this passes
super().test_optional_frames()
# TODO: RUSTPYTHON, AttributeError: module 'pickle' has no attribute 'PickleBuffer'
@unittest.expectedFailure
def test_picklebuffer_error(self): # TODO: RUSTPYTHON, remove when this passes
super().test_picklebuffer_error()
def dumps(self, arg, proto=None, **kwargs):
return pickletools.optimize(pickle.dumps(arg, proto, **kwargs))
@@ -108,6 +63,397 @@ class OptimizedPickleTests(AbstractPickleTests, unittest.TestCase):
self.assertNotIn(pickle.BINPUT, pickled2)
class SimpleReader:
def __init__(self, data):
self.data = data
self.pos = 0
def read(self, n):
data = self.data[self.pos: self.pos + n]
self.pos += n
return data
def readline(self):
nl = self.data.find(b'\n', self.pos) + 1
if not nl:
nl = len(self.data)
data = self.data[self.pos: nl]
self.pos = nl
return data
class GenopsTests(unittest.TestCase):
def test_genops(self):
it = pickletools.genops(b'(I123\nK\x12J\x12\x34\x56\x78t.')
self.assertEqual([(item[0].name,) + item[1:] for item in it], [
('MARK', None, 0),
('INT', 123, 1),
('BININT1', 0x12, 6),
('BININT', 0x78563412, 8),
('TUPLE', None, 13),
('STOP', None, 14),
])
def test_from_file(self):
f = io.BytesIO(b'prefix(I123\nK\x12J\x12\x34\x56\x78t.suffix')
self.assertEqual(f.read(6), b'prefix')
it = pickletools.genops(f)
self.assertEqual([(item[0].name,) + item[1:] for item in it], [
('MARK', None, 6),
('INT', 123, 7),
('BININT1', 0x12, 12),
('BININT', 0x78563412, 14),
('TUPLE', None, 19),
('STOP', None, 20),
])
self.assertEqual(f.read(), b'suffix')
def test_without_pos(self):
f = SimpleReader(b'(I123\nK\x12J\x12\x34\x56\x78t.')
it = pickletools.genops(f)
self.assertEqual([(item[0].name,) + item[1:] for item in it], [
('MARK', None, None),
('INT', 123, None),
('BININT1', 0x12, None),
('BININT', 0x78563412, None),
('TUPLE', None, None),
('STOP', None, None),
])
def test_no_stop(self):
it = pickletools.genops(b'N')
item = next(it)
self.assertEqual(item[0].name, 'NONE')
with self.assertRaisesRegex(ValueError,
'pickle exhausted before seeing STOP'):
next(it)
def test_truncated_data(self):
it = pickletools.genops(b'I123')
with self.assertRaisesRegex(ValueError,
'no newline found when trying to read stringnl'):
next(it)
it = pickletools.genops(b'J\x12\x34')
with self.assertRaisesRegex(ValueError,
'not enough data in stream to read int4'):
next(it)
def test_unknown_opcode(self):
it = pickletools.genops(b'N\xff')
item = next(it)
self.assertEqual(item[0].name, 'NONE')
with self.assertRaisesRegex(ValueError,
r"at position 1, opcode b'\\xff' unknown"):
next(it)
def test_unknown_opcode_without_pos(self):
f = SimpleReader(b'N\xff')
it = pickletools.genops(f)
item = next(it)
self.assertEqual(item[0].name, 'NONE')
with self.assertRaisesRegex(ValueError,
r"at position <unknown>, opcode b'\\xff' unknown"):
next(it)
class DisTests(unittest.TestCase):
maxDiff = None
def check_dis(self, data, expected, **kwargs):
out = io.StringIO()
pickletools.dis(data, out=out, **kwargs)
self.assertEqual(out.getvalue(), expected)
def check_dis_error(self, data, expected, expected_error, **kwargs):
out = io.StringIO()
with self.assertRaisesRegex(ValueError, expected_error):
pickletools.dis(data, out=out, **kwargs)
self.assertEqual(out.getvalue(), expected)
def test_mark(self):
self.check_dis(b'(N(tl.', '''\
0: ( MARK
1: N NONE
2: ( MARK
3: t TUPLE (MARK at 2)
4: l LIST (MARK at 0)
5: . STOP
highest protocol among opcodes = 0
''')
def test_indentlevel(self):
self.check_dis(b'(N(tl.', '''\
0: ( MARK
1: N NONE
2: ( MARK
3: t TUPLE (MARK at 2)
4: l LIST (MARK at 0)
5: . STOP
highest protocol among opcodes = 0
''', indentlevel=2)
def test_mark_without_pos(self):
self.check_dis(SimpleReader(b'(N(tl.'), '''\
( MARK
N NONE
( MARK
t TUPLE (MARK at unknown opcode offset)
l LIST (MARK at unknown opcode offset)
. STOP
highest protocol among opcodes = 0
''')
def test_no_mark(self):
self.check_dis_error(b'Nt.', '''\
0: N NONE
1: t TUPLE no MARK exists on stack
''', 'no MARK exists on stack')
def test_put(self):
self.check_dis(b'Np0\nq\x01r\x02\x00\x00\x00\x94.', '''\
0: N NONE
1: p PUT 0
4: q BINPUT 1
6: r LONG_BINPUT 2
11: \\x94 MEMOIZE (as 3)
12: . STOP
highest protocol among opcodes = 4
''')
def test_put_redefined(self):
self.check_dis_error(b'Np1\np1\n.', '''\
0: N NONE
1: p PUT 1
4: p PUT 1
''', 'memo key 1 already defined')
self.check_dis_error(b'Np1\nq\x01.', '''\
0: N NONE
1: p PUT 1
4: q BINPUT 1
''', 'memo key 1 already defined')
self.check_dis_error(b'Np1\nr\x01\x00\x00\x00.', '''\
0: N NONE
1: p PUT 1
4: r LONG_BINPUT 1
''', 'memo key 1 already defined')
self.check_dis_error(b'Np1\n\x94.', '''\
0: N NONE
1: p PUT 1
4: \\x94 MEMOIZE (as 1)
''', 'memo key None already defined')
def test_put_empty_stack(self):
self.check_dis_error(b'p0\n', '''\
0: p PUT 0
''', "stack is empty -- can't store into memo")
def test_put_markobject(self):
self.check_dis_error(b'(p0\n', '''\
0: ( MARK
1: p PUT 0
''', "can't store markobject in the memo")
def test_get(self):
self.check_dis(b'(Np1\ng1\nh\x01j\x01\x00\x00\x00t.', '''\
0: ( MARK
1: N NONE
2: p PUT 1
5: g GET 1
8: h BINGET 1
10: j LONG_BINGET 1
15: t TUPLE (MARK at 0)
16: . STOP
highest protocol among opcodes = 1
''')
def test_get_without_put(self):
self.check_dis_error(b'g1\n.', '''\
0: g GET 1
''', 'memo key 1 has never been stored into')
self.check_dis_error(b'h\x01.', '''\
0: h BINGET 1
''', 'memo key 1 has never been stored into')
self.check_dis_error(b'j\x01\x00\x00\x00.', '''\
0: j LONG_BINGET 1
''', 'memo key 1 has never been stored into')
def test_memo(self):
memo = {}
self.check_dis(b'Np1\n.', '''\
0: N NONE
1: p PUT 1
4: . STOP
highest protocol among opcodes = 0
''', memo=memo)
self.check_dis(b'g1\n.', '''\
0: g GET 1
3: . STOP
highest protocol among opcodes = 0
''', memo=memo)
def test_mark_pop(self):
self.check_dis(b'(N00N.', '''\
0: ( MARK
1: N NONE
2: 0 POP
3: 0 POP (MARK at 0)
4: N NONE
5: . STOP
highest protocol among opcodes = 0
''')
def test_too_small_stack(self):
self.check_dis_error(b'a', '''\
0: a APPEND
''', 'tries to pop 2 items from stack with only 0 items')
self.check_dis_error(b']a', '''\
0: ] EMPTY_LIST
1: a APPEND
''', 'tries to pop 2 items from stack with only 1 items')
def test_no_stop(self):
self.check_dis_error(b'N', '''\
0: N NONE
''', 'pickle exhausted before seeing STOP')
def test_truncated_data(self):
self.check_dis_error(b'NI123', '''\
0: N NONE
''', 'no newline found when trying to read stringnl')
self.check_dis_error(b'NJ\x12\x34', '''\
0: N NONE
''', 'not enough data in stream to read int4')
def test_unknown_opcode(self):
self.check_dis_error(b'N\xff', '''\
0: N NONE
''', r"at position 1, opcode b'\\xff' unknown")
def test_stop_not_empty_stack(self):
self.check_dis_error(b']N.', '''\
0: ] EMPTY_LIST
1: N NONE
2: . STOP
highest protocol among opcodes = 1
''', r'stack not empty after STOP: \[list\]')
def test_annotate(self):
self.check_dis(b'(Nt.', '''\
0: ( MARK Push markobject onto the stack.
1: N NONE Push None on the stack.
2: t TUPLE (MARK at 0) Build a tuple out of the topmost stack slice, after markobject.
3: . STOP Stop the unpickling machine.
highest protocol among opcodes = 0
''', annotate=1)
self.check_dis(b'(Nt.', '''\
0: ( MARK Push markobject onto the stack.
1: N NONE Push None on the stack.
2: t TUPLE (MARK at 0) Build a tuple out of the topmost stack slice, after markobject.
3: . STOP Stop the unpickling machine.
highest protocol among opcodes = 0
''', annotate=20)
self.check_dis(b'(((((((ttttttt.', '''\
0: ( MARK Push markobject onto the stack.
1: ( MARK Push markobject onto the stack.
2: ( MARK Push markobject onto the stack.
3: ( MARK Push markobject onto the stack.
4: ( MARK Push markobject onto the stack.
5: ( MARK Push markobject onto the stack.
6: ( MARK Push markobject onto the stack.
7: t TUPLE (MARK at 6) Build a tuple out of the topmost stack slice, after markobject.
8: t TUPLE (MARK at 5) Build a tuple out of the topmost stack slice, after markobject.
9: t TUPLE (MARK at 4) Build a tuple out of the topmost stack slice, after markobject.
10: t TUPLE (MARK at 3) Build a tuple out of the topmost stack slice, after markobject.
11: t TUPLE (MARK at 2) Build a tuple out of the topmost stack slice, after markobject.
12: t TUPLE (MARK at 1) Build a tuple out of the topmost stack slice, after markobject.
13: t TUPLE (MARK at 0) Build a tuple out of the topmost stack slice, after markobject.
14: . STOP Stop the unpickling machine.
highest protocol among opcodes = 0
''', annotate=20)
def test_string(self):
self.check_dis(b"S'abc'\n.", '''\
0: S STRING 'abc'
7: . STOP
highest protocol among opcodes = 0
''')
self.check_dis(b'S"abc"\n.', '''\
0: S STRING 'abc'
7: . STOP
highest protocol among opcodes = 0
''')
self.check_dis(b"S'\xc3\xb5'\n.", '''\
0: S STRING '\\xc3\\xb5'
6: . STOP
highest protocol among opcodes = 0
''')
def test_string_without_quotes(self):
self.check_dis_error(b"Sabc'\n.", '',
'no string quotes around b"abc\'"')
self.check_dis_error(b'Sabc"\n.', '',
"no string quotes around b'abc\"'")
self.check_dis_error(b"S'abc\n.", '',
'''strinq quote b"'" not found at both ends of b"'abc"''')
self.check_dis_error(b'S"abc\n.', '',
r"""strinq quote b'"' not found at both ends of b'"abc'""")
self.check_dis_error(b"S'abc\"\n.", '',
r"""strinq quote b"'" not found at both ends of b'\\'abc"'""")
self.check_dis_error(b"S\"abc'\n.", '',
r"""strinq quote b'"' not found at both ends of b'"abc\\''""")
def test_binstring(self):
self.check_dis(b"T\x03\x00\x00\x00abc.", '''\
0: T BINSTRING 'abc'
8: . STOP
highest protocol among opcodes = 1
''')
self.check_dis(b"T\x02\x00\x00\x00\xc3\xb5.", '''\
0: T BINSTRING '\\xc3\\xb5'
7: . STOP
highest protocol among opcodes = 1
''')
def test_short_binstring(self):
self.check_dis(b"U\x03abc.", '''\
0: U SHORT_BINSTRING 'abc'
5: . STOP
highest protocol among opcodes = 1
''')
self.check_dis(b"U\x02\xc3\xb5.", '''\
0: U SHORT_BINSTRING '\\xc3\\xb5'
4: . STOP
highest protocol among opcodes = 1
''')
def test_global(self):
self.check_dis(b"cmodule\nname\n.", '''\
0: c GLOBAL 'module name'
13: . STOP
highest protocol among opcodes = 0
''')
self.check_dis(b"cm\xc3\xb6dule\nn\xc3\xa4me\n.", '''\
0: c GLOBAL 'm\xf6dule n\xe4me'
15: . STOP
highest protocol among opcodes = 0
''')
def test_inst(self):
self.check_dis(b"(imodule\nname\n.", '''\
0: ( MARK
1: i INST 'module name' (MARK at 0)
14: . STOP
highest protocol among opcodes = 0
''')
def test_persid(self):
self.check_dis(b"Pabc\n.", '''\
0: P PERSID 'abc'
5: . STOP
highest protocol among opcodes = 0
''')
class MiscTestCase(unittest.TestCase):
def test__all__(self):
not_exported = {
@@ -142,8 +488,7 @@ class MiscTestCase(unittest.TestCase):
def load_tests(loader, tests, pattern):
# TODO: RUSTPYTHON
# tests.addTest(doctest.DocTestSuite(pickletools))
tests.addTest(doctest.DocTestSuite(pickletools))
return tests