mirror of
https://github.com/RustPython/RustPython.git
synced 2026-06-02 19:39:49 +09:00
Update zipfile to 3.13.5 (#6069)
* Add Lib/test/archiver_tests.py @ 3.13.5 Needed for updated zipfile tests. * Update zipfile to 3.13.5 Notes: - I have to skip some brand new tests due to shift_jis encoding not being supported - `test_write_filtered_python_package` marked as `expectedFailure` with "AttributeError: module 'os' has no attribute 'supports_effective_ids'" - I didn't want to do a partial or full update to os module in this PR --------- Co-authored-by: Jack O'Connor <jack@jackoconnor.dev>
This commit is contained in:
177
Lib/test/archiver_tests.py
vendored
Normal file
177
Lib/test/archiver_tests.py
vendored
Normal file
@@ -0,0 +1,177 @@
|
||||
"""Tests common to tarfile and zipfile."""
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
from test.support import swap_attr
|
||||
from test.support import os_helper
|
||||
|
||||
class OverwriteTests:
|
||||
|
||||
def setUp(self):
|
||||
os.makedirs(self.testdir)
|
||||
self.addCleanup(os_helper.rmtree, self.testdir)
|
||||
|
||||
def create_file(self, path, content=b''):
|
||||
with open(path, 'wb') as f:
|
||||
f.write(content)
|
||||
|
||||
def open(self, path):
|
||||
raise NotImplementedError
|
||||
|
||||
def extractall(self, ar):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
def test_overwrite_file_as_file(self):
|
||||
target = os.path.join(self.testdir, 'test')
|
||||
self.create_file(target, b'content')
|
||||
with self.open(self.ar_with_file) as ar:
|
||||
self.extractall(ar)
|
||||
self.assertTrue(os.path.isfile(target))
|
||||
with open(target, 'rb') as f:
|
||||
self.assertEqual(f.read(), b'newcontent')
|
||||
|
||||
def test_overwrite_dir_as_dir(self):
|
||||
target = os.path.join(self.testdir, 'test')
|
||||
os.mkdir(target)
|
||||
with self.open(self.ar_with_dir) as ar:
|
||||
self.extractall(ar)
|
||||
self.assertTrue(os.path.isdir(target))
|
||||
|
||||
def test_overwrite_dir_as_implicit_dir(self):
|
||||
target = os.path.join(self.testdir, 'test')
|
||||
os.mkdir(target)
|
||||
with self.open(self.ar_with_implicit_dir) as ar:
|
||||
self.extractall(ar)
|
||||
self.assertTrue(os.path.isdir(target))
|
||||
self.assertTrue(os.path.isfile(os.path.join(target, 'file')))
|
||||
with open(os.path.join(target, 'file'), 'rb') as f:
|
||||
self.assertEqual(f.read(), b'newcontent')
|
||||
|
||||
def test_overwrite_dir_as_file(self):
|
||||
target = os.path.join(self.testdir, 'test')
|
||||
os.mkdir(target)
|
||||
with self.open(self.ar_with_file) as ar:
|
||||
with self.assertRaises(PermissionError if sys.platform == 'win32'
|
||||
else IsADirectoryError):
|
||||
self.extractall(ar)
|
||||
self.assertTrue(os.path.isdir(target))
|
||||
|
||||
def test_overwrite_file_as_dir(self):
|
||||
target = os.path.join(self.testdir, 'test')
|
||||
self.create_file(target, b'content')
|
||||
with self.open(self.ar_with_dir) as ar:
|
||||
with self.assertRaises(FileExistsError):
|
||||
self.extractall(ar)
|
||||
self.assertTrue(os.path.isfile(target))
|
||||
with open(target, 'rb') as f:
|
||||
self.assertEqual(f.read(), b'content')
|
||||
|
||||
def test_overwrite_file_as_implicit_dir(self):
|
||||
target = os.path.join(self.testdir, 'test')
|
||||
self.create_file(target, b'content')
|
||||
with self.open(self.ar_with_implicit_dir) as ar:
|
||||
with self.assertRaises(FileNotFoundError if sys.platform == 'win32'
|
||||
else NotADirectoryError):
|
||||
self.extractall(ar)
|
||||
self.assertTrue(os.path.isfile(target))
|
||||
with open(target, 'rb') as f:
|
||||
self.assertEqual(f.read(), b'content')
|
||||
|
||||
@os_helper.skip_unless_symlink
|
||||
def test_overwrite_file_symlink_as_file(self):
|
||||
# XXX: It is potential security vulnerability.
|
||||
target = os.path.join(self.testdir, 'test')
|
||||
target2 = os.path.join(self.testdir, 'test2')
|
||||
self.create_file(target2, b'content')
|
||||
os.symlink('test2', target)
|
||||
with self.open(self.ar_with_file) as ar:
|
||||
self.extractall(ar)
|
||||
self.assertTrue(os.path.islink(target))
|
||||
self.assertTrue(os.path.isfile(target2))
|
||||
with open(target2, 'rb') as f:
|
||||
self.assertEqual(f.read(), b'newcontent')
|
||||
|
||||
@os_helper.skip_unless_symlink
|
||||
def test_overwrite_broken_file_symlink_as_file(self):
|
||||
# XXX: It is potential security vulnerability.
|
||||
target = os.path.join(self.testdir, 'test')
|
||||
target2 = os.path.join(self.testdir, 'test2')
|
||||
os.symlink('test2', target)
|
||||
with self.open(self.ar_with_file) as ar:
|
||||
self.extractall(ar)
|
||||
self.assertTrue(os.path.islink(target))
|
||||
self.assertTrue(os.path.isfile(target2))
|
||||
with open(target2, 'rb') as f:
|
||||
self.assertEqual(f.read(), b'newcontent')
|
||||
|
||||
@os_helper.skip_unless_symlink
|
||||
def test_overwrite_dir_symlink_as_dir(self):
|
||||
# XXX: It is potential security vulnerability.
|
||||
target = os.path.join(self.testdir, 'test')
|
||||
target2 = os.path.join(self.testdir, 'test2')
|
||||
os.mkdir(target2)
|
||||
os.symlink('test2', target, target_is_directory=True)
|
||||
with self.open(self.ar_with_dir) as ar:
|
||||
self.extractall(ar)
|
||||
self.assertTrue(os.path.islink(target))
|
||||
self.assertTrue(os.path.isdir(target2))
|
||||
|
||||
@os_helper.skip_unless_symlink
|
||||
def test_overwrite_dir_symlink_as_implicit_dir(self):
|
||||
# XXX: It is potential security vulnerability.
|
||||
target = os.path.join(self.testdir, 'test')
|
||||
target2 = os.path.join(self.testdir, 'test2')
|
||||
os.mkdir(target2)
|
||||
os.symlink('test2', target, target_is_directory=True)
|
||||
with self.open(self.ar_with_implicit_dir) as ar:
|
||||
self.extractall(ar)
|
||||
self.assertTrue(os.path.islink(target))
|
||||
self.assertTrue(os.path.isdir(target2))
|
||||
self.assertTrue(os.path.isfile(os.path.join(target2, 'file')))
|
||||
with open(os.path.join(target2, 'file'), 'rb') as f:
|
||||
self.assertEqual(f.read(), b'newcontent')
|
||||
|
||||
@os_helper.skip_unless_symlink
|
||||
def test_overwrite_broken_dir_symlink_as_dir(self):
|
||||
target = os.path.join(self.testdir, 'test')
|
||||
target2 = os.path.join(self.testdir, 'test2')
|
||||
os.symlink('test2', target, target_is_directory=True)
|
||||
with self.open(self.ar_with_dir) as ar:
|
||||
with self.assertRaises(FileExistsError):
|
||||
self.extractall(ar)
|
||||
self.assertTrue(os.path.islink(target))
|
||||
self.assertFalse(os.path.exists(target2))
|
||||
|
||||
@os_helper.skip_unless_symlink
|
||||
def test_overwrite_broken_dir_symlink_as_implicit_dir(self):
|
||||
target = os.path.join(self.testdir, 'test')
|
||||
target2 = os.path.join(self.testdir, 'test2')
|
||||
os.symlink('test2', target, target_is_directory=True)
|
||||
with self.open(self.ar_with_implicit_dir) as ar:
|
||||
with self.assertRaises(FileExistsError):
|
||||
self.extractall(ar)
|
||||
self.assertTrue(os.path.islink(target))
|
||||
self.assertFalse(os.path.exists(target2))
|
||||
|
||||
def test_concurrent_extract_dir(self):
|
||||
target = os.path.join(self.testdir, 'test')
|
||||
def concurrent_mkdir(*args, **kwargs):
|
||||
orig_mkdir(*args, **kwargs)
|
||||
orig_mkdir(*args, **kwargs)
|
||||
with swap_attr(os, 'mkdir', concurrent_mkdir) as orig_mkdir:
|
||||
with self.open(self.ar_with_dir) as ar:
|
||||
self.extractall(ar)
|
||||
self.assertTrue(os.path.isdir(target))
|
||||
|
||||
def test_concurrent_extract_implicit_dir(self):
|
||||
target = os.path.join(self.testdir, 'test')
|
||||
def concurrent_mkdir(*args, **kwargs):
|
||||
orig_mkdir(*args, **kwargs)
|
||||
orig_mkdir(*args, **kwargs)
|
||||
with swap_attr(os, 'mkdir', concurrent_mkdir) as orig_mkdir:
|
||||
with self.open(self.ar_with_implicit_dir) as ar:
|
||||
self.extractall(ar)
|
||||
self.assertTrue(os.path.isdir(target))
|
||||
self.assertTrue(os.path.isfile(os.path.join(target, 'file')))
|
||||
5
Lib/test/test_zipfile/__init__.py
vendored
Normal file
5
Lib/test/test_zipfile/__init__.py
vendored
Normal file
@@ -0,0 +1,5 @@
|
||||
import os
|
||||
from test.support import load_package_tests
|
||||
|
||||
def load_tests(*args):
|
||||
return load_package_tests(os.path.dirname(__file__), *args)
|
||||
7
Lib/test/test_zipfile/__main__.py
vendored
Normal file
7
Lib/test/test_zipfile/__main__.py
vendored
Normal file
@@ -0,0 +1,7 @@
|
||||
import unittest
|
||||
|
||||
from . import load_tests # noqa: F401
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
0
Lib/test/test_zipfile/_path/__init__.py
vendored
Normal file
0
Lib/test/test_zipfile/_path/__init__.py
vendored
Normal file
9
Lib/test/test_zipfile/_path/_functools.py
vendored
Normal file
9
Lib/test/test_zipfile/_path/_functools.py
vendored
Normal file
@@ -0,0 +1,9 @@
|
||||
import functools
|
||||
|
||||
|
||||
# from jaraco.functools 3.5.2
|
||||
def compose(*funcs):
|
||||
def compose_two(f1, f2):
|
||||
return lambda *args, **kwargs: f1(f2(*args, **kwargs))
|
||||
|
||||
return functools.reduce(compose_two, funcs)
|
||||
79
Lib/test/test_zipfile/_path/_itertools.py
vendored
Normal file
79
Lib/test/test_zipfile/_path/_itertools.py
vendored
Normal file
@@ -0,0 +1,79 @@
|
||||
import itertools
|
||||
from collections import deque
|
||||
from itertools import islice
|
||||
|
||||
|
||||
# from jaraco.itertools 6.3.0
|
||||
class Counter:
|
||||
"""
|
||||
Wrap an iterable in an object that stores the count of items
|
||||
that pass through it.
|
||||
|
||||
>>> items = Counter(range(20))
|
||||
>>> items.count
|
||||
0
|
||||
>>> values = list(items)
|
||||
>>> items.count
|
||||
20
|
||||
"""
|
||||
|
||||
def __init__(self, i):
|
||||
self.count = 0
|
||||
self.iter = zip(itertools.count(1), i)
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
def __next__(self):
|
||||
self.count, result = next(self.iter)
|
||||
return result
|
||||
|
||||
|
||||
# from more_itertools v8.13.0
|
||||
def always_iterable(obj, base_type=(str, bytes)):
|
||||
if obj is None:
|
||||
return iter(())
|
||||
|
||||
if (base_type is not None) and isinstance(obj, base_type):
|
||||
return iter((obj,))
|
||||
|
||||
try:
|
||||
return iter(obj)
|
||||
except TypeError:
|
||||
return iter((obj,))
|
||||
|
||||
|
||||
# from more_itertools v9.0.0
|
||||
def consume(iterator, n=None):
|
||||
"""Advance *iterable* by *n* steps. If *n* is ``None``, consume it
|
||||
entirely.
|
||||
Efficiently exhausts an iterator without returning values. Defaults to
|
||||
consuming the whole iterator, but an optional second argument may be
|
||||
provided to limit consumption.
|
||||
>>> i = (x for x in range(10))
|
||||
>>> next(i)
|
||||
0
|
||||
>>> consume(i, 3)
|
||||
>>> next(i)
|
||||
4
|
||||
>>> consume(i)
|
||||
>>> next(i)
|
||||
Traceback (most recent call last):
|
||||
File "<stdin>", line 1, in <module>
|
||||
StopIteration
|
||||
If the iterator has fewer items remaining than the provided limit, the
|
||||
whole iterator will be consumed.
|
||||
>>> i = (x for x in range(3))
|
||||
>>> consume(i, 5)
|
||||
>>> next(i)
|
||||
Traceback (most recent call last):
|
||||
File "<stdin>", line 1, in <module>
|
||||
StopIteration
|
||||
"""
|
||||
# Use functions that consume iterators at C speed.
|
||||
if n is None:
|
||||
# feed the entire iterator into a zero-length deque
|
||||
deque(iterator, maxlen=0)
|
||||
else:
|
||||
# advance to the empty slice starting at position n
|
||||
next(islice(iterator, n, n), None)
|
||||
9
Lib/test/test_zipfile/_path/_support.py
vendored
Normal file
9
Lib/test/test_zipfile/_path/_support.py
vendored
Normal file
@@ -0,0 +1,9 @@
|
||||
import importlib
|
||||
import unittest
|
||||
|
||||
|
||||
def import_or_skip(name):
|
||||
try:
|
||||
return importlib.import_module(name)
|
||||
except ImportError: # pragma: no cover
|
||||
raise unittest.SkipTest(f'Unable to import {name}')
|
||||
39
Lib/test/test_zipfile/_path/_test_params.py
vendored
Normal file
39
Lib/test/test_zipfile/_path/_test_params.py
vendored
Normal file
@@ -0,0 +1,39 @@
|
||||
import functools
|
||||
import types
|
||||
|
||||
from ._itertools import always_iterable
|
||||
|
||||
|
||||
def parameterize(names, value_groups):
|
||||
"""
|
||||
Decorate a test method to run it as a set of subtests.
|
||||
|
||||
Modeled after pytest.parametrize.
|
||||
"""
|
||||
|
||||
def decorator(func):
|
||||
@functools.wraps(func)
|
||||
def wrapped(self):
|
||||
for values in value_groups:
|
||||
resolved = map(Invoked.eval, always_iterable(values))
|
||||
params = dict(zip(always_iterable(names), resolved))
|
||||
with self.subTest(**params):
|
||||
func(self, **params)
|
||||
|
||||
return wrapped
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
class Invoked(types.SimpleNamespace):
|
||||
"""
|
||||
Wrap a function to be invoked for each usage.
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def wrap(cls, func):
|
||||
return cls(func=func)
|
||||
|
||||
@classmethod
|
||||
def eval(cls, cand):
|
||||
return cand.func() if isinstance(cand, cls) else cand
|
||||
105
Lib/test/test_zipfile/_path/test_complexity.py
vendored
Normal file
105
Lib/test/test_zipfile/_path/test_complexity.py
vendored
Normal file
@@ -0,0 +1,105 @@
|
||||
import io
|
||||
import itertools
|
||||
import math
|
||||
import re
|
||||
import string
|
||||
import unittest
|
||||
import zipfile
|
||||
|
||||
from ._functools import compose
|
||||
from ._itertools import consume
|
||||
from ._support import import_or_skip
|
||||
|
||||
big_o = import_or_skip('big_o')
|
||||
pytest = import_or_skip('pytest')
|
||||
|
||||
|
||||
class TestComplexity(unittest.TestCase):
|
||||
@pytest.mark.flaky
|
||||
def test_implied_dirs_performance(self):
|
||||
best, others = big_o.big_o(
|
||||
compose(consume, zipfile._path.CompleteDirs._implied_dirs),
|
||||
lambda size: [
|
||||
'/'.join(string.ascii_lowercase + str(n)) for n in range(size)
|
||||
],
|
||||
max_n=1000,
|
||||
min_n=1,
|
||||
)
|
||||
assert best <= big_o.complexities.Linear
|
||||
|
||||
def make_zip_path(self, depth=1, width=1) -> zipfile.Path:
|
||||
"""
|
||||
Construct a Path with width files at every level of depth.
|
||||
"""
|
||||
zf = zipfile.ZipFile(io.BytesIO(), mode='w')
|
||||
pairs = itertools.product(self.make_deep_paths(depth), self.make_names(width))
|
||||
for path, name in pairs:
|
||||
zf.writestr(f"{path}{name}.txt", b'')
|
||||
zf.filename = "big un.zip"
|
||||
return zipfile.Path(zf)
|
||||
|
||||
@classmethod
|
||||
def make_names(cls, width, letters=string.ascii_lowercase):
|
||||
"""
|
||||
>>> list(TestComplexity.make_names(1))
|
||||
['a']
|
||||
>>> list(TestComplexity.make_names(2))
|
||||
['a', 'b']
|
||||
>>> list(TestComplexity.make_names(30))
|
||||
['aa', 'ab', ..., 'bd']
|
||||
>>> list(TestComplexity.make_names(17124))
|
||||
['aaa', 'aab', ..., 'zip']
|
||||
"""
|
||||
# determine how many products are needed to produce width
|
||||
n_products = max(1, math.ceil(math.log(width, len(letters))))
|
||||
inputs = (letters,) * n_products
|
||||
combinations = itertools.product(*inputs)
|
||||
names = map(''.join, combinations)
|
||||
return itertools.islice(names, width)
|
||||
|
||||
@classmethod
|
||||
def make_deep_paths(cls, depth):
|
||||
return map(cls.make_deep_path, range(depth))
|
||||
|
||||
@classmethod
|
||||
def make_deep_path(cls, depth):
|
||||
return ''.join(('d/',) * depth)
|
||||
|
||||
def test_baseline_regex_complexity(self):
|
||||
best, others = big_o.big_o(
|
||||
lambda path: re.fullmatch(r'[^/]*\\.txt', path),
|
||||
self.make_deep_path,
|
||||
max_n=100,
|
||||
min_n=1,
|
||||
)
|
||||
assert best <= big_o.complexities.Constant
|
||||
|
||||
@pytest.mark.flaky
|
||||
def test_glob_depth(self):
|
||||
best, others = big_o.big_o(
|
||||
lambda path: consume(path.glob('*.txt')),
|
||||
self.make_zip_path,
|
||||
max_n=100,
|
||||
min_n=1,
|
||||
)
|
||||
assert best <= big_o.complexities.Linear
|
||||
|
||||
@pytest.mark.flaky
|
||||
def test_glob_width(self):
|
||||
best, others = big_o.big_o(
|
||||
lambda path: consume(path.glob('*.txt')),
|
||||
lambda size: self.make_zip_path(width=size),
|
||||
max_n=100,
|
||||
min_n=1,
|
||||
)
|
||||
assert best <= big_o.complexities.Linear
|
||||
|
||||
@pytest.mark.flaky
|
||||
def test_glob_width_and_depth(self):
|
||||
best, others = big_o.big_o(
|
||||
lambda path: consume(path.glob('*.txt')),
|
||||
lambda size: self.make_zip_path(depth=size, width=size),
|
||||
max_n=10,
|
||||
min_n=1,
|
||||
)
|
||||
assert best <= big_o.complexities.Linear
|
||||
687
Lib/test/test_zipfile/_path/test_path.py
vendored
Normal file
687
Lib/test/test_zipfile/_path/test_path.py
vendored
Normal file
@@ -0,0 +1,687 @@
|
||||
import contextlib
|
||||
import io
|
||||
import itertools
|
||||
import pathlib
|
||||
import pickle
|
||||
import stat
|
||||
import sys
|
||||
import time
|
||||
import unittest
|
||||
import zipfile
|
||||
import zipfile._path
|
||||
|
||||
from test.support.os_helper import FakePath, temp_dir
|
||||
|
||||
from ._functools import compose
|
||||
from ._itertools import Counter
|
||||
from ._test_params import Invoked, parameterize
|
||||
|
||||
|
||||
class jaraco:
|
||||
class itertools:
|
||||
Counter = Counter
|
||||
|
||||
|
||||
def _make_link(info: zipfile.ZipInfo): # type: ignore[name-defined]
|
||||
info.external_attr |= stat.S_IFLNK << 16
|
||||
|
||||
|
||||
def build_alpharep_fixture():
|
||||
"""
|
||||
Create a zip file with this structure:
|
||||
|
||||
.
|
||||
├── a.txt
|
||||
├── n.txt (-> a.txt)
|
||||
├── b
|
||||
│ ├── c.txt
|
||||
│ ├── d
|
||||
│ │ └── e.txt
|
||||
│ └── f.txt
|
||||
├── g
|
||||
│ └── h
|
||||
│ └── i.txt
|
||||
└── j
|
||||
├── k.bin
|
||||
├── l.baz
|
||||
└── m.bar
|
||||
|
||||
This fixture has the following key characteristics:
|
||||
|
||||
- a file at the root (a)
|
||||
- a file two levels deep (b/d/e)
|
||||
- multiple files in a directory (b/c, b/f)
|
||||
- a directory containing only a directory (g/h)
|
||||
- a directory with files of different extensions (j/klm)
|
||||
- a symlink (n) pointing to (a)
|
||||
|
||||
"alpha" because it uses alphabet
|
||||
"rep" because it's a representative example
|
||||
"""
|
||||
data = io.BytesIO()
|
||||
zf = zipfile.ZipFile(data, "w")
|
||||
zf.writestr("a.txt", b"content of a")
|
||||
zf.writestr("b/c.txt", b"content of c")
|
||||
zf.writestr("b/d/e.txt", b"content of e")
|
||||
zf.writestr("b/f.txt", b"content of f")
|
||||
zf.writestr("g/h/i.txt", b"content of i")
|
||||
zf.writestr("j/k.bin", b"content of k")
|
||||
zf.writestr("j/l.baz", b"content of l")
|
||||
zf.writestr("j/m.bar", b"content of m")
|
||||
zf.writestr("n.txt", b"a.txt")
|
||||
_make_link(zf.infolist()[-1])
|
||||
|
||||
zf.filename = "alpharep.zip"
|
||||
return zf
|
||||
|
||||
|
||||
alpharep_generators = [
|
||||
Invoked.wrap(build_alpharep_fixture),
|
||||
Invoked.wrap(compose(zipfile._path.CompleteDirs.inject, build_alpharep_fixture)),
|
||||
]
|
||||
|
||||
pass_alpharep = parameterize(['alpharep'], alpharep_generators)
|
||||
|
||||
|
||||
class TestPath(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.fixtures = contextlib.ExitStack()
|
||||
self.addCleanup(self.fixtures.close)
|
||||
|
||||
def zipfile_ondisk(self, alpharep):
|
||||
tmpdir = pathlib.Path(self.fixtures.enter_context(temp_dir()))
|
||||
buffer = alpharep.fp
|
||||
alpharep.close()
|
||||
path = tmpdir / alpharep.filename
|
||||
with path.open("wb") as strm:
|
||||
strm.write(buffer.getvalue())
|
||||
return path
|
||||
|
||||
@pass_alpharep
|
||||
def test_iterdir_and_types(self, alpharep):
|
||||
root = zipfile.Path(alpharep)
|
||||
assert root.is_dir()
|
||||
a, n, b, g, j = root.iterdir()
|
||||
assert a.is_file()
|
||||
assert b.is_dir()
|
||||
assert g.is_dir()
|
||||
c, f, d = b.iterdir()
|
||||
assert c.is_file() and f.is_file()
|
||||
(e,) = d.iterdir()
|
||||
assert e.is_file()
|
||||
(h,) = g.iterdir()
|
||||
(i,) = h.iterdir()
|
||||
assert i.is_file()
|
||||
|
||||
@pass_alpharep
|
||||
def test_is_file_missing(self, alpharep):
|
||||
root = zipfile.Path(alpharep)
|
||||
assert not root.joinpath('missing.txt').is_file()
|
||||
|
||||
@pass_alpharep
|
||||
def test_iterdir_on_file(self, alpharep):
|
||||
root = zipfile.Path(alpharep)
|
||||
a, n, b, g, j = root.iterdir()
|
||||
with self.assertRaises(ValueError):
|
||||
a.iterdir()
|
||||
|
||||
@pass_alpharep
|
||||
def test_subdir_is_dir(self, alpharep):
|
||||
root = zipfile.Path(alpharep)
|
||||
assert (root / 'b').is_dir()
|
||||
assert (root / 'b/').is_dir()
|
||||
assert (root / 'g').is_dir()
|
||||
assert (root / 'g/').is_dir()
|
||||
|
||||
@pass_alpharep
|
||||
def test_open(self, alpharep):
|
||||
root = zipfile.Path(alpharep)
|
||||
a, n, b, g, j = root.iterdir()
|
||||
with a.open(encoding="utf-8") as strm:
|
||||
data = strm.read()
|
||||
self.assertEqual(data, "content of a")
|
||||
with a.open('r', "utf-8") as strm: # not a kw, no gh-101144 TypeError
|
||||
data = strm.read()
|
||||
self.assertEqual(data, "content of a")
|
||||
|
||||
def test_open_encoding_utf16(self):
|
||||
in_memory_file = io.BytesIO()
|
||||
zf = zipfile.ZipFile(in_memory_file, "w")
|
||||
zf.writestr("path/16.txt", "This was utf-16".encode("utf-16"))
|
||||
zf.filename = "test_open_utf16.zip"
|
||||
root = zipfile.Path(zf)
|
||||
(path,) = root.iterdir()
|
||||
u16 = path.joinpath("16.txt")
|
||||
with u16.open('r', "utf-16") as strm:
|
||||
data = strm.read()
|
||||
assert data == "This was utf-16"
|
||||
with u16.open(encoding="utf-16") as strm:
|
||||
data = strm.read()
|
||||
assert data == "This was utf-16"
|
||||
|
||||
def test_open_encoding_errors(self):
|
||||
in_memory_file = io.BytesIO()
|
||||
zf = zipfile.ZipFile(in_memory_file, "w")
|
||||
zf.writestr("path/bad-utf8.bin", b"invalid utf-8: \xff\xff.")
|
||||
zf.filename = "test_read_text_encoding_errors.zip"
|
||||
root = zipfile.Path(zf)
|
||||
(path,) = root.iterdir()
|
||||
u16 = path.joinpath("bad-utf8.bin")
|
||||
|
||||
# encoding= as a positional argument for gh-101144.
|
||||
data = u16.read_text("utf-8", errors="ignore")
|
||||
assert data == "invalid utf-8: ."
|
||||
with u16.open("r", "utf-8", errors="surrogateescape") as f:
|
||||
assert f.read() == "invalid utf-8: \udcff\udcff."
|
||||
|
||||
# encoding= both positional and keyword is an error; gh-101144.
|
||||
with self.assertRaisesRegex(TypeError, "encoding"):
|
||||
data = u16.read_text("utf-8", encoding="utf-8")
|
||||
|
||||
# both keyword arguments work.
|
||||
with u16.open("r", encoding="utf-8", errors="strict") as f:
|
||||
# error during decoding with wrong codec.
|
||||
with self.assertRaises(UnicodeDecodeError):
|
||||
f.read()
|
||||
|
||||
@unittest.skipIf(
|
||||
not getattr(sys.flags, 'warn_default_encoding', 0),
|
||||
"Requires warn_default_encoding",
|
||||
)
|
||||
@pass_alpharep
|
||||
def test_encoding_warnings(self, alpharep):
|
||||
"""EncodingWarning must blame the read_text and open calls."""
|
||||
assert sys.flags.warn_default_encoding
|
||||
root = zipfile.Path(alpharep)
|
||||
with self.assertWarns(EncodingWarning) as wc: # noqa: F821 (astral-sh/ruff#13296)
|
||||
root.joinpath("a.txt").read_text()
|
||||
assert __file__ == wc.filename
|
||||
with self.assertWarns(EncodingWarning) as wc: # noqa: F821 (astral-sh/ruff#13296)
|
||||
root.joinpath("a.txt").open("r").close()
|
||||
assert __file__ == wc.filename
|
||||
|
||||
def test_open_write(self):
|
||||
"""
|
||||
If the zipfile is open for write, it should be possible to
|
||||
write bytes or text to it.
|
||||
"""
|
||||
zf = zipfile.Path(zipfile.ZipFile(io.BytesIO(), mode='w'))
|
||||
with zf.joinpath('file.bin').open('wb') as strm:
|
||||
strm.write(b'binary contents')
|
||||
with zf.joinpath('file.txt').open('w', encoding="utf-8") as strm:
|
||||
strm.write('text file')
|
||||
|
||||
@pass_alpharep
|
||||
def test_open_extant_directory(self, alpharep):
|
||||
"""
|
||||
Attempting to open a directory raises IsADirectoryError.
|
||||
"""
|
||||
zf = zipfile.Path(alpharep)
|
||||
with self.assertRaises(IsADirectoryError):
|
||||
zf.joinpath('b').open()
|
||||
|
||||
@pass_alpharep
|
||||
def test_open_binary_invalid_args(self, alpharep):
|
||||
root = zipfile.Path(alpharep)
|
||||
with self.assertRaises(ValueError):
|
||||
root.joinpath('a.txt').open('rb', encoding='utf-8')
|
||||
with self.assertRaises(ValueError):
|
||||
root.joinpath('a.txt').open('rb', 'utf-8')
|
||||
|
||||
@pass_alpharep
|
||||
def test_open_missing_directory(self, alpharep):
|
||||
"""
|
||||
Attempting to open a missing directory raises FileNotFoundError.
|
||||
"""
|
||||
zf = zipfile.Path(alpharep)
|
||||
with self.assertRaises(FileNotFoundError):
|
||||
zf.joinpath('z').open()
|
||||
|
||||
@pass_alpharep
|
||||
def test_read(self, alpharep):
|
||||
root = zipfile.Path(alpharep)
|
||||
a, n, b, g, j = root.iterdir()
|
||||
assert a.read_text(encoding="utf-8") == "content of a"
|
||||
# Also check positional encoding arg (gh-101144).
|
||||
assert a.read_text("utf-8") == "content of a"
|
||||
assert a.read_bytes() == b"content of a"
|
||||
|
||||
@pass_alpharep
|
||||
def test_joinpath(self, alpharep):
|
||||
root = zipfile.Path(alpharep)
|
||||
a = root.joinpath("a.txt")
|
||||
assert a.is_file()
|
||||
e = root.joinpath("b").joinpath("d").joinpath("e.txt")
|
||||
assert e.read_text(encoding="utf-8") == "content of e"
|
||||
|
||||
@pass_alpharep
|
||||
def test_joinpath_multiple(self, alpharep):
|
||||
root = zipfile.Path(alpharep)
|
||||
e = root.joinpath("b", "d", "e.txt")
|
||||
assert e.read_text(encoding="utf-8") == "content of e"
|
||||
|
||||
@pass_alpharep
|
||||
def test_traverse_truediv(self, alpharep):
|
||||
root = zipfile.Path(alpharep)
|
||||
a = root / "a.txt"
|
||||
assert a.is_file()
|
||||
e = root / "b" / "d" / "e.txt"
|
||||
assert e.read_text(encoding="utf-8") == "content of e"
|
||||
|
||||
@pass_alpharep
|
||||
def test_pathlike_construction(self, alpharep):
|
||||
"""
|
||||
zipfile.Path should be constructable from a path-like object
|
||||
"""
|
||||
zipfile_ondisk = self.zipfile_ondisk(alpharep)
|
||||
pathlike = FakePath(str(zipfile_ondisk))
|
||||
zipfile.Path(pathlike)
|
||||
|
||||
@pass_alpharep
|
||||
def test_traverse_pathlike(self, alpharep):
|
||||
root = zipfile.Path(alpharep)
|
||||
root / FakePath("a")
|
||||
|
||||
@pass_alpharep
|
||||
def test_parent(self, alpharep):
|
||||
root = zipfile.Path(alpharep)
|
||||
assert (root / 'a').parent.at == ''
|
||||
assert (root / 'a' / 'b').parent.at == 'a/'
|
||||
|
||||
@pass_alpharep
|
||||
def test_dir_parent(self, alpharep):
|
||||
root = zipfile.Path(alpharep)
|
||||
assert (root / 'b').parent.at == ''
|
||||
assert (root / 'b/').parent.at == ''
|
||||
|
||||
@pass_alpharep
|
||||
def test_missing_dir_parent(self, alpharep):
|
||||
root = zipfile.Path(alpharep)
|
||||
assert (root / 'missing dir/').parent.at == ''
|
||||
|
||||
@pass_alpharep
|
||||
def test_mutability(self, alpharep):
|
||||
"""
|
||||
If the underlying zipfile is changed, the Path object should
|
||||
reflect that change.
|
||||
"""
|
||||
root = zipfile.Path(alpharep)
|
||||
a, n, b, g, j = root.iterdir()
|
||||
alpharep.writestr('foo.txt', 'foo')
|
||||
alpharep.writestr('bar/baz.txt', 'baz')
|
||||
assert any(child.name == 'foo.txt' for child in root.iterdir())
|
||||
assert (root / 'foo.txt').read_text(encoding="utf-8") == 'foo'
|
||||
(baz,) = (root / 'bar').iterdir()
|
||||
assert baz.read_text(encoding="utf-8") == 'baz'
|
||||
|
||||
HUGE_ZIPFILE_NUM_ENTRIES = 2**13
|
||||
|
||||
def huge_zipfile(self):
|
||||
"""Create a read-only zipfile with a huge number of entries entries."""
|
||||
strm = io.BytesIO()
|
||||
zf = zipfile.ZipFile(strm, "w")
|
||||
for entry in map(str, range(self.HUGE_ZIPFILE_NUM_ENTRIES)):
|
||||
zf.writestr(entry, entry)
|
||||
zf.mode = 'r'
|
||||
return zf
|
||||
|
||||
def test_joinpath_constant_time(self):
|
||||
"""
|
||||
Ensure joinpath on items in zipfile is linear time.
|
||||
"""
|
||||
root = zipfile.Path(self.huge_zipfile())
|
||||
entries = jaraco.itertools.Counter(root.iterdir())
|
||||
for entry in entries:
|
||||
entry.joinpath('suffix')
|
||||
# Check the file iterated all items
|
||||
assert entries.count == self.HUGE_ZIPFILE_NUM_ENTRIES
|
||||
|
||||
@pass_alpharep
|
||||
def test_read_does_not_close(self, alpharep):
|
||||
alpharep = self.zipfile_ondisk(alpharep)
|
||||
with zipfile.ZipFile(alpharep) as file:
|
||||
for rep in range(2):
|
||||
zipfile.Path(file, 'a.txt').read_text(encoding="utf-8")
|
||||
|
||||
@pass_alpharep
|
||||
def test_subclass(self, alpharep):
|
||||
class Subclass(zipfile.Path):
|
||||
pass
|
||||
|
||||
root = Subclass(alpharep)
|
||||
assert isinstance(root / 'b', Subclass)
|
||||
|
||||
@pass_alpharep
|
||||
def test_filename(self, alpharep):
|
||||
root = zipfile.Path(alpharep)
|
||||
assert root.filename == pathlib.Path('alpharep.zip')
|
||||
|
||||
@pass_alpharep
|
||||
def test_root_name(self, alpharep):
|
||||
"""
|
||||
The name of the root should be the name of the zipfile
|
||||
"""
|
||||
root = zipfile.Path(alpharep)
|
||||
assert root.name == 'alpharep.zip' == root.filename.name
|
||||
|
||||
@pass_alpharep
|
||||
def test_root_on_disk(self, alpharep):
|
||||
"""
|
||||
The name/stem of the root should match the zipfile on disk.
|
||||
|
||||
This condition must hold across platforms.
|
||||
"""
|
||||
root = zipfile.Path(self.zipfile_ondisk(alpharep))
|
||||
assert root.name == 'alpharep.zip' == root.filename.name
|
||||
assert root.stem == 'alpharep' == root.filename.stem
|
||||
|
||||
@pass_alpharep
|
||||
def test_suffix(self, alpharep):
|
||||
"""
|
||||
The suffix of the root should be the suffix of the zipfile.
|
||||
The suffix of each nested file is the final component's last suffix, if any.
|
||||
Includes the leading period, just like pathlib.Path.
|
||||
"""
|
||||
root = zipfile.Path(alpharep)
|
||||
assert root.suffix == '.zip' == root.filename.suffix
|
||||
|
||||
b = root / "b.txt"
|
||||
assert b.suffix == ".txt"
|
||||
|
||||
c = root / "c" / "filename.tar.gz"
|
||||
assert c.suffix == ".gz"
|
||||
|
||||
d = root / "d"
|
||||
assert d.suffix == ""
|
||||
|
||||
@pass_alpharep
|
||||
def test_suffixes(self, alpharep):
|
||||
"""
|
||||
The suffix of the root should be the suffix of the zipfile.
|
||||
The suffix of each nested file is the final component's last suffix, if any.
|
||||
Includes the leading period, just like pathlib.Path.
|
||||
"""
|
||||
root = zipfile.Path(alpharep)
|
||||
assert root.suffixes == ['.zip'] == root.filename.suffixes
|
||||
|
||||
b = root / 'b.txt'
|
||||
assert b.suffixes == ['.txt']
|
||||
|
||||
c = root / 'c' / 'filename.tar.gz'
|
||||
assert c.suffixes == ['.tar', '.gz']
|
||||
|
||||
d = root / 'd'
|
||||
assert d.suffixes == []
|
||||
|
||||
e = root / '.hgrc'
|
||||
assert e.suffixes == []
|
||||
|
||||
@pass_alpharep
|
||||
def test_suffix_no_filename(self, alpharep):
|
||||
alpharep.filename = None
|
||||
root = zipfile.Path(alpharep)
|
||||
assert root.joinpath('example').suffix == ""
|
||||
assert root.joinpath('example').suffixes == []
|
||||
|
||||
@pass_alpharep
|
||||
def test_stem(self, alpharep):
|
||||
"""
|
||||
The final path component, without its suffix
|
||||
"""
|
||||
root = zipfile.Path(alpharep)
|
||||
assert root.stem == 'alpharep' == root.filename.stem
|
||||
|
||||
b = root / "b.txt"
|
||||
assert b.stem == "b"
|
||||
|
||||
c = root / "c" / "filename.tar.gz"
|
||||
assert c.stem == "filename.tar"
|
||||
|
||||
d = root / "d"
|
||||
assert d.stem == "d"
|
||||
|
||||
assert (root / ".gitignore").stem == ".gitignore"
|
||||
|
||||
@pass_alpharep
|
||||
def test_root_parent(self, alpharep):
|
||||
root = zipfile.Path(alpharep)
|
||||
assert root.parent == pathlib.Path('.')
|
||||
root.root.filename = 'foo/bar.zip'
|
||||
assert root.parent == pathlib.Path('foo')
|
||||
|
||||
@pass_alpharep
|
||||
def test_root_unnamed(self, alpharep):
|
||||
"""
|
||||
It is an error to attempt to get the name
|
||||
or parent of an unnamed zipfile.
|
||||
"""
|
||||
alpharep.filename = None
|
||||
root = zipfile.Path(alpharep)
|
||||
with self.assertRaises(TypeError):
|
||||
root.name
|
||||
with self.assertRaises(TypeError):
|
||||
root.parent
|
||||
|
||||
# .name and .parent should still work on subs
|
||||
sub = root / "b"
|
||||
assert sub.name == "b"
|
||||
assert sub.parent
|
||||
|
||||
@pass_alpharep
|
||||
def test_match_and_glob(self, alpharep):
|
||||
root = zipfile.Path(alpharep)
|
||||
assert not root.match("*.txt")
|
||||
|
||||
assert list(root.glob("b/c.*")) == [zipfile.Path(alpharep, "b/c.txt")]
|
||||
assert list(root.glob("b/*.txt")) == [
|
||||
zipfile.Path(alpharep, "b/c.txt"),
|
||||
zipfile.Path(alpharep, "b/f.txt"),
|
||||
]
|
||||
|
||||
@pass_alpharep
|
||||
def test_glob_recursive(self, alpharep):
|
||||
root = zipfile.Path(alpharep)
|
||||
files = root.glob("**/*.txt")
|
||||
assert all(each.match("*.txt") for each in files)
|
||||
|
||||
assert list(root.glob("**/*.txt")) == list(root.rglob("*.txt"))
|
||||
|
||||
@pass_alpharep
|
||||
def test_glob_dirs(self, alpharep):
|
||||
root = zipfile.Path(alpharep)
|
||||
assert list(root.glob('b')) == [zipfile.Path(alpharep, "b/")]
|
||||
assert list(root.glob('b*')) == [zipfile.Path(alpharep, "b/")]
|
||||
|
||||
@pass_alpharep
|
||||
def test_glob_subdir(self, alpharep):
|
||||
root = zipfile.Path(alpharep)
|
||||
assert list(root.glob('g/h')) == [zipfile.Path(alpharep, "g/h/")]
|
||||
assert list(root.glob('g*/h*')) == [zipfile.Path(alpharep, "g/h/")]
|
||||
|
||||
@pass_alpharep
|
||||
def test_glob_subdirs(self, alpharep):
|
||||
root = zipfile.Path(alpharep)
|
||||
|
||||
assert list(root.glob("*/i.txt")) == []
|
||||
assert list(root.rglob("*/i.txt")) == [zipfile.Path(alpharep, "g/h/i.txt")]
|
||||
|
||||
@pass_alpharep
|
||||
def test_glob_does_not_overmatch_dot(self, alpharep):
|
||||
root = zipfile.Path(alpharep)
|
||||
|
||||
assert list(root.glob("*.xt")) == []
|
||||
|
||||
@pass_alpharep
|
||||
def test_glob_single_char(self, alpharep):
|
||||
root = zipfile.Path(alpharep)
|
||||
|
||||
assert list(root.glob("a?txt")) == [zipfile.Path(alpharep, "a.txt")]
|
||||
assert list(root.glob("a[.]txt")) == [zipfile.Path(alpharep, "a.txt")]
|
||||
assert list(root.glob("a[?]txt")) == []
|
||||
|
||||
@pass_alpharep
|
||||
def test_glob_chars(self, alpharep):
|
||||
root = zipfile.Path(alpharep)
|
||||
|
||||
assert list(root.glob("j/?.b[ai][nz]")) == [
|
||||
zipfile.Path(alpharep, "j/k.bin"),
|
||||
zipfile.Path(alpharep, "j/l.baz"),
|
||||
]
|
||||
|
||||
def test_glob_empty(self):
|
||||
root = zipfile.Path(zipfile.ZipFile(io.BytesIO(), 'w'))
|
||||
with self.assertRaises(ValueError):
|
||||
root.glob('')
|
||||
|
||||
@pass_alpharep
|
||||
def test_eq_hash(self, alpharep):
|
||||
root = zipfile.Path(alpharep)
|
||||
assert root == zipfile.Path(alpharep)
|
||||
|
||||
assert root != (root / "a.txt")
|
||||
assert (root / "a.txt") == (root / "a.txt")
|
||||
|
||||
root = zipfile.Path(alpharep)
|
||||
assert root in {root}
|
||||
|
||||
@pass_alpharep
|
||||
def test_is_symlink(self, alpharep):
|
||||
root = zipfile.Path(alpharep)
|
||||
assert not root.joinpath('a.txt').is_symlink()
|
||||
assert root.joinpath('n.txt').is_symlink()
|
||||
|
||||
@pass_alpharep
|
||||
def test_relative_to(self, alpharep):
|
||||
root = zipfile.Path(alpharep)
|
||||
relative = root.joinpath("b", "c.txt").relative_to(root / "b")
|
||||
assert str(relative) == "c.txt"
|
||||
|
||||
relative = root.joinpath("b", "d", "e.txt").relative_to(root / "b")
|
||||
assert str(relative) == "d/e.txt"
|
||||
|
||||
@pass_alpharep
|
||||
def test_inheritance(self, alpharep):
|
||||
cls = type('PathChild', (zipfile.Path,), {})
|
||||
file = cls(alpharep).joinpath('some dir').parent
|
||||
assert isinstance(file, cls)
|
||||
|
||||
@unittest.skipIf(sys.platform == 'win32', "TODO: RUSTPYTHON, fails on Windows")
|
||||
@parameterize(
|
||||
['alpharep', 'path_type', 'subpath'],
|
||||
itertools.product(
|
||||
alpharep_generators,
|
||||
[str, FakePath],
|
||||
['', 'b/'],
|
||||
),
|
||||
)
|
||||
def test_pickle(self, alpharep, path_type, subpath):
|
||||
zipfile_ondisk = path_type(str(self.zipfile_ondisk(alpharep)))
|
||||
|
||||
saved_1 = pickle.dumps(zipfile.Path(zipfile_ondisk, at=subpath))
|
||||
restored_1 = pickle.loads(saved_1)
|
||||
first, *rest = restored_1.iterdir()
|
||||
assert first.read_text(encoding='utf-8').startswith('content of ')
|
||||
|
||||
@pass_alpharep
|
||||
def test_extract_orig_with_implied_dirs(self, alpharep):
|
||||
"""
|
||||
A zip file wrapped in a Path should extract even with implied dirs.
|
||||
"""
|
||||
source_path = self.zipfile_ondisk(alpharep)
|
||||
zf = zipfile.ZipFile(source_path)
|
||||
# wrap the zipfile for its side effect
|
||||
zipfile.Path(zf)
|
||||
zf.extractall(source_path.parent)
|
||||
|
||||
@pass_alpharep
|
||||
def test_getinfo_missing(self, alpharep):
|
||||
"""
|
||||
Validate behavior of getinfo on original zipfile after wrapping.
|
||||
"""
|
||||
zipfile.Path(alpharep)
|
||||
with self.assertRaises(KeyError):
|
||||
alpharep.getinfo('does-not-exist')
|
||||
|
||||
def test_malformed_paths(self):
|
||||
"""
|
||||
Path should handle malformed paths gracefully.
|
||||
|
||||
Paths with leading slashes are not visible.
|
||||
|
||||
Paths with dots are treated like regular files.
|
||||
"""
|
||||
data = io.BytesIO()
|
||||
zf = zipfile.ZipFile(data, "w")
|
||||
zf.writestr("/one-slash.txt", b"content")
|
||||
zf.writestr("//two-slash.txt", b"content")
|
||||
zf.writestr("../parent.txt", b"content")
|
||||
zf.filename = ''
|
||||
root = zipfile.Path(zf)
|
||||
assert list(map(str, root.iterdir())) == ['../']
|
||||
assert root.joinpath('..').joinpath('parent.txt').read_bytes() == b'content'
|
||||
|
||||
def test_unsupported_names(self):
|
||||
"""
|
||||
Path segments with special characters are readable.
|
||||
|
||||
On some platforms or file systems, characters like
|
||||
``:`` and ``?`` are not allowed, but they are valid
|
||||
in the zip file.
|
||||
"""
|
||||
data = io.BytesIO()
|
||||
zf = zipfile.ZipFile(data, "w")
|
||||
zf.writestr("path?", b"content")
|
||||
zf.writestr("V: NMS.flac", b"fLaC...")
|
||||
zf.filename = ''
|
||||
root = zipfile.Path(zf)
|
||||
contents = root.iterdir()
|
||||
assert next(contents).name == 'path?'
|
||||
assert next(contents).name == 'V: NMS.flac'
|
||||
assert root.joinpath('V: NMS.flac').read_bytes() == b"fLaC..."
|
||||
|
||||
def test_backslash_not_separator(self):
|
||||
"""
|
||||
In a zip file, backslashes are not separators.
|
||||
"""
|
||||
data = io.BytesIO()
|
||||
zf = zipfile.ZipFile(data, "w")
|
||||
zf.writestr(DirtyZipInfo.for_name("foo\\bar", zf), b"content")
|
||||
zf.filename = ''
|
||||
root = zipfile.Path(zf)
|
||||
(first,) = root.iterdir()
|
||||
assert not first.is_dir()
|
||||
assert first.name == 'foo\\bar'
|
||||
|
||||
@pass_alpharep
|
||||
def test_interface(self, alpharep):
|
||||
from importlib.resources.abc import Traversable
|
||||
|
||||
zf = zipfile.Path(alpharep)
|
||||
assert isinstance(zf, Traversable)
|
||||
|
||||
|
||||
class DirtyZipInfo(zipfile.ZipInfo):
|
||||
"""
|
||||
Bypass name sanitization.
|
||||
"""
|
||||
|
||||
def __init__(self, filename, *args, **kwargs):
|
||||
super().__init__(filename, *args, **kwargs)
|
||||
self.filename = filename
|
||||
|
||||
@classmethod
|
||||
def for_name(cls, name, archive):
|
||||
"""
|
||||
Construct the same way that ZipFile.writestr does.
|
||||
|
||||
TODO: extract this functionality and re-use
|
||||
"""
|
||||
self = cls(filename=name, date_time=time.localtime(time.time())[:6])
|
||||
self.compress_type = archive.compression
|
||||
self.compress_level = archive.compresslevel
|
||||
if self.filename.endswith('/'): # pragma: no cover
|
||||
self.external_attr = 0o40775 << 16 # drwxrwxr-x
|
||||
self.external_attr |= 0x10 # MS-DOS directory flag
|
||||
else:
|
||||
self.external_attr = 0o600 << 16 # ?rw-------
|
||||
return self
|
||||
3
Lib/test/test_zipfile/_path/write-alpharep.py
vendored
Normal file
3
Lib/test/test_zipfile/_path/write-alpharep.py
vendored
Normal file
@@ -0,0 +1,3 @@
|
||||
from . import test_path
|
||||
|
||||
__name__ == '__main__' and test_path.build_alpharep_fixture().extractall('alpharep')
|
||||
File diff suppressed because it is too large
Load Diff
733
Lib/zipfile.py → Lib/zipfile/__init__.py
vendored
733
Lib/zipfile.py → Lib/zipfile/__init__.py
vendored
File diff suppressed because it is too large
Load Diff
4
Lib/zipfile/__main__.py
vendored
Normal file
4
Lib/zipfile/__main__.py
vendored
Normal file
@@ -0,0 +1,4 @@
|
||||
from . import main
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
452
Lib/zipfile/_path/__init__.py
vendored
Normal file
452
Lib/zipfile/_path/__init__.py
vendored
Normal file
@@ -0,0 +1,452 @@
|
||||
"""
|
||||
A Path-like interface for zipfiles.
|
||||
|
||||
This codebase is shared between zipfile.Path in the stdlib
|
||||
and zipp in PyPI. See
|
||||
https://github.com/python/importlib_metadata/wiki/Development-Methodology
|
||||
for more detail.
|
||||
"""
|
||||
|
||||
import contextlib
|
||||
import io
|
||||
import itertools
|
||||
import pathlib
|
||||
import posixpath
|
||||
import re
|
||||
import stat
|
||||
import sys
|
||||
import zipfile
|
||||
|
||||
from .glob import Translator
|
||||
|
||||
__all__ = ['Path']
|
||||
|
||||
|
||||
def _parents(path):
|
||||
"""
|
||||
Given a path with elements separated by
|
||||
posixpath.sep, generate all parents of that path.
|
||||
|
||||
>>> list(_parents('b/d'))
|
||||
['b']
|
||||
>>> list(_parents('/b/d/'))
|
||||
['/b']
|
||||
>>> list(_parents('b/d/f/'))
|
||||
['b/d', 'b']
|
||||
>>> list(_parents('b'))
|
||||
[]
|
||||
>>> list(_parents(''))
|
||||
[]
|
||||
"""
|
||||
return itertools.islice(_ancestry(path), 1, None)
|
||||
|
||||
|
||||
def _ancestry(path):
|
||||
"""
|
||||
Given a path with elements separated by
|
||||
posixpath.sep, generate all elements of that path.
|
||||
|
||||
>>> list(_ancestry('b/d'))
|
||||
['b/d', 'b']
|
||||
>>> list(_ancestry('/b/d/'))
|
||||
['/b/d', '/b']
|
||||
>>> list(_ancestry('b/d/f/'))
|
||||
['b/d/f', 'b/d', 'b']
|
||||
>>> list(_ancestry('b'))
|
||||
['b']
|
||||
>>> list(_ancestry(''))
|
||||
[]
|
||||
|
||||
Multiple separators are treated like a single.
|
||||
|
||||
>>> list(_ancestry('//b//d///f//'))
|
||||
['//b//d///f', '//b//d', '//b']
|
||||
"""
|
||||
path = path.rstrip(posixpath.sep)
|
||||
while path.rstrip(posixpath.sep):
|
||||
yield path
|
||||
path, tail = posixpath.split(path)
|
||||
|
||||
|
||||
_dedupe = dict.fromkeys
|
||||
"""Deduplicate an iterable in original order"""
|
||||
|
||||
|
||||
def _difference(minuend, subtrahend):
|
||||
"""
|
||||
Return items in minuend not in subtrahend, retaining order
|
||||
with O(1) lookup.
|
||||
"""
|
||||
return itertools.filterfalse(set(subtrahend).__contains__, minuend)
|
||||
|
||||
|
||||
class InitializedState:
|
||||
"""
|
||||
Mix-in to save the initialization state for pickling.
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.__args = args
|
||||
self.__kwargs = kwargs
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def __getstate__(self):
|
||||
return self.__args, self.__kwargs
|
||||
|
||||
def __setstate__(self, state):
|
||||
args, kwargs = state
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
|
||||
class CompleteDirs(InitializedState, zipfile.ZipFile):
|
||||
"""
|
||||
A ZipFile subclass that ensures that implied directories
|
||||
are always included in the namelist.
|
||||
|
||||
>>> list(CompleteDirs._implied_dirs(['foo/bar.txt', 'foo/bar/baz.txt']))
|
||||
['foo/', 'foo/bar/']
|
||||
>>> list(CompleteDirs._implied_dirs(['foo/bar.txt', 'foo/bar/baz.txt', 'foo/bar/']))
|
||||
['foo/']
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def _implied_dirs(names):
|
||||
parents = itertools.chain.from_iterable(map(_parents, names))
|
||||
as_dirs = (p + posixpath.sep for p in parents)
|
||||
return _dedupe(_difference(as_dirs, names))
|
||||
|
||||
def namelist(self):
|
||||
names = super().namelist()
|
||||
return names + list(self._implied_dirs(names))
|
||||
|
||||
def _name_set(self):
|
||||
return set(self.namelist())
|
||||
|
||||
def resolve_dir(self, name):
|
||||
"""
|
||||
If the name represents a directory, return that name
|
||||
as a directory (with the trailing slash).
|
||||
"""
|
||||
names = self._name_set()
|
||||
dirname = name + '/'
|
||||
dir_match = name not in names and dirname in names
|
||||
return dirname if dir_match else name
|
||||
|
||||
def getinfo(self, name):
|
||||
"""
|
||||
Supplement getinfo for implied dirs.
|
||||
"""
|
||||
try:
|
||||
return super().getinfo(name)
|
||||
except KeyError:
|
||||
if not name.endswith('/') or name not in self._name_set():
|
||||
raise
|
||||
return zipfile.ZipInfo(filename=name)
|
||||
|
||||
@classmethod
|
||||
def make(cls, source):
|
||||
"""
|
||||
Given a source (filename or zipfile), return an
|
||||
appropriate CompleteDirs subclass.
|
||||
"""
|
||||
if isinstance(source, CompleteDirs):
|
||||
return source
|
||||
|
||||
if not isinstance(source, zipfile.ZipFile):
|
||||
return cls(source)
|
||||
|
||||
# Only allow for FastLookup when supplied zipfile is read-only
|
||||
if 'r' not in source.mode:
|
||||
cls = CompleteDirs
|
||||
|
||||
source.__class__ = cls
|
||||
return source
|
||||
|
||||
@classmethod
|
||||
def inject(cls, zf: zipfile.ZipFile) -> zipfile.ZipFile:
|
||||
"""
|
||||
Given a writable zip file zf, inject directory entries for
|
||||
any directories implied by the presence of children.
|
||||
"""
|
||||
for name in cls._implied_dirs(zf.namelist()):
|
||||
zf.writestr(name, b"")
|
||||
return zf
|
||||
|
||||
|
||||
class FastLookup(CompleteDirs):
|
||||
"""
|
||||
ZipFile subclass to ensure implicit
|
||||
dirs exist and are resolved rapidly.
|
||||
"""
|
||||
|
||||
def namelist(self):
|
||||
with contextlib.suppress(AttributeError):
|
||||
return self.__names
|
||||
self.__names = super().namelist()
|
||||
return self.__names
|
||||
|
||||
def _name_set(self):
|
||||
with contextlib.suppress(AttributeError):
|
||||
return self.__lookup
|
||||
self.__lookup = super()._name_set()
|
||||
return self.__lookup
|
||||
|
||||
def _extract_text_encoding(encoding=None, *args, **kwargs):
|
||||
# compute stack level so that the caller of the caller sees any warning.
|
||||
is_pypy = sys.implementation.name == 'pypy'
|
||||
# PyPy no longer special cased after 7.3.19 (or maybe 7.3.18)
|
||||
# See jaraco/zipp#143
|
||||
is_old_pypi = is_pypy and sys.pypy_version_info < (7, 3, 19)
|
||||
stack_level = 3 + is_old_pypi
|
||||
return io.text_encoding(encoding, stack_level), args, kwargs
|
||||
|
||||
|
||||
class Path:
|
||||
"""
|
||||
A :class:`importlib.resources.abc.Traversable` interface for zip files.
|
||||
|
||||
Implements many of the features users enjoy from
|
||||
:class:`pathlib.Path`.
|
||||
|
||||
Consider a zip file with this structure::
|
||||
|
||||
.
|
||||
├── a.txt
|
||||
└── b
|
||||
├── c.txt
|
||||
└── d
|
||||
└── e.txt
|
||||
|
||||
>>> data = io.BytesIO()
|
||||
>>> zf = ZipFile(data, 'w')
|
||||
>>> zf.writestr('a.txt', 'content of a')
|
||||
>>> zf.writestr('b/c.txt', 'content of c')
|
||||
>>> zf.writestr('b/d/e.txt', 'content of e')
|
||||
>>> zf.filename = 'mem/abcde.zip'
|
||||
|
||||
Path accepts the zipfile object itself or a filename
|
||||
|
||||
>>> path = Path(zf)
|
||||
|
||||
From there, several path operations are available.
|
||||
|
||||
Directory iteration (including the zip file itself):
|
||||
|
||||
>>> a, b = path.iterdir()
|
||||
>>> a
|
||||
Path('mem/abcde.zip', 'a.txt')
|
||||
>>> b
|
||||
Path('mem/abcde.zip', 'b/')
|
||||
|
||||
name property:
|
||||
|
||||
>>> b.name
|
||||
'b'
|
||||
|
||||
join with divide operator:
|
||||
|
||||
>>> c = b / 'c.txt'
|
||||
>>> c
|
||||
Path('mem/abcde.zip', 'b/c.txt')
|
||||
>>> c.name
|
||||
'c.txt'
|
||||
|
||||
Read text:
|
||||
|
||||
>>> c.read_text(encoding='utf-8')
|
||||
'content of c'
|
||||
|
||||
existence:
|
||||
|
||||
>>> c.exists()
|
||||
True
|
||||
>>> (b / 'missing.txt').exists()
|
||||
False
|
||||
|
||||
Coercion to string:
|
||||
|
||||
>>> import os
|
||||
>>> str(c).replace(os.sep, posixpath.sep)
|
||||
'mem/abcde.zip/b/c.txt'
|
||||
|
||||
At the root, ``name``, ``filename``, and ``parent``
|
||||
resolve to the zipfile.
|
||||
|
||||
>>> str(path)
|
||||
'mem/abcde.zip/'
|
||||
>>> path.name
|
||||
'abcde.zip'
|
||||
>>> path.filename == pathlib.Path('mem/abcde.zip')
|
||||
True
|
||||
>>> str(path.parent)
|
||||
'mem'
|
||||
|
||||
If the zipfile has no filename, such attributes are not
|
||||
valid and accessing them will raise an Exception.
|
||||
|
||||
>>> zf.filename = None
|
||||
>>> path.name
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
TypeError: ...
|
||||
|
||||
>>> path.filename
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
TypeError: ...
|
||||
|
||||
>>> path.parent
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
TypeError: ...
|
||||
|
||||
# workaround python/cpython#106763
|
||||
>>> pass
|
||||
"""
|
||||
|
||||
__repr = "{self.__class__.__name__}({self.root.filename!r}, {self.at!r})"
|
||||
|
||||
def __init__(self, root, at=""):
|
||||
"""
|
||||
Construct a Path from a ZipFile or filename.
|
||||
|
||||
Note: When the source is an existing ZipFile object,
|
||||
its type (__class__) will be mutated to a
|
||||
specialized type. If the caller wishes to retain the
|
||||
original type, the caller should either create a
|
||||
separate ZipFile object or pass a filename.
|
||||
"""
|
||||
self.root = FastLookup.make(root)
|
||||
self.at = at
|
||||
|
||||
def __eq__(self, other):
|
||||
"""
|
||||
>>> Path(zipfile.ZipFile(io.BytesIO(), 'w')) == 'foo'
|
||||
False
|
||||
"""
|
||||
if self.__class__ is not other.__class__:
|
||||
return NotImplemented
|
||||
return (self.root, self.at) == (other.root, other.at)
|
||||
|
||||
def __hash__(self):
|
||||
return hash((self.root, self.at))
|
||||
|
||||
def open(self, mode='r', *args, pwd=None, **kwargs):
|
||||
"""
|
||||
Open this entry as text or binary following the semantics
|
||||
of ``pathlib.Path.open()`` by passing arguments through
|
||||
to io.TextIOWrapper().
|
||||
"""
|
||||
if self.is_dir():
|
||||
raise IsADirectoryError(self)
|
||||
zip_mode = mode[0]
|
||||
if zip_mode == 'r' and not self.exists():
|
||||
raise FileNotFoundError(self)
|
||||
stream = self.root.open(self.at, zip_mode, pwd=pwd)
|
||||
if 'b' in mode:
|
||||
if args or kwargs:
|
||||
raise ValueError("encoding args invalid for binary operation")
|
||||
return stream
|
||||
# Text mode:
|
||||
encoding, args, kwargs = _extract_text_encoding(*args, **kwargs)
|
||||
return io.TextIOWrapper(stream, encoding, *args, **kwargs)
|
||||
|
||||
def _base(self):
|
||||
return pathlib.PurePosixPath(self.at) if self.at else self.filename
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
return self._base().name
|
||||
|
||||
@property
|
||||
def suffix(self):
|
||||
return self._base().suffix
|
||||
|
||||
@property
|
||||
def suffixes(self):
|
||||
return self._base().suffixes
|
||||
|
||||
@property
|
||||
def stem(self):
|
||||
return self._base().stem
|
||||
|
||||
@property
|
||||
def filename(self):
|
||||
return pathlib.Path(self.root.filename).joinpath(self.at)
|
||||
|
||||
def read_text(self, *args, **kwargs):
|
||||
encoding, args, kwargs = _extract_text_encoding(*args, **kwargs)
|
||||
with self.open('r', encoding, *args, **kwargs) as strm:
|
||||
return strm.read()
|
||||
|
||||
def read_bytes(self):
|
||||
with self.open('rb') as strm:
|
||||
return strm.read()
|
||||
|
||||
def _is_child(self, path):
|
||||
return posixpath.dirname(path.at.rstrip("/")) == self.at.rstrip("/")
|
||||
|
||||
def _next(self, at):
|
||||
return self.__class__(self.root, at)
|
||||
|
||||
def is_dir(self):
|
||||
return not self.at or self.at.endswith("/")
|
||||
|
||||
def is_file(self):
|
||||
return self.exists() and not self.is_dir()
|
||||
|
||||
def exists(self):
|
||||
return self.at in self.root._name_set()
|
||||
|
||||
def iterdir(self):
|
||||
if not self.is_dir():
|
||||
raise ValueError("Can't listdir a file")
|
||||
subs = map(self._next, self.root.namelist())
|
||||
return filter(self._is_child, subs)
|
||||
|
||||
def match(self, path_pattern):
|
||||
return pathlib.PurePosixPath(self.at).match(path_pattern)
|
||||
|
||||
def is_symlink(self):
|
||||
"""
|
||||
Return whether this path is a symlink.
|
||||
"""
|
||||
info = self.root.getinfo(self.at)
|
||||
mode = info.external_attr >> 16
|
||||
return stat.S_ISLNK(mode)
|
||||
|
||||
def glob(self, pattern):
|
||||
if not pattern:
|
||||
raise ValueError(f"Unacceptable pattern: {pattern!r}")
|
||||
|
||||
prefix = re.escape(self.at)
|
||||
tr = Translator(seps='/')
|
||||
matches = re.compile(prefix + tr.translate(pattern)).fullmatch
|
||||
return map(self._next, filter(matches, self.root.namelist()))
|
||||
|
||||
def rglob(self, pattern):
|
||||
return self.glob(f'**/{pattern}')
|
||||
|
||||
def relative_to(self, other, *extra):
|
||||
return posixpath.relpath(str(self), str(other.joinpath(*extra)))
|
||||
|
||||
def __str__(self):
|
||||
return posixpath.join(self.root.filename, self.at)
|
||||
|
||||
def __repr__(self):
|
||||
return self.__repr.format(self=self)
|
||||
|
||||
def joinpath(self, *other):
|
||||
next = posixpath.join(self.at, *other)
|
||||
return self._next(self.root.resolve_dir(next))
|
||||
|
||||
__truediv__ = joinpath
|
||||
|
||||
@property
|
||||
def parent(self):
|
||||
if not self.at:
|
||||
return self.filename.parent
|
||||
parent_at = posixpath.dirname(self.at.rstrip('/'))
|
||||
if parent_at:
|
||||
parent_at += '/'
|
||||
return self._next(parent_at)
|
||||
113
Lib/zipfile/_path/glob.py
vendored
Normal file
113
Lib/zipfile/_path/glob.py
vendored
Normal file
@@ -0,0 +1,113 @@
|
||||
import os
|
||||
import re
|
||||
|
||||
_default_seps = os.sep + str(os.altsep) * bool(os.altsep)
|
||||
|
||||
|
||||
class Translator:
|
||||
"""
|
||||
>>> Translator('xyz')
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
AssertionError: Invalid separators
|
||||
|
||||
>>> Translator('')
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
AssertionError: Invalid separators
|
||||
"""
|
||||
|
||||
seps: str
|
||||
|
||||
def __init__(self, seps: str = _default_seps):
|
||||
assert seps and set(seps) <= set(_default_seps), "Invalid separators"
|
||||
self.seps = seps
|
||||
|
||||
def translate(self, pattern):
|
||||
"""
|
||||
Given a glob pattern, produce a regex that matches it.
|
||||
"""
|
||||
return self.extend(self.match_dirs(self.translate_core(pattern)))
|
||||
|
||||
def extend(self, pattern):
|
||||
r"""
|
||||
Extend regex for pattern-wide concerns.
|
||||
|
||||
Apply '(?s:)' to create a non-matching group that
|
||||
matches newlines (valid on Unix).
|
||||
|
||||
Append '\Z' to imply fullmatch even when match is used.
|
||||
"""
|
||||
return rf'(?s:{pattern})\Z'
|
||||
|
||||
def match_dirs(self, pattern):
|
||||
"""
|
||||
Ensure that zipfile.Path directory names are matched.
|
||||
|
||||
zipfile.Path directory names always end in a slash.
|
||||
"""
|
||||
return rf'{pattern}[/]?'
|
||||
|
||||
def translate_core(self, pattern):
|
||||
r"""
|
||||
Given a glob pattern, produce a regex that matches it.
|
||||
|
||||
>>> t = Translator()
|
||||
>>> t.translate_core('*.txt').replace('\\\\', '')
|
||||
'[^/]*\\.txt'
|
||||
>>> t.translate_core('a?txt')
|
||||
'a[^/]txt'
|
||||
>>> t.translate_core('**/*').replace('\\\\', '')
|
||||
'.*/[^/][^/]*'
|
||||
"""
|
||||
self.restrict_rglob(pattern)
|
||||
return ''.join(map(self.replace, separate(self.star_not_empty(pattern))))
|
||||
|
||||
def replace(self, match):
|
||||
"""
|
||||
Perform the replacements for a match from :func:`separate`.
|
||||
"""
|
||||
return match.group('set') or (
|
||||
re.escape(match.group(0))
|
||||
.replace('\\*\\*', r'.*')
|
||||
.replace('\\*', rf'[^{re.escape(self.seps)}]*')
|
||||
.replace('\\?', r'[^/]')
|
||||
)
|
||||
|
||||
def restrict_rglob(self, pattern):
|
||||
"""
|
||||
Raise ValueError if ** appears in anything but a full path segment.
|
||||
|
||||
>>> Translator().translate('**foo')
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
ValueError: ** must appear alone in a path segment
|
||||
"""
|
||||
seps_pattern = rf'[{re.escape(self.seps)}]+'
|
||||
segments = re.split(seps_pattern, pattern)
|
||||
if any('**' in segment and segment != '**' for segment in segments):
|
||||
raise ValueError("** must appear alone in a path segment")
|
||||
|
||||
def star_not_empty(self, pattern):
|
||||
"""
|
||||
Ensure that * will not match an empty segment.
|
||||
"""
|
||||
|
||||
def handle_segment(match):
|
||||
segment = match.group(0)
|
||||
return '?*' if segment == '*' else segment
|
||||
|
||||
not_seps_pattern = rf'[^{re.escape(self.seps)}]+'
|
||||
return re.sub(not_seps_pattern, handle_segment, pattern)
|
||||
|
||||
|
||||
def separate(pattern):
|
||||
"""
|
||||
Separate out character sets to avoid translating their contents.
|
||||
|
||||
>>> [m.group(0) for m in separate('*.txt')]
|
||||
['*.txt']
|
||||
>>> [m.group(0) for m in separate('a[?]txt')]
|
||||
['a', '[?]', 'txt']
|
||||
"""
|
||||
return re.finditer(r'([^\[]+)|(?P<set>[\[].*?[\]])|([\[][^\]]*$)', pattern)
|
||||
Reference in New Issue
Block a user