From ce75d5c6a2b695221a26fb3f4aa1f72d0178486a Mon Sep 17 00:00:00 2001 From: Dean Li Date: Wed, 10 Nov 2021 23:29:00 +0800 Subject: [PATCH] test: os use os_helper --- Lib/test/test_os.py | 1163 ++++++++++++++++++++++++++++++------------- 1 file changed, 811 insertions(+), 352 deletions(-) diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py index a58abef6e..c66045f4b 100644 --- a/Lib/test/test_os.py +++ b/Lib/test/test_os.py @@ -2,8 +2,6 @@ # does add tests for a few functions which have been determined to be more # portable than they had been thought to be. -import asynchat -import asyncore import codecs import contextlib import decimal @@ -12,25 +10,41 @@ import fnmatch import fractions import itertools import locale +# TODO: RUSTPYTHON # import mmap import os import pickle +import select import shutil import signal import socket import stat +import struct import subprocess import sys import sysconfig import tempfile import threading import time +import types import unittest import uuid import warnings from test import support +# TODO: RUSTPYTHON +# from test.support import import_helper +from test.support import os_helper +# TODO: RUSTPYTHON +# from test.support import socket_helper +# from test.support import threading_helper +# from test.support import warnings_helper from platform import win32_is_iot +with warnings.catch_warnings(): + warnings.simplefilter('ignore', DeprecationWarning) + import asynchat + import asyncore + try: import resource except ImportError: @@ -53,8 +67,10 @@ try: except ImportError: INT_MAX = PY_SSIZE_T_MAX = sys.maxsize + from test.support.script_helper import assert_python_ok -from test.support import unix_shell, FakePath +from test.support import unix_shell +from test.support.os_helper import FakePath root_in_posix = False @@ -83,6 +99,11 @@ def create_file(filename, content=b'content'): fp.write(content) +# bpo-41625: On AIX, splice() only works with a socket, not with a pipe. +requires_splice_pipe = unittest.skipIf(sys.platform.startswith("aix"), + 'on AIX, splice() only accepts sockets') + + class MiscTests(unittest.TestCase): def test_getcwd(self): cwd = os.getcwd() @@ -101,12 +122,16 @@ class MiscTests(unittest.TestCase): # than MAX_PATH if long paths support is disabled: # see RtlAreLongPathsEnabled(). min_len = 2000 # characters + # On VxWorks, PATH_MAX is defined as 1024 bytes. Creating a path + # longer than PATH_MAX will fail. + if sys.platform == 'vxworks': + min_len = 1000 dirlen = 200 # characters dirname = 'python_test_dir_' dirname = dirname + ('a' * (dirlen - len(dirname))) with tempfile.TemporaryDirectory() as tmpdir: - with support.change_cwd(tmpdir) as path: + with os_helper.change_cwd(tmpdir) as path: expected = path while True: @@ -150,17 +175,17 @@ class MiscTests(unittest.TestCase): # Tests creating TESTFN class FileTests(unittest.TestCase): def setUp(self): - if os.path.lexists(support.TESTFN): - os.unlink(support.TESTFN) + if os.path.lexists(os_helper.TESTFN): + os.unlink(os_helper.TESTFN) tearDown = setUp def test_access(self): - f = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR) + f = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR) os.close(f) - self.assertTrue(os.access(support.TESTFN, os.W_OK)) + self.assertTrue(os.access(os_helper.TESTFN, os.W_OK)) def test_closerange(self): - first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR) + first = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR) # We must allocate two consecutive file descriptors, otherwise # it will mess up other file descriptors (perhaps even the three # standard ones). @@ -182,14 +207,14 @@ class FileTests(unittest.TestCase): @support.cpython_only def test_rename(self): - path = support.TESTFN + path = os_helper.TESTFN old = sys.getrefcount(path) self.assertRaises(TypeError, os.rename, path, 0) new = sys.getrefcount(path) self.assertEqual(old, new) def test_read(self): - with open(support.TESTFN, "w+b") as fobj: + with open(os_helper.TESTFN, "w+b") as fobj: fobj.write(b"spam") fobj.flush() fd = fobj.fileno() @@ -205,12 +230,12 @@ class FileTests(unittest.TestCase): "needs INT_MAX < PY_SSIZE_T_MAX") @support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False) def test_large_read(self, size): - self.addCleanup(support.unlink, support.TESTFN) - create_file(support.TESTFN, b'test') + self.addCleanup(os_helper.unlink, os_helper.TESTFN) + create_file(os_helper.TESTFN, b'test') # Issue #21932: Make sure that os.read() does not raise an # OverflowError for size larger than INT_MAX - with open(support.TESTFN, "rb") as fp: + with open(os_helper.TESTFN, "rb") as fp: data = os.read(fp.fileno(), size) # The test does not try to read more than 2 GiB at once because the @@ -219,13 +244,13 @@ class FileTests(unittest.TestCase): def test_write(self): # os.write() accepts bytes- and buffer-like objects but not strings - fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY) + fd = os.open(os_helper.TESTFN, os.O_CREAT | os.O_WRONLY) self.assertRaises(TypeError, os.write, fd, "beans") os.write(fd, b"bacon\n") os.write(fd, bytearray(b"eggs\n")) os.write(fd, memoryview(b"spam\n")) os.close(fd) - with open(support.TESTFN, "rb") as fobj: + with open(os_helper.TESTFN, "rb") as fobj: self.assertEqual(fobj.read().splitlines(), [b"bacon", b"eggs", b"spam"]) @@ -249,12 +274,12 @@ class FileTests(unittest.TestCase): self.write_windows_console(sys.executable, "-u", "-c", code) def fdopen_helper(self, *args): - fd = os.open(support.TESTFN, os.O_RDONLY) - f = os.fdopen(fd, *args) + fd = os.open(os_helper.TESTFN, os.O_RDONLY) + f = os.fdopen(fd, *args, encoding="utf-8") f.close() def test_fdopen(self): - fd = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR) + fd = os.open(os_helper.TESTFN, os.O_CREAT|os.O_RDWR) os.close(fd) self.fdopen_helper() @@ -262,16 +287,16 @@ class FileTests(unittest.TestCase): self.fdopen_helper('r', 100) def test_replace(self): - TESTFN2 = support.TESTFN + ".2" - self.addCleanup(support.unlink, support.TESTFN) - self.addCleanup(support.unlink, TESTFN2) + TESTFN2 = os_helper.TESTFN + ".2" + self.addCleanup(os_helper.unlink, os_helper.TESTFN) + self.addCleanup(os_helper.unlink, TESTFN2) - create_file(support.TESTFN, b"1") + create_file(os_helper.TESTFN, b"1") create_file(TESTFN2, b"2") - os.replace(support.TESTFN, TESTFN2) - self.assertRaises(FileNotFoundError, os.stat, support.TESTFN) - with open(TESTFN2, 'r') as f: + os.replace(os_helper.TESTFN, TESTFN2) + self.assertRaises(FileNotFoundError, os.stat, os_helper.TESTFN) + with open(TESTFN2, 'r', encoding='utf-8') as f: self.assertEqual(f.read(), "1") def test_open_keywords(self): @@ -282,7 +307,7 @@ class FileTests(unittest.TestCase): def test_symlink_keywords(self): symlink = support.get_attribute(os, "symlink") try: - symlink(src='target', dst=support.TESTFN, + symlink(src='target', dst=os_helper.TESTFN, target_is_directory=False, dir_fd=None) except (NotImplementedError, OSError): pass # No OS support or unprivileged user @@ -294,18 +319,18 @@ class FileTests(unittest.TestCase): @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()') def test_copy_file_range(self): - TESTFN2 = support.TESTFN + ".3" + TESTFN2 = os_helper.TESTFN + ".3" data = b'0123456789' - create_file(support.TESTFN, data) - self.addCleanup(support.unlink, support.TESTFN) + create_file(os_helper.TESTFN, data) + self.addCleanup(os_helper.unlink, os_helper.TESTFN) - in_file = open(support.TESTFN, 'rb') + in_file = open(os_helper.TESTFN, 'rb') self.addCleanup(in_file.close) in_fd = in_file.fileno() out_file = open(TESTFN2, 'w+b') - self.addCleanup(support.unlink, TESTFN2) + self.addCleanup(os_helper.unlink, TESTFN2) self.addCleanup(out_file.close) out_fd = out_file.fileno() @@ -328,21 +353,21 @@ class FileTests(unittest.TestCase): @unittest.skipUnless(hasattr(os, 'copy_file_range'), 'test needs os.copy_file_range()') def test_copy_file_range_offset(self): - TESTFN4 = support.TESTFN + ".4" + TESTFN4 = os_helper.TESTFN + ".4" data = b'0123456789' bytes_to_copy = 6 in_skip = 3 out_seek = 5 - create_file(support.TESTFN, data) - self.addCleanup(support.unlink, support.TESTFN) + create_file(os_helper.TESTFN, data) + self.addCleanup(os_helper.unlink, os_helper.TESTFN) - in_file = open(support.TESTFN, 'rb') + in_file = open(os_helper.TESTFN, 'rb') self.addCleanup(in_file.close) in_fd = in_file.fileno() out_file = open(TESTFN4, 'w+b') - self.addCleanup(support.unlink, TESTFN4) + self.addCleanup(os_helper.unlink, TESTFN4) self.addCleanup(out_file.close) out_fd = out_file.fileno() @@ -371,11 +396,131 @@ class FileTests(unittest.TestCase): self.assertEqual(read[out_seek:], data[in_skip:in_skip+i]) + @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()') + def test_splice_invalid_values(self): + with self.assertRaises(ValueError): + os.splice(0, 1, -10) + + @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()') + @requires_splice_pipe + def test_splice(self): + TESTFN2 = os_helper.TESTFN + ".3" + data = b'0123456789' + + create_file(os_helper.TESTFN, data) + self.addCleanup(os_helper.unlink, os_helper.TESTFN) + + in_file = open(os_helper.TESTFN, 'rb') + self.addCleanup(in_file.close) + in_fd = in_file.fileno() + + read_fd, write_fd = os.pipe() + self.addCleanup(lambda: os.close(read_fd)) + self.addCleanup(lambda: os.close(write_fd)) + + try: + i = os.splice(in_fd, write_fd, 5) + except OSError as e: + # Handle the case in which Python was compiled + # in a system with the syscall but without support + # in the kernel. + if e.errno != errno.ENOSYS: + raise + self.skipTest(e) + else: + # The number of copied bytes can be less than + # the number of bytes originally requested. + self.assertIn(i, range(0, 6)); + + self.assertEqual(os.read(read_fd, 100), data[:i]) + + @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()') + @requires_splice_pipe + def test_splice_offset_in(self): + TESTFN4 = os_helper.TESTFN + ".4" + data = b'0123456789' + bytes_to_copy = 6 + in_skip = 3 + + create_file(os_helper.TESTFN, data) + self.addCleanup(os_helper.unlink, os_helper.TESTFN) + + in_file = open(os_helper.TESTFN, 'rb') + self.addCleanup(in_file.close) + in_fd = in_file.fileno() + + read_fd, write_fd = os.pipe() + self.addCleanup(lambda: os.close(read_fd)) + self.addCleanup(lambda: os.close(write_fd)) + + try: + i = os.splice(in_fd, write_fd, bytes_to_copy, offset_src=in_skip) + except OSError as e: + # Handle the case in which Python was compiled + # in a system with the syscall but without support + # in the kernel. + if e.errno != errno.ENOSYS: + raise + self.skipTest(e) + else: + # The number of copied bytes can be less than + # the number of bytes originally requested. + self.assertIn(i, range(0, bytes_to_copy+1)); + + read = os.read(read_fd, 100) + # 012 are skipped (in_skip) + # 345678 are copied in the file (in_skip + bytes_to_copy) + self.assertEqual(read, data[in_skip:in_skip+i]) + + @unittest.skipUnless(hasattr(os, 'splice'), 'test needs os.splice()') + @requires_splice_pipe + def test_splice_offset_out(self): + TESTFN4 = os_helper.TESTFN + ".4" + data = b'0123456789' + bytes_to_copy = 6 + out_seek = 3 + + create_file(os_helper.TESTFN, data) + self.addCleanup(os_helper.unlink, os_helper.TESTFN) + + read_fd, write_fd = os.pipe() + self.addCleanup(lambda: os.close(read_fd)) + self.addCleanup(lambda: os.close(write_fd)) + os.write(write_fd, data) + + out_file = open(TESTFN4, 'w+b') + self.addCleanup(os_helper.unlink, TESTFN4) + self.addCleanup(out_file.close) + out_fd = out_file.fileno() + + try: + i = os.splice(read_fd, out_fd, bytes_to_copy, offset_dst=out_seek) + except OSError as e: + # Handle the case in which Python was compiled + # in a system with the syscall but without support + # in the kernel. + if e.errno != errno.ENOSYS: + raise + self.skipTest(e) + else: + # The number of copied bytes can be less than + # the number of bytes originally requested. + self.assertIn(i, range(0, bytes_to_copy+1)); + + with open(TESTFN4, 'rb') as in_file: + read = in_file.read() + # seeked bytes (5) are zero'ed + self.assertEqual(read[:out_seek], b'\x00'*out_seek) + # 012 are skipped (in_skip) + # 345678 are copied in the file (in_skip + bytes_to_copy) + self.assertEqual(read[out_seek:], data[:i]) + + # Test attributes on return values from os.*stat* family. class StatAttributeTests(unittest.TestCase): def setUp(self): - self.fname = support.TESTFN - self.addCleanup(support.unlink, self.fname) + self.fname = os_helper.TESTFN + self.addCleanup(os_helper.unlink, self.fname) create_file(self.fname, b"ABC") def check_stat_attributes(self, fname): @@ -560,7 +705,7 @@ class StatAttributeTests(unittest.TestCase): 0) # test directory st_file_attributes (FILE_ATTRIBUTE_DIRECTORY set) - dirname = support.TESTFN + "dir" + dirname = os_helper.TESTFN + "dir" os.mkdir(dirname) self.addCleanup(os.rmdir, dirname) @@ -577,7 +722,7 @@ class StatAttributeTests(unittest.TestCase): # os.environ['TEMP'] should be located on a volume that # supports file ACLs. fname = os.path.join(os.environ['TEMP'], self.fname) - self.addCleanup(support.unlink, fname) + self.addCleanup(os_helper.unlink, fname) create_file(fname, b'ABC') # Deny the right to [S]YNCHRONIZE on the file to # force CreateFile to fail with ERROR_ACCESS_DENIED. @@ -602,10 +747,10 @@ class StatAttributeTests(unittest.TestCase): class UtimeTests(unittest.TestCase): def setUp(self): - self.dirname = support.TESTFN + self.dirname = os_helper.TESTFN self.fname = os.path.join(self.dirname, "f1") - self.addCleanup(support.rmtree, self.dirname) + self.addCleanup(os_helper.rmtree, self.dirname) os.mkdir(self.dirname) create_file(self.fname) @@ -824,185 +969,311 @@ class UtimeTests(unittest.TestCase): from test import mapping_tests -# TODO: RUSTPYTHON (KeyError: 'surrogateescape') -# class EnvironTests(mapping_tests.BasicTestMappingProtocol): -# """check that os.environ object conform to mapping protocol""" -# type2test = None +class EnvironTests(mapping_tests.BasicTestMappingProtocol): + """check that os.environ object conform to mapping protocol""" + type2test = None -# def setUp(self): -# self.__save = dict(os.environ) -# if os.supports_bytes_environ: -# self.__saveb = dict(os.environb) -# for key, value in self._reference().items(): -# os.environ[key] = value + def setUp(self): + self.__save = dict(os.environ) + if os.supports_bytes_environ: + self.__saveb = dict(os.environb) + for key, value in self._reference().items(): + os.environ[key] = value -# def tearDown(self): -# os.environ.clear() -# os.environ.update(self.__save) -# if os.supports_bytes_environ: -# os.environb.clear() -# os.environb.update(self.__saveb) + def tearDown(self): + os.environ.clear() + os.environ.update(self.__save) + if os.supports_bytes_environ: + os.environb.clear() + os.environb.update(self.__saveb) -# def _reference(self): -# return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"} + def _reference(self): + return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"} -# def _empty_mapping(self): -# os.environ.clear() -# return os.environ + def _empty_mapping(self): + os.environ.clear() + return os.environ -# # Bug 1110478 -# @unittest.skipUnless(unix_shell and os.path.exists(unix_shell), -# 'requires a shell') -# def test_update2(self): -# os.environ.clear() -# os.environ.update(HELLO="World") -# with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen: -# value = popen.read().strip() -# self.assertEqual(value, "World") + # Bug 1110478 + @unittest.skipUnless(unix_shell and os.path.exists(unix_shell), + 'requires a shell') + @unittest.skipUnless(hasattr(os, 'popen'), "needs os.popen()") + def test_update2(self): + os.environ.clear() + os.environ.update(HELLO="World") + with os.popen("%s -c 'echo $HELLO'" % unix_shell) as popen: + value = popen.read().strip() + self.assertEqual(value, "World") -# @unittest.skipUnless(unix_shell and os.path.exists(unix_shell), -# 'requires a shell') -# def test_os_popen_iter(self): -# with os.popen("%s -c 'echo \"line1\nline2\nline3\"'" -# % unix_shell) as popen: -# it = iter(popen) -# self.assertEqual(next(it), "line1\n") -# self.assertEqual(next(it), "line2\n") -# self.assertEqual(next(it), "line3\n") -# self.assertRaises(StopIteration, next, it) + @unittest.skipUnless(unix_shell and os.path.exists(unix_shell), + 'requires a shell') + @unittest.skipUnless(hasattr(os, 'popen'), "needs os.popen()") + def test_os_popen_iter(self): + with os.popen("%s -c 'echo \"line1\nline2\nline3\"'" + % unix_shell) as popen: + it = iter(popen) + self.assertEqual(next(it), "line1\n") + self.assertEqual(next(it), "line2\n") + self.assertEqual(next(it), "line3\n") + self.assertRaises(StopIteration, next, it) -# # Verify environ keys and values from the OS are of the -# # correct str type. -# def test_keyvalue_types(self): -# for key, val in os.environ.items(): -# self.assertEqual(type(key), str) -# self.assertEqual(type(val), str) + # Verify environ keys and values from the OS are of the + # correct str type. + def test_keyvalue_types(self): + for key, val in os.environ.items(): + self.assertEqual(type(key), str) + self.assertEqual(type(val), str) -# def test_items(self): -# for key, value in self._reference().items(): -# self.assertEqual(os.environ.get(key), value) + def test_items(self): + for key, value in self._reference().items(): + self.assertEqual(os.environ.get(key), value) -# # Issue 7310 -# def test___repr__(self): -# """Check that the repr() of os.environ looks like environ({...}).""" -# env = os.environ -# self.assertEqual(repr(env), 'environ({{{}}})'.format(', '.join( -# '{!r}: {!r}'.format(key, value) -# for key, value in env.items()))) + # Issue 7310 + def test___repr__(self): + """Check that the repr() of os.environ looks like environ({...}).""" + env = os.environ + formatted_items = ", ".join( + f"{key!r}: {value!r}" + for key, value in env.items() + ) + self.assertEqual(repr(env), f"environ({{{formatted_items}}})") -# def test_get_exec_path(self): -# defpath_list = os.defpath.split(os.pathsep) -# test_path = ['/monty', '/python', '', '/flying/circus'] -# test_env = {'PATH': os.pathsep.join(test_path)} + def test_get_exec_path(self): + defpath_list = os.defpath.split(os.pathsep) + test_path = ['/monty', '/python', '', '/flying/circus'] + test_env = {'PATH': os.pathsep.join(test_path)} -# saved_environ = os.environ -# try: -# os.environ = dict(test_env) -# # Test that defaulting to os.environ works. -# self.assertSequenceEqual(test_path, os.get_exec_path()) -# self.assertSequenceEqual(test_path, os.get_exec_path(env=None)) -# finally: -# os.environ = saved_environ + saved_environ = os.environ + try: + os.environ = dict(test_env) + # Test that defaulting to os.environ works. + self.assertSequenceEqual(test_path, os.get_exec_path()) + self.assertSequenceEqual(test_path, os.get_exec_path(env=None)) + finally: + os.environ = saved_environ -# # No PATH environment variable -# self.assertSequenceEqual(defpath_list, os.get_exec_path({})) -# # Empty PATH environment variable -# self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''})) -# # Supplied PATH environment variable -# self.assertSequenceEqual(test_path, os.get_exec_path(test_env)) + # No PATH environment variable + self.assertSequenceEqual(defpath_list, os.get_exec_path({})) + # Empty PATH environment variable + self.assertSequenceEqual(('',), os.get_exec_path({'PATH':''})) + # Supplied PATH environment variable + self.assertSequenceEqual(test_path, os.get_exec_path(test_env)) -# if os.supports_bytes_environ: -# # env cannot contain 'PATH' and b'PATH' keys -# try: -# # ignore BytesWarning warning -# with warnings.catch_warnings(record=True): -# mixed_env = {'PATH': '1', b'PATH': b'2'} -# except BytesWarning: -# # mixed_env cannot be created with python -bb -# pass -# else: -# self.assertRaises(ValueError, os.get_exec_path, mixed_env) + if os.supports_bytes_environ: + # env cannot contain 'PATH' and b'PATH' keys + try: + # ignore BytesWarning warning + with warnings.catch_warnings(record=True): + mixed_env = {'PATH': '1', b'PATH': b'2'} + except BytesWarning: + # mixed_env cannot be created with python -bb + pass + else: + self.assertRaises(ValueError, os.get_exec_path, mixed_env) -# # bytes key and/or value -# self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}), -# ['abc']) -# self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}), -# ['abc']) -# self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}), -# ['abc']) + # bytes key and/or value + self.assertSequenceEqual(os.get_exec_path({b'PATH': b'abc'}), + ['abc']) + self.assertSequenceEqual(os.get_exec_path({b'PATH': 'abc'}), + ['abc']) + self.assertSequenceEqual(os.get_exec_path({'PATH': b'abc'}), + ['abc']) -# @unittest.skipUnless(os.supports_bytes_environ, -# "os.environb required for this test.") -# def test_environb(self): -# # os.environ -> os.environb -# value = 'euro\u20ac' -# try: -# value_bytes = value.encode(sys.getfilesystemencoding(), -# 'surrogateescape') -# except UnicodeEncodeError: -# msg = "U+20AC character is not encodable to %s" % ( -# sys.getfilesystemencoding(),) -# self.skipTest(msg) -# os.environ['unicode'] = value -# self.assertEqual(os.environ['unicode'], value) -# self.assertEqual(os.environb[b'unicode'], value_bytes) + @unittest.skipUnless(os.supports_bytes_environ, + "os.environb required for this test.") + # TODO: RUSTPYTHON (KeyError: 'surrogateescape') + @unittest.expectedFailure + def test_environb(self): + # os.environ -> os.environb + value = 'euro\u20ac' + try: + value_bytes = value.encode(sys.getfilesystemencoding(), + 'surrogateescape') + except UnicodeEncodeError: + msg = "U+20AC character is not encodable to %s" % ( + sys.getfilesystemencoding(),) + self.skipTest(msg) + os.environ['unicode'] = value + self.assertEqual(os.environ['unicode'], value) + self.assertEqual(os.environb[b'unicode'], value_bytes) -# # os.environb -> os.environ -# value = b'\xff' -# os.environb[b'bytes'] = value -# self.assertEqual(os.environb[b'bytes'], value) -# value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape') -# self.assertEqual(os.environ['bytes'], value_str) + # os.environb -> os.environ + value = b'\xff' + os.environb[b'bytes'] = value + self.assertEqual(os.environb[b'bytes'], value) + value_str = value.decode(sys.getfilesystemencoding(), 'surrogateescape') + self.assertEqual(os.environ['bytes'], value_str) -# # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415). -# @support.requires_mac_ver(10, 6) -# def test_unset_error(self): -# if sys.platform == "win32": -# # an environment variable is limited to 32,767 characters -# key = 'x' * 50000 -# self.assertRaises(ValueError, os.environ.__delitem__, key) -# else: -# # "=" is not allowed in a variable name -# key = 'key=' -# self.assertRaises(OSError, os.environ.__delitem__, key) + def test_putenv_unsetenv(self): + name = "PYTHONTESTVAR" + value = "testvalue" + code = f'import os; print(repr(os.environ.get({name!r})))' -# def test_key_type(self): -# missing = 'missingkey' -# self.assertNotIn(missing, os.environ) + with os_helper.EnvironmentVarGuard() as env: + env.pop(name, None) -# with self.assertRaises(KeyError) as cm: -# os.environ[missing] -# self.assertIs(cm.exception.args[0], missing) -# self.assertTrue(cm.exception.__suppress_context__) + os.putenv(name, value) + proc = subprocess.run([sys.executable, '-c', code], check=True, + stdout=subprocess.PIPE, text=True) + self.assertEqual(proc.stdout.rstrip(), repr(value)) -# with self.assertRaises(KeyError) as cm: -# del os.environ[missing] -# self.assertIs(cm.exception.args[0], missing) -# self.assertTrue(cm.exception.__suppress_context__) + os.unsetenv(name) + proc = subprocess.run([sys.executable, '-c', code], check=True, + stdout=subprocess.PIPE, text=True) + self.assertEqual(proc.stdout.rstrip(), repr(None)) -# def _test_environ_iteration(self, collection): -# iterator = iter(collection) -# new_key = "__new_key__" + # On OS X < 10.6, unsetenv() doesn't return a value (bpo-13415). + @support.requires_mac_ver(10, 6) + def test_putenv_unsetenv_error(self): + # Empty variable name is invalid. + # "=" and null character are not allowed in a variable name. + for name in ('', '=name', 'na=me', 'name=', 'name\0', 'na\0me'): + self.assertRaises((OSError, ValueError), os.putenv, name, "value") + self.assertRaises((OSError, ValueError), os.unsetenv, name) -# next(iterator) # start iteration over os.environ.items + if sys.platform == "win32": + # On Windows, an environment variable string ("name=value" string) + # is limited to 32,767 characters + longstr = 'x' * 32_768 + self.assertRaises(ValueError, os.putenv, longstr, "1") + self.assertRaises(ValueError, os.putenv, "X", longstr) + self.assertRaises(ValueError, os.unsetenv, longstr) -# # add a new key in os.environ mapping -# os.environ[new_key] = "test_environ_iteration" + def test_key_type(self): + missing = 'missingkey' + self.assertNotIn(missing, os.environ) -# try: -# next(iterator) # force iteration over modified mapping -# self.assertEqual(os.environ[new_key], "test_environ_iteration") -# finally: -# del os.environ[new_key] + with self.assertRaises(KeyError) as cm: + os.environ[missing] + self.assertIs(cm.exception.args[0], missing) + self.assertTrue(cm.exception.__suppress_context__) -# def test_iter_error_when_changing_os_environ(self): -# self._test_environ_iteration(os.environ) + with self.assertRaises(KeyError) as cm: + del os.environ[missing] + self.assertIs(cm.exception.args[0], missing) + self.assertTrue(cm.exception.__suppress_context__) -# def test_iter_error_when_changing_os_environ_items(self): -# self._test_environ_iteration(os.environ.items()) + def _test_environ_iteration(self, collection): + iterator = iter(collection) + new_key = "__new_key__" -# def test_iter_error_when_changing_os_environ_values(self): -# self._test_environ_iteration(os.environ.values()) + next(iterator) # start iteration over os.environ.items + + # add a new key in os.environ mapping + os.environ[new_key] = "test_environ_iteration" + + try: + next(iterator) # force iteration over modified mapping + self.assertEqual(os.environ[new_key], "test_environ_iteration") + finally: + del os.environ[new_key] + + def test_iter_error_when_changing_os_environ(self): + self._test_environ_iteration(os.environ) + + def test_iter_error_when_changing_os_environ_items(self): + self._test_environ_iteration(os.environ.items()) + + def test_iter_error_when_changing_os_environ_values(self): + self._test_environ_iteration(os.environ.values()) + + def _test_underlying_process_env(self, var, expected): + if not (unix_shell and os.path.exists(unix_shell)): + return + + with os.popen(f"{unix_shell} -c 'echo ${var}'") as popen: + value = popen.read().strip() + + self.assertEqual(expected, value) + + # TODO: RUSTPYTHON (KeyError: 'surrogateescape') + @unittest.expectedFailure + def test_or_operator(self): + overridden_key = '_TEST_VAR_' + original_value = 'original_value' + os.environ[overridden_key] = original_value + + new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'} + expected = dict(os.environ) + expected.update(new_vars_dict) + + actual = os.environ | new_vars_dict + self.assertDictEqual(expected, actual) + self.assertEqual('3', actual[overridden_key]) + + new_vars_items = new_vars_dict.items() + self.assertIs(NotImplemented, os.environ.__or__(new_vars_items)) + + self._test_underlying_process_env('_A_', '') + self._test_underlying_process_env(overridden_key, original_value) + + # TODO: RUSTPYTHON (KeyError: 'surrogateescape') + @unittest.expectedFailure + def test_ior_operator(self): + overridden_key = '_TEST_VAR_' + os.environ[overridden_key] = 'original_value' + + new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'} + expected = dict(os.environ) + expected.update(new_vars_dict) + + os.environ |= new_vars_dict + self.assertEqual(expected, os.environ) + self.assertEqual('3', os.environ[overridden_key]) + + self._test_underlying_process_env('_A_', '1') + self._test_underlying_process_env(overridden_key, '3') + + def test_ior_operator_invalid_dicts(self): + os_environ_copy = os.environ.copy() + with self.assertRaises(TypeError): + dict_with_bad_key = {1: '_A_'} + os.environ |= dict_with_bad_key + + with self.assertRaises(TypeError): + dict_with_bad_val = {'_A_': 1} + os.environ |= dict_with_bad_val + + # Check nothing was added. + self.assertEqual(os_environ_copy, os.environ) + + # TODO: RUSTPYTHON (KeyError: 'surrogateescape') + @unittest.expectedFailure + def test_ior_operator_key_value_iterable(self): + overridden_key = '_TEST_VAR_' + os.environ[overridden_key] = 'original_value' + + new_vars_items = (('_A_', '1'), ('_B_', '2'), (overridden_key, '3')) + expected = dict(os.environ) + expected.update(new_vars_items) + + os.environ |= new_vars_items + self.assertEqual(expected, os.environ) + self.assertEqual('3', os.environ[overridden_key]) + + self._test_underlying_process_env('_A_', '1') + self._test_underlying_process_env(overridden_key, '3') + + # TODO: RUSTPYTHON (KeyError: 'surrogateescape') + @unittest.expectedFailure + def test_ror_operator(self): + overridden_key = '_TEST_VAR_' + original_value = 'original_value' + os.environ[overridden_key] = original_value + + new_vars_dict = {'_A_': '1', '_B_': '2', overridden_key: '3'} + expected = dict(new_vars_dict) + expected.update(os.environ) + + actual = new_vars_dict | os.environ + self.assertDictEqual(expected, actual) + self.assertEqual(original_value, actual[overridden_key]) + + new_vars_items = new_vars_dict.items() + self.assertIs(NotImplemented, os.environ.__ror__(new_vars_items)) + + self._test_underlying_process_env('_A_', '') + self._test_underlying_process_env(overridden_key, original_value) class WalkTests(unittest.TestCase): @@ -1017,7 +1288,7 @@ class WalkTests(unittest.TestCase): def setUp(self): join = os.path.join - self.addCleanup(support.rmtree, support.TESTFN) + self.addCleanup(os_helper.rmtree, os_helper.TESTFN) # Build: # TESTFN/ @@ -1036,7 +1307,7 @@ class WalkTests(unittest.TestCase): # broken_link3 # TEST2/ # tmp4 a lone file - self.walk_path = join(support.TESTFN, "TEST1") + self.walk_path = join(os_helper.TESTFN, "TEST1") self.sub1_path = join(self.walk_path, "SUB1") self.sub11_path = join(self.sub1_path, "SUB11") sub2_path = join(self.walk_path, "SUB2") @@ -1046,8 +1317,8 @@ class WalkTests(unittest.TestCase): tmp3_path = join(sub2_path, "tmp3") tmp5_path = join(sub21_path, "tmp3") self.link_path = join(sub2_path, "link") - t2_path = join(support.TESTFN, "TEST2") - tmp4_path = join(support.TESTFN, "TEST2", "tmp4") + t2_path = join(os_helper.TESTFN, "TEST2") + tmp4_path = join(os_helper.TESTFN, "TEST2", "tmp4") broken_link_path = join(sub2_path, "broken_link") broken_link2_path = join(sub2_path, "broken_link2") broken_link3_path = join(sub2_path, "broken_link3") @@ -1059,10 +1330,10 @@ class WalkTests(unittest.TestCase): os.makedirs(t2_path) for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path: - with open(path, "x") as f: + with open(path, "x", encoding='utf-8') as f: f.write("I'm " + path + " and proud of it. Blame test_os.\n") - if support.can_symlink(): + if os_helper.can_symlink(): os.symlink(os.path.abspath(t2_path), self.link_path) os.symlink('broken', broken_link_path, True) os.symlink(join('tmp3', 'broken'), broken_link2_path, True) @@ -1145,7 +1416,7 @@ class WalkTests(unittest.TestCase): self.sub2_tree) def test_walk_symlink(self): - if not support.can_symlink(): + if not os_helper.can_symlink(): self.skipTest("need symlink support") # Walk, following symlinks. @@ -1181,7 +1452,7 @@ class WalkTests(unittest.TestCase): def test_walk_many_open_files(self): depth = 30 - base = os.path.join(support.TESTFN, 'deep') + base = os.path.join(os_helper.TESTFN, 'deep') p = os.path.join(base, *(['d']*depth)) os.makedirs(p) @@ -1231,13 +1502,13 @@ class FwalkTests(WalkTests): self.assertEqual(expected[root], (set(dirs), set(files))) def test_compare_to_walk(self): - kwargs = {'top': support.TESTFN} + kwargs = {'top': os_helper.TESTFN} self._compare_to_walk(kwargs, kwargs) def test_dir_fd(self): try: fd = os.open(".", os.O_RDONLY) - walk_kwargs = {'top': support.TESTFN} + walk_kwargs = {'top': os_helper.TESTFN} fwalk_kwargs = walk_kwargs.copy() fwalk_kwargs['dir_fd'] = fd self._compare_to_walk(walk_kwargs, fwalk_kwargs) @@ -1247,7 +1518,7 @@ class FwalkTests(WalkTests): def test_yields_correct_dir_fd(self): # check returned file descriptors for topdown, follow_symlinks in itertools.product((True, False), repeat=2): - args = support.TESTFN, topdown, None + args = os_helper.TESTFN, topdown, None for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks): # check that the FD is valid os.fstat(rootfd) @@ -1263,7 +1534,7 @@ class FwalkTests(WalkTests): minfd = os.dup(1) os.close(minfd) for i in range(256): - for x in self.fwalk(support.TESTFN): + for x in self.fwalk(os_helper.TESTFN): pass newfd = os.dup(1) self.addCleanup(os.close, newfd) @@ -1321,10 +1592,10 @@ class BytesFwalkTests(FwalkTests): class MakedirTests(unittest.TestCase): def setUp(self): - os.mkdir(support.TESTFN) + os.mkdir(os_helper.TESTFN) def test_makedir(self): - base = support.TESTFN + base = os_helper.TESTFN path = os.path.join(base, 'dir1', 'dir2', 'dir3') os.makedirs(path) # Should work path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4') @@ -1339,8 +1610,8 @@ class MakedirTests(unittest.TestCase): os.makedirs(path) def test_mode(self): - with support.temp_umask(0o002): - base = support.TESTFN + with os_helper.temp_umask(0o002): + base = os_helper.TESTFN parent = os.path.join(base, 'dir1') path = os.path.join(parent, 'dir2') os.makedirs(path, 0o555) @@ -1351,7 +1622,7 @@ class MakedirTests(unittest.TestCase): self.assertEqual(os.stat(parent).st_mode & 0o777, 0o775) def test_exist_ok_existing_directory(self): - path = os.path.join(support.TESTFN, 'dir1') + path = os.path.join(os_helper.TESTFN, 'dir1') mode = 0o777 old_mask = os.umask(0o022) os.makedirs(path, mode) @@ -1365,18 +1636,18 @@ class MakedirTests(unittest.TestCase): os.makedirs(os.path.abspath('/'), exist_ok=True) def test_exist_ok_s_isgid_directory(self): - path = os.path.join(support.TESTFN, 'dir1') + path = os.path.join(os_helper.TESTFN, 'dir1') S_ISGID = stat.S_ISGID mode = 0o777 old_mask = os.umask(0o022) try: existing_testfn_mode = stat.S_IMODE( - os.lstat(support.TESTFN).st_mode) + os.lstat(os_helper.TESTFN).st_mode) try: - os.chmod(support.TESTFN, existing_testfn_mode | S_ISGID) + os.chmod(os_helper.TESTFN, existing_testfn_mode | S_ISGID) except PermissionError: raise unittest.SkipTest('Cannot set S_ISGID for dir.') - if (os.lstat(support.TESTFN).st_mode & S_ISGID != S_ISGID): + if (os.lstat(os_helper.TESTFN).st_mode & S_ISGID != S_ISGID): raise unittest.SkipTest('No support for S_ISGID dir mode.') # The os should apply S_ISGID from the parent dir for us, but # this test need not depend on that behavior. Be explicit. @@ -1392,9 +1663,9 @@ class MakedirTests(unittest.TestCase): os.umask(old_mask) def test_exist_ok_existing_regular_file(self): - base = support.TESTFN - path = os.path.join(support.TESTFN, 'dir1') - with open(path, 'w') as f: + base = os_helper.TESTFN + path = os.path.join(os_helper.TESTFN, 'dir1') + with open(path, 'w', encoding='utf-8') as f: f.write('abc') self.assertRaises(OSError, os.makedirs, path) self.assertRaises(OSError, os.makedirs, path, exist_ok=False) @@ -1402,12 +1673,12 @@ class MakedirTests(unittest.TestCase): os.remove(path) def tearDown(self): - path = os.path.join(support.TESTFN, 'dir1', 'dir2', 'dir3', + path = os.path.join(os_helper.TESTFN, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', 'dir6') # If the tests failed, the bottom-most directory ('../dir6') # may not have been created, so we look for the outermost directory # that exists. - while not os.path.exists(path) and path != support.TESTFN: + while not os.path.exists(path) and path != os_helper.TESTFN: path = os.path.dirname(path) os.removedirs(path) @@ -1418,17 +1689,17 @@ class ChownFileTests(unittest.TestCase): @classmethod def setUpClass(cls): - os.mkdir(support.TESTFN) + os.mkdir(os_helper.TESTFN) def test_chown_uid_gid_arguments_must_be_index(self): - stat = os.stat(support.TESTFN) + stat = os.stat(os_helper.TESTFN) uid = stat.st_uid gid = stat.st_gid for value in (-1.0, -1j, decimal.Decimal(-1), fractions.Fraction(-2, 2)): - self.assertRaises(TypeError, os.chown, support.TESTFN, value, gid) - self.assertRaises(TypeError, os.chown, support.TESTFN, uid, value) - self.assertIsNone(os.chown(support.TESTFN, uid, gid)) - self.assertIsNone(os.chown(support.TESTFN, -1, -1)) + self.assertRaises(TypeError, os.chown, os_helper.TESTFN, value, gid) + self.assertRaises(TypeError, os.chown, os_helper.TESTFN, uid, value) + self.assertIsNone(os.chown(os_helper.TESTFN, uid, gid)) + self.assertIsNone(os.chown(os_helper.TESTFN, -1, -1)) @unittest.skipUnless(hasattr(os, 'getgroups'), 'need os.getgroups') def test_chown_gid(self): @@ -1437,61 +1708,61 @@ class ChownFileTests(unittest.TestCase): self.skipTest("test needs at least 2 groups") gid_1, gid_2 = groups[:2] - uid = os.stat(support.TESTFN).st_uid + uid = os.stat(os_helper.TESTFN).st_uid - os.chown(support.TESTFN, uid, gid_1) - gid = os.stat(support.TESTFN).st_gid + os.chown(os_helper.TESTFN, uid, gid_1) + gid = os.stat(os_helper.TESTFN).st_gid self.assertEqual(gid, gid_1) - os.chown(support.TESTFN, uid, gid_2) - gid = os.stat(support.TESTFN).st_gid + os.chown(os_helper.TESTFN, uid, gid_2) + gid = os.stat(os_helper.TESTFN).st_gid self.assertEqual(gid, gid_2) @unittest.skipUnless(root_in_posix and len(all_users) > 1, "test needs root privilege and more than one user") def test_chown_with_root(self): uid_1, uid_2 = all_users[:2] - gid = os.stat(support.TESTFN).st_gid - os.chown(support.TESTFN, uid_1, gid) - uid = os.stat(support.TESTFN).st_uid + gid = os.stat(os_helper.TESTFN).st_gid + os.chown(os_helper.TESTFN, uid_1, gid) + uid = os.stat(os_helper.TESTFN).st_uid self.assertEqual(uid, uid_1) - os.chown(support.TESTFN, uid_2, gid) - uid = os.stat(support.TESTFN).st_uid + os.chown(os_helper.TESTFN, uid_2, gid) + uid = os.stat(os_helper.TESTFN).st_uid self.assertEqual(uid, uid_2) @unittest.skipUnless(not root_in_posix and len(all_users) > 1, "test needs non-root account and more than one user") def test_chown_without_permission(self): uid_1, uid_2 = all_users[:2] - gid = os.stat(support.TESTFN).st_gid + gid = os.stat(os_helper.TESTFN).st_gid with self.assertRaises(PermissionError): - os.chown(support.TESTFN, uid_1, gid) - os.chown(support.TESTFN, uid_2, gid) + os.chown(os_helper.TESTFN, uid_1, gid) + os.chown(os_helper.TESTFN, uid_2, gid) @classmethod def tearDownClass(cls): - os.rmdir(support.TESTFN) + os.rmdir(os_helper.TESTFN) class RemoveDirsTests(unittest.TestCase): def setUp(self): - os.makedirs(support.TESTFN) + os.makedirs(os_helper.TESTFN) def tearDown(self): - support.rmtree(support.TESTFN) + os_helper.rmtree(os_helper.TESTFN) def test_remove_all(self): - dira = os.path.join(support.TESTFN, 'dira') + dira = os.path.join(os_helper.TESTFN, 'dira') os.mkdir(dira) dirb = os.path.join(dira, 'dirb') os.mkdir(dirb) os.removedirs(dirb) self.assertFalse(os.path.exists(dirb)) self.assertFalse(os.path.exists(dira)) - self.assertFalse(os.path.exists(support.TESTFN)) + self.assertFalse(os.path.exists(os_helper.TESTFN)) def test_remove_partial(self): - dira = os.path.join(support.TESTFN, 'dira') + dira = os.path.join(os_helper.TESTFN, 'dira') os.mkdir(dira) dirb = os.path.join(dira, 'dirb') os.mkdir(dirb) @@ -1499,10 +1770,10 @@ class RemoveDirsTests(unittest.TestCase): os.removedirs(dirb) self.assertFalse(os.path.exists(dirb)) self.assertTrue(os.path.exists(dira)) - self.assertTrue(os.path.exists(support.TESTFN)) + self.assertTrue(os.path.exists(os_helper.TESTFN)) def test_remove_nothing(self): - dira = os.path.join(support.TESTFN, 'dira') + dira = os.path.join(os_helper.TESTFN, 'dira') os.mkdir(dira) dirb = os.path.join(dira, 'dirb') os.mkdir(dirb) @@ -1511,7 +1782,7 @@ class RemoveDirsTests(unittest.TestCase): os.removedirs(dirb) self.assertTrue(os.path.exists(dirb)) self.assertTrue(os.path.exists(dira)) - self.assertTrue(os.path.exists(support.TESTFN)) + self.assertTrue(os.path.exists(os_helper.TESTFN)) class DevNullTests(unittest.TestCase): @@ -1649,8 +1920,8 @@ class URandomFDTests(unittest.TestCase): def test_urandom_fd_reopened(self): # Issue #21207: urandom() should detect its fd to /dev/urandom # changed to something else, and reopen it. - self.addCleanup(support.unlink, support.TESTFN) - create_file(support.TESTFN, b"x" * 256) + self.addCleanup(os_helper.unlink, os_helper.TESTFN) + create_file(os_helper.TESTFN, b"x" * 256) code = """if 1: import os @@ -1676,7 +1947,7 @@ class URandomFDTests(unittest.TestCase): os.dup2(new_fd, fd) sys.stdout.buffer.write(os.urandom(4)) sys.stdout.buffer.write(os.urandom(4)) - """.format(TESTFN=support.TESTFN) + """.format(TESTFN=os_helper.TESTFN) rc, out, err = assert_python_ok('-Sc', code) self.assertEqual(len(out), 8) self.assertNotEqual(out[0:4], out[4:8]) @@ -1828,40 +2099,40 @@ class ExecTests(unittest.TestCase): class Win32ErrorTests(unittest.TestCase): def setUp(self): try: - os.stat(support.TESTFN) + os.stat(os_helper.TESTFN) except FileNotFoundError: exists = False except OSError as exc: exists = True self.fail("file %s must not exist; os.stat failed with %s" - % (support.TESTFN, exc)) + % (os_helper.TESTFN, exc)) else: - self.fail("file %s must not exist" % support.TESTFN) + self.fail("file %s must not exist" % os_helper.TESTFN) def test_rename(self): - self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak") + self.assertRaises(OSError, os.rename, os_helper.TESTFN, os_helper.TESTFN+".bak") def test_remove(self): - self.assertRaises(OSError, os.remove, support.TESTFN) + self.assertRaises(OSError, os.remove, os_helper.TESTFN) def test_chdir(self): - self.assertRaises(OSError, os.chdir, support.TESTFN) + self.assertRaises(OSError, os.chdir, os_helper.TESTFN) def test_mkdir(self): - self.addCleanup(support.unlink, support.TESTFN) + self.addCleanup(os_helper.unlink, os_helper.TESTFN) - with open(support.TESTFN, "x") as f: - self.assertRaises(OSError, os.mkdir, support.TESTFN) + with open(os_helper.TESTFN, "x") as f: + self.assertRaises(OSError, os.mkdir, os_helper.TESTFN) def test_utime(self): - self.assertRaises(OSError, os.utime, support.TESTFN, None) + self.assertRaises(OSError, os.utime, os_helper.TESTFN, None) def test_chmod(self): - self.assertRaises(OSError, os.chmod, support.TESTFN, 0) + self.assertRaises(OSError, os.chmod, os_helper.TESTFN, 0) class TestInvalidFD(unittest.TestCase): - singles = ["fchdir", "dup", "fdopen", "fdatasync", "fstat", + singles = ["fchdir", "dup", "fdatasync", "fstat", "fstatvfs", "fsync", "tcgetpgrp", "ttyname"] #singles.append("close") #We omit close because it doesn't raise an exception on some platforms @@ -1869,28 +2140,29 @@ class TestInvalidFD(unittest.TestCase): def helper(self): if hasattr(os, f): self.check(getattr(os, f)) - - return helper for f in singles: locals()["test_"+f] = get_single(f) - def check(self, f, *args): + def check(self, f, *args, **kwargs): try: - f(support.make_bad_fd(), *args) + f(os_helper.make_bad_fd(), *args, **kwargs) except OSError as e: self.assertEqual(e.errno, errno.EBADF) else: self.fail("%r didn't raise an OSError with a bad file descriptor" % f) + def test_fdopen(self): + self.check(os.fdopen, encoding="utf-8") + @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()') def test_isatty(self): - self.assertEqual(os.isatty(support.make_bad_fd()), False) + self.assertEqual(os.isatty(os_helper.make_bad_fd()), False) @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()') def test_closerange(self): - fd = support.make_bad_fd() + fd = os_helper.make_bad_fd() # Make sure none of the descriptors we are about to close are # currently valid (issue 6542). for i in range(10): @@ -1964,8 +2236,8 @@ class TestInvalidFD(unittest.TestCase): class LinkTests(unittest.TestCase): def setUp(self): - self.file1 = support.TESTFN - self.file2 = os.path.join(support.TESTFN + "2") + self.file1 = os_helper.TESTFN + self.file2 = os.path.join(os_helper.TESTFN + "2") def tearDown(self): for file in (self.file1, self.file2): @@ -1979,7 +2251,7 @@ class LinkTests(unittest.TestCase): os.link(file1, file2) except PermissionError as e: self.skipTest('os.link(): %s' % e) - with open(file1, "r") as f1, open(file2, "r") as f2: + with open(file1, "rb") as f1, open(file2, "rb") as f2: self.assertTrue(os.path.sameopenfile(f1.fileno(), f2.fileno())) def test_link(self): @@ -2070,12 +2342,12 @@ class PosixUidGidTests(unittest.TestCase): @unittest.skipIf(sys.platform == "win32", "Posix specific tests") class Pep383Tests(unittest.TestCase): def setUp(self): - if support.TESTFN_UNENCODABLE: - self.dir = support.TESTFN_UNENCODABLE - elif support.TESTFN_NONASCII: - self.dir = support.TESTFN_NONASCII + if os_helper.TESTFN_UNENCODABLE: + self.dir = os_helper.TESTFN_UNENCODABLE + elif os_helper.TESTFN_NONASCII: + self.dir = os_helper.TESTFN_NONASCII else: - self.dir = support.TESTFN + self.dir = os_helper.TESTFN self.bdir = os.fsencode(self.dir) bytesfn = [] @@ -2085,11 +2357,11 @@ class Pep383Tests(unittest.TestCase): except UnicodeEncodeError: return bytesfn.append(fn) - add_filename(support.TESTFN_UNICODE) - if support.TESTFN_UNENCODABLE: - add_filename(support.TESTFN_UNENCODABLE) - if support.TESTFN_NONASCII: - add_filename(support.TESTFN_NONASCII) + add_filename(os_helper.TESTFN_UNICODE) + if os_helper.TESTFN_UNENCODABLE: + add_filename(os_helper.TESTFN_UNENCODABLE) + if os_helper.TESTFN_NONASCII: + add_filename(os_helper.TESTFN_NONASCII) if not bytesfn: self.skipTest("couldn't create any non-ascii filename") @@ -2097,7 +2369,7 @@ class Pep383Tests(unittest.TestCase): os.mkdir(self.dir) try: for fn in bytesfn: - support.create_empty_file(os.path.join(self.bdir, fn)) + os_helper.create_empty_file(os.path.join(self.bdir, fn)) fn = os.fsdecode(fn) if fn in self.unicodefn: raise ValueError("duplicate filename") @@ -2265,41 +2537,41 @@ class Win32ListdirTests(unittest.TestCase): self.created_paths = [] for i in range(2): dir_name = 'SUB%d' % i - dir_path = os.path.join(support.TESTFN, dir_name) + dir_path = os.path.join(os_helper.TESTFN, dir_name) file_name = 'FILE%d' % i - file_path = os.path.join(support.TESTFN, file_name) + file_path = os.path.join(os_helper.TESTFN, file_name) os.makedirs(dir_path) - with open(file_path, 'w') as f: + with open(file_path, 'w', encoding='utf-8') as f: f.write("I'm %s and proud of it. Blame test_os.\n" % file_path) self.created_paths.extend([dir_name, file_name]) self.created_paths.sort() def tearDown(self): - shutil.rmtree(support.TESTFN) + shutil.rmtree(os_helper.TESTFN) def test_listdir_no_extended_path(self): """Test when the path is not an "extended" path.""" # unicode self.assertEqual( - sorted(os.listdir(support.TESTFN)), + sorted(os.listdir(os_helper.TESTFN)), self.created_paths) # bytes self.assertEqual( - sorted(os.listdir(os.fsencode(support.TESTFN))), + sorted(os.listdir(os.fsencode(os_helper.TESTFN))), [os.fsencode(path) for path in self.created_paths]) def test_listdir_extended_path(self): """Test when the path starts with '\\\\?\\'.""" # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath # unicode - path = '\\\\?\\' + os.path.abspath(support.TESTFN) + path = '\\\\?\\' + os.path.abspath(os_helper.TESTFN) self.assertEqual( sorted(os.listdir(path)), self.created_paths) # bytes - path = b'\\\\?\\' + os.fsencode(os.path.abspath(support.TESTFN)) + path = b'\\\\?\\' + os.fsencode(os.path.abspath(os_helper.TESTFN)) self.assertEqual( sorted(os.listdir(path)), [os.fsencode(path) for path in self.created_paths]) @@ -2342,32 +2614,32 @@ class ReadlinkTests(unittest.TestCase): self.assertRaises(FileNotFoundError, os.readlink, FakePath('missing-link')) - @support.skip_unless_symlink + @os_helper.skip_unless_symlink def test_pathlike(self): os.symlink(self.filelink_target, self.filelink) - self.addCleanup(support.unlink, self.filelink) + self.addCleanup(os_helper.unlink, self.filelink) filelink = FakePath(self.filelink) self.assertPathEqual(os.readlink(filelink), self.filelink_target) - @support.skip_unless_symlink + @os_helper.skip_unless_symlink def test_pathlike_bytes(self): os.symlink(self.filelinkb_target, self.filelinkb) - self.addCleanup(support.unlink, self.filelinkb) + self.addCleanup(os_helper.unlink, self.filelinkb) path = os.readlink(FakePath(self.filelinkb)) self.assertPathEqual(path, self.filelinkb_target) self.assertIsInstance(path, bytes) - @support.skip_unless_symlink + @os_helper.skip_unless_symlink def test_bytes(self): os.symlink(self.filelinkb_target, self.filelinkb) - self.addCleanup(support.unlink, self.filelinkb) + self.addCleanup(os_helper.unlink, self.filelinkb) path = os.readlink(self.filelinkb) self.assertPathEqual(path, self.filelinkb_target) self.assertIsInstance(path, bytes) @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") -@support.skip_unless_symlink +@os_helper.skip_unless_symlink class Win32SymlinkTests(unittest.TestCase): filelink = 'filelinktest' filelink_target = os.path.abspath(__file__) @@ -2438,10 +2710,10 @@ class Win32SymlinkTests(unittest.TestCase): self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link)) def test_12084(self): - level1 = os.path.abspath(support.TESTFN) + level1 = os.path.abspath(os_helper.TESTFN) level2 = os.path.join(level1, "level2") level3 = os.path.join(level2, "level3") - self.addCleanup(support.rmtree, level1) + self.addCleanup(os_helper.rmtree, level1) os.mkdir(level1) os.mkdir(level2) @@ -2627,7 +2899,7 @@ class Win32NtTests(unittest.TestCase): self.assertEqual(0, handle_delta) -@support.skip_unless_symlink +@os_helper.skip_unless_symlink class NonLocalSymlinkTests(unittest.TestCase): def setUp(self): @@ -2676,6 +2948,7 @@ class FSEncodingTests(unittest.TestCase): class DeviceEncodingTests(unittest.TestCase): + # TODO: RUSTPYTHON (AttributeError: module 'os' has no attribute 'device_encoding') @unittest.expectedFailure def test_bad_fd(self): @@ -2701,7 +2974,7 @@ class PidTests(unittest.TestCase): # We are the parent of our subprocess self.assertEqual(int(stdout), os.getpid()) - def check_waitpid(self, code, exitcode): + def check_waitpid(self, code, exitcode, callback=None): if sys.platform == 'win32': # On Windows, os.spawnv() simply joins arguments with spaces: # arguments need to be quoted @@ -2710,12 +2983,13 @@ class PidTests(unittest.TestCase): args = [sys.executable, '-c', code] pid = os.spawnv(os.P_NOWAIT, sys.executable, args) + if callback is not None: + callback(pid) + + # don't use support.wait_process() to test directly os.waitpid() + # and os.waitstatus_to_exitcode() pid2, status = os.waitpid(pid, 0) - if sys.platform == 'win32': - self.assertEqual(status, exitcode << 8) - else: - self.assertTrue(os.WIFEXITED(status), status) - self.assertEqual(os.WEXITSTATUS(status), exitcode) + self.assertEqual(os.waitstatus_to_exitcode(status), exitcode) self.assertEqual(pid2, pid) # TODO: RUSTPYTHON (AttributeError: module 'os' has no attribute 'spawnv') @@ -2725,25 +2999,55 @@ class PidTests(unittest.TestCase): # TODO: RUSTPYTHON (AttributeError: module 'os' has no attribute 'spawnv') @unittest.expectedFailure - def test_waitpid_exitcode(self): + def test_waitstatus_to_exitcode(self): exitcode = 23 code = f'import sys; sys.exit({exitcode})' self.check_waitpid(code, exitcode=exitcode) + with self.assertRaises(TypeError): + os.waitstatus_to_exitcode(0.0) + @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test') def test_waitpid_windows(self): - # bpo-40138: test os.waitpid() with exit code larger than INT_MAX. + # bpo-40138: test os.waitpid() and os.waitstatus_to_exitcode() + # with exit code larger than INT_MAX. STATUS_CONTROL_C_EXIT = 0xC000013A code = f'import _winapi; _winapi.ExitProcess({STATUS_CONTROL_C_EXIT})' self.check_waitpid(code, exitcode=STATUS_CONTROL_C_EXIT) + @unittest.skipUnless(sys.platform == 'win32', 'win32-specific test') + def test_waitstatus_to_exitcode_windows(self): + max_exitcode = 2 ** 32 - 1 + for exitcode in (0, 1, 5, max_exitcode): + self.assertEqual(os.waitstatus_to_exitcode(exitcode << 8), + exitcode) + + # invalid values + with self.assertRaises(ValueError): + os.waitstatus_to_exitcode((max_exitcode + 1) << 8) + with self.assertRaises(OverflowError): + os.waitstatus_to_exitcode(-1) + + # TODO: RUSTPYTHON (AttributeError: module 'os' has no attribute 'spawnv') + @unittest.expectedFailure + # Skip the test on Windows + @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'need signal.SIGKILL') + def test_waitstatus_to_exitcode_kill(self): + code = f'import time; time.sleep({support.LONG_TIMEOUT})' + signum = signal.SIGKILL + + def kill_process(pid): + os.kill(pid, signum) + + self.check_waitpid(code, exitcode=-signum, callback=kill_process) + class SpawnTests(unittest.TestCase): def create_args(self, *, with_env=False, use_bytes=False): self.exitcode = 17 - filename = support.TESTFN - self.addCleanup(support.unlink, filename) + filename = os_helper.TESTFN + self.addCleanup(os_helper.unlink, filename) if not with_env: code = 'import sys; sys.exit(%s)' % self.exitcode @@ -2756,7 +3060,7 @@ class SpawnTests(unittest.TestCase): code = ('import sys, os; magic = os.environ[%r]; sys.exit(%s)' % (self.key, self.exitcode)) - with open(filename, "w") as fp: + with open(filename, "w", encoding="utf-8") as fp: fp.write(code) args = [sys.executable, filename] @@ -2797,6 +3101,10 @@ class SpawnTests(unittest.TestCase): exitcode = os.spawnv(os.P_WAIT, args[0], args) self.assertEqual(exitcode, self.exitcode) + # Test for PyUnicode_FSConverter() + exitcode = os.spawnv(os.P_WAIT, FakePath(args[0]), args) + self.assertEqual(exitcode, self.exitcode) + @requires_os_func('spawnve') def test_spawnve(self): args = self.create_args(with_env=True) @@ -2819,14 +3127,7 @@ class SpawnTests(unittest.TestCase): def test_nowait(self): args = self.create_args() pid = os.spawnv(os.P_NOWAIT, args[0], args) - result = os.waitpid(pid, 0) - self.assertEqual(result[0], pid) - status = result[1] - if hasattr(os, 'WIFEXITED'): - self.assertTrue(os.WIFEXITED(status)) - self.assertEqual(os.WEXITSTATUS(status), self.exitcode) - else: - self.assertEqual(status, self.exitcode << 8) + support.wait_process(pid, exitcode=self.exitcode) @requires_os_func('spawnve') def test_spawnve_bytes(self): @@ -2897,9 +3198,9 @@ class SpawnTests(unittest.TestCase): self.assertEqual(exitcode, 127) # equal character in the environment variable value - filename = support.TESTFN - self.addCleanup(support.unlink, filename) - with open(filename, "w") as fp: + filename = os_helper.TESTFN + self.addCleanup(os_helper.unlink, filename) + with open(filename, "w", encoding="utf-8") as fp: fp.write('import sys, os\n' 'if os.getenv("FRUIT") != "orange=lemon":\n' ' raise AssertionError') @@ -3053,12 +3354,12 @@ class TestSendfile(unittest.TestCase): @classmethod def setUpClass(cls): cls.key = support.threading_setup() - create_file(support.TESTFN, cls.DATA) + create_file(os_helper.TESTFN, cls.DATA) @classmethod def tearDownClass(cls): support.threading_cleanup(*cls.key) - support.unlink(support.TESTFN) + os_helper.unlink(os_helper.TESTFN) def setUp(self): self.server = SendfileTestServer((support.HOST, 0)) @@ -3069,7 +3370,7 @@ class TestSendfile(unittest.TestCase): # synchronize by waiting for "220 ready" response self.client.recv(1024) self.sockno = self.client.fileno() - self.file = open(support.TESTFN, 'rb') + self.file = open(os_helper.TESTFN, 'rb') self.fileno = self.file.fileno() def tearDown(self): @@ -3201,10 +3502,10 @@ class TestSendfile(unittest.TestCase): @requires_headers_trailers def test_trailers(self): - TESTFN2 = support.TESTFN + "2" + TESTFN2 = os_helper.TESTFN + "2" file_data = b"abcdef" - self.addCleanup(support.unlink, TESTFN2) + self.addCleanup(os_helper.unlink, TESTFN2) create_file(TESTFN2, file_data) with open(TESTFN2, 'rb') as f: @@ -3268,8 +3569,8 @@ def supports_extended_attributes(): class ExtendedAttributeTests(unittest.TestCase): def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs): - fn = support.TESTFN - self.addCleanup(support.unlink, fn) + fn = os_helper.TESTFN + self.addCleanup(os_helper.unlink, fn) create_file(fn) with self.assertRaises(OSError) as cm: @@ -3317,10 +3618,10 @@ class ExtendedAttributeTests(unittest.TestCase): def _check_xattrs(self, *args, **kwargs): self._check_xattrs_str(str, *args, **kwargs) - support.unlink(support.TESTFN) + os_helper.unlink(os_helper.TESTFN) self._check_xattrs_str(os.fsencode, *args, **kwargs) - support.unlink(support.TESTFN) + os_helper.unlink(os_helper.TESTFN) def test_simple(self): self._check_xattrs(os.getxattr, os.setxattr, os.removexattr, @@ -3374,7 +3675,11 @@ class TermsizeTests(unittest.TestCase): should work too. """ try: - size = subprocess.check_output(['stty', 'size']).decode().split() + size = ( + subprocess.check_output( + ["stty", "size"], stderr=subprocess.DEVNULL, text=True + ).split() + ) except (FileNotFoundError, subprocess.CalledProcessError, PermissionError): self.skipTest("stty invocation failed") @@ -3408,6 +3713,89 @@ class MemfdCreateTests(unittest.TestCase): self.assertFalse(os.get_inheritable(fd2)) +@unittest.skipUnless(hasattr(os, 'eventfd'), 'requires os.eventfd') +@support.requires_linux_version(2, 6, 30) +class EventfdTests(unittest.TestCase): + def test_eventfd_initval(self): + def pack(value): + """Pack as native uint64_t + """ + return struct.pack("@Q", value) + size = 8 # read/write 8 bytes + initval = 42 + fd = os.eventfd(initval) + self.assertNotEqual(fd, -1) + self.addCleanup(os.close, fd) + self.assertFalse(os.get_inheritable(fd)) + + # test with raw read/write + res = os.read(fd, size) + self.assertEqual(res, pack(initval)) + + os.write(fd, pack(23)) + res = os.read(fd, size) + self.assertEqual(res, pack(23)) + + os.write(fd, pack(40)) + os.write(fd, pack(2)) + res = os.read(fd, size) + self.assertEqual(res, pack(42)) + + # test with eventfd_read/eventfd_write + os.eventfd_write(fd, 20) + os.eventfd_write(fd, 3) + res = os.eventfd_read(fd) + self.assertEqual(res, 23) + + def test_eventfd_semaphore(self): + initval = 2 + flags = os.EFD_CLOEXEC | os.EFD_SEMAPHORE | os.EFD_NONBLOCK + fd = os.eventfd(initval, flags) + self.assertNotEqual(fd, -1) + self.addCleanup(os.close, fd) + + # semaphore starts has initval 2, two reads return '1' + res = os.eventfd_read(fd) + self.assertEqual(res, 1) + res = os.eventfd_read(fd) + self.assertEqual(res, 1) + # third read would block + with self.assertRaises(BlockingIOError): + os.eventfd_read(fd) + with self.assertRaises(BlockingIOError): + os.read(fd, 8) + + # increase semaphore counter, read one + os.eventfd_write(fd, 1) + res = os.eventfd_read(fd) + self.assertEqual(res, 1) + # next read would block, too + with self.assertRaises(BlockingIOError): + os.eventfd_read(fd) + + def test_eventfd_select(self): + flags = os.EFD_CLOEXEC | os.EFD_NONBLOCK + fd = os.eventfd(0, flags) + self.assertNotEqual(fd, -1) + self.addCleanup(os.close, fd) + + # counter is zero, only writeable + rfd, wfd, xfd = select.select([fd], [fd], [fd], 0) + self.assertEqual((rfd, wfd, xfd), ([], [fd], [])) + + # counter is non-zero, read and writeable + os.eventfd_write(fd, 23) + rfd, wfd, xfd = select.select([fd], [fd], [fd], 0) + self.assertEqual((rfd, wfd, xfd), ([fd], [fd], [])) + self.assertEqual(os.eventfd_read(fd), 23) + + # counter at max, only readable + os.eventfd_write(fd, (2**64) - 2) + rfd, wfd, xfd = select.select([fd], [fd], [fd], 0) + self.assertEqual((rfd, wfd, xfd), ([fd], [], [])) + os.eventfd_read(fd) + + class OSErrorTests(unittest.TestCase): def setUp(self): class Str(str): @@ -3415,16 +3803,16 @@ class OSErrorTests(unittest.TestCase): self.bytes_filenames = [] self.unicode_filenames = [] - if support.TESTFN_UNENCODABLE is not None: - decoded = support.TESTFN_UNENCODABLE + if os_helper.TESTFN_UNENCODABLE is not None: + decoded = os_helper.TESTFN_UNENCODABLE else: - decoded = support.TESTFN + decoded = os_helper.TESTFN self.unicode_filenames.append(decoded) self.unicode_filenames.append(Str(decoded)) - if support.TESTFN_UNDECODABLE is not None: - encoded = support.TESTFN_UNDECODABLE + if os_helper.TESTFN_UNDECODABLE is not None: + encoded = os_helper.TESTFN_UNDECODABLE else: - encoded = os.fsencode(support.TESTFN) + encoded = os.fsencode(os_helper.TESTFN) self.bytes_filenames.append(encoded) self.bytes_filenames.append(bytearray(encoded)) self.bytes_filenames.append(memoryview(encoded)) @@ -3546,6 +3934,33 @@ class FDInheritanceTests(unittest.TestCase): self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, 0) + @unittest.skipUnless(hasattr(os, 'O_PATH'), "need os.O_PATH") + def test_get_set_inheritable_o_path(self): + fd = os.open(__file__, os.O_PATH) + self.addCleanup(os.close, fd) + self.assertEqual(os.get_inheritable(fd), False) + + os.set_inheritable(fd, True) + self.assertEqual(os.get_inheritable(fd), True) + + os.set_inheritable(fd, False) + self.assertEqual(os.get_inheritable(fd), False) + + def test_get_set_inheritable_badf(self): + fd = os_helper.make_bad_fd() + + with self.assertRaises(OSError) as ctx: + os.get_inheritable(fd) + self.assertEqual(ctx.exception.errno, errno.EBADF) + + with self.assertRaises(OSError) as ctx: + os.set_inheritable(fd, True) + self.assertEqual(ctx.exception.errno, errno.EBADF) + + with self.assertRaises(OSError) as ctx: + os.set_inheritable(fd, False) + self.assertEqual(ctx.exception.errno, errno.EBADF) + def test_open(self): fd = os.open(__file__, os.O_RDONLY) self.addCleanup(os.close, fd) @@ -3614,22 +4029,24 @@ class PathTConverterTests(unittest.TestCase): # function, cleanup function) functions = [ ('stat', True, (), None), - ('lstat', True, (), None), + ('lstat', False, (), None), ('access', False, (os.F_OK,), None), ('chflags', False, (0,), None), ('lchflags', False, (0,), None), ('open', False, (0,), getattr(os, 'close', None)), ] + # TODO: RUSTPYTHON (AssertionError: TypeError not raised) + @unittest.expectedFailure def test_path_t_converter(self): - str_filename = support.TESTFN + str_filename = os_helper.TESTFN if os.name == 'nt': bytes_fspath = bytes_filename = None else: - bytes_filename = support.TESTFN.encode('ascii') + bytes_filename = os.fsencode(os_helper.TESTFN) bytes_fspath = FakePath(bytes_filename) fd = os.open(FakePath(str_filename), os.O_WRONLY|os.O_CREAT) - self.addCleanup(support.unlink, support.TESTFN) + self.addCleanup(os_helper.unlink, os_helper.TESTFN) self.addCleanup(os.close, fd) int_fspath = FakePath(fd) @@ -3697,13 +4114,35 @@ class ExportsTests(unittest.TestCase): self.assertIn('walk', os.__all__) +class TestDirEntry(unittest.TestCase): + def setUp(self): + self.path = os.path.realpath(os_helper.TESTFN) + self.addCleanup(os_helper.rmtree, self.path) + os.mkdir(self.path) + + # TODO: RUSTPYTHON (AssertionError: TypeError not raised by DirEntry) + @unittest.expectedFailure + def test_uninstantiable(self): + self.assertRaises(TypeError, os.DirEntry) + + # TODO: RUSTPYTHON (pickle.PicklingError: Can't pickle : it's not found as _os.DirEntry) + @unittest.expectedFailure + def test_unpickable(self): + filename = create_file(os.path.join(self.path, "file.txt"), b'python') + entry = [entry for entry in os.scandir(self.path)].pop() + self.assertIsInstance(entry, os.DirEntry) + self.assertEqual(entry.name, "file.txt") + import pickle + self.assertRaises(TypeError, pickle.dumps, entry, filename) + + class TestScandir(unittest.TestCase): check_no_resource_warning = support.check_no_resource_warning def setUp(self): - self.path = os.path.realpath(support.TESTFN) + self.path = os.path.realpath(os_helper.TESTFN) self.bytes_path = os.fsencode(self.path) - self.addCleanup(support.rmtree, self.path) + self.addCleanup(os_helper.rmtree, self.path) os.mkdir(self.path) def create_file(self, name="file.txt"): @@ -3731,6 +4170,20 @@ class TestScandir(unittest.TestCase): else: self.assertEqual(stat1, stat2) + # TODO: RUSTPPYTHON (AssertionError: TypeError not raised by ScandirIter) + @unittest.expectedFailure + def test_uninstantiable(self): + scandir_iter = os.scandir(self.path) + self.assertRaises(TypeError, type(scandir_iter)) + scandir_iter.close() + + def test_unpickable(self): + filename = self.create_file("file.txt") + scandir_iter = os.scandir(self.path) + import pickle + self.assertRaises(TypeError, pickle.dumps, scandir_iter, filename) + scandir_iter.close() + def check_entry(self, entry, name, is_dir, is_file, is_symlink): self.assertIsInstance(entry, os.DirEntry) self.assertEqual(entry.name, name) @@ -3762,7 +4215,7 @@ class TestScandir(unittest.TestCase): @unittest.skipIf(sys.platform == "linux", "TODO: RUSTPYTHON, flaky test") def test_attributes(self): link = hasattr(os, 'link') - symlink = support.can_symlink() + symlink = os_helper.can_symlink() dirname = os.path.join(self.path, "dir") os.mkdir(dirname) @@ -3886,7 +4339,7 @@ class TestScandir(unittest.TestCase): self.assertRaises(FileNotFoundError, entry.stat, follow_symlinks=False) def test_broken_symlink(self): - if not support.can_symlink(): + if not os_helper.can_symlink(): return self.skipTest('cannot create symbolic link') filename = self.create_file("file.txt") @@ -3936,14 +4389,15 @@ class TestScandir(unittest.TestCase): self.assertIs(type(entry.name), bytes) self.assertIs(type(entry.path), bytes) - # TODO: RUSTPYTHON (scandir needs to have supports_fd) - @unittest.skipUnless(os.scandir in os.supports_fd, - 'fd support for scandir required for this test.') + # TODO: RUSTPYTHON + @unittest.expectedFailure + @unittest.skipUnless(os.listdir in os.supports_fd, + 'fd support for listdir required for this test.') def test_fd(self): self.assertIn(os.scandir, os.supports_fd) self.create_file('file.txt') expected_names = ['file.txt'] - if support.can_symlink(): + if os_helper.can_symlink(): os.symlink('file.txt', os.path.join(self.path, 'link')) expected_names.append('link') @@ -4091,6 +4545,11 @@ class TestPEP519(unittest.TestCase): self.assertFalse(issubclass(FakePath, A)) self.assertTrue(issubclass(FakePath, os.PathLike)) + # TODO: RUSTPYTHON (TypeError: 'ABCMeta' object is not subscriptable) + @unittest.expectedFailure + def test_pathlike_class_getitem(self): + self.assertIsInstance(os.PathLike[bytes], types.GenericAlias) + class TimesTests(unittest.TestCase): def test_times(self):