Update os from CPython 3.12.2

This commit is contained in:
CPython Developers
2024-04-23 12:45:14 +09:00
committed by Jeong YunWon
parent 153ec2823d
commit a5f8d42d34
2 changed files with 609 additions and 265 deletions

180
Lib/os.py vendored
View File

@@ -288,7 +288,8 @@ def walk(top, topdown=True, onerror=None, followlinks=False):
dirpath, dirnames, filenames
dirpath is a string, the path to the directory. dirnames is a list of
the names of the subdirectories in dirpath (excluding '.' and '..').
the names of the subdirectories in dirpath (including symlinks to directories,
and excluding '.' and '..').
filenames is a list of the names of the non-directory files in dirpath.
Note that the names in the lists are just names, with no path components.
To get a full path (which begins with top) to a file or directory in
@@ -331,97 +332,103 @@ def walk(top, topdown=True, onerror=None, followlinks=False):
import os
from os.path import join, getsize
for root, dirs, files in os.walk('python/Lib/email'):
print(root, "consumes", end="")
print(sum(getsize(join(root, name)) for name in files), end="")
print(root, "consumes ")
print(sum(getsize(join(root, name)) for name in files), end=" ")
print("bytes in", len(files), "non-directory files")
if 'CVS' in dirs:
dirs.remove('CVS') # don't visit CVS directories
"""
sys.audit("os.walk", top, topdown, onerror, followlinks)
return _walk(fspath(top), topdown, onerror, followlinks)
def _walk(top, topdown, onerror, followlinks):
dirs = []
nondirs = []
walk_dirs = []
stack = [fspath(top)]
islink, join = path.islink, path.join
while stack:
top = stack.pop()
if isinstance(top, tuple):
yield top
continue
# We may not have read permission for top, in which case we can't
# get a list of the files the directory contains. os.walk
# always suppressed the exception then, rather than blow up for a
# minor reason when (say) a thousand readable directories are still
# left to visit. That logic is copied here.
try:
# Note that scandir is global in this module due
# to earlier import-*.
scandir_it = scandir(top)
except OSError as error:
if onerror is not None:
onerror(error)
return
dirs = []
nondirs = []
walk_dirs = []
with scandir_it:
while True:
try:
# We may not have read permission for top, in which case we can't
# get a list of the files the directory contains.
# We suppress the exception here, rather than blow up for a
# minor reason when (say) a thousand readable directories are still
# left to visit.
try:
scandir_it = scandir(top)
except OSError as error:
if onerror is not None:
onerror(error)
continue
cont = False
with scandir_it:
while True:
try:
entry = next(scandir_it)
except StopIteration:
break
except OSError as error:
if onerror is not None:
onerror(error)
return
try:
is_dir = entry.is_dir()
except OSError:
# If is_dir() raises an OSError, consider that the entry is not
# a directory, same behaviour than os.path.isdir().
is_dir = False
if is_dir:
dirs.append(entry.name)
else:
nondirs.append(entry.name)
if not topdown and is_dir:
# Bottom-up: recurse into sub-directory, but exclude symlinks to
# directories if followlinks is False
if followlinks:
walk_into = True
else:
try:
is_symlink = entry.is_symlink()
except OSError:
# If is_symlink() raises an OSError, consider that the
# entry is not a symbolic link, same behaviour than
# os.path.islink().
is_symlink = False
walk_into = not is_symlink
entry = next(scandir_it)
except StopIteration:
break
except OSError as error:
if onerror is not None:
onerror(error)
cont = True
break
if walk_into:
walk_dirs.append(entry.path)
try:
is_dir = entry.is_dir()
except OSError:
# If is_dir() raises an OSError, consider the entry not to
# be a directory, same behaviour as os.path.isdir().
is_dir = False
# Yield before recursion if going top down
if topdown:
yield top, dirs, nondirs
if is_dir:
dirs.append(entry.name)
else:
nondirs.append(entry.name)
# Recurse into sub-directories
islink, join = path.islink, path.join
for dirname in dirs:
new_path = join(top, dirname)
# Issue #23605: os.path.islink() is used instead of caching
# entry.is_symlink() result during the loop on os.scandir() because
# the caller can replace the directory entry during the "yield"
# above.
if followlinks or not islink(new_path):
yield from _walk(new_path, topdown, onerror, followlinks)
else:
# Recurse into sub-directories
for new_path in walk_dirs:
yield from _walk(new_path, topdown, onerror, followlinks)
# Yield after recursion if going bottom up
yield top, dirs, nondirs
if not topdown and is_dir:
# Bottom-up: traverse into sub-directory, but exclude
# symlinks to directories if followlinks is False
if followlinks:
walk_into = True
else:
try:
is_symlink = entry.is_symlink()
except OSError:
# If is_symlink() raises an OSError, consider the
# entry not to be a symbolic link, same behaviour
# as os.path.islink().
is_symlink = False
walk_into = not is_symlink
if walk_into:
walk_dirs.append(entry.path)
if cont:
continue
if topdown:
# Yield before sub-directory traversal if going top down
yield top, dirs, nondirs
# Traverse into sub-directories
for dirname in reversed(dirs):
new_path = join(top, dirname)
# bpo-23605: os.path.islink() is used instead of caching
# entry.is_symlink() result during the loop on os.scandir() because
# the caller can replace the directory entry during the "yield"
# above.
if followlinks or not islink(new_path):
stack.append(new_path)
else:
# Yield after sub-directory traversal if going bottom up
stack.append((top, dirs, nondirs))
# Traverse into sub-directories
for new_path in reversed(walk_dirs):
stack.append(new_path)
__all__.append("walk")
@@ -461,13 +468,12 @@ if {open, stat} <= supports_dir_fd and {scandir, stat} <= supports_fd:
dirs.remove('CVS') # don't visit CVS directories
"""
sys.audit("os.fwalk", top, topdown, onerror, follow_symlinks, dir_fd)
if not isinstance(top, int) or not hasattr(top, '__index__'):
top = fspath(top)
top = fspath(top)
# Note: To guard against symlink races, we use the standard
# lstat()/open()/fstat() trick.
if not follow_symlinks:
orig_st = stat(top, follow_symlinks=False, dir_fd=dir_fd)
topfd = open(top, O_RDONLY, dir_fd=dir_fd)
topfd = open(top, O_RDONLY | O_NONBLOCK, dir_fd=dir_fd)
try:
if (follow_symlinks or (st.S_ISDIR(orig_st.st_mode) and
path.samestat(orig_st, stat(topfd)))):
@@ -516,7 +522,7 @@ if {open, stat} <= supports_dir_fd and {scandir, stat} <= supports_fd:
assert entries is not None
name, entry = name
orig_st = entry.stat(follow_symlinks=False)
dirfd = open(name, O_RDONLY, dir_fd=topfd)
dirfd = open(name, O_RDONLY | O_NONBLOCK, dir_fd=topfd)
except OSError as err:
if onerror is not None:
onerror(err)
@@ -704,9 +710,11 @@ class _Environ(MutableMapping):
return len(self._data)
def __repr__(self):
return 'environ({{{}}})'.format(', '.join(
('{!r}: {!r}'.format(self.decodekey(key), self.decodevalue(value))
for key, value in self._data.items())))
formatted_items = ", ".join(
f"{self.decodekey(key)!r}: {self.decodevalue(value)!r}"
for key, value in self._data.items()
)
return f"environ({{{formatted_items}}})"
def copy(self):
return dict(self)
@@ -980,7 +988,7 @@ if sys.platform != 'vxworks':
raise ValueError("invalid mode %r" % mode)
if buffering == 0 or buffering is None:
raise ValueError("popen() does not support unbuffered streams")
import subprocess, io
import subprocess
if mode == "r":
proc = subprocess.Popen(cmd,
shell=True, text=True,

694
Lib/test/test_os.py vendored

File diff suppressed because it is too large Load Diff