Merge pull request #3547 from fanninpm/test-support-os-helper

Align test/support/os_helper with CPython 3.10
This commit is contained in:
Jeong YunWon
2022-02-13 09:36:22 +09:00
committed by GitHub
9 changed files with 43 additions and 165 deletions

View File

@@ -83,9 +83,6 @@ __all__ = [
# io
"record_original_stdout", "get_original_stdout", "captured_stdout",
"captured_stdin", "captured_stderr",
# filesystem
"TESTFN", "SAVEDCWD", "unlink", "rmtree", "temp_cwd", "findfile",
"create_empty_file", "can_symlink", "fs_is_case_insensitive",
# unittest
"is_resource_enabled", "requires", "requires_freebsd_version",
"requires_linux_version", "requires_mac_ver",
@@ -279,135 +276,6 @@ def _force_run(path, func, *args):
os.chmod(path, stat.S_IRWXU)
return func(*args)
if sys.platform.startswith("win"):
def _waitfor(func, pathname, waitall=False):
# Perform the operation
func(pathname)
# Now setup the wait loop
if waitall:
dirname = pathname
else:
dirname, name = os.path.split(pathname)
dirname = dirname or '.'
# Check for `pathname` to be removed from the filesystem.
# The exponential backoff of the timeout amounts to a total
# of ~1 second after which the deletion is probably an error
# anyway.
# Testing on an i7@4.3GHz shows that usually only 1 iteration is
# required when contention occurs.
timeout = 0.001
while timeout < 1.0:
# Note we are only testing for the existence of the file(s) in
# the contents of the directory regardless of any security or
# access rights. If we have made it this far, we have sufficient
# permissions to do that much using Python's equivalent of the
# Windows API FindFirstFile.
# Other Windows APIs can fail or give incorrect results when
# dealing with files that are pending deletion.
L = os.listdir(dirname)
if not (L if waitall else name in L):
return
# Increase the timeout and try again
time.sleep(timeout)
timeout *= 2
warnings.warn('tests may fail, delete still pending for ' + pathname,
RuntimeWarning, stacklevel=4)
def _unlink(filename):
# XXX RUSTPYTHON: on ci, unlink() raises PermissionError when target doesn't exist.
# Might also happen locally, but not sure
if not os.path.exists(filename):
return
_waitfor(os.unlink, filename)
def _rmdir(dirname):
# XXX RUSTPYTHON: on ci, unlink() raises PermissionError when target doesn't exist.
# Might also happen locally, but not sure
if not os.path.exists(dirname):
return
_waitfor(os.rmdir, dirname)
def _rmtree(path):
# XXX RUSTPYTHON: on ci, unlink() raises PermissionError when target doesn't exist.
# Might also happen locally, but not sure
if not os.path.exists(path):
return
def _rmtree_inner(path):
for name in _force_run(path, os.listdir, path):
fullname = os.path.join(path, name)
try:
mode = os.lstat(fullname).st_mode
except OSError as exc:
print("os_helper.rmtree(): os.lstat(%r) failed with %s" % (fullname, exc),
file=sys.__stderr__)
mode = 0
if stat.S_ISDIR(mode):
_waitfor(_rmtree_inner, fullname, waitall=True)
_force_run(fullname, os.rmdir, fullname)
else:
_force_run(fullname, os.unlink, fullname)
_waitfor(_rmtree_inner, path, waitall=True)
_waitfor(lambda p: _force_run(p, os.rmdir, p), path)
def _longpath(path):
try:
import ctypes
except ImportError:
# No ctypes means we can't expands paths.
pass
else:
buffer = ctypes.create_unicode_buffer(len(path) * 2)
length = ctypes.windll.kernel32.GetLongPathNameW(path, buffer,
len(buffer))
if length:
return buffer[:length]
return path
else:
_unlink = os.unlink
_rmdir = os.rmdir
def _rmtree(path):
try:
shutil.rmtree(path)
return
except OSError:
pass
def _rmtree_inner(path):
for name in _force_run(path, os.listdir, path):
fullname = os.path.join(path, name)
try:
mode = os.lstat(fullname).st_mode
except OSError:
mode = 0
if stat.S_ISDIR(mode):
_rmtree_inner(fullname)
_force_run(path, os.rmdir, fullname)
else:
_force_run(path, os.unlink, fullname)
_rmtree_inner(path)
os.rmdir(path)
def _longpath(path):
return path
def unlink(filename):
try:
_unlink(filename)
except (FileNotFoundError, NotADirectoryError):
pass
def rmdir(dirname):
try:
_rmdir(dirname)
except FileNotFoundError:
pass
def rmtree(path):
try:
_rmtree(path)
except FileNotFoundError:
pass
# Check whether a gui is actually available
def _is_gui_available():
@@ -2245,6 +2113,7 @@ def skip_unless_bind_unix_socket(test):
except OSError as e:
_bind_nix_socket_error = e
finally:
from .os_helper import unlink
unlink(path)
if _bind_nix_socket_error:
msg = 'Requires a functional unix bind(): %s' % _bind_nix_socket_error

View File

@@ -302,7 +302,7 @@ if sys.platform.startswith("win"):
try:
mode = os.lstat(fullname).st_mode
except OSError as exc:
print("os_helper.rmtree(): os.lstat(%r) failed with %s"
print("support.rmtree(): os.lstat(%r) failed with %s"
% (fullname, exc),
file=sys.__stderr__)
mode = 0
@@ -479,6 +479,17 @@ def create_empty_file(filename):
os.close(fd)
@contextlib.contextmanager
def open_dir_fd(path):
"""Open a file descriptor to a directory."""
assert os.path.isdir(path)
dir_fd = os.open(path, os.O_RDONLY)
try:
yield dir_fd
finally:
os.close(dir_fd)
def fs_is_case_insensitive(directory):
"""Detects if the file system for the specified directory
is case-insensitive."""
@@ -623,10 +634,6 @@ class EnvironmentVarGuard(collections.abc.MutableMapping):
def unset(self, envvar):
del self[envvar]
def copy(self):
# We do what os.environ.copy() does.
return dict(self)
def __enter__(self):
return self

View File

@@ -8,7 +8,6 @@ import stat
import sys
import unittest
import test.support
from test.support import os_helper
import _osx_support
@@ -40,9 +39,9 @@ class Test_OSXSupport(unittest.TestCase):
if self.env['PATH']:
self.env['PATH'] = self.env['PATH'] + ':'
self.env['PATH'] = self.env['PATH'] + os.path.abspath(self.temp_path_dir)
test.support.unlink(self.prog_name)
os_helper.unlink(self.prog_name)
self.assertIsNone(_osx_support._find_executable(self.prog_name))
self.addCleanup(test.support.unlink, self.prog_name)
self.addCleanup(os_helper.unlink, self.prog_name)
with open(self.prog_name, 'w') as f:
f.write("#!/bin/sh\n/bin/echo OK\n")
os.chmod(self.prog_name, stat.S_IRWXU)
@@ -53,8 +52,8 @@ class Test_OSXSupport(unittest.TestCase):
if self.env['PATH']:
self.env['PATH'] = self.env['PATH'] + ':'
self.env['PATH'] = self.env['PATH'] + os.path.abspath(self.temp_path_dir)
test.support.unlink(self.prog_name)
self.addCleanup(test.support.unlink, self.prog_name)
os_helper.unlink(self.prog_name)
self.addCleanup(os_helper.unlink, self.prog_name)
with open(self.prog_name, 'w') as f:
f.write("#!/bin/sh\n/bin/echo ExpectedOutput\n")
os.chmod(self.prog_name, stat.S_IRWXU)
@@ -144,8 +143,8 @@ class Test_OSXSupport(unittest.TestCase):
suffix = (':' + self.env['PATH']) if self.env['PATH'] else ''
self.env['PATH'] = os.path.abspath(self.temp_path_dir) + suffix
for c_name, c_output in compilers:
test.support.unlink(c_name)
self.addCleanup(test.support.unlink, c_name)
os_helper.unlink(c_name)
self.addCleanup(os_helper.unlink, c_name)
with open(c_name, 'w') as f:
f.write("#!/bin/sh\n/bin/echo " + c_output)
os.chmod(c_name, stat.S_IRWXU)
@@ -224,8 +223,8 @@ class Test_OSXSupport(unittest.TestCase):
suffix = (':' + self.env['PATH']) if self.env['PATH'] else ''
self.env['PATH'] = os.path.abspath(self.temp_path_dir) + suffix
c_name = 'clang'
test.support.unlink(c_name)
self.addCleanup(test.support.unlink, c_name)
os_helper.unlink(c_name)
self.addCleanup(os_helper.unlink, c_name)
# exit status 255 means no PPC support in this compiler chain
with open(c_name, 'w') as f:
f.write("#!/bin/sh\nexit 255")

View File

@@ -7,7 +7,6 @@ import os
import sys
import unittest
import warnings
from test import support
from test.support import os_helper
from test.support.script_helper import assert_python_ok
from test.support.os_helper import FakePath
@@ -186,7 +185,7 @@ class GenericTest:
self.assertIs(self.pathmodule.isdir(filename), True)
self.assertIs(self.pathmodule.isdir(bfilename), True)
finally:
support.rmdir(filename)
os_helper.rmdir(filename)
def test_isfile(self):
filename = os_helper.TESTFN
@@ -211,7 +210,7 @@ class GenericTest:
self.assertIs(self.pathmodule.isfile(filename), False)
self.assertIs(self.pathmodule.isfile(bfilename), False)
finally:
support.rmdir(filename)
os_helper.rmdir(filename)
def test_samefile(self):
file1 = os_helper.TESTFN
@@ -526,7 +525,8 @@ class CommonTest(GenericTest):
def test_join_errors(self):
# Check join() raises friendly TypeErrors.
with support.check_warnings(('', BytesWarning), quiet=True):
from .support import check_warnings
with check_warnings(('', BytesWarning), quiet=True):
errmsg = "Can't mix strings and bytes in path components"
with self.assertRaisesRegex(TypeError, errmsg):
self.pathmodule.join(b'bytes', 'str')
@@ -546,7 +546,8 @@ class CommonTest(GenericTest):
def test_relpath_errors(self):
# Check relpath() raises friendly TypeErrors.
with support.check_warnings(('', (BytesWarning, DeprecationWarning)),
from .support import check_warnings
with check_warnings(('', (BytesWarning, DeprecationWarning)),
quiet=True):
errmsg = "Can't mix strings and bytes in path components"
with self.assertRaisesRegex(TypeError, errmsg):

View File

@@ -3,10 +3,10 @@ import os
import sys
import unittest
import warnings
from test.support import TestFailed
from test.support import os_helper
from test.support import TestFailed
from test.support.os_helper import FakePath
from test import support, test_genericpath
from test import test_genericpath
from tempfile import TemporaryFile

View File

@@ -11,8 +11,8 @@ import tempfile
import unittest
from unittest import mock
from test import support
from test.support import os_helper, import_helper
from test.support import import_helper
from test.support import os_helper
from test.support.os_helper import TESTFN, FakePath
try:
@@ -1525,7 +1525,8 @@ class _BasePathTest(object):
# resolves to 'dirB/..' first before resolving to parent of dirB.
self._check_resolve_relative(p, P(BASE, 'foo', 'in', 'spam'), False)
# Now create absolute symlinks.
d = support._longpath(tempfile.mkdtemp(suffix='-dirD', dir=os.getcwd()))
d = os_helper._longpath(tempfile.mkdtemp(suffix='-dirD',
dir=os.getcwd()))
self.addCleanup(os_helper.rmtree, d)
os.symlink(os.path.join(d), join('dirA', 'linkX'))
os.symlink(join('dirB'), os.path.join(d, 'linkY'))

View File

@@ -306,7 +306,7 @@ class ErrorHandlerTest(unittest.TestCase):
KeyboardInterrupt are not passed."""
def tearDown(self):
test.os_helper.unlink(test.os_helper.TESTFN)
os_helper.unlink(os_helper.TESTFN)
def test_sync_handled(self):
BaseErrorTestServer(ValueError)
@@ -336,7 +336,7 @@ class ErrorHandlerTest(unittest.TestCase):
self.check_result(handled=False)
def check_result(self, handled):
with open(test.os_helper.TESTFN) as log:
with open(os_helper.TESTFN) as log:
expected = 'Handler called\n' + 'Error handled\n' * handled
self.assertEqual(log.read(), expected)
@@ -354,7 +354,7 @@ class BaseErrorTestServer(socketserver.TCPServer):
self.wait_done()
def handle_error(self, request, client_address):
with open(test.os_helper.TESTFN, 'a') as log:
with open(os_helper.TESTFN, 'a') as log:
log.write('Error handled\n')
def wait_done(self):
@@ -363,7 +363,7 @@ class BaseErrorTestServer(socketserver.TCPServer):
class BadHandler(socketserver.BaseRequestHandler):
def handle(self):
with open(test.os_helper.TESTFN, 'a') as log:
with open(os_helper.TESTFN, 'a') as log:
log.write('Handler called\n')
raise self.server.exception('Test error')

View File

@@ -1183,7 +1183,7 @@ class WriteTest(WriteTestBase, unittest.TestCase):
finally:
tar.close()
finally:
support.rmdir(path)
os_helper.rmdir(path)
# mock the following:
# os.listdir: so we know that files are in the wrong order
@@ -1207,7 +1207,7 @@ class WriteTest(WriteTestBase, unittest.TestCase):
finally:
os_helper.unlink(os.path.join(path, "1"))
os_helper.unlink(os.path.join(path, "2"))
support.rmdir(path)
os_helper.rmdir(path)
def test_gettarinfo_pathlike_name(self):
with tarfile.open(tmpname, self.mode) as tar:
@@ -1348,7 +1348,7 @@ class WriteTest(WriteTestBase, unittest.TestCase):
if not dir:
os_helper.unlink(foo)
else:
support.rmdir(foo)
os_helper.rmdir(foo)
self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/"))

View File

@@ -15,7 +15,8 @@ from unittest import mock
import unittest
from test import support
from test.support import script_helper, os_helper
from test.support import os_helper
from test.support import script_helper
has_textmode = (tempfile._text_openflags != tempfile._bin_openflags)
@@ -608,7 +609,7 @@ class TestGetTempDir(BaseTestCase):
self.assertEqual(tempfile.gettempdir(), case_sensitive_tempdir)
finally:
tempfile.tempdir = _tempdir
support.rmdir(case_sensitive_tempdir)
os_helper.rmdir(case_sensitive_tempdir)
class TestMkstemp(BaseTestCase):