Files
2026-05-18 00:23:34 +09:00

1725 lines
50 KiB
Python

"""
Dependency resolution for library updates.
Handles:
- Irregular library paths (e.g., libregrtest at Lib/test/libregrtest/)
- Library dependencies (e.g., datetime requires _pydatetime)
- Test dependencies (auto-detected from 'from test import ...')
"""
import ast
import difflib
import functools
import pathlib
import shelve
import subprocess
from update_lib.file_utils import (
_dircmp_is_same,
compare_dir_contents,
compare_file_contents,
compare_paths,
construct_lib_path,
cpython_to_local_path,
read_python_files,
resolve_module_path,
resolve_test_path,
safe_parse_ast,
safe_read_text,
)
# === Import parsing utilities ===
class ImportVisitor(ast.NodeVisitor):
def __init__(self) -> None:
self.__imports = set()
@property
def test_imports(self) -> set[str]:
imports = set()
for module in self.__imports:
if not module.startswith("test."):
continue
name = module.removeprefix("test.")
if name == "support" or name.startswith("support."):
continue
imports.add(name)
return imports
@property
def lib_imports(self) -> set[str]:
return {module for module in self.__imports if not module.startswith("test.")}
def visit_Import(self, node):
for alias in node.names:
self.__imports.add(alias.name)
def visit_ImportFrom(self, node):
try:
module = node.module
except AttributeError:
# Ignore `from . import my_internal_module`
return
if module is None: # Ignore `from . import my_internal_module`
return
for alias in node.names:
# We only care about what we import if it was from the "test" module
if module == "test":
name = f"{module}.{alias.name}"
else:
name = module
self.__imports.add(name)
def visit_Call(self, node) -> None:
"""
In test files, there's sometimes use of:
```python
import test.support
from test.support import script_helper
script = support.findfile("_test_atexit.py")
script_helper.run_test_script(script)
```
This imports "_test_atexit.py" but does not show as an import node.
"""
func = node.func
if not isinstance(func, ast.Attribute):
return
value = func.value
if not isinstance(value, ast.Name):
return
if (value.id != "support") or (func.attr != "findfile"):
return
arg = node.args[0]
if not isinstance(arg, ast.Constant):
return
target = arg.value
if not target.endswith(".py"):
return
target = target.removesuffix(".py")
self.__imports.add(f"test.{target}")
def parse_test_imports(content: str) -> frozenset[str]:
"""Parse test file content and extract test package dependencies."""
if not (tree := safe_parse_ast(content)):
return set()
visitor = ImportVisitor()
visitor.visit(tree)
return visitor.test_imports
def parse_lib_imports(content: str) -> frozenset[str]:
"""Parse library file and extract all imported module names."""
if not (tree := safe_parse_ast(content)):
return set()
visitor = ImportVisitor()
visitor.visit(tree)
return visitor.lib_imports
# === TODO marker utilities ===
TODO_MARKER = "TODO: RUSTPYTHON"
def filter_rustpython_todo(content: str) -> str:
"""Remove lines containing RustPython TODO markers."""
lines = content.splitlines(keepends=True)
return "".join(line for line in lines if TODO_MARKER not in line)
def count_rustpython_todo(content: str) -> int:
"""Count lines containing RustPython TODO markers."""
return content.count(TODO_MARKER)
def count_todo_in_path(path: pathlib.Path) -> int:
"""Count RustPython TODO markers in a file or directory of .py files."""
if path.is_file():
content = safe_read_text(path)
return count_rustpython_todo(content) if content else 0
return sum(count_rustpython_todo(content) for _, content in read_python_files(path))
# === Test utilities ===
def _get_cpython_test_path(test_name: str, cpython_prefix: str) -> pathlib.Path | None:
"""Return the CPython test path for a test name, or None if missing."""
cpython_path = resolve_test_path(test_name, cpython_prefix, prefer="dir")
return cpython_path if cpython_path.exists() else None
def _get_local_test_path(
cpython_test_path: pathlib.Path, lib_prefix: str
) -> pathlib.Path:
"""Return the local Lib/test path matching a CPython test path."""
return pathlib.Path(lib_prefix) / "test" / cpython_test_path.name
def is_test_tracked(test_name: str, cpython_prefix: str, lib_prefix: str) -> bool:
"""Check if a test exists in the local Lib/test."""
cpython_path = _get_cpython_test_path(test_name, cpython_prefix)
if cpython_path is None:
return True
local_path = _get_local_test_path(cpython_path, lib_prefix)
return local_path.exists()
def is_test_up_to_date(test_name: str, cpython_prefix: str, lib_prefix: str) -> bool:
"""Check if a test is up-to-date, ignoring RustPython TODO markers."""
cpython_path = _get_cpython_test_path(test_name, cpython_prefix)
if cpython_path is None:
return True
local_path = _get_local_test_path(cpython_path, lib_prefix)
if not local_path.exists():
return False
if cpython_path.is_file():
return compare_file_contents(
cpython_path, local_path, local_filter=filter_rustpython_todo
)
return compare_dir_contents(
cpython_path, local_path, local_filter=filter_rustpython_todo
)
def count_test_todos(test_name: str, lib_prefix: str) -> int:
"""Count RustPython TODO markers in a test file/directory."""
local_dir = pathlib.Path(lib_prefix) / "test" / test_name
local_file = pathlib.Path(lib_prefix) / "test" / f"{test_name}.py"
if local_dir.exists():
return count_todo_in_path(local_dir)
if local_file.exists():
return count_todo_in_path(local_file)
return 0
# === Cross-process cache using shelve ===
def _get_cpython_version(cpython_prefix: str) -> str:
"""Get CPython version from git tag for cache namespace."""
try:
result = subprocess.run(
["git", "describe", "--tags", "--abbrev=0"],
cwd=cpython_prefix,
capture_output=True,
text=True,
)
if result.returncode == 0:
return result.stdout.strip()
except Exception:
pass
return "unknown"
def _get_cache_path() -> str:
"""Get cache file path (without extension - shelve adds its own)."""
cache_dir = pathlib.Path(__file__).parent / ".cache"
cache_dir.mkdir(parents=True, exist_ok=True)
return str(cache_dir / "import_graph_cache")
def clear_import_graph_caches() -> None:
"""Clear in-process import graph caches (for testing)."""
if "_test_import_graph_cache" in globals():
globals()["_test_import_graph_cache"].clear()
if "_lib_import_graph_cache" in globals():
globals()["_lib_import_graph_cache"].clear()
# Manual dependency table for irregular cases
# Format: "name" -> {"lib": [...], "test": [...], "data": [...], "hard_deps": [...]}
# - lib: override default path (default: name.py or name/)
# - hard_deps: additional files to copy alongside the main module
DEPENDENCIES = {
# regrtest is in Lib/test/libregrtest/, not Lib/libregrtest/
"regrtest": {
"lib": ["test/libregrtest"],
"test": ["test_regrtest"],
"data": ["test/regrtestdata"],
},
# Rust-implemented modules (no lib file, only test)
"int": {
"lib": [],
"hard_deps": ["_pylong.py"],
"test": [
"test_int.py",
"test_long.py",
"test_int_literal.py",
],
},
"exception": {
"lib": [],
"test": [
"test_exceptions.py",
"test_baseexception.py",
"test_except_star.py",
"test_exception_group.py",
"test_exception_hierarchy.py",
"test_exception_variations.py",
],
},
"dict": {
"lib": [],
"test": [
"test_dict.py",
"test_dictcomps.py",
"test_dictviews.py",
"test_userdict.py",
"mapping_tests.py",
],
},
"list": {
"lib": [],
"test": [
"test_list.py",
"test_listcomps.py",
"test_userlist.py",
],
},
"__future__": {
"test": [
"test___future__.py",
"test_future_stmt.py",
],
},
"site": {
"hard_deps": ["_sitebuiltins.py"],
},
"opcode": {
"hard_deps": ["_opcode_metadata.py"],
"test": [
"test_opcode.py",
"test__opcode.py",
"test_opcodes.py",
],
},
"pickle": {
"hard_deps": ["_compat_pickle.py"],
"test": [
"picklecommon.py",
"test_pickle.py",
"test_picklebuffer.py",
"test_pickletools.py",
"test_xpickle.py",
"xpickle_worker.py",
],
},
"re": {
"hard_deps": ["sre_compile.py", "sre_constants.py", "sre_parse.py"],
"test": [
"test_re.py",
"re_tests.py",
],
},
"weakref": {
"hard_deps": ["_weakrefset.py"],
"test": [
"test_weakref.py",
"test_weakset.py",
],
},
"codecs": {
"test": [
"test_charmapcodec.py",
"test_codeccallbacks.py",
"test_codecencodings_cn.py",
"test_codecencodings_hk.py",
"test_codecencodings_iso2022.py",
"test_codecencodings_jp.py",
"test_codecencodings_kr.py",
"test_codecencodings_tw.py",
"test_codecmaps_cn.py",
"test_codecmaps_hk.py",
"test_codecmaps_jp.py",
"test_codecmaps_kr.py",
"test_codecmaps_tw.py",
"test_codecs.py",
"test_multibytecodec.py",
"testcodec.py",
],
},
# Non-pattern hard_deps (can't be auto-detected)
"ast": {
"hard_deps": ["_ast_unparse.py"],
"test": [
"test_ast.py",
"test_unparse.py",
"test_type_comments.py",
],
},
# Data directories
"pydoc": {
"hard_deps": ["pydoc_data"],
},
"turtle": {
"hard_deps": ["turtledemo"],
},
"sysconfig": {
"hard_deps": ["_aix_support.py", "_osx_support.py"],
"test": [
"test_sysconfig.py",
"test__osx_support.py",
],
},
"tkinter": {
"test": [
"test_tkinter",
"test_ttk",
"test_ttk_textonly.py",
"test_tcl.py",
"test_idle",
],
},
# Test support library (like regrtest)
"support": {
"lib": ["test/support"],
"data": ["test/wheeldata"],
"test": [
"test_support.py",
"test_script_helper.py",
],
},
# test_htmlparser tests html.parser
"html": {
"hard_deps": ["_markupbase.py"],
"test": ["test_html.py", "test_htmlparser.py"],
},
"xml": {
"test": [
"test_xml_etree.py",
"test_xml_etree_c.py",
"test_minidom.py",
"test_pulldom.py",
"test_pyexpat.py",
"test_sax.py",
"test_xml_dom_minicompat.py",
"test_xml_dom_xmlbuilder.py",
],
},
"multiprocessing": {
"test": [
"test_multiprocessing_fork",
"test_multiprocessing_forkserver",
"test_multiprocessing_spawn",
"test_multiprocessing_main_handling.py",
"_test_multiprocessing.py",
],
},
"urllib": {
"test": [
"test_urllib.py",
"test_urllib2.py",
"test_urllib2_localnet.py",
"test_urllib2net.py",
"test_urllibnet.py",
"test_urlparse.py",
"test_urllib_response.py",
"test_robotparser.py",
],
},
"collections": {
"hard_deps": ["_collections_abc.py"],
"test": [
"test_collections.py",
"test_deque.py",
"test_defaultdict.py",
"test_ordered_dict.py",
],
},
"http": {
"test": [
"test_httplib.py",
"test_http_cookiejar.py",
"test_http_cookies.py",
"test_httpservers.py",
],
},
"unicode": {
"lib": [],
"test": [
"test_unicodedata.py",
"test_unicode_file.py",
"test_unicode_file_functions.py",
"test_unicode_identifiers.py",
"test_ucn.py",
],
},
"typing": {
"test": [
"test_typing.py",
"test_type_aliases.py",
"test_type_annotations.py",
"test_type_params.py",
"test_genericalias.py",
],
},
"unpack": {
"lib": [],
"test": [
"test_unpack.py",
"test_unpack_ex.py",
],
},
"zipimport": {
"test": [
"test_zipimport.py",
"test_zipimport_support.py",
],
},
"time": {
"lib": [],
"test": [
"test_time.py",
"test_strftime.py",
],
},
"sys": {
"lib": [],
"test": [
"test_sys.py",
"test_syslog.py",
"test_sys_setprofile.py",
"test_sys_settrace.py",
"test_audit.py",
"audit-tests.py",
],
},
"str": {
"lib": [],
"test": [
"test_str.py",
"test_fstring.py",
"test_string_literals.py",
],
},
"thread": {
"lib": [],
"test": [
"test_thread.py",
"test_thread_local_bytecode.py",
"test_threadsignals.py",
],
},
"threading": {
"hard_deps": ["_threading_local.py"],
"test": [
"test_threading.py",
"test_threadedtempfile.py",
"test_threading_local.py",
],
},
"class": {
"lib": [],
"test": [
"test_class.py",
"test_genericclass.py",
"test_subclassinit.py",
],
},
"generator": {
"lib": [],
"test": [
"test_generators.py",
"test_genexps.py",
"test_generator_stop.py",
"test_yield_from.py",
],
},
"descr": {
"lib": [],
"test": [
"test_descr.py",
"test_descrtut.py",
],
},
"code": {
"test": [
"test_code_module.py",
],
},
"contextlib": {
"test": [
"test_contextlib.py",
"test_contextlib_async.py",
],
},
"io": {
"hard_deps": ["_pyio.py"],
"test": [
"test_io.py",
"test_bufio.py",
"test_fileio.py",
"test_memoryio.py",
],
},
"dbm": {
"test": [
"test_dbm.py",
"test_dbm_dumb.py",
"test_dbm_gnu.py",
"test_dbm_ndbm.py",
"test_dbm_sqlite3.py",
],
},
"datetime": {
"hard_deps": ["_strptime.py"],
"test": [
"test_datetime.py",
"test_strptime.py",
],
},
"locale": {
"test": [
"test_locale.py",
"test__locale.py",
],
},
"numbers": {
"test": [
"test_numbers.py",
"test_abstract_numbers.py",
],
},
"file": {
"lib": [],
"test": [
"test_file.py",
"test_largefile.py",
],
},
"fcntl": {
"lib": [],
"test": [
"test_fcntl.py",
"test_ioctl.py",
],
},
"select": {
"lib": [],
"test": [
"test_select.py",
"test_poll.py",
],
},
"xmlrpc": {
"test": [
"test_xmlrpc.py",
"test_docxmlrpc.py",
],
},
"ctypes": {
"test": [
"test_ctypes",
"test_stable_abi_ctypes.py",
],
},
# Grouped tests for modules without custom lib paths
"compile": {
"lib": [],
"test": [
"test_compile.py",
"test_compiler_assemble.py",
"test_compiler_codegen.py",
"test_peepholer.py",
],
},
"math": {
"lib": [],
"test": [
"test_math.py",
"test_math_property.py",
],
},
"float": {
"lib": [],
"test": [
"test_float.py",
"test_strtod.py",
],
},
"zipfile": {
"test": [
"test_zipfile.py",
"test_zipfile64.py",
],
},
"smtplib": {
"test": [
"test_smtplib.py",
"test_smtpnet.py",
],
},
"profile": {
"test": [
"test_profile.py",
"test_cprofile.py",
],
},
"string": {
"test": [
"test_string.py",
"test_userstring.py",
],
},
"os": {
"test": [
"test_os.py",
"test_popen.py",
],
},
"pyrepl": {
"test": [
"test_pyrepl",
"test_repl.py",
],
},
"concurrent": {
"test": [
"test_concurrent_futures",
"test_interpreters",
"test__interpreters.py",
"test__interpchannels.py",
"test_crossinterp.py",
],
},
"atexit": {
"test": [
"test_atexit.py",
"_test_atexit.py",
],
},
"eintr": {
"test": [
"test_eintr.py",
"_test_eintr.py",
]
},
"curses": {
"test": [
"test_curses.py",
"curses_tests.py",
],
},
}
def resolve_hard_dep_parent(name: str, cpython_prefix: str) -> str | None:
"""Resolve a hard_dep name to its parent module.
Only returns a parent if the file is actually tracked:
- Explicitly listed in DEPENDENCIES as a hard_dep
- Or auto-detected _py{module}.py pattern where the parent module exists
Args:
name: Module or file name (with or without .py extension)
cpython_prefix: CPython directory prefix
Returns:
Parent module name if found and tracked, None otherwise
"""
# Normalize: remove .py extension if present
if name.endswith(".py"):
name = name[:-3]
# Check DEPENDENCIES table first (explicit hard_deps)
for module_name, dep_info in DEPENDENCIES.items():
hard_deps = dep_info.get("hard_deps", [])
for dep in hard_deps:
# Normalize dep: remove .py extension
dep_normalized = dep[:-3] if dep.endswith(".py") else dep
if dep_normalized == name:
return module_name
# Auto-detect _py{module} or _py_{module} patterns
# Only if the parent module actually exists
if name.startswith("_py"):
# _py_abc -> abc
# _pydatetime -> datetime
parent = name.removeprefix("_py_").removeprefix("_py")
# Verify the parent module exists
lib_dir = pathlib.Path(cpython_prefix) / "Lib"
parent_file = lib_dir / f"{parent}.py"
parent_dir = lib_dir / parent
if parent_file.exists() or (
parent_dir.exists() and (parent_dir / "__init__.py").exists()
):
return parent
return None
def resolve_test_to_lib(test_name: str) -> str | None:
"""Resolve a test name to its library group from DEPENDENCIES.
Args:
test_name: Test name with or without test_ prefix (e.g., "test_urllib2" or "urllib2")
Returns:
Library name if test belongs to a group, None otherwise
"""
# Normalize: add test_ prefix if not present
if not test_name.startswith("test_"):
test_name = f"test_{test_name}"
for lib_name, dep_info in DEPENDENCIES.items():
tests = dep_info.get("test", [])
for test_path in tests:
# test_path is like "test_urllib2.py" or "test_multiprocessing_fork"
path_stem = test_path.removesuffix(".py")
if path_stem == test_name:
return lib_name
return None
# Test-specific dependencies (only when auto-detection isn't enough)
# - hard_deps: files to migrate (tightly coupled, must be migrated together)
# - data: directories to copy without migration
TEST_DEPENDENCIES = {
# Audio tests
"test_winsound": {
"data": ["audiodata"],
},
"test_wave": {
"data": ["audiodata"],
},
"audiotests": {
"data": ["audiodata"],
},
# Archive tests
"test_tarfile": {
"data": ["archivetestdata"],
},
"test_zipfile": {
"data": ["archivetestdata"],
},
# Config tests
"test_configparser": {
"data": ["configdata"],
},
"test_config": {
"data": ["configdata"],
},
# Other data directories
"test_decimal": {
"data": ["decimaltestdata"],
},
"test_dtrace": {
"data": ["dtracedata"],
},
"test_math": {
"data": ["mathdata"],
},
"test_ssl": {
"data": ["certdata"],
},
"test_subprocess": {
"data": ["subprocessdata"],
},
"test_tkinter": {
"data": ["tkinterdata"],
},
"test_tokenize": {
"data": ["tokenizedata"],
},
"test_type_annotations": {
"data": ["typinganndata"],
},
"test_zipimport": {
"data": ["zipimport_data"],
},
# XML tests share xmltestdata
"test_xml_etree": {
"data": ["xmltestdata"],
},
"test_pulldom": {
"data": ["xmltestdata"],
},
"test_sax": {
"data": ["xmltestdata"],
},
"test_minidom": {
"data": ["xmltestdata"],
},
# Multibytecodec support needs cjkencodings
"multibytecodec_support": {
"data": ["cjkencodings"],
},
# i18n
"i18n_helper": {
"data": ["translationdata"],
},
# wheeldata is used by test_makefile and support
"test_makefile": {
"data": ["wheeldata"],
},
# profilee is used by test_monitoring
"test_monitoring": {
"hard_deps": ["profilee"],
},
}
@functools.cache
def get_lib_paths(name: str, cpython_prefix: str) -> tuple[pathlib.Path, ...]:
"""Get all library paths for a module.
Args:
name: Module name (e.g., "datetime", "libregrtest")
cpython_prefix: CPython directory prefix
Returns:
Tuple of paths to copy
"""
dep_info = DEPENDENCIES.get(name, {})
# Get main lib path (override or default)
if "lib" in dep_info:
paths = [construct_lib_path(cpython_prefix, p) for p in dep_info["lib"]]
else:
# Default: try file first, then directory
paths = [resolve_module_path(name, cpython_prefix, prefer="file")]
# Add hard_deps from DEPENDENCIES
for dep in dep_info.get("hard_deps", []):
paths.append(construct_lib_path(cpython_prefix, dep))
# Auto-detect _py{module}.py or _py_{module}.py patterns
for pattern in [f"_py{name}.py", f"_py_{name}.py"]:
auto_path = construct_lib_path(cpython_prefix, pattern)
if auto_path.exists() and auto_path not in paths:
paths.append(auto_path)
return tuple(paths)
def get_all_hard_deps(name: str, cpython_prefix: str) -> list[str]:
"""Get all hard_deps for a module (explicit + auto-detected).
Args:
name: Module name (e.g., "decimal", "datetime")
cpython_prefix: CPython directory prefix
Returns:
List of hard_dep names (without .py extension)
"""
dep_info = DEPENDENCIES.get(name, {})
hard_deps = set()
# Explicit hard_deps from DEPENDENCIES
for hd in dep_info.get("hard_deps", []):
# Remove .py extension if present
hard_deps.add(hd[:-3] if hd.endswith(".py") else hd)
# Auto-detect _py{module}.py or _py_{module}.py patterns
for pattern in [f"_py{name}.py", f"_py_{name}.py"]:
auto_path = construct_lib_path(cpython_prefix, pattern)
if auto_path.exists():
hard_deps.add(auto_path.stem)
return sorted(hard_deps)
@functools.cache
def get_test_paths(name: str, cpython_prefix: str) -> tuple[pathlib.Path, ...]:
"""Get all test paths for a module.
Args:
name: Module name (e.g., "datetime", "libregrtest")
cpython_prefix: CPython directory prefix
Returns:
Tuple of test paths
"""
if name in DEPENDENCIES and "test" in DEPENDENCIES[name]:
return tuple(
construct_lib_path(cpython_prefix, f"test/{p}")
for p in DEPENDENCIES[name]["test"]
)
# Default: try directory first, then file
return (resolve_module_path(f"test/test_{name}", cpython_prefix, prefer="dir"),)
@functools.cache
def get_all_imports(name: str, cpython_prefix: str) -> frozenset[str]:
"""Get all imports from a library file.
Args:
name: Module name
cpython_prefix: CPython directory prefix
Returns:
Frozenset of all imported module names
"""
all_imports = set()
for lib_path in get_lib_paths(name, cpython_prefix):
if lib_path.exists():
for _, content in read_python_files(lib_path):
all_imports.update(parse_lib_imports(content))
# Remove self
all_imports.discard(name)
return frozenset(all_imports)
@functools.cache
def get_soft_deps(name: str, cpython_prefix: str) -> frozenset[str]:
"""Get soft dependencies by parsing imports from library file.
Args:
name: Module name
cpython_prefix: CPython directory prefix
Returns:
Frozenset of imported stdlib module names (those that exist in cpython/Lib/)
"""
all_imports = get_all_imports(name, cpython_prefix)
# Filter: only include modules that exist in cpython/Lib/
stdlib_deps = set()
for imp in all_imports:
module_path = resolve_module_path(imp, cpython_prefix)
if module_path.exists():
stdlib_deps.add(imp)
return frozenset(stdlib_deps)
@functools.cache
def get_rust_deps(name: str, cpython_prefix: str) -> frozenset[str]:
"""Get Rust/C dependencies (imports that don't exist in cpython/Lib/).
Args:
name: Module name
cpython_prefix: CPython directory prefix
Returns:
Frozenset of imported module names that are built-in or C extensions
"""
all_imports = get_all_imports(name, cpython_prefix)
soft_deps = get_soft_deps(name, cpython_prefix)
return frozenset(all_imports - soft_deps)
def is_path_synced(
cpython_path: pathlib.Path,
cpython_prefix: str,
lib_prefix: str,
) -> bool:
"""Check if a CPython path is synced with local.
Args:
cpython_path: Path in CPython directory
cpython_prefix: CPython directory prefix
lib_prefix: Local Lib directory prefix
Returns:
True if synced, False otherwise
"""
local_path = cpython_to_local_path(cpython_path, cpython_prefix, lib_prefix)
if local_path is None:
return False
return compare_paths(cpython_path, local_path)
@functools.cache
def is_up_to_date(name: str, cpython_prefix: str, lib_prefix: str) -> bool:
"""Check if a module is up-to-date by comparing files.
Args:
name: Module name
cpython_prefix: CPython directory prefix
lib_prefix: Local Lib directory prefix
Returns:
True if all files match, False otherwise
"""
lib_paths = get_lib_paths(name, cpython_prefix)
found_any = False
for cpython_path in lib_paths:
if not cpython_path.exists():
continue
found_any = True
# Convert cpython path to local path
# cpython/Lib/foo.py -> Lib/foo.py
rel_path = cpython_path.relative_to(cpython_prefix)
local_path = pathlib.Path(lib_prefix) / rel_path.relative_to("Lib")
if not compare_paths(cpython_path, local_path):
return False
if not found_any:
dep_info = DEPENDENCIES.get(name, {})
if dep_info.get("lib") == []:
return True
return found_any
def _count_file_diff(file_a: pathlib.Path, file_b: pathlib.Path) -> int:
"""Count changed lines between two text files using difflib."""
a_content = safe_read_text(file_a)
b_content = safe_read_text(file_b)
if a_content is None or b_content is None:
return 0
if a_content == b_content:
return 0
a_lines = a_content.splitlines()
b_lines = b_content.splitlines()
count = 0
for line in difflib.unified_diff(a_lines, b_lines, lineterm=""):
if (line.startswith("+") and not line.startswith("+++")) or (
line.startswith("-") and not line.startswith("---")
):
count += 1
return count
def _count_path_diff(path_a: pathlib.Path, path_b: pathlib.Path) -> int:
"""Count changed lines between two paths (file or directory, *.py only)."""
if path_a.is_file() and path_b.is_file():
return _count_file_diff(path_a, path_b)
if path_a.is_dir() and path_b.is_dir():
total = 0
a_files = {f.relative_to(path_a) for f in path_a.rglob("*.py")}
b_files = {f.relative_to(path_b) for f in path_b.rglob("*.py")}
for rel in a_files & b_files:
total += _count_file_diff(path_a / rel, path_b / rel)
for rel in a_files - b_files:
content = safe_read_text(path_a / rel)
if content:
total += len(content.splitlines())
for rel in b_files - a_files:
content = safe_read_text(path_b / rel)
if content:
total += len(content.splitlines())
return total
return 0
@functools.cache
def _bulk_last_updated() -> dict[str, str]:
"""Get last git commit dates for all paths under Lib/ in one git call.
Keys are Lib/-relative paths (e.g. "re/__init__.py", "test/test_os.py",
"os.py"), plus directory rollups (e.g. "re", "test/test_zoneinfo").
Returns:
Dict mapping Lib/-relative path to date string.
"""
file_map: dict[str, str] = {}
try:
result = subprocess.run(
["git", "log", "--format=%cd", "--date=short", "--name-only", "--", "Lib/"],
capture_output=True,
text=True,
timeout=30,
)
if result.returncode != 0:
return file_map
except Exception:
return file_map
current_date = None
for line in result.stdout.splitlines():
line = line.strip()
if not line:
continue
# Date lines are YYYY-MM-DD format
if len(line) == 10 and line[4] == "-" and line[7] == "-":
current_date = line
elif current_date and line.startswith("Lib/"):
# Strip "Lib/" prefix to get Lib-relative key
rel = line[4:]
if rel and rel not in file_map:
file_map[rel] = current_date
# Pre-compute directory rollups
dir_map: dict[str, str] = {}
for filepath, date in file_map.items():
parts = filepath.split("/")
for i in range(1, len(parts)):
dirpath = "/".join(parts[:i])
if dirpath not in dir_map or date > dir_map[dirpath]:
dir_map[dirpath] = date
dir_map.update(file_map)
return dir_map
@functools.cache
def _lib_prefix_stripped(lib_prefix: str) -> str:
"""Get the normalized prefix to strip from paths, with trailing /."""
# e.g. "Lib" -> "Lib/", "./Lib" -> "Lib/", "../Lib" -> "../Lib/"
return pathlib.Path(lib_prefix).as_posix().rstrip("/") + "/"
def _lookup_last_updated(paths: list[str], lib_prefix: str) -> str | None:
"""Look up the most recent date among paths from the bulk cache."""
cache = _bulk_last_updated()
prefix = _lib_prefix_stripped(lib_prefix)
latest = None
for p in paths:
p_norm = pathlib.Path(p).as_posix()
# Strip lib_prefix to get Lib-relative key
# e.g. "Lib/test/test_os.py" -> "test/test_os.py"
# "../Lib/re" -> "re"
if p_norm.startswith(prefix):
key = p_norm[len(prefix) :]
else:
key = p_norm
date = cache.get(key)
if date and (latest is None or date > latest):
latest = date
return latest
def get_module_last_updated(
name: str, cpython_prefix: str, lib_prefix: str
) -> str | None:
"""Get the last git commit date for a module's Lib files."""
local_paths = []
for cpython_path in get_lib_paths(name, cpython_prefix):
if not cpython_path.exists():
continue
try:
rel_path = cpython_path.relative_to(cpython_prefix)
local_path = pathlib.Path(lib_prefix) / rel_path.relative_to("Lib")
if local_path.exists():
local_paths.append(str(local_path))
except ValueError:
continue
if not local_paths:
return None
return _lookup_last_updated(local_paths, lib_prefix)
def get_module_diff_stat(name: str, cpython_prefix: str, lib_prefix: str) -> int:
"""Count differing lines between cpython and local Lib for a module."""
total = 0
for cpython_path in get_lib_paths(name, cpython_prefix):
if not cpython_path.exists():
continue
try:
rel_path = cpython_path.relative_to(cpython_prefix)
local_path = pathlib.Path(lib_prefix) / rel_path.relative_to("Lib")
except ValueError:
continue
if not local_path.exists():
continue
total += _count_path_diff(cpython_path, local_path)
return total
def get_test_last_updated(
test_name: str, cpython_prefix: str, lib_prefix: str
) -> str | None:
"""Get the last git commit date for a test's files."""
cpython_path = _get_cpython_test_path(test_name, cpython_prefix)
if cpython_path is None:
return None
local_path = _get_local_test_path(cpython_path, lib_prefix)
if not local_path.exists():
return None
return _lookup_last_updated([str(local_path)], lib_prefix)
def get_test_dependencies(
test_path: pathlib.Path,
) -> dict[str, list[pathlib.Path]]:
"""Get test dependencies by parsing imports.
Args:
test_path: Path to test file or directory
Returns:
Dict with "hard_deps" (files to migrate) and "data" (dirs to copy)
"""
result = {"hard_deps": [], "data": []}
if not test_path.exists():
return result
# Parse all files for imports (auto-detect deps)
all_imports = set()
for _, content in read_python_files(test_path):
all_imports.update(parse_test_imports(content))
# Also add manual dependencies from TEST_DEPENDENCIES
test_name = test_path.stem if test_path.is_file() else test_path.name
manual_deps = TEST_DEPENDENCIES.get(test_name, {})
if "hard_deps" in manual_deps:
all_imports.update(manual_deps["hard_deps"])
# Convert imports to paths (deps)
for imp in all_imports:
# Skip other test modules (test_*) - they are independently managed
# via their own update_lib entry. Only support/helper modules
# (e.g., string_tests, mapping_tests) should be treated as hard deps.
if imp.startswith("test_"):
continue
dep_path = test_path.parent / f"{imp}.py"
if not dep_path.exists():
dep_path = test_path.parent / imp
if dep_path.exists() and dep_path not in result["hard_deps"]:
result["hard_deps"].append(dep_path)
# Add data paths from manual table (for the test file itself)
if "data" in manual_deps:
for data_name in manual_deps["data"]:
data_path = test_path.parent / data_name
if data_path.exists() and data_path not in result["data"]:
result["data"].append(data_path)
# Also add data from auto-detected deps' TEST_DEPENDENCIES
# e.g., test_codecencodings_kr -> multibytecodec_support -> cjkencodings
for imp in all_imports:
dep_info = TEST_DEPENDENCIES.get(imp, {})
if "data" in dep_info:
for data_name in dep_info["data"]:
data_path = test_path.parent / data_name
if data_path.exists() and data_path not in result["data"]:
result["data"].append(data_path)
return result
def _parse_test_submodule_imports(content: str) -> dict[str, set[str]]:
"""Parse 'from test.X import Y' to get submodule imports.
Args:
content: Python file content
Returns:
Dict mapping submodule (e.g., "test_bar") -> set of imported names (e.g., {"helper"})
"""
tree = safe_parse_ast(content)
if tree is None:
return {}
result: dict[str, set[str]] = {}
for node in ast.walk(tree):
if isinstance(node, ast.ImportFrom):
if node.module and node.module.startswith("test."):
# from test.test_bar import helper -> test_bar: {helper}
parts = node.module.split(".")
if len(parts) >= 2:
submodule = parts[1]
if submodule not in ("support", "__init__"):
if submodule not in result:
result[submodule] = set()
for alias in node.names:
result[submodule].add(alias.name)
return result
_test_import_graph_cache: dict[
str, tuple[dict[str, set[str]], dict[str, set[str]]]
] = {}
def _is_standard_lib_path(path: str) -> bool:
"""Check if path is the standard Lib directory (not a temp dir)."""
if "/tmp" in path.lower() or "/var/folders" in path.lower():
return False
return (
path == "Lib/test"
or path.endswith("/Lib/test")
or path == "Lib"
or path.endswith("/Lib")
)
def _build_test_import_graph(
test_dir: pathlib.Path,
) -> tuple[dict[str, set[str]], dict[str, set[str]]]:
"""Build import graphs for files within test directory (recursive).
Uses cross-process shelve cache based on CPython version.
Args:
test_dir: Path to Lib/test/ directory
Returns:
Tuple of:
- Dict mapping relative path (without .py) -> set of test modules it imports
- Dict mapping relative path (without .py) -> set of all lib imports
"""
# In-process cache
cache_key = str(test_dir)
if cache_key in _test_import_graph_cache:
return _test_import_graph_cache[cache_key]
# Cross-process cache (only for standard Lib/test directory)
use_file_cache = _is_standard_lib_path(cache_key)
if use_file_cache:
version = _get_cpython_version("cpython")
shelve_key = f"test_import_graph:{version}"
try:
with shelve.open(_get_cache_path()) as db:
if shelve_key in db:
import_graph, lib_imports_graph = db[shelve_key]
_test_import_graph_cache[cache_key] = (
import_graph,
lib_imports_graph,
)
return import_graph, lib_imports_graph
except Exception:
pass
# Build from scratch
import_graph: dict[str, set[str]] = {}
lib_imports_graph: dict[str, set[str]] = {}
for py_file in test_dir.glob("**/*.py"):
content = safe_read_text(py_file)
if content is None:
continue
imports = set()
imports.update(parse_test_imports(content))
all_imports = parse_lib_imports(content)
for imp in all_imports:
if (py_file.parent / f"{imp}.py").exists():
imports.add(imp)
if (test_dir / f"{imp}.py").exists():
imports.add(imp)
submodule_imports = _parse_test_submodule_imports(content)
for submodule, imported_names in submodule_imports.items():
submodule_dir = test_dir / submodule
if submodule_dir.is_dir():
for name in imported_names:
if (submodule_dir / f"{name}.py").exists():
imports.add(name)
rel_path = py_file.relative_to(test_dir)
key = str(rel_path.with_suffix(""))
import_graph[key] = imports
lib_imports_graph[key] = all_imports
# Save to cross-process cache
if use_file_cache:
try:
with shelve.open(_get_cache_path()) as db:
db[shelve_key] = (import_graph, lib_imports_graph)
except Exception:
pass
_test_import_graph_cache[cache_key] = (import_graph, lib_imports_graph)
return import_graph, lib_imports_graph
_lib_import_graph_cache: dict[str, dict[str, set[str]]] = {}
def _build_lib_import_graph(lib_prefix: str) -> dict[str, set[str]]:
"""Build import graph for Lib modules (full module paths like urllib.request).
Uses cross-process shelve cache based on CPython version.
Args:
lib_prefix: RustPython Lib directory
Returns:
Dict mapping full_module_path -> set of modules it imports
"""
# In-process cache
if lib_prefix in _lib_import_graph_cache:
return _lib_import_graph_cache[lib_prefix]
# Cross-process cache (only for standard Lib directory)
use_file_cache = _is_standard_lib_path(lib_prefix)
if use_file_cache:
version = _get_cpython_version("cpython")
shelve_key = f"lib_import_graph:{version}"
try:
with shelve.open(_get_cache_path()) as db:
if shelve_key in db:
import_graph = db[shelve_key]
_lib_import_graph_cache[lib_prefix] = import_graph
return import_graph
except Exception:
pass
# Build from scratch
lib_dir = pathlib.Path(lib_prefix)
if not lib_dir.exists():
return {}
import_graph: dict[str, set[str]] = {}
for entry in lib_dir.iterdir():
if entry.name.startswith(("_", ".")):
continue
if entry.name == "test":
continue
if entry.is_file() and entry.suffix == ".py":
content = safe_read_text(entry)
if content:
imports = parse_lib_imports(content)
imports.discard(entry.stem)
import_graph[entry.stem] = imports
elif entry.is_dir() and (entry / "__init__.py").exists():
for py_file in entry.glob("**/*.py"):
content = safe_read_text(py_file)
if content:
imports = parse_lib_imports(content)
rel_path = py_file.relative_to(lib_dir)
if rel_path.name == "__init__.py":
full_name = str(rel_path.parent).replace("/", ".")
else:
full_name = str(rel_path.with_suffix("")).replace("/", ".")
imports.discard(full_name.split(".")[0])
import_graph[full_name] = imports
# Save to cross-process cache
if use_file_cache:
try:
with shelve.open(_get_cache_path()) as db:
db[shelve_key] = import_graph
except Exception:
pass
_lib_import_graph_cache[lib_prefix] = import_graph
return import_graph
def _get_lib_modules_importing(
module_name: str, lib_import_graph: dict[str, set[str]]
) -> set[str]:
"""Find Lib modules (full paths) that import module_name or any of its submodules."""
importers: set[str] = set()
target_top = module_name.split(".")[0]
for full_path, imports in lib_import_graph.items():
if full_path.split(".")[0] == target_top:
continue # Skip same package
# Match if module imports target OR any submodule of target
# e.g., for "xml": match imports of "xml", "xml.parsers", "xml.etree.ElementTree"
matches = any(
imp == module_name or imp.startswith(module_name + ".") for imp in imports
)
if matches:
importers.add(full_path)
return importers
def _consolidate_submodules(
modules: set[str], threshold: int = 3
) -> dict[str, set[str]]:
"""Consolidate submodules if count exceeds threshold.
Args:
modules: Set of full module paths (e.g., {"urllib.request", "urllib.parse", "xml.dom", "xml.sax"})
threshold: If submodules > threshold, consolidate to parent
Returns:
Dict mapping display_name -> set of original module paths
e.g., {"urllib.request": {"urllib.request"}, "xml": {"xml.dom", "xml.sax", "xml.etree", "xml.parsers"}}
"""
# Group by top-level package
by_package: dict[str, set[str]] = {}
for mod in modules:
parts = mod.split(".")
top = parts[0]
if top not in by_package:
by_package[top] = set()
by_package[top].add(mod)
result: dict[str, set[str]] = {}
for top, submods in by_package.items():
if len(submods) > threshold:
# Consolidate to top-level
result[top] = submods
else:
# Keep individual
for mod in submods:
result[mod] = {mod}
return result
# Modules that are used everywhere - show but don't expand their dependents
_BLOCKLIST_MODULES = frozenset(
{
"unittest",
"test.support",
"support",
"doctest",
"typing",
"abc",
"collections.abc",
"functools",
"itertools",
"operator",
"contextlib",
"warnings",
"types",
"enum",
"re",
"io",
"os",
"sys",
}
)
def find_dependent_tests_tree(
module_name: str,
lib_prefix: str,
max_depth: int = 1,
_depth: int = 0,
_visited_tests: set[str] | None = None,
_visited_modules: set[str] | None = None,
) -> dict:
"""Find dependent tests in a tree structure.
Args:
module_name: Module to search for (e.g., "ftplib")
lib_prefix: RustPython Lib directory
max_depth: Maximum depth to recurse (default 1 = show direct + 1 level of Lib deps)
Returns:
Dict with structure:
{
"module": "ftplib",
"tests": ["test_ftplib", "test_urllib2"], # Direct importers
"children": [
{"module": "urllib.request", "tests": [...], "children": []},
...
]
}
"""
lib_dir = pathlib.Path(lib_prefix)
test_dir = lib_dir / "test"
if _visited_tests is None:
_visited_tests = set()
if _visited_modules is None:
_visited_modules = set()
# Build graphs
test_import_graph, test_lib_imports = _build_test_import_graph(test_dir)
lib_import_graph = _build_lib_import_graph(lib_prefix)
# Find tests that directly import this module
target_top = module_name.split(".")[0]
direct_tests: set[str] = set()
for file_key, imports in test_lib_imports.items():
if file_key in _visited_tests:
continue
# Match exact module OR any child submodule
# e.g., "xml" matches imports of "xml", "xml.parsers", "xml.etree.ElementTree"
# but "collections._defaultdict" only matches "collections._defaultdict" (no children)
matches = any(
imp == module_name or imp.startswith(module_name + ".") for imp in imports
)
if matches:
# Check if it's a test file
if pathlib.Path(file_key).name.startswith("test_"):
direct_tests.add(file_key)
_visited_tests.add(file_key)
# Consolidate test names (test_sqlite3/test_dbapi -> test_sqlite3)
consolidated_tests = {_consolidate_file_key(t) for t in direct_tests}
# Mark this module as visited (cycle detection)
_visited_modules.add(module_name)
_visited_modules.add(target_top)
children = []
# Check blocklist and depth limit
should_expand = (
_depth < max_depth
and module_name not in _BLOCKLIST_MODULES
and target_top not in _BLOCKLIST_MODULES
)
if should_expand:
# Find Lib modules that import this module
lib_importers = _get_lib_modules_importing(module_name, lib_import_graph)
# Skip already visited modules (cycle detection) and blocklisted modules
lib_importers = {
m
for m in lib_importers
if m not in _visited_modules
and m.split(".")[0] not in _visited_modules
and m not in _BLOCKLIST_MODULES
and m.split(".")[0] not in _BLOCKLIST_MODULES
}
# Consolidate submodules (xml.dom, xml.sax, xml.etree -> xml if > 3)
consolidated_libs = _consolidate_submodules(lib_importers, threshold=3)
# Build children
for display_name, original_mods in sorted(consolidated_libs.items()):
child = find_dependent_tests_tree(
display_name,
lib_prefix,
max_depth,
_depth + 1,
_visited_tests,
_visited_modules,
)
if child["tests"] or child["children"]:
children.append(child)
return {
"module": module_name,
"tests": sorted(consolidated_tests),
"children": children,
}
def _consolidate_file_key(file_key: str) -> str:
"""Consolidate file_key to test name.
Args:
file_key: Relative path without .py (e.g., "test_foo", "test_bar/test_sub")
Returns:
Consolidated test name:
- "test_foo" for "test_foo"
- "test_sqlite3" for "test_sqlite3/test_dbapi"
"""
parts = pathlib.Path(file_key).parts
if len(parts) == 1:
return parts[0]
return parts[0]