Update the zipfile + zipimport libraries + tests - v3.13.11 (#6639)

* Updated zipimport library + test

* Updated zipfile library + test

* Annotated failing/erroring tests in test_zipfile and test_zipimport

* Changed all skips in `test_core.py` to expectedFailures

* skip EncodedMetadataTests

---------

Co-authored-by: Jeong YunWon <jeong@youknowone.org>
This commit is contained in:
Terry Tianlin Luan
2026-01-06 07:35:21 -05:00
committed by GitHub
parent e367145a4a
commit cd613edc71
8 changed files with 675 additions and 409 deletions

View File

@@ -275,7 +275,8 @@ class TestPath(unittest.TestCase):
"""
zipfile_ondisk = self.zipfile_ondisk(alpharep)
pathlike = FakePath(str(zipfile_ondisk))
zipfile.Path(pathlike)
root = zipfile.Path(pathlike)
root.root.close()
@pass_alpharep
def test_traverse_pathlike(self, alpharep):
@@ -374,6 +375,7 @@ class TestPath(unittest.TestCase):
root = zipfile.Path(self.zipfile_ondisk(alpharep))
assert root.name == 'alpharep.zip' == root.filename.name
assert root.stem == 'alpharep' == root.filename.stem
root.root.close()
@pass_alpharep
def test_suffix(self, alpharep):
@@ -565,7 +567,7 @@ class TestPath(unittest.TestCase):
file = cls(alpharep).joinpath('some dir').parent
assert isinstance(file, cls)
@unittest.skipIf(sys.platform == 'win32', "TODO: RUSTPYTHON, fails on Windows")
@unittest.skipIf(sys.platform == 'win32', 'TODO: RUSTPYTHON; fails on Windows')
@parameterize(
['alpharep', 'path_type', 'subpath'],
itertools.product(
@@ -576,11 +578,13 @@ class TestPath(unittest.TestCase):
)
def test_pickle(self, alpharep, path_type, subpath):
zipfile_ondisk = path_type(str(self.zipfile_ondisk(alpharep)))
saved_1 = pickle.dumps(zipfile.Path(zipfile_ondisk, at=subpath))
root = zipfile.Path(zipfile_ondisk, at=subpath)
saved_1 = pickle.dumps(root)
root.root.close()
restored_1 = pickle.loads(saved_1)
first, *rest = restored_1.iterdir()
assert first.read_text(encoding='utf-8').startswith('content of ')
restored_1.root.close()
@pass_alpharep
def test_extract_orig_with_implied_dirs(self, alpharep):
@@ -592,6 +596,7 @@ class TestPath(unittest.TestCase):
# wrap the zipfile for its side effect
zipfile.Path(zf)
zf.extractall(source_path.parent)
zf.close()
@pass_alpharep
def test_getinfo_missing(self, alpharep):

View File

@@ -302,26 +302,26 @@ class AbstractTestsWithSourceFile:
self.assertEqual(openobj.read(1), b'2')
def test_writestr_compression(self):
zipfp = zipfile.ZipFile(TESTFN2, "w")
zipfp.writestr("b.txt", "hello world", compress_type=self.compression)
info = zipfp.getinfo('b.txt')
self.assertEqual(info.compress_type, self.compression)
with zipfile.ZipFile(TESTFN2, "w") as zipfp:
zipfp.writestr("b.txt", "hello world", compress_type=self.compression)
info = zipfp.getinfo('b.txt')
self.assertEqual(info.compress_type, self.compression)
def test_writestr_compresslevel(self):
zipfp = zipfile.ZipFile(TESTFN2, "w", compresslevel=1)
zipfp.writestr("a.txt", "hello world", compress_type=self.compression)
zipfp.writestr("b.txt", "hello world", compress_type=self.compression,
compresslevel=2)
with zipfile.ZipFile(TESTFN2, "w", compresslevel=1) as zipfp:
zipfp.writestr("a.txt", "hello world", compress_type=self.compression)
zipfp.writestr("b.txt", "hello world", compress_type=self.compression,
compresslevel=2)
# Compression level follows the constructor.
a_info = zipfp.getinfo('a.txt')
self.assertEqual(a_info.compress_type, self.compression)
self.assertEqual(a_info.compress_level, 1)
# Compression level follows the constructor.
a_info = zipfp.getinfo('a.txt')
self.assertEqual(a_info.compress_type, self.compression)
self.assertEqual(a_info.compress_level, 1)
# Compression level is overridden.
b_info = zipfp.getinfo('b.txt')
self.assertEqual(b_info.compress_type, self.compression)
self.assertEqual(b_info._compresslevel, 2)
# Compression level is overridden.
b_info = zipfp.getinfo('b.txt')
self.assertEqual(b_info.compress_type, self.compression)
self.assertEqual(b_info._compresslevel, 2)
def test_read_return_size(self):
# Issue #9837: ZipExtFile.read() shouldn't return more bytes
@@ -884,6 +884,8 @@ class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
self, file_size_64_set=False, file_size_extra=False,
compress_size_64_set=False, compress_size_extra=False,
header_offset_64_set=False, header_offset_extra=False,
extensible_data=b'',
end_of_central_dir_size=None, offset_to_end_of_central_dir=None,
):
"""Generate bytes sequence for a zip with (incomplete) zip64 data.
@@ -937,6 +939,12 @@ class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
central_dir_size = struct.pack('<Q', 58 + 8 * len(central_zip64_fields))
offset_to_central_dir = struct.pack('<Q', 50 + 8 * len(local_zip64_fields))
if end_of_central_dir_size is None:
end_of_central_dir_size = 44 + len(extensible_data)
if offset_to_end_of_central_dir is None:
offset_to_end_of_central_dir = (108
+ 8 * len(local_zip64_fields)
+ 8 * len(central_zip64_fields))
local_extra_length = struct.pack("<H", 4 + 8 * len(local_zip64_fields))
central_extra_length = struct.pack("<H", 4 + 8 * len(central_zip64_fields))
@@ -965,14 +973,17 @@ class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
+ filename
+ central_extra
# Zip64 end of central directory
+ b"PK\x06\x06,\x00\x00\x00\x00\x00\x00\x00-\x00-"
+ b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00"
+ b"PK\x06\x06"
+ struct.pack('<Q', end_of_central_dir_size)
+ b"-\x00-\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00"
+ b"\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00"
+ central_dir_size
+ offset_to_central_dir
+ extensible_data
# Zip64 end of central directory locator
+ b"PK\x06\x07\x00\x00\x00\x00l\x00\x00\x00\x00\x00\x00\x00\x01"
+ b"\x00\x00\x00"
+ b"PK\x06\x07\x00\x00\x00\x00"
+ struct.pack('<Q', offset_to_end_of_central_dir)
+ b"\x01\x00\x00\x00"
# end of central directory
+ b"PK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00:\x00\x00\x002\x00"
+ b"\x00\x00\x00\x00"
@@ -1003,6 +1014,7 @@ class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
with self.assertRaises(zipfile.BadZipFile) as e:
zipfile.ZipFile(io.BytesIO(missing_file_size_extra))
self.assertIn('file size', str(e.exception).lower())
self.assertTrue(zipfile.is_zipfile(io.BytesIO(missing_file_size_extra)))
# zip64 file size present, zip64 compress size present, one field in
# extra, expecting two, equals missing compress size.
@@ -1014,6 +1026,7 @@ class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
with self.assertRaises(zipfile.BadZipFile) as e:
zipfile.ZipFile(io.BytesIO(missing_compress_size_extra))
self.assertIn('compress size', str(e.exception).lower())
self.assertTrue(zipfile.is_zipfile(io.BytesIO(missing_compress_size_extra)))
# zip64 compress size present, no fields in extra, expecting one,
# equals missing compress size.
@@ -1023,6 +1036,7 @@ class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
with self.assertRaises(zipfile.BadZipFile) as e:
zipfile.ZipFile(io.BytesIO(missing_compress_size_extra))
self.assertIn('compress size', str(e.exception).lower())
self.assertTrue(zipfile.is_zipfile(io.BytesIO(missing_compress_size_extra)))
# zip64 file size present, zip64 compress size present, zip64 header
# offset present, two fields in extra, expecting three, equals missing
@@ -1037,6 +1051,7 @@ class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
with self.assertRaises(zipfile.BadZipFile) as e:
zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
self.assertIn('header offset', str(e.exception).lower())
self.assertTrue(zipfile.is_zipfile(io.BytesIO(missing_header_offset_extra)))
# zip64 compress size present, zip64 header offset present, one field
# in extra, expecting two, equals missing header offset
@@ -1049,6 +1064,7 @@ class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
with self.assertRaises(zipfile.BadZipFile) as e:
zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
self.assertIn('header offset', str(e.exception).lower())
self.assertTrue(zipfile.is_zipfile(io.BytesIO(missing_header_offset_extra)))
# zip64 file size present, zip64 header offset present, one field in
# extra, expecting two, equals missing header offset
@@ -1061,6 +1077,7 @@ class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
with self.assertRaises(zipfile.BadZipFile) as e:
zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
self.assertIn('header offset', str(e.exception).lower())
self.assertTrue(zipfile.is_zipfile(io.BytesIO(missing_header_offset_extra)))
# zip64 header offset present, no fields in extra, expecting one,
# equals missing header offset
@@ -1072,6 +1089,63 @@ class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
with self.assertRaises(zipfile.BadZipFile) as e:
zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
self.assertIn('header offset', str(e.exception).lower())
self.assertTrue(zipfile.is_zipfile(io.BytesIO(missing_header_offset_extra)))
def test_bad_zip64_end_of_central_dir(self):
zipdata = self.make_zip64_file(end_of_central_dir_size=0)
with self.assertRaisesRegex(zipfile.BadZipFile, 'Corrupt.*record'):
zipfile.ZipFile(io.BytesIO(zipdata))
self.assertFalse(zipfile.is_zipfile(io.BytesIO(zipdata)))
zipdata = self.make_zip64_file(end_of_central_dir_size=100)
with self.assertRaisesRegex(zipfile.BadZipFile, 'Corrupt.*record'):
zipfile.ZipFile(io.BytesIO(zipdata))
self.assertFalse(zipfile.is_zipfile(io.BytesIO(zipdata)))
zipdata = self.make_zip64_file(offset_to_end_of_central_dir=0)
with self.assertRaisesRegex(zipfile.BadZipFile, 'Corrupt.*record'):
zipfile.ZipFile(io.BytesIO(zipdata))
self.assertFalse(zipfile.is_zipfile(io.BytesIO(zipdata)))
zipdata = self.make_zip64_file(offset_to_end_of_central_dir=1000)
with self.assertRaisesRegex(zipfile.BadZipFile, 'Corrupt.*locator'):
zipfile.ZipFile(io.BytesIO(zipdata))
self.assertFalse(zipfile.is_zipfile(io.BytesIO(zipdata)))
def test_zip64_end_of_central_dir_record_not_found(self):
zipdata = self.make_zip64_file()
zipdata = zipdata.replace(b"PK\x06\x06", b'\x00'*4)
with self.assertRaisesRegex(zipfile.BadZipFile, 'record not found'):
zipfile.ZipFile(io.BytesIO(zipdata))
self.assertFalse(zipfile.is_zipfile(io.BytesIO(zipdata)))
zipdata = self.make_zip64_file(
extensible_data=b'\xca\xfe\x04\x00\x00\x00data')
zipdata = zipdata.replace(b"PK\x06\x06", b'\x00'*4)
with self.assertRaisesRegex(zipfile.BadZipFile, 'record not found'):
zipfile.ZipFile(io.BytesIO(zipdata))
self.assertFalse(zipfile.is_zipfile(io.BytesIO(zipdata)))
def test_zip64_extensible_data(self):
# These values are what is set in the make_zip64_file method.
expected_file_size = 8
expected_compress_size = 8
expected_header_offset = 0
expected_content = b"test1234"
zipdata = self.make_zip64_file(
extensible_data=b'\xca\xfe\x04\x00\x00\x00data')
with zipfile.ZipFile(io.BytesIO(zipdata)) as zf:
zinfo = zf.infolist()[0]
self.assertEqual(zinfo.file_size, expected_file_size)
self.assertEqual(zinfo.compress_size, expected_compress_size)
self.assertEqual(zinfo.header_offset, expected_header_offset)
self.assertEqual(zf.read(zinfo), expected_content)
self.assertTrue(zipfile.is_zipfile(io.BytesIO(zipdata)))
with self.assertRaisesRegex(zipfile.BadZipFile, 'record not found'):
zipfile.ZipFile(io.BytesIO(b'prepended' + zipdata))
self.assertFalse(zipfile.is_zipfile(io.BytesIO(b'prepended' + zipdata)))
def test_generated_valid_zip64_extra(self):
# These values are what is set in the make_zip64_file method.
@@ -1357,8 +1431,7 @@ class PyZipFileTests(unittest.TestCase):
self.skipTest('requires write access to the installed location')
unlink(filename)
# TODO: RUSTPYTHON
@unittest.expectedFailure
@unittest.expectedFailure # TODO: RUSTPYTHON
def test_write_pyfile(self):
self.requiresWriteAccess(os.path.dirname(__file__))
with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
@@ -1389,8 +1462,7 @@ class PyZipFileTests(unittest.TestCase):
self.assertNotIn(bn, zipfp.namelist())
self.assertCompiledIn(bn, zipfp.namelist())
# TODO: RUSTPYTHON
@unittest.expectedFailure
@unittest.expectedFailure # TODO: RUSTPYTHON
def test_write_python_package(self):
import email
packagedir = os.path.dirname(email.__file__)
@@ -1405,8 +1477,7 @@ class PyZipFileTests(unittest.TestCase):
self.assertCompiledIn('email/__init__.py', names)
self.assertCompiledIn('email/mime/text.py', names)
# TODO: RUSTPYTHON - AttributeError: module 'os' has no attribute 'supports_effective_ids'
@unittest.expectedFailure
@unittest.expectedFailure # TODO: RUSTPYTHON; - AttributeError: module 'os' has no attribute 'supports_effective_ids'
def test_write_filtered_python_package(self):
import test
packagedir = os.path.dirname(test.__file__)
@@ -1437,8 +1508,7 @@ class PyZipFileTests(unittest.TestCase):
print(reportStr)
self.assertTrue('SyntaxError' not in reportStr)
# TODO: RUSTPYTHON
@unittest.expectedFailure
@unittest.expectedFailure # TODO: RUSTPYTHON
def test_write_with_optimization(self):
import email
packagedir = os.path.dirname(email.__file__)
@@ -2189,6 +2259,7 @@ class OtherTests(unittest.TestCase):
zipf = zipfile.ZipFile(TESTFN, mode="r")
except zipfile.BadZipFile:
self.fail("Unable to create empty ZIP file in 'w' mode")
zipf.close()
zipf = zipfile.ZipFile(TESTFN, mode="a")
zipf.close()
@@ -2196,6 +2267,7 @@ class OtherTests(unittest.TestCase):
zipf = zipfile.ZipFile(TESTFN, mode="r")
except:
self.fail("Unable to create empty ZIP file in 'a' mode")
zipf.close()
def test_open_empty_file(self):
# Issue 1710703: Check that opening a file with less than 22 bytes
@@ -3020,11 +3092,6 @@ class TestsWithMultipleOpens(unittest.TestCase):
self.assertEqual(data1, self.data1)
self.assertEqual(data2, self.data2)
# TODO: RUSTPYTHON other tests can impact the file descriptor incrementor
# by leaving file handles unclosed. If there are more than 100 files in
# TESTFN and references to them are left unclosed and ungarbage collected
# in another test, then fileno() will always be too high for this test to
# pass. The solution is to increase the number of files from 100 to 200
def test_many_opens(self):
# Verify that read() and open() promptly close the file descriptor,
# and don't rely on the garbage collector to free resources.
@@ -3348,8 +3415,7 @@ class TestExecutablePrependedZip(unittest.TestCase):
def test_read_zip64_with_exe_prepended(self):
self._test_zip_works(self.exe_zip64)
# TODO: RUSTPYTHON
@unittest.expectedFailure
@unittest.expectedFailure # TODO: RUSTPYTHON
@unittest.skipUnless(sys.executable, 'sys.executable required.')
@unittest.skipUnless(os.access('/bin/bash', os.X_OK),
'Test relies on #!/bin/bash working.')
@@ -3358,8 +3424,7 @@ class TestExecutablePrependedZip(unittest.TestCase):
output = subprocess.check_output([self.exe_zip, sys.executable])
self.assertIn(b'number in executable: 5', output)
# TODO: RUSTPYTHON
@unittest.expectedFailure
@unittest.expectedFailure # TODO: RUSTPYTHON
@unittest.skipUnless(sys.executable, 'sys.executable required.')
@unittest.skipUnless(os.access('/bin/bash', os.X_OK),
'Test relies on #!/bin/bash working.')
@@ -3369,7 +3434,6 @@ class TestExecutablePrependedZip(unittest.TestCase):
self.assertIn(b'number in executable: 5', output)
# TODO: RUSTPYTHON
@unittest.skip("TODO: RUSTPYTHON shift_jis encoding unsupported")
class EncodedMetadataTests(unittest.TestCase):
file_names = ['\u4e00', '\u4e8c', '\u4e09'] # Han 'one', 'two', 'three'
@@ -3377,8 +3441,10 @@ class EncodedMetadataTests(unittest.TestCase):
"This is pure ASCII.\n".encode('ascii'),
# This is modern Japanese. (UTF-8)
"\u3053\u308c\u306f\u73fe\u4ee3\u7684\u65e5\u672c\u8a9e\u3067\u3059\u3002\n".encode('utf-8'),
# TODO RUSTPYTHON
# Uncomment when Shift JIS is supported
# This is obsolete Japanese. (Shift JIS)
# "\u3053\u308c\u306f\u53e4\u3044\u65e5\u672c\u8a9e\u3067\u3059\u3002\n".encode('shift_jis'), # TODO: RUSTPYTHON
# "\u3053\u308c\u306f\u53e4\u3044\u65e5\u672c\u8a9e\u3067\u3059\u3002\n".encode('shift_jis'),
]
def setUp(self):
@@ -3418,11 +3484,13 @@ class EncodedMetadataTests(unittest.TestCase):
self.assertEqual(info.file_size, len(content))
self.assertEqual(zipfp.read(name), content)
@unittest.expectedFailure # TODO: RUSTPYTHON;
def test_read_with_metadata_encoding(self):
# Read the ZIP archive with correct metadata_encoding
with zipfile.ZipFile(TESTFN, "r", metadata_encoding='shift_jis') as zipfp:
self._test_read(zipfp, self.file_names, self.file_content)
@unittest.expectedFailure # TODO: RUSTPYTHON;
def test_read_without_metadata_encoding(self):
# Read the ZIP archive without metadata_encoding
expected_names = [name.encode('shift_jis').decode('cp437')
@@ -3430,6 +3498,7 @@ class EncodedMetadataTests(unittest.TestCase):
with zipfile.ZipFile(TESTFN, "r") as zipfp:
self._test_read(zipfp, expected_names, self.file_content)
@unittest.expectedFailure # TODO: RUSTPYTHON;
def test_read_with_incorrect_metadata_encoding(self):
# Read the ZIP archive with incorrect metadata_encoding
expected_names = [name.encode('shift_jis').decode('koi8-u')
@@ -3437,6 +3506,7 @@ class EncodedMetadataTests(unittest.TestCase):
with zipfile.ZipFile(TESTFN, "r", metadata_encoding='koi8-u') as zipfp:
self._test_read(zipfp, expected_names, self.file_content)
@unittest.expectedFailure # TODO: RUSTPYTHON;
def test_read_with_unsuitable_metadata_encoding(self):
# Read the ZIP archive with metadata_encoding unsuitable for
# decoding metadata
@@ -3445,6 +3515,7 @@ class EncodedMetadataTests(unittest.TestCase):
with self.assertRaises(UnicodeDecodeError):
zipfile.ZipFile(TESTFN, "r", metadata_encoding='utf-8')
@unittest.expectedFailure # TODO: RUSTPYTHON;
def test_read_after_append(self):
newname = '\u56db' # Han 'four'
expected_names = [name.encode('shift_jis').decode('cp437')
@@ -3471,6 +3542,7 @@ class EncodedMetadataTests(unittest.TestCase):
else:
self.assertEqual(zipfp.read(name), content)
@unittest.expectedFailure # TODO: RUSTPYTHON;
def test_write_with_metadata_encoding(self):
ZF = zipfile.ZipFile
for mode in ("w", "x", "a"):
@@ -3478,6 +3550,7 @@ class EncodedMetadataTests(unittest.TestCase):
"^metadata_encoding is only"):
ZF("nonesuch.zip", mode, metadata_encoding="shift_jis")
@unittest.expectedFailure # TODO: RUSTPYTHON;
def test_cli_with_metadata_encoding(self):
errmsg = "Non-conforming encodings not supported with -c."
args = ["--metadata-encoding=shift_jis", "-c", "nonesuch", "nonesuch"]
@@ -3497,6 +3570,7 @@ class EncodedMetadataTests(unittest.TestCase):
for name in self.file_names:
self.assertIn(name, listing)
@unittest.expectedFailure # TODO: RUSTPYTHON;
def test_cli_with_metadata_encoding_extract(self):
os.mkdir(TESTFN2)
self.addCleanup(rmtree, TESTFN2)

View File

@@ -1,8 +1,10 @@
import sys
import os
import marshal
import glob
import importlib
import importlib.util
import re
import struct
import time
import unittest
@@ -50,10 +52,14 @@ test_pyc = make_pyc(test_co, NOW, len(test_src))
TESTMOD = "ziptestmodule"
TESTMOD2 = "ziptestmodule2"
TESTMOD3 = "ziptestmodule3"
TESTPACK = "ziptestpackage"
TESTPACK2 = "ziptestpackage2"
TESTPACK3 = "ziptestpackage3"
TEMP_DIR = os.path.abspath("junk95142")
TEMP_ZIP = os.path.abspath("junk95142.zip")
TEST_DATA_DIR = os.path.join(os.path.dirname(__file__), "zipimport_data")
pyc_file = importlib.util.cache_from_source(TESTMOD + '.py')
pyc_ext = '.pyc'
@@ -92,8 +98,10 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
# defined by files under the directory dirName.
self.addCleanup(os_helper.rmtree, dirName)
for name, (mtime, data) in files.items():
path = os.path.join(dirName, name)
for name, data in files.items():
if isinstance(data, tuple):
mtime, data = data
path = os.path.join(dirName, *name.split('/'))
if path[-1] == os.sep:
if not os.path.isdir(path):
os.makedirs(path)
@@ -104,22 +112,18 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
with open(path, 'wb') as fp:
fp.write(data)
def makeZip(self, files, zipName=TEMP_ZIP, **kw):
def makeZip(self, files, zipName=TEMP_ZIP, *,
comment=None, file_comment=None, stuff=None, prefix='', **kw):
# Create a zip archive based set of modules/packages
# defined by files in the zip file zipName. If the
# key 'stuff' exists in kw it is prepended to the archive.
# defined by files in the zip file zipName.
# If stuff is not None, it is prepended to the archive.
self.addCleanup(os_helper.unlink, zipName)
with ZipFile(zipName, "w") as z:
for name, (mtime, data) in files.items():
zinfo = ZipInfo(name, time.localtime(mtime))
zinfo.compress_type = self.compression
z.writestr(zinfo, data)
comment = kw.get("comment", None)
with ZipFile(zipName, "w", compression=self.compression) as z:
self.writeZip(z, files, file_comment=file_comment, prefix=prefix)
if comment is not None:
z.comment = comment
stuff = kw.get("stuff", None)
if stuff is not None:
# Prepend 'stuff' to the start of the zipfile
with open(zipName, "rb") as f:
@@ -128,20 +132,47 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
f.write(stuff)
f.write(data)
def doTest(self, expected_ext, files, *modules, **kw):
self.makeZip(files, **kw)
def writeZip(self, z, files, *, file_comment=None, prefix=''):
for name, data in files.items():
if isinstance(data, tuple):
mtime, data = data
else:
mtime = NOW
name = name.replace(os.sep, '/')
zinfo = ZipInfo(prefix + name, time.localtime(mtime))
zinfo.compress_type = self.compression
if file_comment is not None:
zinfo.comment = file_comment
if data is None:
zinfo.CRC = 0
z.mkdir(zinfo)
else:
assert name[-1] != '/'
z.writestr(zinfo, data)
sys.path.insert(0, TEMP_ZIP)
def getZip64Files(self):
# This is the simplest way to make zipfile generate the zip64 EOCD block
return {f"f{n}.py": test_src for n in range(65537)}
def doTest(self, expected_ext, files, *modules, **kw):
if 'prefix' not in kw:
kw['prefix'] = 'pre/fix/'
self.makeZip(files, **kw)
self.doTestWithPreBuiltZip(expected_ext, *modules, **kw)
def doTestWithPreBuiltZip(self, expected_ext, *modules,
call=None, prefix='', **kw):
zip_path = os.path.join(TEMP_ZIP, *prefix.split('/')[:-1])
sys.path.insert(0, zip_path)
mod = importlib.import_module(".".join(modules))
call = kw.get('call')
if call is not None:
call(mod)
if expected_ext:
file = mod.get_file()
self.assertEqual(file, os.path.join(TEMP_ZIP,
self.assertEqual(file, os.path.join(zip_path,
*modules) + expected_ext)
def testAFakeZlib(self):
@@ -155,7 +186,8 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
# zlib.decompress function object, after which the problem being
# tested here wouldn't be a problem anymore...
# (Hence the 'A' in the test method name: to make it the first
# item in a list sorted by name, like unittest.makeSuite() does.)
# item in a list sorted by name, like
# unittest.TestLoader.getTestCaseNames() does.)
#
# This test fails on platforms on which the zlib module is
# statically linked, but the problem it tests for can't
@@ -166,7 +198,7 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
self.skipTest('zlib is a builtin module')
if "zlib" in sys.modules:
del sys.modules["zlib"]
files = {"zlib.py": (NOW, test_src)}
files = {"zlib.py": test_src}
try:
self.doTest(".py", files, "zlib")
except ImportError:
@@ -177,16 +209,16 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
self.fail("expected test to raise ImportError")
def testPy(self):
files = {TESTMOD + ".py": (NOW, test_src)}
files = {TESTMOD + ".py": test_src}
self.doTest(".py", files, TESTMOD)
def testPyc(self):
files = {TESTMOD + pyc_ext: (NOW, test_pyc)}
files = {TESTMOD + pyc_ext: test_pyc}
self.doTest(pyc_ext, files, TESTMOD)
def testBoth(self):
files = {TESTMOD + ".py": (NOW, test_src),
TESTMOD + pyc_ext: (NOW, test_pyc)}
files = {TESTMOD + ".py": test_src,
TESTMOD + pyc_ext: test_pyc}
self.doTest(pyc_ext, files, TESTMOD)
def testUncheckedHashBasedPyc(self):
@@ -219,22 +251,22 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
self.doTest(None, files, TESTMOD, call=check)
def testEmptyPy(self):
files = {TESTMOD + ".py": (NOW, "")}
files = {TESTMOD + ".py": ""}
self.doTest(None, files, TESTMOD)
def testBadMagic(self):
# make pyc magic word invalid, forcing loading from .py
badmagic_pyc = bytearray(test_pyc)
badmagic_pyc[0] ^= 0x04 # flip an arbitrary bit
files = {TESTMOD + ".py": (NOW, test_src),
TESTMOD + pyc_ext: (NOW, badmagic_pyc)}
files = {TESTMOD + ".py": test_src,
TESTMOD + pyc_ext: badmagic_pyc}
self.doTest(".py", files, TESTMOD)
def testBadMagic2(self):
# make pyc magic word invalid, causing an ImportError
badmagic_pyc = bytearray(test_pyc)
badmagic_pyc[0] ^= 0x04 # flip an arbitrary bit
files = {TESTMOD + pyc_ext: (NOW, badmagic_pyc)}
files = {TESTMOD + pyc_ext: badmagic_pyc}
try:
self.doTest(".py", files, TESTMOD)
self.fail("This should not be reached")
@@ -247,22 +279,22 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
# flip the second bit -- not the first as that one isn't stored in the
# .py's mtime in the zip archive.
badtime_pyc[11] ^= 0x02
files = {TESTMOD + ".py": (NOW, test_src),
TESTMOD + pyc_ext: (NOW, badtime_pyc)}
files = {TESTMOD + ".py": test_src,
TESTMOD + pyc_ext: badtime_pyc}
self.doTest(".py", files, TESTMOD)
def test2038MTime(self):
# Make sure we can handle mtimes larger than what a 32-bit signed number
# can hold.
twenty_thirty_eight_pyc = make_pyc(test_co, 2**32 - 1, len(test_src))
files = {TESTMOD + ".py": (NOW, test_src),
TESTMOD + pyc_ext: (NOW, twenty_thirty_eight_pyc)}
files = {TESTMOD + ".py": test_src,
TESTMOD + pyc_ext: twenty_thirty_eight_pyc}
self.doTest(".py", files, TESTMOD)
def testPackage(self):
packdir = TESTPACK + os.sep
files = {packdir + "__init__" + pyc_ext: (NOW, test_pyc),
packdir + TESTMOD + pyc_ext: (NOW, test_pyc)}
files = {packdir + "__init__" + pyc_ext: test_pyc,
packdir + TESTMOD + pyc_ext: test_pyc}
self.doTest(pyc_ext, files, TESTPACK, TESTMOD)
def testSubPackage(self):
@@ -270,9 +302,9 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
# archives.
packdir = TESTPACK + os.sep
packdir2 = packdir + TESTPACK2 + os.sep
files = {packdir + "__init__" + pyc_ext: (NOW, test_pyc),
packdir2 + "__init__" + pyc_ext: (NOW, test_pyc),
packdir2 + TESTMOD + pyc_ext: (NOW, test_pyc)}
files = {packdir + "__init__" + pyc_ext: test_pyc,
packdir2 + "__init__" + pyc_ext: test_pyc,
packdir2 + TESTMOD + pyc_ext: test_pyc}
self.doTest(pyc_ext, files, TESTPACK, TESTPACK2, TESTMOD)
def testSubNamespacePackage(self):
@@ -281,9 +313,9 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
packdir = TESTPACK + os.sep
packdir2 = packdir + TESTPACK2 + os.sep
# The first two files are just directory entries (so have no data).
files = {packdir: (NOW, ""),
packdir2: (NOW, ""),
packdir2 + TESTMOD + pyc_ext: (NOW, test_pyc)}
files = {packdir: None,
packdir2: None,
packdir2 + TESTMOD + pyc_ext: test_pyc}
self.doTest(pyc_ext, files, TESTPACK, TESTPACK2, TESTMOD)
def testMixedNamespacePackage(self):
@@ -291,19 +323,19 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
# real filesystem and a zip archive.
packdir = TESTPACK + os.sep
packdir2 = packdir + TESTPACK2 + os.sep
packdir3 = packdir2 + TESTPACK + '3' + os.sep
files1 = {packdir: (NOW, ""),
packdir + TESTMOD + pyc_ext: (NOW, test_pyc),
packdir2: (NOW, ""),
packdir3: (NOW, ""),
packdir3 + TESTMOD + pyc_ext: (NOW, test_pyc),
packdir2 + TESTMOD + '3' + pyc_ext: (NOW, test_pyc),
packdir2 + TESTMOD + pyc_ext: (NOW, test_pyc)}
files2 = {packdir: (NOW, ""),
packdir + TESTMOD + '2' + pyc_ext: (NOW, test_pyc),
packdir2: (NOW, ""),
packdir2 + TESTMOD + '2' + pyc_ext: (NOW, test_pyc),
packdir2 + TESTMOD + pyc_ext: (NOW, test_pyc)}
packdir3 = packdir2 + TESTPACK3 + os.sep
files1 = {packdir: None,
packdir + TESTMOD + pyc_ext: test_pyc,
packdir2: None,
packdir3: None,
packdir3 + TESTMOD + pyc_ext: test_pyc,
packdir2 + TESTMOD3 + pyc_ext: test_pyc,
packdir2 + TESTMOD + pyc_ext: test_pyc}
files2 = {packdir: None,
packdir + TESTMOD2 + pyc_ext: test_pyc,
packdir2: None,
packdir2 + TESTMOD2 + pyc_ext: test_pyc,
packdir2 + TESTMOD + pyc_ext: test_pyc}
zip1 = os.path.abspath("path1.zip")
self.makeZip(files1, zip1)
@@ -336,8 +368,8 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
mod = importlib.import_module('.'.join((TESTPACK, TESTMOD)))
self.assertEqual("path1.zip", mod.__file__.split(os.sep)[-3])
# And TESTPACK/(TESTMOD + '2') only exists in path2.
mod = importlib.import_module('.'.join((TESTPACK, TESTMOD + '2')))
# And TESTPACK/(TESTMOD2) only exists in path2.
mod = importlib.import_module('.'.join((TESTPACK, TESTMOD2)))
self.assertEqual(os.path.basename(TEMP_DIR),
mod.__file__.split(os.sep)[-3])
@@ -354,13 +386,13 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
self.assertEqual(os.path.basename(TEMP_DIR),
mod.__file__.split(os.sep)[-4])
# subpkg.TESTMOD + '2' only exists in zip2.
mod = importlib.import_module('.'.join((subpkg, TESTMOD + '2')))
# subpkg.TESTMOD2 only exists in zip2.
mod = importlib.import_module('.'.join((subpkg, TESTMOD2)))
self.assertEqual(os.path.basename(TEMP_DIR),
mod.__file__.split(os.sep)[-4])
# Finally subpkg.TESTMOD + '3' only exists in zip1.
mod = importlib.import_module('.'.join((subpkg, TESTMOD + '3')))
# Finally subpkg.TESTMOD3 only exists in zip1.
mod = importlib.import_module('.'.join((subpkg, TESTMOD3)))
self.assertEqual('path1.zip', mod.__file__.split(os.sep)[-4])
def testNamespacePackage(self):
@@ -368,22 +400,22 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
# archives.
packdir = TESTPACK + os.sep
packdir2 = packdir + TESTPACK2 + os.sep
packdir3 = packdir2 + TESTPACK + '3' + os.sep
files1 = {packdir: (NOW, ""),
packdir + TESTMOD + pyc_ext: (NOW, test_pyc),
packdir2: (NOW, ""),
packdir3: (NOW, ""),
packdir3 + TESTMOD + pyc_ext: (NOW, test_pyc),
packdir2 + TESTMOD + '3' + pyc_ext: (NOW, test_pyc),
packdir2 + TESTMOD + pyc_ext: (NOW, test_pyc)}
packdir3 = packdir2 + TESTPACK3 + os.sep
files1 = {packdir: None,
packdir + TESTMOD + pyc_ext: test_pyc,
packdir2: None,
packdir3: None,
packdir3 + TESTMOD + pyc_ext: test_pyc,
packdir2 + TESTMOD3 + pyc_ext: test_pyc,
packdir2 + TESTMOD + pyc_ext: test_pyc}
zip1 = os.path.abspath("path1.zip")
self.makeZip(files1, zip1)
files2 = {packdir: (NOW, ""),
packdir + TESTMOD + '2' + pyc_ext: (NOW, test_pyc),
packdir2: (NOW, ""),
packdir2 + TESTMOD + '2' + pyc_ext: (NOW, test_pyc),
packdir2 + TESTMOD + pyc_ext: (NOW, test_pyc)}
files2 = {packdir: None,
packdir + TESTMOD2 + pyc_ext: test_pyc,
packdir2: None,
packdir2 + TESTMOD2 + pyc_ext: test_pyc,
packdir2 + TESTMOD + pyc_ext: test_pyc}
zip2 = os.path.abspath("path2.zip")
self.makeZip(files2, zip2)
@@ -412,8 +444,8 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
mod = importlib.import_module('.'.join((TESTPACK, TESTMOD)))
self.assertEqual("path1.zip", mod.__file__.split(os.sep)[-3])
# And TESTPACK/(TESTMOD + '2') only exists in path2.
mod = importlib.import_module('.'.join((TESTPACK, TESTMOD + '2')))
# And TESTPACK/(TESTMOD2) only exists in path2.
mod = importlib.import_module('.'.join((TESTPACK, TESTMOD2)))
self.assertEqual("path2.zip", mod.__file__.split(os.sep)[-3])
# One level deeper...
@@ -428,29 +460,22 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
mod = importlib.import_module('.'.join((subpkg, TESTMOD)))
self.assertEqual('path2.zip', mod.__file__.split(os.sep)[-4])
# subpkg.TESTMOD + '2' only exists in zip2.
mod = importlib.import_module('.'.join((subpkg, TESTMOD + '2')))
# subpkg.TESTMOD2 only exists in zip2.
mod = importlib.import_module('.'.join((subpkg, TESTMOD2)))
self.assertEqual('path2.zip', mod.__file__.split(os.sep)[-4])
# Finally subpkg.TESTMOD + '3' only exists in zip1.
mod = importlib.import_module('.'.join((subpkg, TESTMOD + '3')))
# Finally subpkg.TESTMOD3 only exists in zip1.
mod = importlib.import_module('.'.join((subpkg, TESTMOD3)))
self.assertEqual('path1.zip', mod.__file__.split(os.sep)[-4])
def testZipImporterMethods(self):
packdir = TESTPACK + os.sep
packdir2 = packdir + TESTPACK2 + os.sep
files = {packdir + "__init__" + pyc_ext: (NOW, test_pyc),
packdir2 + "__init__" + pyc_ext: (NOW, test_pyc),
packdir2 + TESTMOD + pyc_ext: (NOW, test_pyc),
"spam" + pyc_ext: (NOW, test_pyc)}
self.addCleanup(os_helper.unlink, TEMP_ZIP)
with ZipFile(TEMP_ZIP, "w") as z:
for name, (mtime, data) in files.items():
zinfo = ZipInfo(name, time.localtime(mtime))
zinfo.compress_type = self.compression
zinfo.comment = b"spam"
z.writestr(zinfo, data)
files = {packdir + "__init__" + pyc_ext: test_pyc,
packdir2 + "__init__" + pyc_ext: test_pyc,
packdir2 + TESTMOD + pyc_ext: test_pyc,
"spam" + pyc_ext: test_pyc}
self.makeZip(files, file_comment=b"spam")
zi = zipimport.zipimporter(TEMP_ZIP)
self.assertEqual(zi.archive, TEMP_ZIP)
@@ -459,12 +484,6 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
# PEP 302
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
find_mod = zi.find_module('spam')
self.assertIsNotNone(find_mod)
self.assertIsInstance(find_mod, zipimport.zipimporter)
self.assertFalse(find_mod.is_package('spam'))
load_mod = find_mod.load_module('spam')
self.assertEqual(find_mod.get_filename('spam'), load_mod.__file__)
mod = zi.load_module(TESTPACK)
self.assertEqual(zi.get_filename(TESTPACK), mod.__file__)
@@ -512,58 +531,68 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
def testInvalidateCaches(self):
packdir = TESTPACK + os.sep
packdir2 = packdir + TESTPACK2 + os.sep
files = {packdir + "__init__" + pyc_ext: (NOW, test_pyc),
packdir2 + "__init__" + pyc_ext: (NOW, test_pyc),
packdir2 + TESTMOD + pyc_ext: (NOW, test_pyc),
"spam" + pyc_ext: (NOW, test_pyc)}
self.addCleanup(os_helper.unlink, TEMP_ZIP)
with ZipFile(TEMP_ZIP, "w") as z:
for name, (mtime, data) in files.items():
zinfo = ZipInfo(name, time.localtime(mtime))
zinfo.compress_type = self.compression
zinfo.comment = b"spam"
z.writestr(zinfo, data)
files = {packdir + "__init__" + pyc_ext: test_pyc,
packdir2 + "__init__" + pyc_ext: test_pyc,
packdir2 + TESTMOD + pyc_ext: test_pyc,
"spam" + pyc_ext: test_pyc}
self.makeZip(files, file_comment=b"spam")
zi = zipimport.zipimporter(TEMP_ZIP)
self.assertEqual(zi._files.keys(), files.keys())
self.assertEqual(zi._get_files().keys(), files.keys())
# Check that the file information remains accurate after reloading
zi.invalidate_caches()
self.assertEqual(zi._files.keys(), files.keys())
self.assertEqual(zi._get_files().keys(), files.keys())
# Add a new file to the ZIP archive
newfile = {"spam2" + pyc_ext: (NOW, test_pyc)}
newfile = {"spam2" + pyc_ext: test_pyc}
files.update(newfile)
with ZipFile(TEMP_ZIP, "a") as z:
for name, (mtime, data) in newfile.items():
zinfo = ZipInfo(name, time.localtime(mtime))
zinfo.compress_type = self.compression
zinfo.comment = b"spam"
z.writestr(zinfo, data)
with ZipFile(TEMP_ZIP, "a", compression=self.compression) as z:
self.writeZip(z, newfile, file_comment=b"spam")
# Check that we can detect the new file after invalidating the cache
zi.invalidate_caches()
self.assertEqual(zi._files.keys(), files.keys())
self.assertEqual(zi._get_files().keys(), files.keys())
spec = zi.find_spec('spam2')
self.assertIsNotNone(spec)
self.assertIsInstance(spec.loader, zipimport.zipimporter)
# Check that the cached data is removed if the file is deleted
os.remove(TEMP_ZIP)
zi.invalidate_caches()
self.assertFalse(zi._files)
self.assertFalse(zi._get_files())
self.assertIsNone(zipimport._zip_directory_cache.get(zi.archive))
self.assertIsNone(zi.find_spec("name_does_not_matter"))
def testInvalidateCachesWithMultipleZipimports(self):
packdir = TESTPACK + os.sep
packdir2 = packdir + TESTPACK2 + os.sep
files = {packdir + "__init__" + pyc_ext: test_pyc,
packdir2 + "__init__" + pyc_ext: test_pyc,
packdir2 + TESTMOD + pyc_ext: test_pyc,
"spam" + pyc_ext: test_pyc}
self.makeZip(files, file_comment=b"spam")
zi = zipimport.zipimporter(TEMP_ZIP)
self.assertEqual(zi._get_files().keys(), files.keys())
# Zipimporter for the same path.
zi2 = zipimport.zipimporter(TEMP_ZIP)
self.assertEqual(zi2._get_files().keys(), files.keys())
# Add a new file to the ZIP archive to make the cache wrong.
newfile = {"spam2" + pyc_ext: test_pyc}
files.update(newfile)
with ZipFile(TEMP_ZIP, "a", compression=self.compression) as z:
self.writeZip(z, newfile, file_comment=b"spam")
# Invalidate the cache of the first zipimporter.
zi.invalidate_caches()
# Check that the second zipimporter detects the new file and isn't using a stale cache.
self.assertEqual(zi2._get_files().keys(), files.keys())
spec = zi2.find_spec('spam2')
self.assertIsNotNone(spec)
self.assertIsInstance(spec.loader, zipimport.zipimporter)
def testZipImporterMethodsInSubDirectory(self):
packdir = TESTPACK + os.sep
packdir2 = packdir + TESTPACK2 + os.sep
files = {packdir2 + "__init__" + pyc_ext: (NOW, test_pyc),
packdir2 + TESTMOD + pyc_ext: (NOW, test_pyc)}
self.addCleanup(os_helper.unlink, TEMP_ZIP)
with ZipFile(TEMP_ZIP, "w") as z:
for name, (mtime, data) in files.items():
zinfo = ZipInfo(name, time.localtime(mtime))
zinfo.compress_type = self.compression
zinfo.comment = b"eggs"
z.writestr(zinfo, data)
files = {packdir2 + "__init__" + pyc_ext: test_pyc,
packdir2 + TESTMOD + pyc_ext: test_pyc}
self.makeZip(files, file_comment=b"eggs")
zi = zipimport.zipimporter(TEMP_ZIP + os.sep + packdir)
self.assertEqual(zi.archive, TEMP_ZIP)
@@ -585,16 +614,6 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
pkg_path = TEMP_ZIP + os.sep + packdir + TESTPACK2
zi2 = zipimport.zipimporter(pkg_path)
# PEP 302
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
find_mod_dotted = zi2.find_module(TESTMOD)
self.assertIsNotNone(find_mod_dotted)
self.assertIsInstance(find_mod_dotted, zipimport.zipimporter)
self.assertFalse(zi2.is_package(TESTMOD))
load_mod = find_mod_dotted.load_module(TESTMOD)
self.assertEqual(
find_mod_dotted.get_filename(TESTMOD), load_mod.__file__)
# PEP 451
spec = zi2.find_spec(TESTMOD)
@@ -638,9 +657,9 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
if __loader__.get_data("some.data") != b"some data":
raise AssertionError("bad data")\n"""
pyc = make_pyc(compile(src, "<???>", "exec"), NOW, len(src))
files = {TESTMOD + pyc_ext: (NOW, pyc),
"some.data": (NOW, "some data")}
self.doTest(pyc_ext, files, TESTMOD)
files = {TESTMOD + pyc_ext: pyc,
"some.data": "some data"}
self.doTest(pyc_ext, files, TESTMOD, prefix='')
def testDefaultOptimizationLevel(self):
# zipimport should use the default optimization level (#28131)
@@ -648,17 +667,20 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
def test(val):
assert(val)
return val\n"""
files = {TESTMOD + '.py': (NOW, src)}
files = {TESTMOD + '.py': src}
self.makeZip(files)
sys.path.insert(0, TEMP_ZIP)
mod = importlib.import_module(TESTMOD)
self.assertEqual(mod.test(1), 1)
self.assertRaises(AssertionError, mod.test, False)
if __debug__:
self.assertRaises(AssertionError, mod.test, False)
else:
self.assertEqual(mod.test(0), 0)
def testImport_WithStuff(self):
# try importing from a zipfile which contains additional
# stuff at the beginning of the file
files = {TESTMOD + ".py": (NOW, test_src)}
files = {TESTMOD + ".py": test_src}
self.doTest(".py", files, TESTMOD,
stuff=b"Some Stuff"*31)
@@ -666,18 +688,18 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
self.assertEqual(inspect.getsource(module), test_src)
def testGetSource(self):
files = {TESTMOD + ".py": (NOW, test_src)}
files = {TESTMOD + ".py": test_src}
self.doTest(".py", files, TESTMOD, call=self.assertModuleSource)
def testGetCompiledSource(self):
pyc = make_pyc(compile(test_src, "<???>", "exec"), NOW, len(test_src))
files = {TESTMOD + ".py": (NOW, test_src),
TESTMOD + pyc_ext: (NOW, pyc)}
files = {TESTMOD + ".py": test_src,
TESTMOD + pyc_ext: pyc}
self.doTest(pyc_ext, files, TESTMOD, call=self.assertModuleSource)
def runDoctest(self, callback):
files = {TESTMOD + ".py": (NOW, test_src),
"xyz.txt": (NOW, ">>> log.append(True)\n")}
files = {TESTMOD + ".py": test_src,
"xyz.txt": ">>> log.append(True)\n"}
self.doTest(".py", files, TESTMOD, call=callback)
def doDoctestFile(self, module):
@@ -720,56 +742,177 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
s = io.StringIO()
print_tb(tb, 1, s)
self.assertTrue(s.getvalue().endswith(raise_src))
self.assertTrue(s.getvalue().endswith(
' def do_raise(): raise TypeError\n'
'' if support.has_no_debug_ranges() else
' ^^^^^^^^^^^^^^^\n'
))
else:
raise AssertionError("This ought to be impossible")
# TODO: RUSTPYTHON; empty caret lines from equal col/end_col
@unittest.expectedFailure
@unittest.expectedFailure # TODO: RUSTPYTHON; empty caret lines from equal col/end_col
def testTraceback(self):
files = {TESTMOD + ".py": (NOW, raise_src)}
files = {TESTMOD + ".py": raise_src}
self.doTest(None, files, TESTMOD, call=self.doTraceback)
@unittest.skipIf(os_helper.TESTFN_UNENCODABLE is None,
"need an unencodable filename")
def testUnencodable(self):
filename = os_helper.TESTFN_UNENCODABLE + ".zip"
self.addCleanup(os_helper.unlink, filename)
with ZipFile(filename, "w") as z:
zinfo = ZipInfo(TESTMOD + ".py", time.localtime(NOW))
zinfo.compress_type = self.compression
z.writestr(zinfo, test_src)
self.makeZip({TESTMOD + ".py": test_src}, filename)
spec = zipimport.zipimporter(filename).find_spec(TESTMOD)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
def testBytesPath(self):
filename = os_helper.TESTFN + ".zip"
self.addCleanup(os_helper.unlink, filename)
with ZipFile(filename, "w") as z:
zinfo = ZipInfo(TESTMOD + ".py", time.localtime(NOW))
zinfo.compress_type = self.compression
z.writestr(zinfo, test_src)
self.makeZip({TESTMOD + ".py": test_src}, filename)
zipimport.zipimporter(filename)
zipimport.zipimporter(os.fsencode(filename))
with self.assertRaises(TypeError):
zipimport.zipimporter(os.fsencode(filename))
with self.assertRaises(TypeError):
zipimport.zipimporter(bytearray(os.fsencode(filename)))
with self.assertRaises(TypeError):
zipimport.zipimporter(memoryview(os.fsencode(filename)))
def testComment(self):
files = {TESTMOD + ".py": (NOW, test_src)}
files = {TESTMOD + ".py": test_src}
self.doTest(".py", files, TESTMOD, comment=b"comment")
def testBeginningCruftAndComment(self):
files = {TESTMOD + ".py": (NOW, test_src)}
files = {TESTMOD + ".py": test_src}
self.doTest(".py", files, TESTMOD, stuff=b"cruft" * 64, comment=b"hi")
def testLargestPossibleComment(self):
files = {TESTMOD + ".py": (NOW, test_src)}
files = {TESTMOD + ".py": test_src}
self.doTest(".py", files, TESTMOD, comment=b"c" * ((1 << 16) - 1))
def testZip64(self):
files = self.getZip64Files()
self.doTest(".py", files, "f6")
def testZip64CruftAndComment(self):
files = self.getZip64Files()
self.doTest(".py", files, "f65536", comment=b"c" * ((1 << 16) - 1))
@unittest.skip('TODO: RUSTPYTHON; (intermittent success/failures); ValueError: name="RustPython/crates/pylib/Lib/test/zipimport_data/sparse-zip64-c0-0x000000000.part" does not fit expected pattern.')
def testZip64LargeFile(self):
support.requires(
"largefile",
f"test generates files >{0xFFFFFFFF} bytes and takes a long time "
"to run"
)
# N.B.: We do a lot of gymnastics below in the ZIP_STORED case to save
# and reconstruct a sparse zip on systems that support sparse files.
# Instead of creating a ~8GB zip file mainly consisting of null bytes
# for every run of the test, we create the zip once and save off the
# non-null portions of the resulting file as data blobs with offsets
# that allow re-creating the zip file sparsely. This drops disk space
# usage to ~9KB for the ZIP_STORED case and drops that test time by ~2
# orders of magnitude. For the ZIP_DEFLATED case, however, we bite the
# bullet. The resulting zip file is ~8MB of non-null data; so the sparse
# trick doesn't work and would result in that full ~8MB zip data file
# being checked in to source control.
parts_glob = f"sparse-zip64-c{self.compression:d}-0x*.part"
full_parts_glob = os.path.join(TEST_DATA_DIR, parts_glob)
pre_built_zip_parts = glob.glob(full_parts_glob)
self.addCleanup(os_helper.unlink, TEMP_ZIP)
if not pre_built_zip_parts:
if self.compression != ZIP_STORED:
support.requires(
"cpu",
"test requires a lot of CPU for compression."
)
self.addCleanup(os_helper.unlink, os_helper.TESTFN)
with open(os_helper.TESTFN, "wb") as f:
f.write(b"data")
f.write(os.linesep.encode())
f.seek(0xffff_ffff, os.SEEK_CUR)
f.write(os.linesep.encode())
os.utime(os_helper.TESTFN, (0.0, 0.0))
with ZipFile(
TEMP_ZIP,
"w",
compression=self.compression,
strict_timestamps=False
) as z:
z.write(os_helper.TESTFN, "data1")
z.writestr(
ZipInfo("module.py", (1980, 1, 1, 0, 0, 0)), test_src
)
z.write(os_helper.TESTFN, "data2")
# This "works" but relies on the zip format having a non-empty
# final page due to the trailing central directory to wind up with
# the correct length file.
def make_sparse_zip_parts(name):
empty_page = b"\0" * 4096
with open(name, "rb") as f:
part = None
try:
while True:
offset = f.tell()
data = f.read(len(empty_page))
if not data:
break
if data != empty_page:
if not part:
part_fullname = os.path.join(
TEST_DATA_DIR,
f"sparse-zip64-c{self.compression:d}-"
f"{offset:#011x}.part",
)
os.makedirs(
os.path.dirname(part_fullname),
exist_ok=True
)
part = open(part_fullname, "wb")
print("Created", part_fullname)
part.write(data)
else:
if part:
part.close()
part = None
finally:
if part:
part.close()
if self.compression == ZIP_STORED:
print(f"Creating sparse parts to check in into {TEST_DATA_DIR}:")
make_sparse_zip_parts(TEMP_ZIP)
else:
def extract_offset(name):
if m := re.search(r"-(0x[0-9a-f]{9})\.part$", name):
return int(m.group(1), base=16)
raise ValueError(f"{name=} does not fit expected pattern.")
offset_parts = [(extract_offset(n), n) for n in pre_built_zip_parts]
with open(TEMP_ZIP, "wb") as f:
for offset, part_fn in sorted(offset_parts):
with open(part_fn, "rb") as part:
f.seek(offset, os.SEEK_SET)
f.write(part.read())
# Confirm that the reconstructed zip file works and looks right.
with ZipFile(TEMP_ZIP, "r") as z:
self.assertEqual(
z.getinfo("module.py").date_time, (1980, 1, 1, 0, 0, 0)
)
self.assertEqual(
z.read("module.py"), test_src.encode(),
msg=f"Recreate {full_parts_glob}, unexpected contents."
)
def assertDataEntry(name):
zinfo = z.getinfo(name)
self.assertEqual(zinfo.date_time, (1980, 1, 1, 0, 0, 0))
self.assertGreater(zinfo.file_size, 0xffff_ffff)
assertDataEntry("data1")
assertDataEntry("data2")
self.doTestWithPreBuiltZip(".py", "module")
@support.requires_zlib()
class CompressedZipImportTestCase(UncompressedZipImportTestCase):
@@ -801,6 +944,7 @@ class BadFileZipImportTestCase(unittest.TestCase):
os_helper.create_empty_file(TESTMOD)
self.assertZipFailure(TESTMOD)
@unittest.skipIf(support.is_wasi, "mode 000 not supported.")
def testFileUnreadable(self):
os_helper.unlink(TESTMOD)
fd = os.open(TESTMOD, os.O_CREAT, 000)
@@ -844,7 +988,6 @@ class BadFileZipImportTestCase(unittest.TestCase):
self.assertRaises(TypeError, z.get_source, None)
error = zipimport.ZipImportError
self.assertIsNone(z.find_module('abc'))
self.assertIsNone(z.find_spec('abc'))
with warnings.catch_warnings():

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -245,7 +245,7 @@ def is_zipfile(filename):
else:
with open(filename, "rb") as fp:
result = _check_zipfile(fp)
except OSError:
except (OSError, BadZipFile):
pass
return result
@@ -253,16 +253,15 @@ def _EndRecData64(fpin, offset, endrec):
"""
Read the ZIP64 end-of-archive records and use that to update endrec
"""
try:
fpin.seek(offset - sizeEndCentDir64Locator, 2)
except OSError:
# If the seek fails, the file is not large enough to contain a ZIP64
offset -= sizeEndCentDir64Locator
if offset < 0:
# The file is not large enough to contain a ZIP64
# end-of-archive record, so just return the end record we were given.
return endrec
fpin.seek(offset)
data = fpin.read(sizeEndCentDir64Locator)
if len(data) != sizeEndCentDir64Locator:
return endrec
raise OSError("Unknown I/O error")
sig, diskno, reloff, disks = struct.unpack(structEndArchive64Locator, data)
if sig != stringEndArchive64Locator:
return endrec
@@ -270,16 +269,33 @@ def _EndRecData64(fpin, offset, endrec):
if diskno != 0 or disks > 1:
raise BadZipFile("zipfiles that span multiple disks are not supported")
# Assume no 'zip64 extensible data'
fpin.seek(offset - sizeEndCentDir64Locator - sizeEndCentDir64, 2)
offset -= sizeEndCentDir64
if reloff > offset:
raise BadZipFile("Corrupt zip64 end of central directory locator")
# First, check the assumption that there is no prepended data.
fpin.seek(reloff)
extrasz = offset - reloff
data = fpin.read(sizeEndCentDir64)
if len(data) != sizeEndCentDir64:
return endrec
raise OSError("Unknown I/O error")
if not data.startswith(stringEndArchive64) and reloff != offset:
# Since we already have seen the Zip64 EOCD Locator, it's
# possible we got here because there is prepended data.
# Assume no 'zip64 extensible data'
fpin.seek(offset)
extrasz = 0
data = fpin.read(sizeEndCentDir64)
if len(data) != sizeEndCentDir64:
raise OSError("Unknown I/O error")
if not data.startswith(stringEndArchive64):
raise BadZipFile("Zip64 end of central directory record not found")
sig, sz, create_version, read_version, disk_num, disk_dir, \
dircount, dircount2, dirsize, diroffset = \
struct.unpack(structEndArchive64, data)
if sig != stringEndArchive64:
return endrec
if (diroffset + dirsize != reloff or
sz + 12 != sizeEndCentDir64 + extrasz):
raise BadZipFile("Corrupt zip64 end of central directory record")
# Update the original endrec using data from the ZIP64 record
endrec[_ECD_SIGNATURE] = sig
@@ -289,6 +305,7 @@ def _EndRecData64(fpin, offset, endrec):
endrec[_ECD_ENTRIES_TOTAL] = dircount2
endrec[_ECD_SIZE] = dirsize
endrec[_ECD_OFFSET] = diroffset
endrec[_ECD_LOCATION] = offset - extrasz
return endrec
@@ -322,7 +339,7 @@ def _EndRecData(fpin):
endrec.append(filesize - sizeEndCentDir)
# Try to read the "Zip64 end of central directory" structure
return _EndRecData64(fpin, -sizeEndCentDir, endrec)
return _EndRecData64(fpin, filesize - sizeEndCentDir, endrec)
# Either this is not a ZIP file, or it is a ZIP file with an archive
# comment. Search the end of the file for the "end of central directory"
@@ -346,8 +363,7 @@ def _EndRecData(fpin):
endrec.append(maxCommentStart + start)
# Try to read the "Zip64 end of central directory" structure
return _EndRecData64(fpin, maxCommentStart + start - filesize,
endrec)
return _EndRecData64(fpin, maxCommentStart + start, endrec)
# Unable to find a valid end of central directory structure
return None
@@ -1458,9 +1474,6 @@ class ZipFile:
# "concat" is zero, unless zip was concatenated to another file
concat = endrec[_ECD_LOCATION] - size_cd - offset_cd
if endrec[_ECD_SIGNATURE] == stringEndArchive64:
# If Zip64 extension structures are present, account for them
concat -= (sizeEndCentDir64 + sizeEndCentDir64Locator)
if self.debug > 2:
inferred = concat + offset_cd
@@ -2082,7 +2095,7 @@ class ZipFile:
" would require ZIP64 extensions")
zip64endrec = struct.pack(
structEndArchive64, stringEndArchive64,
44, 45, 45, 0, 0, centDirCount, centDirCount,
sizeEndCentDir64 - 12, 45, 45, 0, 0, centDirCount, centDirCount,
centDirSize, centDirOffset)
self.fp.write(zip64endrec)

391
Lib/zipimport.py vendored
View File

@@ -1,11 +1,9 @@
"""zipimport provides support for importing Python modules from Zip archives.
This module exports three objects:
This module exports two objects:
- zipimporter: a class; its constructor takes a path to a Zip archive.
- ZipImportError: exception raised by zipimporter objects. It's a
subclass of ImportError, so it can be caught as ImportError, too.
- _zip_directory_cache: a dict, mapping archive paths to zip directory
info dicts, as used in zipimporter._files.
It is usually not needed to use the zipimport module explicitly; it is
used by the builtin import mechanism for sys.path items that are paths
@@ -15,7 +13,7 @@ to Zip archives.
#from importlib import _bootstrap_external
#from importlib import _bootstrap # for _verbose_message
import _frozen_importlib_external as _bootstrap_external
from _frozen_importlib_external import _unpack_uint16, _unpack_uint32
from _frozen_importlib_external import _unpack_uint16, _unpack_uint32, _unpack_uint64
import _frozen_importlib as _bootstrap # for _verbose_message
import _imp # for check_hash_based_pycs
import _io # for open
@@ -40,8 +38,14 @@ _zip_directory_cache = {}
_module_type = type(sys)
END_CENTRAL_DIR_SIZE = 22
STRING_END_ARCHIVE = b'PK\x05\x06'
END_CENTRAL_DIR_SIZE_64 = 56
END_CENTRAL_DIR_LOCATOR_SIZE_64 = 20
STRING_END_ARCHIVE = b'PK\x05\x06' # standard EOCD signature
STRING_END_LOCATOR_64 = b'PK\x06\x07' # Zip64 EOCD Locator signature
STRING_END_ZIP_64 = b'PK\x06\x06' # Zip64 EOCD signature
MAX_COMMENT_LEN = (1 << 16) - 1
MAX_UINT32 = 0xffffffff
ZIP64_EXTRA_TAG = 0x1
class zipimporter(_bootstrap_external._LoaderBasics):
"""zipimporter(archivepath) -> zipimporter object
@@ -63,8 +67,7 @@ class zipimporter(_bootstrap_external._LoaderBasics):
# if found, or else read it from the archive.
def __init__(self, path):
if not isinstance(path, str):
import os
path = os.fsdecode(path)
raise TypeError(f"expected str, not {type(path)!r}")
if not path:
raise ZipImportError('archive path is empty', path=path)
if alt_path_sep:
@@ -89,12 +92,8 @@ class zipimporter(_bootstrap_external._LoaderBasics):
raise ZipImportError('not a Zip file', path=path)
break
try:
files = _zip_directory_cache[path]
except KeyError:
files = _read_directory(path)
_zip_directory_cache[path] = files
self._files = files
if path not in _zip_directory_cache:
_zip_directory_cache[path] = _read_directory(path)
self.archive = path
# a prefix directory following the ZIP file path.
self.prefix = _bootstrap_external._path_join(*prefix[::-1])
@@ -102,64 +101,6 @@ class zipimporter(_bootstrap_external._LoaderBasics):
self.prefix += path_sep
# Check whether we can satisfy the import of the module named by
# 'fullname', or whether it could be a portion of a namespace
# package. Return self if we can load it, a string containing the
# full path if it's a possible namespace portion, None if we
# can't load it.
def find_loader(self, fullname, path=None):
"""find_loader(fullname, path=None) -> self, str or None.
Search for a module specified by 'fullname'. 'fullname' must be the
fully qualified (dotted) module name. It returns the zipimporter
instance itself if the module was found, a string containing the
full path name if it's possibly a portion of a namespace package,
or None otherwise. The optional 'path' argument is ignored -- it's
there for compatibility with the importer protocol.
Deprecated since Python 3.10. Use find_spec() instead.
"""
_warnings.warn("zipimporter.find_loader() is deprecated and slated for "
"removal in Python 3.12; use find_spec() instead",
DeprecationWarning)
mi = _get_module_info(self, fullname)
if mi is not None:
# This is a module or package.
return self, []
# Not a module or regular package. See if this is a directory, and
# therefore possibly a portion of a namespace package.
# We're only interested in the last path component of fullname
# earlier components are recorded in self.prefix.
modpath = _get_module_path(self, fullname)
if _is_dir(self, modpath):
# This is possibly a portion of a namespace
# package. Return the string representing its path,
# without a trailing separator.
return None, [f'{self.archive}{path_sep}{modpath}']
return None, []
# Check whether we can satisfy the import of the module named by
# 'fullname'. Return self if we can, None if we can't.
def find_module(self, fullname, path=None):
"""find_module(fullname, path=None) -> self or None.
Search for a module specified by 'fullname'. 'fullname' must be the
fully qualified (dotted) module name. It returns the zipimporter
instance itself if the module was found, or None if it wasn't.
The optional 'path' argument is ignored -- it's there for compatibility
with the importer protocol.
Deprecated since Python 3.10. Use find_spec() instead.
"""
_warnings.warn("zipimporter.find_module() is deprecated and slated for "
"removal in Python 3.12; use find_spec() instead",
DeprecationWarning)
return self.find_loader(fullname, path)[0]
def find_spec(self, fullname, target=None):
"""Create a ModuleSpec for the specified module.
@@ -211,7 +152,7 @@ class zipimporter(_bootstrap_external._LoaderBasics):
key = pathname[len(self.archive + path_sep):]
try:
toc_entry = self._files[key]
toc_entry = self._get_files()[key]
except KeyError:
raise OSError(0, '', key)
return _get_data(self.archive, toc_entry)
@@ -248,7 +189,7 @@ class zipimporter(_bootstrap_external._LoaderBasics):
fullpath = f'{path}.py'
try:
toc_entry = self._files[fullpath]
toc_entry = self._get_files()[fullpath]
except KeyError:
# we have the module, but no source
return None
@@ -313,28 +254,28 @@ class zipimporter(_bootstrap_external._LoaderBasics):
def get_resource_reader(self, fullname):
"""Return the ResourceReader for a package in a zip file.
If 'fullname' is a package within the zip file, return the
'ResourceReader' object for the package. Otherwise return None.
"""
try:
if not self.is_package(fullname):
return None
except ZipImportError:
return None
"""Return the ResourceReader for a module in a zip file."""
from importlib.readers import ZipReader
return ZipReader(self, fullname)
def invalidate_caches(self):
"""Reload the file data of the archive path."""
def _get_files(self):
"""Return the files within the archive path."""
try:
self._files = _read_directory(self.archive)
_zip_directory_cache[self.archive] = self._files
except ZipImportError:
_zip_directory_cache.pop(self.archive, None)
self._files = {}
files = _zip_directory_cache[self.archive]
except KeyError:
try:
files = _zip_directory_cache[self.archive] = _read_directory(self.archive)
except ZipImportError:
files = {}
return files
def invalidate_caches(self):
"""Invalidates the cache of file data of the archive path."""
_zip_directory_cache.pop(self.archive, None)
def __repr__(self):
@@ -364,15 +305,15 @@ def _is_dir(self, path):
# of a namespace package. We test by seeing if the name, with an
# appended path separator, exists.
dirpath = path + path_sep
# If dirpath is present in self._files, we have a directory.
return dirpath in self._files
# If dirpath is present in self._get_files(), we have a directory.
return dirpath in self._get_files()
# Return some information about a module.
def _get_module_info(self, fullname):
path = _get_module_path(self, fullname)
for suffix, isbytecode, ispackage in _zip_searchorder:
fullpath = path + suffix
if fullpath in self._files:
if fullpath in self._get_files():
return ispackage
return None
@@ -406,16 +347,11 @@ def _read_directory(archive):
raise ZipImportError(f"can't open Zip file: {archive!r}", path=archive)
with fp:
# GH-87235: On macOS all file descriptors for /dev/fd/N share the same
# file offset, reset the file offset after scanning the zipfile directory
# to not cause problems when some runs 'python3 /dev/fd/9 9<some_script'
start_offset = fp.tell()
try:
fp.seek(-END_CENTRAL_DIR_SIZE, 2)
header_position = fp.tell()
buffer = fp.read(END_CENTRAL_DIR_SIZE)
except OSError:
raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
if len(buffer) != END_CENTRAL_DIR_SIZE:
raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
if buffer[:4] != STRING_END_ARCHIVE:
# Bad: End of Central Dir signature
# Check if there's a comment.
try:
fp.seek(0, 2)
@@ -423,97 +359,192 @@ def _read_directory(archive):
except OSError:
raise ZipImportError(f"can't read Zip file: {archive!r}",
path=archive)
max_comment_start = max(file_size - MAX_COMMENT_LEN -
END_CENTRAL_DIR_SIZE, 0)
max_comment_plus_dirs_size = (
MAX_COMMENT_LEN + END_CENTRAL_DIR_SIZE +
END_CENTRAL_DIR_SIZE_64 + END_CENTRAL_DIR_LOCATOR_SIZE_64)
max_comment_start = max(file_size - max_comment_plus_dirs_size, 0)
try:
fp.seek(max_comment_start)
data = fp.read()
data = fp.read(max_comment_plus_dirs_size)
except OSError:
raise ZipImportError(f"can't read Zip file: {archive!r}",
path=archive)
pos = data.rfind(STRING_END_ARCHIVE)
if pos < 0:
pos64 = data.rfind(STRING_END_ZIP_64)
if (pos64 >= 0 and pos64+END_CENTRAL_DIR_SIZE_64+END_CENTRAL_DIR_LOCATOR_SIZE_64==pos):
# Zip64 at "correct" offset from standard EOCD
buffer = data[pos64:pos64 + END_CENTRAL_DIR_SIZE_64]
if len(buffer) != END_CENTRAL_DIR_SIZE_64:
raise ZipImportError(
f"corrupt Zip64 file: Expected {END_CENTRAL_DIR_SIZE_64} byte "
f"zip64 central directory, but read {len(buffer)} bytes.",
path=archive)
header_position = file_size - len(data) + pos64
central_directory_size = _unpack_uint64(buffer[40:48])
central_directory_position = _unpack_uint64(buffer[48:56])
num_entries = _unpack_uint64(buffer[24:32])
elif pos >= 0:
buffer = data[pos:pos+END_CENTRAL_DIR_SIZE]
if len(buffer) != END_CENTRAL_DIR_SIZE:
raise ZipImportError(f"corrupt Zip file: {archive!r}",
path=archive)
header_position = file_size - len(data) + pos
# Buffer now contains a valid EOCD, and header_position gives the
# starting position of it.
central_directory_size = _unpack_uint32(buffer[12:16])
central_directory_position = _unpack_uint32(buffer[16:20])
num_entries = _unpack_uint16(buffer[8:10])
# N.b. if someday you want to prefer the standard (non-zip64) EOCD,
# you need to adjust position by 76 for arc to be 0.
else:
raise ZipImportError(f'not a Zip file: {archive!r}',
path=archive)
buffer = data[pos:pos+END_CENTRAL_DIR_SIZE]
if len(buffer) != END_CENTRAL_DIR_SIZE:
raise ZipImportError(f"corrupt Zip file: {archive!r}",
path=archive)
header_position = file_size - len(data) + pos
header_size = _unpack_uint32(buffer[12:16])
header_offset = _unpack_uint32(buffer[16:20])
if header_position < header_size:
raise ZipImportError(f'bad central directory size: {archive!r}', path=archive)
if header_position < header_offset:
raise ZipImportError(f'bad central directory offset: {archive!r}', path=archive)
header_position -= header_size
arc_offset = header_position - header_offset
if arc_offset < 0:
raise ZipImportError(f'bad central directory size or offset: {archive!r}', path=archive)
files = {}
# Start of Central Directory
count = 0
try:
fp.seek(header_position)
except OSError:
raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
while True:
buffer = fp.read(46)
if len(buffer) < 4:
raise EOFError('EOF read where not expected')
# Start of file header
if buffer[:4] != b'PK\x01\x02':
break # Bad: Central Dir File Header
if len(buffer) != 46:
raise EOFError('EOF read where not expected')
flags = _unpack_uint16(buffer[8:10])
compress = _unpack_uint16(buffer[10:12])
time = _unpack_uint16(buffer[12:14])
date = _unpack_uint16(buffer[14:16])
crc = _unpack_uint32(buffer[16:20])
data_size = _unpack_uint32(buffer[20:24])
file_size = _unpack_uint32(buffer[24:28])
name_size = _unpack_uint16(buffer[28:30])
extra_size = _unpack_uint16(buffer[30:32])
comment_size = _unpack_uint16(buffer[32:34])
file_offset = _unpack_uint32(buffer[42:46])
header_size = name_size + extra_size + comment_size
if file_offset > header_offset:
raise ZipImportError(f'bad local header offset: {archive!r}', path=archive)
file_offset += arc_offset
# Buffer now contains a valid EOCD, and header_position gives the
# starting position of it.
# XXX: These are cursory checks but are not as exact or strict as they
# could be. Checking the arc-adjusted value is probably good too.
if header_position < central_directory_size:
raise ZipImportError(f'bad central directory size: {archive!r}', path=archive)
if header_position < central_directory_position:
raise ZipImportError(f'bad central directory offset: {archive!r}', path=archive)
header_position -= central_directory_size
# On just-a-zipfile these values are the same and arc_offset is zero; if
# the file has some bytes prepended, `arc_offset` is the number of such
# bytes. This is used for pex as well as self-extracting .exe.
arc_offset = header_position - central_directory_position
if arc_offset < 0:
raise ZipImportError(f'bad central directory size or offset: {archive!r}', path=archive)
files = {}
# Start of Central Directory
count = 0
try:
name = fp.read(name_size)
except OSError:
raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
if len(name) != name_size:
raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
# On Windows, calling fseek to skip over the fields we don't use is
# slower than reading the data because fseek flushes stdio's
# internal buffers. See issue #8745.
try:
if len(fp.read(header_size - name_size)) != header_size - name_size:
raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
fp.seek(header_position)
except OSError:
raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
while True:
buffer = fp.read(46)
if len(buffer) < 4:
raise EOFError('EOF read where not expected')
# Start of file header
if buffer[:4] != b'PK\x01\x02':
if count != num_entries:
raise ZipImportError(
f"mismatched num_entries: {count} should be {num_entries} in {archive!r}",
path=archive,
)
break # Bad: Central Dir File Header
if len(buffer) != 46:
raise EOFError('EOF read where not expected')
flags = _unpack_uint16(buffer[8:10])
compress = _unpack_uint16(buffer[10:12])
time = _unpack_uint16(buffer[12:14])
date = _unpack_uint16(buffer[14:16])
crc = _unpack_uint32(buffer[16:20])
data_size = _unpack_uint32(buffer[20:24])
file_size = _unpack_uint32(buffer[24:28])
name_size = _unpack_uint16(buffer[28:30])
extra_size = _unpack_uint16(buffer[30:32])
comment_size = _unpack_uint16(buffer[32:34])
file_offset = _unpack_uint32(buffer[42:46])
header_size = name_size + extra_size + comment_size
if flags & 0x800:
# UTF-8 file names extension
name = name.decode()
else:
# Historical ZIP filename encoding
try:
name = name.decode('ascii')
except UnicodeDecodeError:
name = name.decode('latin1').translate(cp437_table)
name = fp.read(name_size)
except OSError:
raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
if len(name) != name_size:
raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
# On Windows, calling fseek to skip over the fields we don't use is
# slower than reading the data because fseek flushes stdio's
# internal buffers. See issue #8745.
try:
extra_data_len = header_size - name_size
extra_data = memoryview(fp.read(extra_data_len))
name = name.replace('/', path_sep)
path = _bootstrap_external._path_join(archive, name)
t = (path, compress, data_size, file_size, file_offset, time, date, crc)
files[name] = t
count += 1
if len(extra_data) != extra_data_len:
raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
except OSError:
raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
if flags & 0x800:
# UTF-8 file names extension
name = name.decode()
else:
# Historical ZIP filename encoding
try:
name = name.decode('ascii')
except UnicodeDecodeError:
name = name.decode('latin1').translate(cp437_table)
name = name.replace('/', path_sep)
path = _bootstrap_external._path_join(archive, name)
# Ordering matches unpacking below.
if (
file_size == MAX_UINT32 or
data_size == MAX_UINT32 or
file_offset == MAX_UINT32
):
# need to decode extra_data looking for a zip64 extra (which might not
# be present)
while extra_data:
if len(extra_data) < 4:
raise ZipImportError(f"can't read header extra: {archive!r}", path=archive)
tag = _unpack_uint16(extra_data[:2])
size = _unpack_uint16(extra_data[2:4])
if len(extra_data) < 4 + size:
raise ZipImportError(f"can't read header extra: {archive!r}", path=archive)
if tag == ZIP64_EXTRA_TAG:
if (len(extra_data) - 4) % 8 != 0:
raise ZipImportError(f"can't read header extra: {archive!r}", path=archive)
num_extra_values = (len(extra_data) - 4) // 8
if num_extra_values > 3:
raise ZipImportError(f"can't read header extra: {archive!r}", path=archive)
import struct
values = list(struct.unpack_from(f"<{min(num_extra_values, 3)}Q",
extra_data, offset=4))
# N.b. Here be dragons: the ordering of these is different than
# the header fields, and it's really easy to get it wrong since
# naturally-occuring zips that use all 3 are >4GB
if file_size == MAX_UINT32:
file_size = values.pop(0)
if data_size == MAX_UINT32:
data_size = values.pop(0)
if file_offset == MAX_UINT32:
file_offset = values.pop(0)
break
# For a typical zip, this bytes-slicing only happens 2-3 times, on
# small data like timestamps and filesizes.
extra_data = extra_data[4+size:]
else:
_bootstrap._verbose_message(
"zipimport: suspected zip64 but no zip64 extra for {!r}",
path,
)
# XXX These two statements seem swapped because `central_directory_position`
# is a position within the actual file, but `file_offset` (when compared) is
# as encoded in the entry, not adjusted for this file.
# N.b. this must be after we've potentially read the zip64 extra which can
# change `file_offset`.
if file_offset > central_directory_position:
raise ZipImportError(f'bad local header offset: {archive!r}', path=archive)
file_offset += arc_offset
t = (path, compress, data_size, file_size, file_offset, time, date, crc)
files[name] = t
count += 1
finally:
fp.seek(start_offset)
_bootstrap._verbose_message('zipimport: found {} names in {!r}', count, archive)
return files
@@ -708,7 +739,7 @@ def _get_mtime_and_size_of_source(self, path):
# strip 'c' or 'o' from *.py[co]
assert path[-1:] in ('c', 'o')
path = path[:-1]
toc_entry = self._files[path]
toc_entry = self._get_files()[path]
# fetch the time stamp of the .py file for comparison
# with an embedded pyc time stamp
time = toc_entry[5]
@@ -728,7 +759,7 @@ def _get_pyc_source(self, path):
path = path[:-1]
try:
toc_entry = self._files[path]
toc_entry = self._get_files()[path]
except KeyError:
return None
else:
@@ -744,7 +775,7 @@ def _get_module_code(self, fullname):
fullpath = path + suffix
_bootstrap._verbose_message('trying {}{}{}', self.archive, path_sep, fullpath, verbosity=2)
try:
toc_entry = self._files[fullpath]
toc_entry = self._get_files()[fullpath]
except KeyError:
pass
else: