Merge pull request #2666 from fanninpm/test-tempfile

Add test_tempfile from CPython 3.8
This commit is contained in:
Noah
2021-05-24 20:13:55 -05:00
committed by GitHub
5 changed files with 1602 additions and 96 deletions

112
Lib/tempfile.py vendored
View File

@@ -43,6 +43,7 @@ import os as _os
import shutil as _shutil
import errno as _errno
from random import Random as _Random
import sys as _sys
import weakref as _weakref
try:
@@ -74,20 +75,10 @@ template = "tmp"
_once_lock = _allocate_lock()
if hasattr(_os, "lstat"):
_stat = _os.lstat
elif hasattr(_os, "stat"):
_stat = _os.stat
else:
# Fallback. All we need is something that raises OSError if the
# file doesn't exist.
def _stat(fn):
fd = _os.open(fn, _os.O_RDONLY)
_os.close(fd)
def _exists(fn):
try:
_stat(fn)
_os.lstat(fn)
except OSError:
return False
else:
@@ -136,7 +127,7 @@ def _sanitize_params(prefix, suffix, dir):
class _RandomNameSequence:
"""An instance of _RandomNameSequence generates an endless
sequence of unpredictable strings which can safely be incorporated
into file names. Each string is six characters long. Multiple
into file names. Each string is eight characters long. Multiple
threads can safely use the same instance at the same time.
_RandomNameSequence is an iterator."""
@@ -173,7 +164,9 @@ def _candidate_tempdir_list():
# Failing that, try OS-specific locations.
if _os.name == 'nt':
dirlist.extend([ r'c:\temp', r'c:\tmp', r'\temp', r'\tmp' ])
dirlist.extend([ _os.path.expanduser(r'~\AppData\Local\Temp'),
_os.path.expandvars(r'%SYSTEMROOT%\Temp'),
r'c:\temp', r'c:\tmp', r'\temp', r'\tmp' ])
else:
dirlist.extend([ '/tmp', '/var/tmp', '/usr/tmp' ])
@@ -256,6 +249,7 @@ def _mkstemp_inner(dir, pre, suf, flags, output_type):
for seq in range(TMP_MAX):
name = next(names)
file = _os.path.join(dir, pre + name + suf)
_sys.audit("tempfile.mkstemp", file)
try:
fd = _os.open(file, flags, 0o600)
except FileExistsError:
@@ -317,8 +311,7 @@ def mkstemp(suffix=None, prefix=None, dir=None, text=False):
otherwise a default directory is used.
If 'text' is specified and true, the file is opened in text
mode. Else (the default) the file is opened in binary mode. On
some operating systems, this makes no difference.
mode. Else (the default) the file is opened in binary mode.
If any of 'suffix', 'prefix' and 'dir' are not None, they must be the
same type. If they are bytes, the returned name will be bytes; str
@@ -364,6 +357,7 @@ def mkdtemp(suffix=None, prefix=None, dir=None):
for seq in range(TMP_MAX):
name = next(names)
file = _os.path.join(dir, prefix + name + suffix)
_sys.audit("tempfile.mkdtemp", file)
try:
_os.mkdir(file, 0o700)
except FileExistsError:
@@ -521,7 +515,7 @@ class _TemporaryFileWrapper:
def NamedTemporaryFile(mode='w+b', buffering=-1, encoding=None,
newline=None, suffix=None, prefix=None,
dir=None, delete=True):
dir=None, delete=True, *, errors=None):
"""Create and return a temporary file.
Arguments:
'prefix', 'suffix', 'dir' -- as for mkstemp.
@@ -530,6 +524,7 @@ def NamedTemporaryFile(mode='w+b', buffering=-1, encoding=None,
'encoding' -- the encoding argument to io.open (default None)
'newline' -- the newline argument to io.open (default None)
'delete' -- whether the file is deleted on close (default True).
'errors' -- the errors argument to io.open (default None)
The file is created as mkstemp() would do it.
Returns an object with a file-like interface; the name of the file
@@ -549,7 +544,7 @@ def NamedTemporaryFile(mode='w+b', buffering=-1, encoding=None,
(fd, name) = _mkstemp_inner(dir, prefix, suffix, flags, output_type)
try:
file = _io.open(fd, mode, buffering=buffering,
newline=newline, encoding=encoding)
newline=newline, encoding=encoding, errors=errors)
return _TemporaryFileWrapper(file, name, delete)
except BaseException:
@@ -557,7 +552,7 @@ def NamedTemporaryFile(mode='w+b', buffering=-1, encoding=None,
_os.close(fd)
raise
if _os.name != 'posix' or _os.sys.platform == 'cygwin':
if _os.name != 'posix' or _sys.platform == 'cygwin':
# On non-POSIX and Cygwin systems, assume that we cannot unlink a file
# while it is open.
TemporaryFile = NamedTemporaryFile
@@ -570,7 +565,7 @@ else:
def TemporaryFile(mode='w+b', buffering=-1, encoding=None,
newline=None, suffix=None, prefix=None,
dir=None):
dir=None, *, errors=None):
"""Create and return a temporary file.
Arguments:
'prefix', 'suffix', 'dir' -- as for mkstemp.
@@ -578,6 +573,7 @@ else:
'buffering' -- the buffer size argument to io.open (default -1).
'encoding' -- the encoding argument to io.open (default None)
'newline' -- the newline argument to io.open (default None)
'errors' -- the errors argument to io.open (default None)
The file is created as mkstemp() would do it.
Returns an object with a file-like interface. The file has no
@@ -611,7 +607,8 @@ else:
else:
try:
return _io.open(fd, mode, buffering=buffering,
newline=newline, encoding=encoding)
newline=newline, encoding=encoding,
errors=errors)
except:
_os.close(fd)
raise
@@ -621,7 +618,7 @@ else:
try:
_os.unlink(name)
return _io.open(fd, mode, buffering=buffering,
newline=newline, encoding=encoding)
newline=newline, encoding=encoding, errors=errors)
except:
_os.close(fd)
raise
@@ -635,20 +632,19 @@ class SpooledTemporaryFile:
def __init__(self, max_size=0, mode='w+b', buffering=-1,
encoding=None, newline=None,
suffix=None, prefix=None, dir=None):
suffix=None, prefix=None, dir=None, *, errors=None):
if 'b' in mode:
self._file = _io.BytesIO()
else:
# Setting newline="\n" avoids newline translation;
# this is important because otherwise on Windows we'd
# get double newline translation upon rollover().
self._file = _io.StringIO(newline="\n")
self._file = _io.TextIOWrapper(_io.BytesIO(),
encoding=encoding, errors=errors,
newline=newline)
self._max_size = max_size
self._rolled = False
self._TemporaryFileArgs = {'mode': mode, 'buffering': buffering,
'suffix': suffix, 'prefix': prefix,
'encoding': encoding, 'newline': newline,
'dir': dir}
'dir': dir, 'errors': errors}
def _check(self, file):
if self._rolled: return
@@ -662,8 +658,12 @@ class SpooledTemporaryFile:
newfile = self._file = TemporaryFile(**self._TemporaryFileArgs)
del self._TemporaryFileArgs
newfile.write(file.getvalue())
newfile.seek(file.tell(), 0)
pos = file.tell()
if hasattr(newfile, 'buffer'):
newfile.buffer.write(file.detach().getvalue())
else:
newfile.write(file.getvalue())
newfile.seek(pos, 0)
self._rolled = True
@@ -694,12 +694,11 @@ class SpooledTemporaryFile:
@property
def encoding(self):
try:
return self._file.encoding
except AttributeError:
if 'b' in self._TemporaryFileArgs['mode']:
raise
return self._TemporaryFileArgs['encoding']
return self._file.encoding
@property
def errors(self):
return self._file.errors
def fileno(self):
self.rollover()
@@ -727,12 +726,7 @@ class SpooledTemporaryFile:
@property
def newlines(self):
try:
return self._file.newlines
except AttributeError:
if 'b' in self._TemporaryFileArgs['mode']:
raise
return self._TemporaryFileArgs['newline']
return self._file.newlines
def read(self, *args):
return self._file.read(*args)
@@ -744,7 +738,7 @@ class SpooledTemporaryFile:
return self._file.readlines(*args)
def seek(self, *args):
self._file.seek(*args)
return self._file.seek(*args)
@property
def softspace(self):
@@ -792,9 +786,39 @@ class TemporaryDirectory(object):
self, self._cleanup, self.name,
warn_message="Implicitly cleaning up {!r}".format(self))
@classmethod
def _rmtree(cls, name):
def onerror(func, path, exc_info):
if issubclass(exc_info[0], PermissionError):
def resetperms(path):
try:
_os.chflags(path, 0)
except AttributeError:
pass
_os.chmod(path, 0o700)
try:
if path != name:
resetperms(_os.path.dirname(path))
resetperms(path)
try:
_os.unlink(path)
# PermissionError is raised on FreeBSD for directories
except (IsADirectoryError, PermissionError):
cls._rmtree(path)
except FileNotFoundError:
pass
elif issubclass(exc_info[0], FileNotFoundError):
pass
else:
raise
_shutil.rmtree(name, onerror=onerror)
@classmethod
def _cleanup(cls, name, warn_message):
_shutil.rmtree(name)
cls._rmtree(name)
_warnings.warn(warn_message, ResourceWarning)
def __repr__(self):
@@ -808,4 +832,4 @@ class TemporaryDirectory(object):
def cleanup(self):
if self._finalizer.detach():
_shutil.rmtree(self.name)
self._rmtree(self.name)

View File

@@ -3783,6 +3783,7 @@ class TestScandir(unittest.TestCase):
entry_lstat,
os.name == 'nt')
@unittest.skipIf(sys.platform == "linux", "TODO: RUSTPYTHON, flaky test")
def test_attributes(self):
link = hasattr(os, 'link')
symlink = support.can_symlink()

View File

@@ -187,10 +187,6 @@ class ProcessTestCase(BaseTestCase):
stdin=tf)
self.assertIn(b'PEAR', output)
# TODO: RUSTPYTHON
if sys.platform == "win32":
test_check_output_stdin_arg = unittest.expectedFailure(test_check_output_stdin_arg)
def test_check_output_input_arg(self):
# check_output() can be called with input set to a string
output = subprocess.check_output(
@@ -248,10 +244,6 @@ class ProcessTestCase(BaseTestCase):
self.assertIn('stdin', c.exception.args[0])
self.assertIn('input', c.exception.args[0])
# TODO: RUSTPYTHON
if sys.platform == "win32":
test_check_output_stdin_with_input_arg = unittest.expectedFailure(test_check_output_stdin_with_input_arg)
def test_check_output_timeout(self):
# check_output() function with timeout arg
with self.assertRaises(subprocess.TimeoutExpired) as c:
@@ -537,10 +529,6 @@ class ProcessTestCase(BaseTestCase):
p.wait()
self.assertEqual(p.returncode, 1)
# TODO: RUSTPYTHON
if sys.platform == "win32":
test_stdin_filedes = unittest.expectedFailure(test_stdin_filedes)
def test_stdin_fileobj(self):
# stdin is set to open file object
tf = tempfile.TemporaryFile()
@@ -553,10 +541,6 @@ class ProcessTestCase(BaseTestCase):
p.wait()
self.assertEqual(p.returncode, 1)
# TODO: RUSTPYTHON
if sys.platform == "win32":
test_stdin_fileobj = unittest.expectedFailure(test_stdin_fileobj)
def test_stdout_pipe(self):
# stdout redirection
p = subprocess.Popen([sys.executable, "-c",
@@ -577,10 +561,6 @@ class ProcessTestCase(BaseTestCase):
os.lseek(d, 0, 0)
self.assertEqual(os.read(d, 1024), b"orange")
# TODO: RUSTPYTHON
if sys.platform == "win32":
test_stdout_filedes = unittest.expectedFailure(test_stdout_filedes)
def test_stdout_fileobj(self):
# stdout is set to open file object
tf = tempfile.TemporaryFile()
@@ -592,10 +572,6 @@ class ProcessTestCase(BaseTestCase):
tf.seek(0)
self.assertEqual(tf.read(), b"orange")
# TODO: RUSTPYTHON
if sys.platform == "win32":
test_stdout_fileobj = unittest.expectedFailure(test_stdout_fileobj)
def test_stderr_pipe(self):
# stderr redirection
p = subprocess.Popen([sys.executable, "-c",
@@ -616,10 +592,6 @@ class ProcessTestCase(BaseTestCase):
os.lseek(d, 0, 0)
self.assertStderrEqual(os.read(d, 1024), b"strawberry")
# TODO: RUSTPYTHON
if sys.platform == "win32":
test_stderr_filedes = unittest.expectedFailure(test_stderr_filedes)
def test_stderr_fileobj(self):
# stderr is set to open file object
tf = tempfile.TemporaryFile()
@@ -631,10 +603,6 @@ class ProcessTestCase(BaseTestCase):
tf.seek(0)
self.assertStderrEqual(tf.read(), b"strawberry")
# TODO: RUSTPYTHON
if sys.platform == "win32":
test_stderr_fileobj = unittest.expectedFailure(test_stderr_fileobj)
def test_stderr_redirect_with_no_stdout_redirect(self):
# test stderr=STDOUT while stdout=None (not set)
@@ -683,10 +651,6 @@ class ProcessTestCase(BaseTestCase):
tf.seek(0)
self.assertStderrEqual(tf.read(), b"appleorange")
# TODO: RUSTPYTHON
if sys.platform == "win32":
test_stdout_stderr_file = unittest.expectedFailure(test_stdout_stderr_file)
def test_stdout_filedes_of_stdout(self):
# stdout is set to 1 (#1531862).
# To avoid printing the text on stdout, we do something similar to
@@ -1587,10 +1551,6 @@ class RunFuncTestCase(BaseTestCase):
stdin=tf, stdout=subprocess.PIPE)
self.assertIn(b'PEAR', cp.stdout)
# TODO: RUSTPYTHON
if sys.platform == "win32":
test_check_output_stdin_arg = unittest.expectedFailure(test_check_output_stdin_arg)
def test_check_output_input_arg(self):
# check_output() can be called with input set to a string
cp = self.run_python(
@@ -1611,10 +1571,6 @@ class RunFuncTestCase(BaseTestCase):
self.assertIn('stdin', c.exception.args[0])
self.assertIn('input', c.exception.args[0])
# TODO: RUSTPYTHON
if sys.platform == "win32":
test_check_output_stdin_with_input_arg = unittest.expectedFailure(test_check_output_stdin_with_input_arg)
def test_check_output_timeout(self):
with self.assertRaises(subprocess.TimeoutExpired) as c:
cp = self.run_python((
@@ -1685,10 +1641,6 @@ class RunFuncTestCase(BaseTestCase):
self.assertIn('stdout', c.exception.args[0])
self.assertIn('capture_output', c.exception.args[0])
# TODO: RUSTPYTHON
if sys.platform == "win32":
test_stdout_with_capture_output_arg = unittest.expectedFailure(test_stdout_with_capture_output_arg)
def test_stderr_with_capture_output_arg(self):
# run() refuses to accept 'stderr' with 'capture_output'
tf = tempfile.TemporaryFile()
@@ -1701,10 +1653,6 @@ class RunFuncTestCase(BaseTestCase):
self.assertIn('stderr', c.exception.args[0])
self.assertIn('capture_output', c.exception.args[0])
# TODO: RUSTPYTHON
if sys.platform == "win32":
test_stderr_with_capture_output_arg = unittest.expectedFailure(test_stderr_with_capture_output_arg)
# This test _might_ wind up a bit fragile on loaded build+test machines
# as it depends on the timing with wide enough margins for normal situations
# but does assert that it happened "soon enough" to believe the right thing

1529
Lib/test/test_tempfile.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -2871,6 +2871,10 @@ mod nt {
#[pyattr]
use libc::O_BINARY;
#[pyattr]
// use libc::O_TEMPORARY;
const O_TEMPORARY: i32 = 0x40;
#[pyfunction]
pub(super) fn access(path: PyPathLike, mode: u8, vm: &VirtualMachine) -> PyResult<bool> {
use winapi::um::{fileapi, winnt};