mirror of
https://github.com/RustPython/RustPython.git
synced 2026-06-02 19:39:49 +09:00
Merge pull request #4841 from Masorubka1/test_mmap.py
Update test_mmap.py from Cpython v3.11.2
This commit is contained in:
108
Lib/test/test_mmap.py
vendored
108
Lib/test/test_mmap.py
vendored
@@ -1,11 +1,15 @@
|
||||
from test.support import (requires, _2G, _4G, gc_collect, cpython_only)
|
||||
from test.support import (
|
||||
requires, _2G, _4G, gc_collect, cpython_only, is_emscripten
|
||||
)
|
||||
from test.support.import_helper import import_module
|
||||
from test.support.os_helper import TESTFN, unlink
|
||||
import unittest
|
||||
import os
|
||||
import re
|
||||
import itertools
|
||||
import random
|
||||
import socket
|
||||
import string
|
||||
import sys
|
||||
import weakref
|
||||
|
||||
@@ -14,6 +18,16 @@ mmap = import_module('mmap')
|
||||
|
||||
PAGESIZE = mmap.PAGESIZE
|
||||
|
||||
tagname_prefix = f'python_{os.getpid()}_test_mmap'
|
||||
def random_tagname(length=10):
|
||||
suffix = ''.join(random.choices(string.ascii_uppercase, k=length))
|
||||
return f'{tagname_prefix}_{suffix}'
|
||||
|
||||
# Python's mmap module dup()s the file descriptor. Emscripten's FS layer
|
||||
# does not materialize file changes through a dupped fd to a new mmap.
|
||||
if is_emscripten:
|
||||
raise unittest.SkipTest("incompatible with Emscripten's mmap emulation.")
|
||||
|
||||
|
||||
class MmapTests(unittest.TestCase):
|
||||
|
||||
@@ -609,11 +623,13 @@ class MmapTests(unittest.TestCase):
|
||||
data1 = b"0123456789"
|
||||
data2 = b"abcdefghij"
|
||||
assert len(data1) == len(data2)
|
||||
tagname1 = random_tagname()
|
||||
tagname2 = random_tagname()
|
||||
|
||||
# Test same tag
|
||||
m1 = mmap.mmap(-1, len(data1), tagname="foo")
|
||||
m1 = mmap.mmap(-1, len(data1), tagname=tagname1)
|
||||
m1[:] = data1
|
||||
m2 = mmap.mmap(-1, len(data2), tagname="foo")
|
||||
m2 = mmap.mmap(-1, len(data2), tagname=tagname1)
|
||||
m2[:] = data2
|
||||
self.assertEqual(m1[:], data2)
|
||||
self.assertEqual(m2[:], data2)
|
||||
@@ -621,9 +637,9 @@ class MmapTests(unittest.TestCase):
|
||||
m1.close()
|
||||
|
||||
# Test different tag
|
||||
m1 = mmap.mmap(-1, len(data1), tagname="foo")
|
||||
m1 = mmap.mmap(-1, len(data1), tagname=tagname1)
|
||||
m1[:] = data1
|
||||
m2 = mmap.mmap(-1, len(data2), tagname="boo")
|
||||
m2 = mmap.mmap(-1, len(data2), tagname=tagname2)
|
||||
m2[:] = data2
|
||||
self.assertEqual(m1[:], data1)
|
||||
self.assertEqual(m2[:], data2)
|
||||
@@ -634,7 +650,7 @@ class MmapTests(unittest.TestCase):
|
||||
@unittest.skipUnless(os.name == 'nt', 'requires Windows')
|
||||
def test_sizeof(self):
|
||||
m1 = mmap.mmap(-1, 100)
|
||||
tagname = "foo"
|
||||
tagname = random_tagname()
|
||||
m2 = mmap.mmap(-1, 100, tagname=tagname)
|
||||
self.assertEqual(sys.getsizeof(m2),
|
||||
sys.getsizeof(m1) + len(tagname) + 1)
|
||||
@@ -642,9 +658,10 @@ class MmapTests(unittest.TestCase):
|
||||
@unittest.skipUnless(os.name == 'nt', 'requires Windows')
|
||||
def test_crasher_on_windows(self):
|
||||
# Should not crash (Issue 1733986)
|
||||
m = mmap.mmap(-1, 1000, tagname="foo")
|
||||
tagname = random_tagname()
|
||||
m = mmap.mmap(-1, 1000, tagname=tagname)
|
||||
try:
|
||||
mmap.mmap(-1, 5000, tagname="foo")[:] # same tagname, but larger size
|
||||
mmap.mmap(-1, 5000, tagname=tagname)[:] # same tagname, but larger size
|
||||
except:
|
||||
pass
|
||||
m.close()
|
||||
@@ -707,7 +724,6 @@ class MmapTests(unittest.TestCase):
|
||||
self.assertEqual(mm.write(b"yz"), 2)
|
||||
self.assertEqual(mm.write(b"python"), 6)
|
||||
|
||||
@unittest.skipIf(os.name == 'nt', 'cannot resize anonymous mmaps on Windows')
|
||||
def test_resize_past_pos(self):
|
||||
m = mmap.mmap(-1, 8192)
|
||||
self.addCleanup(m.close)
|
||||
@@ -797,6 +813,80 @@ class MmapTests(unittest.TestCase):
|
||||
self.assertEqual(m.madvise(mmap.MADV_NORMAL, 0, 2), None)
|
||||
self.assertEqual(m.madvise(mmap.MADV_NORMAL, 0, size), None)
|
||||
|
||||
@unittest.skipUnless(os.name == 'nt', 'requires Windows')
|
||||
def test_resize_up_when_mapped_to_pagefile(self):
|
||||
"""If the mmap is backed by the pagefile ensure a resize up can happen
|
||||
and that the original data is still in place
|
||||
"""
|
||||
start_size = PAGESIZE
|
||||
new_size = 2 * start_size
|
||||
data = bytes(random.getrandbits(8) for _ in range(start_size))
|
||||
|
||||
m = mmap.mmap(-1, start_size)
|
||||
m[:] = data
|
||||
m.resize(new_size)
|
||||
self.assertEqual(len(m), new_size)
|
||||
self.assertEqual(m[:start_size], data[:start_size])
|
||||
|
||||
@unittest.skipUnless(os.name == 'nt', 'requires Windows')
|
||||
def test_resize_down_when_mapped_to_pagefile(self):
|
||||
"""If the mmap is backed by the pagefile ensure a resize down up can happen
|
||||
and that a truncated form of the original data is still in place
|
||||
"""
|
||||
start_size = PAGESIZE
|
||||
new_size = start_size // 2
|
||||
data = bytes(random.getrandbits(8) for _ in range(start_size))
|
||||
|
||||
m = mmap.mmap(-1, start_size)
|
||||
m[:] = data
|
||||
m.resize(new_size)
|
||||
self.assertEqual(len(m), new_size)
|
||||
self.assertEqual(m[:new_size], data[:new_size])
|
||||
|
||||
@unittest.skipUnless(os.name == 'nt', 'requires Windows')
|
||||
def test_resize_fails_if_mapping_held_elsewhere(self):
|
||||
"""If more than one mapping is held against a named file on Windows, neither
|
||||
mapping can be resized
|
||||
"""
|
||||
start_size = 2 * PAGESIZE
|
||||
reduced_size = PAGESIZE
|
||||
|
||||
f = open(TESTFN, 'wb+')
|
||||
f.truncate(start_size)
|
||||
try:
|
||||
m1 = mmap.mmap(f.fileno(), start_size)
|
||||
m2 = mmap.mmap(f.fileno(), start_size)
|
||||
with self.assertRaises(OSError):
|
||||
m1.resize(reduced_size)
|
||||
with self.assertRaises(OSError):
|
||||
m2.resize(reduced_size)
|
||||
m2.close()
|
||||
m1.resize(reduced_size)
|
||||
self.assertEqual(m1.size(), reduced_size)
|
||||
self.assertEqual(os.stat(f.fileno()).st_size, reduced_size)
|
||||
finally:
|
||||
f.close()
|
||||
|
||||
@unittest.skipUnless(os.name == 'nt', 'requires Windows')
|
||||
def test_resize_succeeds_with_error_for_second_named_mapping(self):
|
||||
"""If a more than one mapping exists of the same name, none of them can
|
||||
be resized: they'll raise an Exception and leave the original mapping intact
|
||||
"""
|
||||
start_size = 2 * PAGESIZE
|
||||
reduced_size = PAGESIZE
|
||||
tagname = random_tagname()
|
||||
data_length = 8
|
||||
data = bytes(random.getrandbits(8) for _ in range(data_length))
|
||||
|
||||
m1 = mmap.mmap(-1, start_size, tagname=tagname)
|
||||
m2 = mmap.mmap(-1, start_size, tagname=tagname)
|
||||
m1[:data_length] = data
|
||||
self.assertEqual(m2[:data_length], data)
|
||||
with self.assertRaises(OSError):
|
||||
m1.resize(reduced_size)
|
||||
self.assertEqual(m1.size(), start_size)
|
||||
self.assertEqual(m1[:data_length], data)
|
||||
self.assertEqual(m2[:data_length], data)
|
||||
|
||||
class LargeMmapTests(unittest.TestCase):
|
||||
|
||||
|
||||
Reference in New Issue
Block a user