Update subprocess from CPython 3.11

This commit is contained in:
DimitrisJim
2023-05-19 19:54:59 +03:00
parent 2b43d4817c
commit 7366a41b1c
2 changed files with 171 additions and 149 deletions

143
Lib/subprocess.py vendored
View File

@@ -43,6 +43,7 @@ getstatusoutput(...): Runs a command in the shell, waits for it to complete,
import builtins
import errno
import io
import locale
import os
import time
import signal
@@ -65,16 +66,19 @@ __all__ = ["Popen", "PIPE", "STDOUT", "call", "check_call", "getstatusoutput",
# NOTE: We intentionally exclude list2cmdline as it is
# considered an internal implementation detail. issue10838.
# use presence of msvcrt to detect Windows-like platforms (see bpo-8110)
try:
import msvcrt
import _winapi
_mswindows = True
except ModuleNotFoundError:
_mswindows = False
import _posixsubprocess
import select
import selectors
else:
_mswindows = True
# wasm32-emscripten and wasm32-wasi do not support processes
_can_fork_exec = sys.platform not in {"emscripten", "wasi"}
if _mswindows:
import _winapi
from _winapi import (CREATE_NEW_CONSOLE, CREATE_NEW_PROCESS_GROUP,
STD_INPUT_HANDLE, STD_OUTPUT_HANDLE,
STD_ERROR_HANDLE, SW_HIDE,
@@ -95,6 +99,24 @@ else:
"NORMAL_PRIORITY_CLASS", "REALTIME_PRIORITY_CLASS",
"CREATE_NO_WINDOW", "DETACHED_PROCESS",
"CREATE_DEFAULT_ERROR_MODE", "CREATE_BREAKAWAY_FROM_JOB"])
else:
if _can_fork_exec:
from _posixsubprocess import fork_exec as _fork_exec
# used in methods that are called by __del__
_waitpid = os.waitpid
_waitstatus_to_exitcode = os.waitstatus_to_exitcode
_WIFSTOPPED = os.WIFSTOPPED
_WSTOPSIG = os.WSTOPSIG
_WNOHANG = os.WNOHANG
else:
_fork_exec = None
_waitpid = None
_waitstatus_to_exitcode = None
_WIFSTOPPED = None
_WSTOPSIG = None
_WNOHANG = None
import select
import selectors
# Exception classes used by this module.
@@ -207,8 +229,7 @@ if _mswindows:
def __repr__(self):
return "%s(%d)" % (self.__class__.__name__, int(self))
# XXX: RustPython; OSError('The handle is invalid. (os error 6)')
# __del__ = Close
__del__ = Close
else:
# When select or poll has indicated that the file is writable,
# we can write up to _PIPE_BUF bytes without risk of blocking.
@@ -303,12 +324,14 @@ def _args_from_interpreter_flags():
args.append('-E')
if sys.flags.no_user_site:
args.append('-s')
if sys.flags.safe_path:
args.append('-P')
# -W options
warnopts = sys.warnoptions[:]
bytes_warning = sys.flags.bytes_warning
xoptions = getattr(sys, '_xoptions', {})
dev_mode = ('dev' in xoptions)
bytes_warning = sys.flags.bytes_warning
dev_mode = sys.flags.dev_mode
if bytes_warning > 1:
warnopts.remove("error::BytesWarning")
@@ -335,6 +358,26 @@ def _args_from_interpreter_flags():
return args
def _text_encoding():
# Return default text encoding and emit EncodingWarning if
# sys.flags.warn_default_encoding is true.
if sys.flags.warn_default_encoding:
f = sys._getframe()
filename = f.f_code.co_filename
stacklevel = 2
while f := f.f_back:
if f.f_code.co_filename != filename:
break
stacklevel += 1
warnings.warn("'encoding' argument not specified.",
EncodingWarning, stacklevel)
if sys.flags.utf8_mode:
return "utf-8"
else:
return locale.getencoding()
def call(*popenargs, timeout=None, **kwargs):
"""Run command with arguments. Wait for command to complete or
timeout, then return the returncode attribute.
@@ -406,13 +449,15 @@ def check_output(*popenargs, timeout=None, **kwargs):
decoded according to locale encoding, or by "encoding" if set. Text mode
is triggered by setting any of text, encoding, errors or universal_newlines.
"""
if 'stdout' in kwargs:
raise ValueError('stdout argument not allowed, it will be overridden.')
for kw in ('stdout', 'check'):
if kw in kwargs:
raise ValueError(f'{kw} argument not allowed, it will be overridden.')
if 'input' in kwargs and kwargs['input'] is None:
# Explicitly passing input=None was previously equivalent to passing an
# empty string. That is maintained here for backwards compatibility.
if kwargs.get('universal_newlines') or kwargs.get('text'):
if kwargs.get('universal_newlines') or kwargs.get('text') or kwargs.get('encoding') \
or kwargs.get('errors'):
empty = ''
else:
empty = b''
@@ -464,7 +509,8 @@ def run(*popenargs,
The returned instance will have attributes args, returncode, stdout and
stderr. By default, stdout and stderr are not captured, and those attributes
will be None. Pass stdout=PIPE and/or stderr=PIPE in order to capture them.
will be None. Pass stdout=PIPE and/or stderr=PIPE in order to capture them,
or pass capture_output=True to capture both.
If check is True and the exit code was non-zero, it raises a
CalledProcessError. The CalledProcessError object will have the return code
@@ -600,7 +646,7 @@ def list2cmdline(seq):
# Various tools for executing commands and looking at their output and status.
#
def getstatusoutput(cmd):
def getstatusoutput(cmd, *, encoding=None, errors=None):
"""Return (exitcode, output) of executing cmd in a shell.
Execute the string 'cmd' in a shell with 'check_output' and
@@ -622,7 +668,8 @@ def getstatusoutput(cmd):
(-15, '')
"""
try:
data = check_output(cmd, shell=True, text=True, stderr=STDOUT)
data = check_output(cmd, shell=True, text=True, stderr=STDOUT,
encoding=encoding, errors=errors)
exitcode = 0
except CalledProcessError as ex:
data = ex.output
@@ -631,7 +678,7 @@ def getstatusoutput(cmd):
data = data[:-1]
return exitcode, data
def getoutput(cmd):
def getoutput(cmd, *, encoding=None, errors=None):
"""Return output (stdout or stderr) of executing cmd in a shell.
Like getstatusoutput(), except the exit status is ignored and the return
@@ -641,7 +688,8 @@ def getoutput(cmd):
>>> subprocess.getoutput('ls /bin/ls')
'/bin/ls'
"""
return getstatusoutput(cmd)[1]
return getstatusoutput(cmd, encoding=encoding, errors=errors)[1]
def _use_posix_spawn():
@@ -736,6 +784,8 @@ class Popen:
start_new_session (POSIX only)
process_group (POSIX only)
group (POSIX only)
extra_groups (POSIX only)
@@ -761,8 +811,14 @@ class Popen:
startupinfo=None, creationflags=0,
restore_signals=True, start_new_session=False,
pass_fds=(), *, user=None, group=None, extra_groups=None,
encoding=None, errors=None, text=None, umask=-1, pipesize=-1):
encoding=None, errors=None, text=None, umask=-1, pipesize=-1,
process_group=None):
"""Create new Popen instance."""
if not _can_fork_exec:
raise OSError(
errno.ENOTSUP, f"{sys.platform} does not support processes."
)
_cleanup()
# Held while anything is calling waitpid before returncode has been
# updated to prevent clobbering returncode if wait() or poll() are
@@ -848,15 +904,8 @@ class Popen:
errread = msvcrt.open_osfhandle(errread.Detach(), 0)
self.text_mode = encoding or errors or text or universal_newlines
# PEP 597: We suppress the EncodingWarning in subprocess module
# for now (at Python 3.10), because we focus on files for now.
# This will be changed to encoding = io.text_encoding(encoding)
# in the future.
if self.text_mode and encoding is None:
# TODO: RUSTPYTHON; encoding `locale` is not supported yet
pass
# self.encoding = encoding = "locale"
self.encoding = encoding = _text_encoding()
# How long to resume waiting on a child after the first ^C.
# There is no right value for this. The purpose is to be polite
@@ -874,6 +923,9 @@ class Popen:
else:
line_buffering = False
if process_group is None:
process_group = -1 # The internal APIs are int-only
gid = None
if group is not None:
if not hasattr(os, 'setregid'):
@@ -977,7 +1029,7 @@ class Popen:
errread, errwrite,
restore_signals,
gid, gids, uid, umask,
start_new_session)
start_new_session, process_group)
except:
# Cleanup if the child failed starting.
for f in filter(None, (self.stdin, self.stdout, self.stderr)):
@@ -1285,11 +1337,7 @@ class Popen:
else:
# Assuming file-like object
p2cread = msvcrt.get_osfhandle(stdin.fileno())
# XXX RUSTPYTHON TODO: figure out why closing these old, non-inheritable
# pipe handles is necessary for us, but not CPython
old = p2cread
p2cread = self._make_inheritable(p2cread)
if stdin == PIPE: _winapi.CloseHandle(old)
if stdout is None:
c2pwrite = _winapi.GetStdHandle(_winapi.STD_OUTPUT_HANDLE)
@@ -1307,11 +1355,7 @@ class Popen:
else:
# Assuming file-like object
c2pwrite = msvcrt.get_osfhandle(stdout.fileno())
# XXX RUSTPYTHON TODO: figure out why closing these old, non-inheritable
# pipe handles is necessary for us, but not CPython
old = c2pwrite
c2pwrite = self._make_inheritable(c2pwrite)
if stdout == PIPE: _winapi.CloseHandle(old)
if stderr is None:
errwrite = _winapi.GetStdHandle(_winapi.STD_ERROR_HANDLE)
@@ -1331,11 +1375,7 @@ class Popen:
else:
# Assuming file-like object
errwrite = msvcrt.get_osfhandle(stderr.fileno())
# XXX RUSTPYTHON TODO: figure out why closing these old, non-inheritable
# pipe handles is necessary for us, but not CPython
old = errwrite
errwrite = self._make_inheritable(errwrite)
if stderr == PIPE: _winapi.CloseHandle(old)
return (p2cread, p2cwrite,
c2pread, c2pwrite,
@@ -1373,7 +1413,7 @@ class Popen:
unused_restore_signals,
unused_gid, unused_gids, unused_uid,
unused_umask,
unused_start_new_session):
unused_start_new_session, unused_process_group):
"""Execute program (MS Windows version)"""
assert not pass_fds, "pass_fds not supported on Windows."
@@ -1705,7 +1745,7 @@ class Popen:
errread, errwrite,
restore_signals,
gid, gids, uid, umask,
start_new_session):
start_new_session, process_group):
"""Execute program (POSIX version)"""
if isinstance(args, (str, bytes)):
@@ -1741,6 +1781,7 @@ class Popen:
and (c2pwrite == -1 or c2pwrite > 2)
and (errwrite == -1 or errwrite > 2)
and not start_new_session
and process_group == -1
and gid is None
and gids is None
and uid is None
@@ -1790,7 +1831,7 @@ class Popen:
for dir in os.get_exec_path(env))
fds_to_keep = set(pass_fds)
fds_to_keep.add(errpipe_write)
self.pid = _posixsubprocess.fork_exec(
self.pid = _fork_exec(
args, executable_list,
close_fds, tuple(sorted(map(int, fds_to_keep))),
cwd, env_list,
@@ -1798,8 +1839,8 @@ class Popen:
errread, errwrite,
errpipe_read, errpipe_write,
restore_signals, start_new_session,
gid, gids, uid, umask,
preexec_fn)
process_group, gid, gids, uid, umask,
preexec_fn, _USE_VFORK)
self._child_created = True
finally:
# be sure the FD is closed no matter what
@@ -1862,19 +1903,19 @@ class Popen:
def _handle_exitstatus(self, sts,
waitstatus_to_exitcode=os.waitstatus_to_exitcode,
_WIFSTOPPED=os.WIFSTOPPED,
_WSTOPSIG=os.WSTOPSIG):
_waitstatus_to_exitcode=_waitstatus_to_exitcode,
_WIFSTOPPED=_WIFSTOPPED,
_WSTOPSIG=_WSTOPSIG):
"""All callers to this function MUST hold self._waitpid_lock."""
# This method is called (indirectly) by __del__, so it cannot
# refer to anything outside of its local scope.
if _WIFSTOPPED(sts):
self.returncode = -_WSTOPSIG(sts)
else:
self.returncode = waitstatus_to_exitcode(sts)
self.returncode = _waitstatus_to_exitcode(sts)
def _internal_poll(self, _deadstate=None, _waitpid=os.waitpid,
_WNOHANG=os.WNOHANG, _ECHILD=errno.ECHILD):
def _internal_poll(self, _deadstate=None, _waitpid=_waitpid,
_WNOHANG=_WNOHANG, _ECHILD=errno.ECHILD):
"""Check if child process has terminated. Returns returncode
attribute.
@@ -2105,7 +2146,7 @@ class Popen:
try:
os.kill(self.pid, sig)
except ProcessLookupError:
# Supress the race condition error; bpo-40550.
# Suppress the race condition error; bpo-40550.
pass
def terminate(self):

View File

@@ -48,6 +48,9 @@ except:
if support.PGO:
raise unittest.SkipTest("test is not helpful for PGO")
if not support.has_subprocess_support:
raise unittest.SkipTest("test module requires subprocess")
mswindows = (sys.platform == "win32")
#
@@ -171,6 +174,14 @@ class ProcessTestCase(BaseTestCase):
[sys.executable, "-c", "print('BDFL')"])
self.assertIn(b'BDFL', output)
with self.assertRaisesRegex(ValueError,
"stdout argument not allowed, it will be overridden"):
subprocess.check_output([], stdout=None)
with self.assertRaisesRegex(ValueError,
"check argument not allowed, it will be overridden"):
subprocess.check_output([], check=False)
def test_check_output_nonzero(self):
# check_call() function with non-zero return code
with self.assertRaises(subprocess.CalledProcessError) as c:
@@ -227,6 +238,12 @@ class ProcessTestCase(BaseTestCase):
input=None, universal_newlines=True)
self.assertNotIn('XX', output)
def test_check_output_input_none_encoding_errors(self):
output = subprocess.check_output(
[sys.executable, "-c", "print('foo')"],
input=None, encoding='utf-8', errors='ignore')
self.assertIn('foo', output)
def test_check_output_stdout_arg(self):
# check_output() refuses to accept 'stdout' argument
with self.assertRaises(ValueError) as c:
@@ -719,6 +736,8 @@ class ProcessTestCase(BaseTestCase):
# However, this function is not yet in _winapi.
p.stdin.write(b"pear")
p.stdin.close()
p.stdout.close()
p.stderr.close()
finally:
p.kill()
p.wait()
@@ -746,6 +765,8 @@ class ProcessTestCase(BaseTestCase):
# On other platforms we cannot test the pipe size (yet). But above
# code using pipesize=-1 should not crash.
p.stdin.close()
p.stdout.close()
p.stderr.close()
finally:
p.kill()
p.wait()
@@ -790,7 +811,6 @@ class ProcessTestCase(BaseTestCase):
if not is_env_var_to_ignore(k)]
self.assertEqual(child_env_names, [])
@unittest.skipIf(sys.platform == "win32", "TODO: RUSTPYTHON, null byte is not checked")
def test_invalid_cmd(self):
# null character in the command name
cmd = sys.executable + '\0'
@@ -936,7 +956,6 @@ class ProcessTestCase(BaseTestCase):
self.assertEqual(stdout, None)
self.assertEqual(stderr, None)
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
def test_communicate_pipe_buf(self):
# communicate() with writes larger than pipe_buf
# This test will probably deadlock rather than fail, if
@@ -976,8 +995,6 @@ class ProcessTestCase(BaseTestCase):
self.assertEqual(stdout, b"bananasplit")
self.assertEqual(stderr, b"")
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_universal_newlines_and_text(self):
args = [
sys.executable, "-c",
@@ -1017,7 +1034,6 @@ class ProcessTestCase(BaseTestCase):
self.assertEqual(p.stdout.read(),
"line4\nline5\nline6\nline7\nline8")
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
def test_universal_newlines_communicate(self):
# universal newlines through communicate()
p = subprocess.Popen([sys.executable, "-c",
@@ -1069,7 +1085,6 @@ class ProcessTestCase(BaseTestCase):
p.communicate()
self.assertEqual(p.returncode, 0)
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
def test_universal_newlines_communicate_stdin_stdout_stderr(self):
# universal newlines through communicate(), with stdin, stdout, stderr
p = subprocess.Popen([sys.executable, "-c",
@@ -1098,8 +1113,6 @@ class ProcessTestCase(BaseTestCase):
# to stderr at exit of subprocess.
self.assertTrue(stderr.startswith("eline2\neline6\neline7\n"))
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_universal_newlines_communicate_encodings(self):
# Check that universal newlines mode works for various encodings,
# in particular for encodings in the UTF-16 and UTF-32 families.
@@ -1122,8 +1135,6 @@ class ProcessTestCase(BaseTestCase):
stdout, stderr = popen.communicate(input='')
self.assertEqual(stdout, '1\n2\n3\n4')
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_communicate_errors(self):
for errors, expected in [
('ignore', ''),
@@ -1263,15 +1274,12 @@ class ProcessTestCase(BaseTestCase):
self.assertEqual(p.returncode, 0)
self.assertEqual(read_line, expected)
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
def test_bufsize_equal_one_text_mode(self):
# line is flushed in text mode with bufsize=1.
# we should get the full line in return
line = "line\n"
self._test_bufsize_equal_one(line, line, universal_newlines=True)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_bufsize_equal_one_binary_mode(self):
# line is not flushed in binary mode with bufsize=1.
# we should get empty response
@@ -1444,7 +1452,6 @@ class ProcessTestCase(BaseTestCase):
self.assertFalse(os.path.exists(ofname))
self.assertFalse(os.path.exists(efname))
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
def test_communicate_epipe(self):
# Issue 10963: communicate() should hide EPIPE
p = subprocess.Popen(ZERO_RETURN_CMD,
@@ -1475,7 +1482,6 @@ class ProcessTestCase(BaseTestCase):
p.returncode = code
self.assertEqual(repr(p), sx)
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
def test_communicate_epipe_only_stdin(self):
# Issue 10963: communicate() should hide EPIPE
p = subprocess.Popen(ZERO_RETURN_CMD,
@@ -1484,8 +1490,6 @@ class ProcessTestCase(BaseTestCase):
p.wait()
p.communicate(b"x" * 2**20)
# TODO: RUSTPYTHON
@unittest.expectedFailure
@unittest.skipUnless(hasattr(signal, 'SIGUSR1'),
"Requires signal.SIGUSR1")
@unittest.skipUnless(hasattr(os, 'kill'),
@@ -1535,8 +1539,6 @@ class ProcessTestCase(BaseTestCase):
subprocess.call(['/opt/nonexistent_binary', 'with', 'some', 'args'])
self.assertEqual(c.exception.filename, '/opt/nonexistent_binary')
# TODO: RUSTPYTHON
@unittest.expectedFailure
@unittest.skipIf(mswindows, "behavior currently not supported on Windows")
def test_file_not_found_with_bad_cwd(self):
with self.assertRaises(FileNotFoundError) as c:
@@ -1547,6 +1549,22 @@ class ProcessTestCase(BaseTestCase):
self.assertIsInstance(subprocess.Popen[bytes], types.GenericAlias)
self.assertIsInstance(subprocess.CompletedProcess[str], types.GenericAlias)
@unittest.skipIf(not sysconfig.get_config_var("HAVE_VFORK"),
"vfork() not enabled by configure.")
@mock.patch("subprocess._fork_exec")
def test__use_vfork(self, mock_fork_exec):
self.assertTrue(subprocess._USE_VFORK) # The default value regardless.
mock_fork_exec.side_effect = RuntimeError("just testing args")
with self.assertRaises(RuntimeError):
subprocess.run([sys.executable, "-c", "pass"])
mock_fork_exec.assert_called_once()
self.assertTrue(mock_fork_exec.call_args.args[-1])
with mock.patch.object(subprocess, '_USE_VFORK', False):
with self.assertRaises(RuntimeError):
subprocess.run([sys.executable, "-c", "pass"])
self.assertFalse(mock_fork_exec.call_args_list[-1].args[-1])
class RunFuncTestCase(BaseTestCase):
def run_python(self, code, **kwargs):
"""Run Python code in a subprocess using subprocess.run"""
@@ -1721,27 +1739,18 @@ class RunFuncTestCase(BaseTestCase):
msg="TimeoutExpired was delayed! Bad traceback:\n```\n"
f"{stacks}```")
@unittest.skipIf(not sysconfig.get_config_var("HAVE_VFORK"),
"vfork() not enabled by configure.")
def test__use_vfork(self):
# Attempts code coverage within _posixsubprocess.c on the code that
# probes the subprocess module for the existence and value of this
# attribute in 3.10.5.
self.assertTrue(subprocess._USE_VFORK) # The default value regardless.
with mock.patch.object(subprocess, "_USE_VFORK", False):
self.assertEqual(self.run_python("pass").returncode, 0,
msg="False _USE_VFORK failed")
class RaisingBool:
def __bool__(self):
raise RuntimeError("force PyObject_IsTrue to return -1")
with mock.patch.object(subprocess, "_USE_VFORK", RaisingBool()):
self.assertEqual(self.run_python("pass").returncode, 0,
msg="odd bool()-error _USE_VFORK failed")
del subprocess._USE_VFORK
self.assertEqual(self.run_python("pass").returncode, 0,
msg="lack of a _USE_VFORK attribute failed")
def test_encoding_warning(self):
code = textwrap.dedent("""\
from subprocess import *
run("echo hello", shell=True, text=True)
check_output("echo hello", shell=True, text=True)
""")
cp = subprocess.run([sys.executable, "-Xwarn_default_encoding", "-c", code],
capture_output=True)
lines = cp.stderr.splitlines()
self.assertEqual(len(lines), 2, lines)
self.assertTrue(lines[0].startswith(b"<string>:2: EncodingWarning: "))
self.assertTrue(lines[1].startswith(b"<string>:3: EncodingWarning: "))
def _get_test_grp_name():
@@ -1776,8 +1785,6 @@ class POSIXProcessTestCase(BaseTestCase):
self._nonexistent_dir)
return desired_exception
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_exception_cwd(self):
"""Test error in the child raised in the parent for a bad cwd."""
desired_exception = self._get_chdir_exception()
@@ -1793,8 +1800,6 @@ class POSIXProcessTestCase(BaseTestCase):
else:
self.fail("Expected OSError: %s" % desired_exception)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_exception_bad_executable(self):
"""Test error in the child raised in the parent for a bad executable."""
desired_exception = self._get_chdir_exception()
@@ -1810,8 +1815,6 @@ class POSIXProcessTestCase(BaseTestCase):
else:
self.fail("Expected OSError: %s" % desired_exception)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_exception_bad_args_0(self):
"""Test error in the child raised in the parent for a bad args[0]."""
desired_exception = self._get_chdir_exception()
@@ -1837,7 +1840,7 @@ class POSIXProcessTestCase(BaseTestCase):
def __del__(self):
pass
@mock.patch("subprocess._posixsubprocess.fork_exec")
@mock.patch("subprocess._fork_exec")
def test_exception_errpipe_normal(self, fork_exec):
"""Test error passing done through errpipe_write in the good case"""
def proper_error(*args):
@@ -1854,7 +1857,7 @@ class POSIXProcessTestCase(BaseTestCase):
with self.assertRaises(IsADirectoryError):
self.PopenNoDestructor(["non_existent_command"])
@mock.patch("subprocess._posixsubprocess.fork_exec")
@mock.patch("subprocess._fork_exec")
def test_exception_errpipe_bad_data(self, fork_exec):
"""Test error passing done through errpipe_write where its not
in the expected format"""
@@ -1876,8 +1879,6 @@ class POSIXProcessTestCase(BaseTestCase):
self.assertIn(repr(error_data), str(e.exception))
# TODO: RUSTPYTHON
@unittest.expectedFailure
@unittest.skipIf(not os.path.exists('/proc/self/status'),
"need /proc/self/status")
def test_restore_signals(self):
@@ -1910,16 +1911,32 @@ class POSIXProcessTestCase(BaseTestCase):
output = subprocess.check_output(
[sys.executable, "-c", "import os; print(os.getsid(0))"],
start_new_session=True)
except OSError as e:
except PermissionError as e:
if e.errno != errno.EPERM:
raise
raise # EACCES?
else:
parent_sid = os.getsid(0)
child_sid = int(output)
self.assertNotEqual(parent_sid, child_sid)
# TODO: RUSTPYTHON
@unittest.expectedFailure
@unittest.skipUnless(hasattr(os, 'setpgid') and hasattr(os, 'getpgid'),
'no setpgid or getpgid on platform')
def test_process_group_0(self):
# For code coverage of calling setpgid(). We don't care if we get an
# EPERM error from it depending on the test execution environment, that
# still indicates that it was called.
try:
output = subprocess.check_output(
[sys.executable, "-c", "import os; print(os.getpgid(0))"],
process_group=0)
except PermissionError as e:
if e.errno != errno.EPERM:
raise # EACCES?
else:
parent_pgid = os.getpgid(0)
child_pgid = int(output)
self.assertNotEqual(parent_pgid, child_pgid)
@unittest.skipUnless(hasattr(os, 'setreuid'), 'no setreuid on platform')
def test_user(self):
# For code coverage of the user parameter. We don't care if we get an
@@ -1977,8 +1994,6 @@ class POSIXProcessTestCase(BaseTestCase):
with self.assertRaises(ValueError):
subprocess.check_call(ZERO_RETURN_CMD, user=65535)
# TODO: RUSTPYTHON, observed gids do not match expected gids
@unittest.expectedFailure
@unittest.skipUnless(hasattr(os, 'setregid'), 'no setregid() on platform')
def test_group(self):
gid = os.getegid()
@@ -2026,8 +2041,6 @@ class POSIXProcessTestCase(BaseTestCase):
with self.assertRaises(ValueError):
subprocess.check_call(ZERO_RETURN_CMD, group=65535)
# TODO: RUSTPYTHON, observed gids do not match expected gids
@unittest.expectedFailure
@unittest.skipUnless(hasattr(os, 'setgroups'), 'no setgroups() on platform')
def test_extra_groups(self):
gid = os.getegid()
@@ -2082,8 +2095,6 @@ class POSIXProcessTestCase(BaseTestCase):
with self.assertRaises(ValueError):
subprocess.check_call(ZERO_RETURN_CMD, extra_groups=[])
# TODO: RUSTPYTHON
@unittest.expectedFailure
@unittest.skipIf(mswindows or not hasattr(os, 'umask'),
'POSIX umask() is not available.')
def test_umask(self):
@@ -2135,8 +2146,6 @@ class POSIXProcessTestCase(BaseTestCase):
error_string = str(err)
self.assertIn("non-zero exit status 2.", error_string)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_preexec(self):
# DISCLAIMER: Setting environment variables is *not* a good use
# of a preexec_fn. This is merely a test.
@@ -2148,8 +2157,6 @@ class POSIXProcessTestCase(BaseTestCase):
with p:
self.assertEqual(p.stdout.read(), b"apple")
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_preexec_exception(self):
def raise_it():
raise ValueError("What if two swallows carried a coconut?")
@@ -2158,7 +2165,7 @@ class POSIXProcessTestCase(BaseTestCase):
preexec_fn=raise_it)
except subprocess.SubprocessError as e:
self.assertTrue(
subprocess._posixsubprocess,
subprocess._fork_exec,
"Expected a ValueError from the preexec_fn")
except ValueError as e:
self.assertIn("coconut", e.args[0])
@@ -2191,8 +2198,6 @@ class POSIXProcessTestCase(BaseTestCase):
for fd in devzero_fds:
os.close(fd)
# TODO: RUSTPYTHON
@unittest.expectedFailure
@unittest.skipIf(not os.path.exists("/dev/zero"), "/dev/zero required.")
def test_preexec_errpipe_does_not_double_close_pipes(self):
"""Issue16140: Don't double close pipes on preexec error."""
@@ -2207,8 +2212,6 @@ class POSIXProcessTestCase(BaseTestCase):
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, preexec_fn=raise_it)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_preexec_gc_module_failure(self):
# This tests the code that disables garbage collection if the child
# process will execute any Python.
@@ -2230,8 +2233,6 @@ class POSIXProcessTestCase(BaseTestCase):
if not enabled:
gc.disable()
# TODO: RUSTPYTHON
@unittest.expectedFailure
@unittest.skipIf(
sys.platform == 'darwin', 'setrlimit() seems to fail on OS X')
def test_preexec_fork_failure(self):
@@ -2642,8 +2643,6 @@ class POSIXProcessTestCase(BaseTestCase):
for to_fds in itertools.permutations(range(3), 2):
self._check_swap_std_fds_with_one_closed(from_fds, to_fds)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_surrogates_error_message(self):
def prepare():
raise ValueError("surrogate:\uDCff")
@@ -2654,17 +2653,15 @@ class POSIXProcessTestCase(BaseTestCase):
preexec_fn=prepare)
except ValueError as err:
# Pure Python implementations keeps the message
self.assertIsNone(subprocess._posixsubprocess)
self.assertIsNone(subprocess._fork_exec)
self.assertEqual(str(err), "surrogate:\uDCff")
except subprocess.SubprocessError as err:
# _posixsubprocess uses a default message
self.assertIsNotNone(subprocess._posixsubprocess)
self.assertIsNotNone(subprocess._fork_exec)
self.assertEqual(str(err), "Exception occurred in preexec_fn.")
else:
self.fail("Expected ValueError or subprocess.SubprocessError")
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_undecodable_env(self):
for key, value in (('test', 'abc\uDCFF'), ('test\uDCFF', '42')):
encoded_value = value.encode("ascii", "surrogateescape")
@@ -2785,7 +2782,6 @@ class POSIXProcessTestCase(BaseTestCase):
p1.stdout.close()
p2.stdout.close()
@unittest.skip("TODO: RUSTPYTHON, flaky test")
def test_close_fds(self):
fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
@@ -2913,7 +2909,6 @@ class POSIXProcessTestCase(BaseTestCase):
msg="Some fds were left open.")
@unittest.skip("TODO: RUSTPYTHON, flaky test")
# Mac OS X Tiger (10.4) has a kernel bug: sometimes, the file
# descriptor of a pipe closed in the parent process is valid in the
# child process according to fstat(), but the mode of the file
@@ -3121,8 +3116,6 @@ class POSIXProcessTestCase(BaseTestCase):
else:
self.assertNotIn(ident, [id(o) for o in subprocess._active])
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_close_fds_after_preexec(self):
fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
@@ -3165,9 +3158,9 @@ class POSIXProcessTestCase(BaseTestCase):
True, (), cwd, env_list,
-1, -1, -1, -1,
1, 2, 3, 4,
True, True,
True, True, 0,
False, [], 0, -1,
func)
func, False)
# Attempt to prevent
# "TypeError: fork_exec() takes exactly N arguments (M given)"
# from passing the test. More refactoring to have us start
@@ -3214,9 +3207,9 @@ class POSIXProcessTestCase(BaseTestCase):
True, fds_to_keep, None, [b"env"],
-1, -1, -1, -1,
1, 2, 3, 4,
True, True,
True, True, 0,
None, None, None, -1,
None)
None, "no vfork")
self.assertIn('fds_to_keep', str(c.exception))
finally:
if not gc_enabled:
@@ -3319,6 +3312,7 @@ class POSIXProcessTestCase(BaseTestCase):
with mock.patch.object(p, 'poll', new=lambda: None):
p.returncode = None
p.send_signal(signal.SIGTERM)
p.kill()
def test_communicate_repeated_call_after_stdout_close(self):
proc = subprocess.Popen([sys.executable, '-c',
@@ -3424,8 +3418,6 @@ class Win32ProcessTestCase(BaseTestCase):
close_fds=True)
self.assertEqual(rc, 47)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_close_fds_with_stdio(self):
import msvcrt
@@ -3508,8 +3500,6 @@ class Win32ProcessTestCase(BaseTestCase):
with p:
self.assertIn(b"physalis", p.stdout.read())
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_shell_encodings(self):
# Run command through the shell (string)
for enc in ['ansi', 'oem']:
@@ -3656,7 +3646,6 @@ class MiscTests(unittest.TestCase):
raise KeyboardInterrupt # Test how __exit__ handles ^C.
self._test_keyboardinterrupt_no_kill(popen_via_context_manager)
@unittest.expectedFailureIfWindows("TODO: RUSTPYTHON")
def test_getoutput(self):
self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy')
self.assertEqual(subprocess.getstatusoutput('echo xyzzy'),
@@ -3729,28 +3718,20 @@ class CommandsWithSpaces (BaseTestCase):
"2 [%r, 'ab cd']" % self.fname
)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_shell_string_with_spaces(self):
# call() function with string argument with spaces on Windows
self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
"ab cd"), shell=1)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_shell_sequence_with_spaces(self):
# call() function with sequence argument with spaces on Windows
self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_noshell_string_with_spaces(self):
# call() function with string argument with spaces on Windows
self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
"ab cd"))
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_noshell_sequence_with_spaces(self):
# call() function with sequence argument with spaces on Windows
self.with_spaces([sys.executable, self.fname, "ab cd"])