Merge pull request #5070 from dvermd/compileall_311

Update compileall to CPython 3.11.5
This commit is contained in:
Jeong, YunWon
2023-10-01 20:50:08 +09:00
committed by GitHub

286
Lib/compileall.py vendored
View File

@@ -4,7 +4,7 @@ When called as a script with arguments, this compiles the directories
given as arguments recursively; the -l option prevents it from
recursing into directories.
Without arguments, if compiles all modules on sys.path, without
Without arguments, it compiles all modules on sys.path, without
recursing into subdirectories. (Even though it should do so for
packages -- for now, you'll have to deal with packages separately.)
@@ -15,16 +15,14 @@ import sys
import importlib.util
import py_compile
import struct
import filecmp
try:
from concurrent.futures import ProcessPoolExecutor
except ImportError:
ProcessPoolExecutor = None
from functools import partial
from pathlib import Path
__all__ = ["compile_dir","compile_file","compile_path"]
def _walk_dir(dir, ddir=None, maxlevels=10, quiet=0):
def _walk_dir(dir, maxlevels, quiet=0):
if quiet < 2 and isinstance(dir, os.PathLike):
dir = os.fspath(dir)
if not quiet:
@@ -40,59 +38,94 @@ def _walk_dir(dir, ddir=None, maxlevels=10, quiet=0):
if name == '__pycache__':
continue
fullname = os.path.join(dir, name)
if ddir is not None:
dfile = os.path.join(ddir, name)
else:
dfile = None
if not os.path.isdir(fullname):
yield fullname
elif (maxlevels > 0 and name != os.curdir and name != os.pardir and
os.path.isdir(fullname) and not os.path.islink(fullname)):
yield from _walk_dir(fullname, ddir=dfile,
maxlevels=maxlevels - 1, quiet=quiet)
yield from _walk_dir(fullname, maxlevels=maxlevels - 1,
quiet=quiet)
def compile_dir(dir, maxlevels=10, ddir=None, force=False, rx=None,
quiet=0, legacy=False, optimize=-1, workers=1):
def compile_dir(dir, maxlevels=None, ddir=None, force=False,
rx=None, quiet=0, legacy=False, optimize=-1, workers=1,
invalidation_mode=None, *, stripdir=None,
prependdir=None, limit_sl_dest=None, hardlink_dupes=False):
"""Byte-compile all modules in the given directory tree.
Arguments (only dir is required):
dir: the directory to byte-compile
maxlevels: maximum recursion level (default 10)
maxlevels: maximum recursion level (default `sys.getrecursionlimit()`)
ddir: the directory that will be prepended to the path to the
file as it is compiled into each byte-code file.
force: if True, force compilation, even if timestamps are up-to-date
quiet: full output with False or 0, errors only with 1,
no output with 2
legacy: if True, produce legacy pyc paths instead of PEP 3147 paths
optimize: optimization level or -1 for level of the interpreter
optimize: int or list of optimization levels or -1 for level of
the interpreter. Multiple levels leads to multiple compiled
files each with one optimization level.
workers: maximum number of parallel workers
invalidation_mode: how the up-to-dateness of the pyc will be checked
stripdir: part of path to left-strip from source file path
prependdir: path to prepend to beginning of original file path, applied
after stripdir
limit_sl_dest: ignore symlinks if they are pointing outside of
the defined path
hardlink_dupes: hardlink duplicated pyc files
"""
if workers is not None and workers < 0:
ProcessPoolExecutor = None
if ddir is not None and (stripdir is not None or prependdir is not None):
raise ValueError(("Destination dir (ddir) cannot be used "
"in combination with stripdir or prependdir"))
if ddir is not None:
stripdir = dir
prependdir = ddir
ddir = None
if workers < 0:
raise ValueError('workers must be greater or equal to 0')
files = _walk_dir(dir, quiet=quiet, maxlevels=maxlevels,
ddir=ddir)
if workers != 1:
# Check if this is a system where ProcessPoolExecutor can function.
from concurrent.futures.process import _check_system_limits
try:
_check_system_limits()
except NotImplementedError:
workers = 1
else:
from concurrent.futures import ProcessPoolExecutor
if maxlevels is None:
maxlevels = sys.getrecursionlimit()
files = _walk_dir(dir, quiet=quiet, maxlevels=maxlevels)
success = True
if workers is not None and workers != 1 and ProcessPoolExecutor is not None:
if workers != 1 and ProcessPoolExecutor is not None:
# If workers == 0, let ProcessPoolExecutor choose
workers = workers or None
with ProcessPoolExecutor(max_workers=workers) as executor:
results = executor.map(partial(compile_file,
ddir=ddir, force=force,
rx=rx, quiet=quiet,
legacy=legacy,
optimize=optimize),
optimize=optimize,
invalidation_mode=invalidation_mode,
stripdir=stripdir,
prependdir=prependdir,
limit_sl_dest=limit_sl_dest,
hardlink_dupes=hardlink_dupes),
files)
success = min(results, default=True)
else:
for file in files:
if not compile_file(file, ddir, force, rx, quiet,
legacy, optimize):
legacy, optimize, invalidation_mode,
stripdir=stripdir, prependdir=prependdir,
limit_sl_dest=limit_sl_dest,
hardlink_dupes=hardlink_dupes):
success = False
return success
def compile_file(fullname, ddir=None, force=False, rx=None, quiet=0,
legacy=False, optimize=-1):
legacy=False, optimize=-1,
invalidation_mode=None, *, stripdir=None, prependdir=None,
limit_sl_dest=None, hardlink_dupes=False):
"""Byte-compile one file.
Arguments (only fullname is required):
@@ -104,49 +137,114 @@ def compile_file(fullname, ddir=None, force=False, rx=None, quiet=0,
quiet: full output with False or 0, errors only with 1,
no output with 2
legacy: if True, produce legacy pyc paths instead of PEP 3147 paths
optimize: optimization level or -1 for level of the interpreter
optimize: int or list of optimization levels or -1 for level of
the interpreter. Multiple levels leads to multiple compiled
files each with one optimization level.
invalidation_mode: how the up-to-dateness of the pyc will be checked
stripdir: part of path to left-strip from source file path
prependdir: path to prepend to beginning of original file path, applied
after stripdir
limit_sl_dest: ignore symlinks if they are pointing outside of
the defined path.
hardlink_dupes: hardlink duplicated pyc files
"""
if ddir is not None and (stripdir is not None or prependdir is not None):
raise ValueError(("Destination dir (ddir) cannot be used "
"in combination with stripdir or prependdir"))
success = True
if quiet < 2 and isinstance(fullname, os.PathLike):
fullname = os.fspath(fullname)
fullname = os.fspath(fullname)
stripdir = os.fspath(stripdir) if stripdir is not None else None
name = os.path.basename(fullname)
dfile = None
if ddir is not None:
dfile = os.path.join(ddir, name)
else:
dfile = None
if stripdir is not None:
fullname_parts = fullname.split(os.path.sep)
stripdir_parts = stripdir.split(os.path.sep)
ddir_parts = list(fullname_parts)
for spart, opart in zip(stripdir_parts, fullname_parts):
if spart == opart:
ddir_parts.remove(spart)
dfile = os.path.join(*ddir_parts)
if prependdir is not None:
if dfile is None:
dfile = os.path.join(prependdir, fullname)
else:
dfile = os.path.join(prependdir, dfile)
if isinstance(optimize, int):
optimize = [optimize]
# Use set() to remove duplicates.
# Use sorted() to create pyc files in a deterministic order.
optimize = sorted(set(optimize))
if hardlink_dupes and len(optimize) < 2:
raise ValueError("Hardlinking of duplicated bytecode makes sense "
"only for more than one optimization level")
if rx is not None:
mo = rx.search(fullname)
if mo:
return success
if limit_sl_dest is not None and os.path.islink(fullname):
if Path(limit_sl_dest).resolve() not in Path(fullname).resolve().parents:
return success
opt_cfiles = {}
if os.path.isfile(fullname):
if legacy:
cfile = fullname + 'c'
else:
if optimize >= 0:
opt = optimize if optimize >= 1 else ''
cfile = importlib.util.cache_from_source(
fullname, optimization=opt)
for opt_level in optimize:
if legacy:
opt_cfiles[opt_level] = fullname + 'c'
else:
cfile = importlib.util.cache_from_source(fullname)
cache_dir = os.path.dirname(cfile)
if opt_level >= 0:
opt = opt_level if opt_level >= 1 else ''
cfile = (importlib.util.cache_from_source(
fullname, optimization=opt))
opt_cfiles[opt_level] = cfile
else:
cfile = importlib.util.cache_from_source(fullname)
opt_cfiles[opt_level] = cfile
head, tail = name[:-3], name[-3:]
if tail == '.py':
if not force:
try:
mtime = int(os.stat(fullname).st_mtime)
expect = struct.pack('<4sl', importlib.util.MAGIC_NUMBER,
mtime)
with open(cfile, 'rb') as chandle:
actual = chandle.read(8)
if expect == actual:
expect = struct.pack('<4sLL', importlib.util.MAGIC_NUMBER,
0, mtime & 0xFFFF_FFFF)
for cfile in opt_cfiles.values():
with open(cfile, 'rb') as chandle:
actual = chandle.read(12)
if expect != actual:
break
else:
return success
except OSError:
pass
if not quiet:
print('Compiling {!r}...'.format(fullname))
try:
ok = py_compile.compile(fullname, cfile, dfile, True,
optimize=optimize)
for index, opt_level in enumerate(optimize):
cfile = opt_cfiles[opt_level]
ok = py_compile.compile(fullname, cfile, dfile, True,
optimize=opt_level,
invalidation_mode=invalidation_mode)
if index > 0 and hardlink_dupes:
previous_cfile = opt_cfiles[optimize[index - 1]]
if filecmp.cmp(cfile, previous_cfile, shallow=False):
os.unlink(cfile)
os.link(previous_cfile, cfile)
except py_compile.PyCompileError as err:
success = False
if quiet >= 2:
@@ -156,9 +254,8 @@ def compile_file(fullname, ddir=None, force=False, rx=None, quiet=0,
else:
print('*** ', end='')
# escape non-printable characters in msg
msg = err.msg.encode(sys.stdout.encoding,
errors='backslashreplace')
msg = msg.decode(sys.stdout.encoding)
encoding = sys.stdout.encoding or sys.getdefaultencoding()
msg = err.msg.encode(encoding, errors='backslashreplace').decode(encoding)
print(msg)
except (SyntaxError, UnicodeError, OSError) as e:
success = False
@@ -175,7 +272,8 @@ def compile_file(fullname, ddir=None, force=False, rx=None, quiet=0,
return success
def compile_path(skip_curdir=1, maxlevels=0, force=False, quiet=0,
legacy=False, optimize=-1):
legacy=False, optimize=-1,
invalidation_mode=None):
"""Byte-compile all module on sys.path.
Arguments (all optional):
@@ -186,6 +284,7 @@ def compile_path(skip_curdir=1, maxlevels=0, force=False, quiet=0,
quiet: as for compile_dir() (default 0)
legacy: as for compile_dir() (default False)
optimize: as for compile_dir() (default -1)
invalidation_mode: as for compiler_dir()
"""
success = True
for dir in sys.path:
@@ -193,9 +292,16 @@ def compile_path(skip_curdir=1, maxlevels=0, force=False, quiet=0,
if quiet < 2:
print('Skipping current directory')
else:
success = success and compile_dir(dir, maxlevels, None,
force, quiet=quiet,
legacy=legacy, optimize=optimize)
success = success and compile_dir(
dir,
maxlevels,
None,
force,
quiet=quiet,
legacy=legacy,
optimize=optimize,
invalidation_mode=invalidation_mode,
)
return success
@@ -206,7 +312,7 @@ def main():
parser = argparse.ArgumentParser(
description='Utilities to support installing Python libraries.')
parser.add_argument('-l', action='store_const', const=0,
default=10, dest='maxlevels',
default=None, dest='maxlevels',
help="don't recurse into subdirectories")
parser.add_argument('-r', type=int, dest='recursion',
help=('control the maximum recursion level. '
@@ -224,6 +330,20 @@ def main():
'compile-time tracebacks and in runtime '
'tracebacks in cases where the source file is '
'unavailable'))
parser.add_argument('-s', metavar='STRIPDIR', dest='stripdir',
default=None,
help=('part of path to left-strip from path '
'to source file - for example buildroot. '
'`-d` and `-s` options cannot be '
'specified together.'))
parser.add_argument('-p', metavar='PREPENDDIR', dest='prependdir',
default=None,
help=('path to add as prefix to path '
'to source file - for example / to make '
'it absolute when some part is removed '
'by `-s` option. '
'`-d` and `-p` options cannot be '
'specified together.'))
parser.add_argument('-x', metavar='REGEXP', dest='rx', default=None,
help=('skip files matching the regular expression; '
'the regexp is searched for in the full path '
@@ -238,6 +358,23 @@ def main():
'to the equivalent of -l sys.path'))
parser.add_argument('-j', '--workers', default=1,
type=int, help='Run compileall concurrently')
invalidation_modes = [mode.name.lower().replace('_', '-')
for mode in py_compile.PycInvalidationMode]
parser.add_argument('--invalidation-mode',
choices=sorted(invalidation_modes),
help=('set .pyc invalidation mode; defaults to '
'"checked-hash" if the SOURCE_DATE_EPOCH '
'environment variable is set, and '
'"timestamp" otherwise.'))
parser.add_argument('-o', action='append', type=int, dest='opt_levels',
help=('Optimization levels to run compilation with. '
'Default is -1 which uses the optimization level '
'of the Python interpreter itself (see -O).'))
parser.add_argument('-e', metavar='DIR', dest='limit_sl_dest',
help='Ignore symlinks pointing outsite of the DIR')
parser.add_argument('--hardlink-dupes', action='store_true',
dest='hardlink_dupes',
help='Hardlink duplicated pyc files')
args = parser.parse_args()
compile_dests = args.compile_dest
@@ -246,16 +383,31 @@ def main():
import re
args.rx = re.compile(args.rx)
if args.limit_sl_dest == "":
args.limit_sl_dest = None
if args.recursion is not None:
maxlevels = args.recursion
else:
maxlevels = args.maxlevels
if args.opt_levels is None:
args.opt_levels = [-1]
if len(args.opt_levels) == 1 and args.hardlink_dupes:
parser.error(("Hardlinking of duplicated bytecode makes sense "
"only for more than one optimization level."))
if args.ddir is not None and (
args.stripdir is not None or args.prependdir is not None
):
parser.error("-d cannot be used in combination with -s or -p")
# if flist is provided then load it
if args.flist:
try:
with (sys.stdin if args.flist=='-' else open(args.flist)) as f:
with (sys.stdin if args.flist=='-' else
open(args.flist, encoding="utf-8")) as f:
for line in f:
compile_dests.append(line.strip())
except OSError:
@@ -263,8 +415,11 @@ def main():
print("Error reading file list {}".format(args.flist))
return False
if args.workers is not None:
args.workers = args.workers or None
if args.invalidation_mode:
ivl_mode = args.invalidation_mode.replace('-', '_').upper()
invalidation_mode = py_compile.PycInvalidationMode[ivl_mode]
else:
invalidation_mode = None
success = True
try:
@@ -272,17 +427,30 @@ def main():
for dest in compile_dests:
if os.path.isfile(dest):
if not compile_file(dest, args.ddir, args.force, args.rx,
args.quiet, args.legacy):
args.quiet, args.legacy,
invalidation_mode=invalidation_mode,
stripdir=args.stripdir,
prependdir=args.prependdir,
optimize=args.opt_levels,
limit_sl_dest=args.limit_sl_dest,
hardlink_dupes=args.hardlink_dupes):
success = False
else:
if not compile_dir(dest, maxlevels, args.ddir,
args.force, args.rx, args.quiet,
args.legacy, workers=args.workers):
args.legacy, workers=args.workers,
invalidation_mode=invalidation_mode,
stripdir=args.stripdir,
prependdir=args.prependdir,
optimize=args.opt_levels,
limit_sl_dest=args.limit_sl_dest,
hardlink_dupes=args.hardlink_dupes):
success = False
return success
else:
return compile_path(legacy=args.legacy, force=args.force,
quiet=args.quiet)
quiet=args.quiet,
invalidation_mode=invalidation_mode)
except KeyboardInterrupt:
if args.quiet < 2:
print("\n[interrupted]")