mirror of
https://github.com/RustPython/RustPython.git
synced 2026-06-02 19:39:49 +09:00
1725 lines
50 KiB
Python
1725 lines
50 KiB
Python
"""
|
|
Dependency resolution for library updates.
|
|
|
|
Handles:
|
|
- Irregular library paths (e.g., libregrtest at Lib/test/libregrtest/)
|
|
- Library dependencies (e.g., datetime requires _pydatetime)
|
|
- Test dependencies (auto-detected from 'from test import ...')
|
|
"""
|
|
|
|
import ast
|
|
import difflib
|
|
import functools
|
|
import pathlib
|
|
import shelve
|
|
import subprocess
|
|
|
|
from update_lib.file_utils import (
|
|
_dircmp_is_same,
|
|
compare_dir_contents,
|
|
compare_file_contents,
|
|
compare_paths,
|
|
construct_lib_path,
|
|
cpython_to_local_path,
|
|
read_python_files,
|
|
resolve_module_path,
|
|
resolve_test_path,
|
|
safe_parse_ast,
|
|
safe_read_text,
|
|
)
|
|
|
|
# === Import parsing utilities ===
|
|
|
|
|
|
class ImportVisitor(ast.NodeVisitor):
|
|
def __init__(self) -> None:
|
|
self.__imports = set()
|
|
|
|
@property
|
|
def test_imports(self) -> set[str]:
|
|
imports = set()
|
|
for module in self.__imports:
|
|
if not module.startswith("test."):
|
|
continue
|
|
name = module.removeprefix("test.")
|
|
|
|
if name == "support" or name.startswith("support."):
|
|
continue
|
|
|
|
imports.add(name)
|
|
|
|
return imports
|
|
|
|
@property
|
|
def lib_imports(self) -> set[str]:
|
|
return {module for module in self.__imports if not module.startswith("test.")}
|
|
|
|
def visit_Import(self, node):
|
|
for alias in node.names:
|
|
self.__imports.add(alias.name)
|
|
|
|
def visit_ImportFrom(self, node):
|
|
try:
|
|
module = node.module
|
|
except AttributeError:
|
|
# Ignore `from . import my_internal_module`
|
|
return
|
|
|
|
if module is None: # Ignore `from . import my_internal_module`
|
|
return
|
|
|
|
for alias in node.names:
|
|
# We only care about what we import if it was from the "test" module
|
|
if module == "test":
|
|
name = f"{module}.{alias.name}"
|
|
else:
|
|
name = module
|
|
|
|
self.__imports.add(name)
|
|
|
|
def visit_Call(self, node) -> None:
|
|
"""
|
|
In test files, there's sometimes use of:
|
|
|
|
```python
|
|
import test.support
|
|
from test.support import script_helper
|
|
|
|
script = support.findfile("_test_atexit.py")
|
|
script_helper.run_test_script(script)
|
|
```
|
|
|
|
This imports "_test_atexit.py" but does not show as an import node.
|
|
"""
|
|
func = node.func
|
|
if not isinstance(func, ast.Attribute):
|
|
return
|
|
|
|
value = func.value
|
|
if not isinstance(value, ast.Name):
|
|
return
|
|
|
|
if (value.id != "support") or (func.attr != "findfile"):
|
|
return
|
|
|
|
arg = node.args[0]
|
|
if not isinstance(arg, ast.Constant):
|
|
return
|
|
|
|
target = arg.value
|
|
if not target.endswith(".py"):
|
|
return
|
|
|
|
target = target.removesuffix(".py")
|
|
self.__imports.add(f"test.{target}")
|
|
|
|
|
|
def parse_test_imports(content: str) -> frozenset[str]:
|
|
"""Parse test file content and extract test package dependencies."""
|
|
if not (tree := safe_parse_ast(content)):
|
|
return set()
|
|
|
|
visitor = ImportVisitor()
|
|
visitor.visit(tree)
|
|
return visitor.test_imports
|
|
|
|
|
|
def parse_lib_imports(content: str) -> frozenset[str]:
|
|
"""Parse library file and extract all imported module names."""
|
|
if not (tree := safe_parse_ast(content)):
|
|
return set()
|
|
|
|
visitor = ImportVisitor()
|
|
visitor.visit(tree)
|
|
return visitor.lib_imports
|
|
|
|
|
|
# === TODO marker utilities ===
|
|
|
|
TODO_MARKER = "TODO: RUSTPYTHON"
|
|
|
|
|
|
def filter_rustpython_todo(content: str) -> str:
|
|
"""Remove lines containing RustPython TODO markers."""
|
|
lines = content.splitlines(keepends=True)
|
|
return "".join(line for line in lines if TODO_MARKER not in line)
|
|
|
|
|
|
def count_rustpython_todo(content: str) -> int:
|
|
"""Count lines containing RustPython TODO markers."""
|
|
return content.count(TODO_MARKER)
|
|
|
|
|
|
def count_todo_in_path(path: pathlib.Path) -> int:
|
|
"""Count RustPython TODO markers in a file or directory of .py files."""
|
|
if path.is_file():
|
|
content = safe_read_text(path)
|
|
return count_rustpython_todo(content) if content else 0
|
|
|
|
return sum(count_rustpython_todo(content) for _, content in read_python_files(path))
|
|
|
|
|
|
# === Test utilities ===
|
|
|
|
|
|
def _get_cpython_test_path(test_name: str, cpython_prefix: str) -> pathlib.Path | None:
|
|
"""Return the CPython test path for a test name, or None if missing."""
|
|
cpython_path = resolve_test_path(test_name, cpython_prefix, prefer="dir")
|
|
return cpython_path if cpython_path.exists() else None
|
|
|
|
|
|
def _get_local_test_path(
|
|
cpython_test_path: pathlib.Path, lib_prefix: str
|
|
) -> pathlib.Path:
|
|
"""Return the local Lib/test path matching a CPython test path."""
|
|
return pathlib.Path(lib_prefix) / "test" / cpython_test_path.name
|
|
|
|
|
|
def is_test_tracked(test_name: str, cpython_prefix: str, lib_prefix: str) -> bool:
|
|
"""Check if a test exists in the local Lib/test."""
|
|
cpython_path = _get_cpython_test_path(test_name, cpython_prefix)
|
|
if cpython_path is None:
|
|
return True
|
|
local_path = _get_local_test_path(cpython_path, lib_prefix)
|
|
return local_path.exists()
|
|
|
|
|
|
def is_test_up_to_date(test_name: str, cpython_prefix: str, lib_prefix: str) -> bool:
|
|
"""Check if a test is up-to-date, ignoring RustPython TODO markers."""
|
|
cpython_path = _get_cpython_test_path(test_name, cpython_prefix)
|
|
if cpython_path is None:
|
|
return True
|
|
|
|
local_path = _get_local_test_path(cpython_path, lib_prefix)
|
|
if not local_path.exists():
|
|
return False
|
|
|
|
if cpython_path.is_file():
|
|
return compare_file_contents(
|
|
cpython_path, local_path, local_filter=filter_rustpython_todo
|
|
)
|
|
|
|
return compare_dir_contents(
|
|
cpython_path, local_path, local_filter=filter_rustpython_todo
|
|
)
|
|
|
|
|
|
def count_test_todos(test_name: str, lib_prefix: str) -> int:
|
|
"""Count RustPython TODO markers in a test file/directory."""
|
|
local_dir = pathlib.Path(lib_prefix) / "test" / test_name
|
|
local_file = pathlib.Path(lib_prefix) / "test" / f"{test_name}.py"
|
|
|
|
if local_dir.exists():
|
|
return count_todo_in_path(local_dir)
|
|
if local_file.exists():
|
|
return count_todo_in_path(local_file)
|
|
return 0
|
|
|
|
|
|
# === Cross-process cache using shelve ===
|
|
|
|
|
|
def _get_cpython_version(cpython_prefix: str) -> str:
|
|
"""Get CPython version from git tag for cache namespace."""
|
|
try:
|
|
result = subprocess.run(
|
|
["git", "describe", "--tags", "--abbrev=0"],
|
|
cwd=cpython_prefix,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
if result.returncode == 0:
|
|
return result.stdout.strip()
|
|
except Exception:
|
|
pass
|
|
return "unknown"
|
|
|
|
|
|
def _get_cache_path() -> str:
|
|
"""Get cache file path (without extension - shelve adds its own)."""
|
|
cache_dir = pathlib.Path(__file__).parent / ".cache"
|
|
cache_dir.mkdir(parents=True, exist_ok=True)
|
|
return str(cache_dir / "import_graph_cache")
|
|
|
|
|
|
def clear_import_graph_caches() -> None:
|
|
"""Clear in-process import graph caches (for testing)."""
|
|
if "_test_import_graph_cache" in globals():
|
|
globals()["_test_import_graph_cache"].clear()
|
|
if "_lib_import_graph_cache" in globals():
|
|
globals()["_lib_import_graph_cache"].clear()
|
|
|
|
|
|
# Manual dependency table for irregular cases
|
|
# Format: "name" -> {"lib": [...], "test": [...], "data": [...], "hard_deps": [...]}
|
|
# - lib: override default path (default: name.py or name/)
|
|
# - hard_deps: additional files to copy alongside the main module
|
|
DEPENDENCIES = {
|
|
# regrtest is in Lib/test/libregrtest/, not Lib/libregrtest/
|
|
"regrtest": {
|
|
"lib": ["test/libregrtest"],
|
|
"test": ["test_regrtest"],
|
|
"data": ["test/regrtestdata"],
|
|
},
|
|
# Rust-implemented modules (no lib file, only test)
|
|
"int": {
|
|
"lib": [],
|
|
"hard_deps": ["_pylong.py"],
|
|
"test": [
|
|
"test_int.py",
|
|
"test_long.py",
|
|
"test_int_literal.py",
|
|
],
|
|
},
|
|
"exception": {
|
|
"lib": [],
|
|
"test": [
|
|
"test_exceptions.py",
|
|
"test_baseexception.py",
|
|
"test_except_star.py",
|
|
"test_exception_group.py",
|
|
"test_exception_hierarchy.py",
|
|
"test_exception_variations.py",
|
|
],
|
|
},
|
|
"dict": {
|
|
"lib": [],
|
|
"test": [
|
|
"test_dict.py",
|
|
"test_dictcomps.py",
|
|
"test_dictviews.py",
|
|
"test_userdict.py",
|
|
"mapping_tests.py",
|
|
],
|
|
},
|
|
"list": {
|
|
"lib": [],
|
|
"test": [
|
|
"test_list.py",
|
|
"test_listcomps.py",
|
|
"test_userlist.py",
|
|
],
|
|
},
|
|
"__future__": {
|
|
"test": [
|
|
"test___future__.py",
|
|
"test_future_stmt.py",
|
|
],
|
|
},
|
|
"site": {
|
|
"hard_deps": ["_sitebuiltins.py"],
|
|
},
|
|
"opcode": {
|
|
"hard_deps": ["_opcode_metadata.py"],
|
|
"test": [
|
|
"test_opcode.py",
|
|
"test__opcode.py",
|
|
"test_opcodes.py",
|
|
],
|
|
},
|
|
"pickle": {
|
|
"hard_deps": ["_compat_pickle.py"],
|
|
"test": [
|
|
"picklecommon.py",
|
|
"test_pickle.py",
|
|
"test_picklebuffer.py",
|
|
"test_pickletools.py",
|
|
"test_xpickle.py",
|
|
"xpickle_worker.py",
|
|
],
|
|
},
|
|
"re": {
|
|
"hard_deps": ["sre_compile.py", "sre_constants.py", "sre_parse.py"],
|
|
"test": [
|
|
"test_re.py",
|
|
"re_tests.py",
|
|
],
|
|
},
|
|
"weakref": {
|
|
"hard_deps": ["_weakrefset.py"],
|
|
"test": [
|
|
"test_weakref.py",
|
|
"test_weakset.py",
|
|
],
|
|
},
|
|
"codecs": {
|
|
"test": [
|
|
"test_charmapcodec.py",
|
|
"test_codeccallbacks.py",
|
|
"test_codecencodings_cn.py",
|
|
"test_codecencodings_hk.py",
|
|
"test_codecencodings_iso2022.py",
|
|
"test_codecencodings_jp.py",
|
|
"test_codecencodings_kr.py",
|
|
"test_codecencodings_tw.py",
|
|
"test_codecmaps_cn.py",
|
|
"test_codecmaps_hk.py",
|
|
"test_codecmaps_jp.py",
|
|
"test_codecmaps_kr.py",
|
|
"test_codecmaps_tw.py",
|
|
"test_codecs.py",
|
|
"test_multibytecodec.py",
|
|
"testcodec.py",
|
|
],
|
|
},
|
|
# Non-pattern hard_deps (can't be auto-detected)
|
|
"ast": {
|
|
"hard_deps": ["_ast_unparse.py"],
|
|
"test": [
|
|
"test_ast.py",
|
|
"test_unparse.py",
|
|
"test_type_comments.py",
|
|
],
|
|
},
|
|
# Data directories
|
|
"pydoc": {
|
|
"hard_deps": ["pydoc_data"],
|
|
},
|
|
"turtle": {
|
|
"hard_deps": ["turtledemo"],
|
|
},
|
|
"sysconfig": {
|
|
"hard_deps": ["_aix_support.py", "_osx_support.py"],
|
|
"test": [
|
|
"test_sysconfig.py",
|
|
"test__osx_support.py",
|
|
],
|
|
},
|
|
"tkinter": {
|
|
"test": [
|
|
"test_tkinter",
|
|
"test_ttk",
|
|
"test_ttk_textonly.py",
|
|
"test_tcl.py",
|
|
"test_idle",
|
|
],
|
|
},
|
|
# Test support library (like regrtest)
|
|
"support": {
|
|
"lib": ["test/support"],
|
|
"data": ["test/wheeldata"],
|
|
"test": [
|
|
"test_support.py",
|
|
"test_script_helper.py",
|
|
],
|
|
},
|
|
# test_htmlparser tests html.parser
|
|
"html": {
|
|
"hard_deps": ["_markupbase.py"],
|
|
"test": ["test_html.py", "test_htmlparser.py"],
|
|
},
|
|
"xml": {
|
|
"test": [
|
|
"test_xml_etree.py",
|
|
"test_xml_etree_c.py",
|
|
"test_minidom.py",
|
|
"test_pulldom.py",
|
|
"test_pyexpat.py",
|
|
"test_sax.py",
|
|
"test_xml_dom_minicompat.py",
|
|
"test_xml_dom_xmlbuilder.py",
|
|
],
|
|
},
|
|
"multiprocessing": {
|
|
"test": [
|
|
"test_multiprocessing_fork",
|
|
"test_multiprocessing_forkserver",
|
|
"test_multiprocessing_spawn",
|
|
"test_multiprocessing_main_handling.py",
|
|
"_test_multiprocessing.py",
|
|
],
|
|
},
|
|
"urllib": {
|
|
"test": [
|
|
"test_urllib.py",
|
|
"test_urllib2.py",
|
|
"test_urllib2_localnet.py",
|
|
"test_urllib2net.py",
|
|
"test_urllibnet.py",
|
|
"test_urlparse.py",
|
|
"test_urllib_response.py",
|
|
"test_robotparser.py",
|
|
],
|
|
},
|
|
"collections": {
|
|
"hard_deps": ["_collections_abc.py"],
|
|
"test": [
|
|
"test_collections.py",
|
|
"test_deque.py",
|
|
"test_defaultdict.py",
|
|
"test_ordered_dict.py",
|
|
],
|
|
},
|
|
"http": {
|
|
"test": [
|
|
"test_httplib.py",
|
|
"test_http_cookiejar.py",
|
|
"test_http_cookies.py",
|
|
"test_httpservers.py",
|
|
],
|
|
},
|
|
"unicode": {
|
|
"lib": [],
|
|
"test": [
|
|
"test_unicodedata.py",
|
|
"test_unicode_file.py",
|
|
"test_unicode_file_functions.py",
|
|
"test_unicode_identifiers.py",
|
|
"test_ucn.py",
|
|
],
|
|
},
|
|
"typing": {
|
|
"test": [
|
|
"test_typing.py",
|
|
"test_type_aliases.py",
|
|
"test_type_annotations.py",
|
|
"test_type_params.py",
|
|
"test_genericalias.py",
|
|
],
|
|
},
|
|
"unpack": {
|
|
"lib": [],
|
|
"test": [
|
|
"test_unpack.py",
|
|
"test_unpack_ex.py",
|
|
],
|
|
},
|
|
"zipimport": {
|
|
"test": [
|
|
"test_zipimport.py",
|
|
"test_zipimport_support.py",
|
|
],
|
|
},
|
|
"time": {
|
|
"lib": [],
|
|
"test": [
|
|
"test_time.py",
|
|
"test_strftime.py",
|
|
],
|
|
},
|
|
"sys": {
|
|
"lib": [],
|
|
"test": [
|
|
"test_sys.py",
|
|
"test_syslog.py",
|
|
"test_sys_setprofile.py",
|
|
"test_sys_settrace.py",
|
|
"test_audit.py",
|
|
"audit-tests.py",
|
|
],
|
|
},
|
|
"str": {
|
|
"lib": [],
|
|
"test": [
|
|
"test_str.py",
|
|
"test_fstring.py",
|
|
"test_string_literals.py",
|
|
],
|
|
},
|
|
"thread": {
|
|
"lib": [],
|
|
"test": [
|
|
"test_thread.py",
|
|
"test_thread_local_bytecode.py",
|
|
"test_threadsignals.py",
|
|
],
|
|
},
|
|
"threading": {
|
|
"hard_deps": ["_threading_local.py"],
|
|
"test": [
|
|
"test_threading.py",
|
|
"test_threadedtempfile.py",
|
|
"test_threading_local.py",
|
|
],
|
|
},
|
|
"class": {
|
|
"lib": [],
|
|
"test": [
|
|
"test_class.py",
|
|
"test_genericclass.py",
|
|
"test_subclassinit.py",
|
|
],
|
|
},
|
|
"generator": {
|
|
"lib": [],
|
|
"test": [
|
|
"test_generators.py",
|
|
"test_genexps.py",
|
|
"test_generator_stop.py",
|
|
"test_yield_from.py",
|
|
],
|
|
},
|
|
"descr": {
|
|
"lib": [],
|
|
"test": [
|
|
"test_descr.py",
|
|
"test_descrtut.py",
|
|
],
|
|
},
|
|
"code": {
|
|
"test": [
|
|
"test_code_module.py",
|
|
],
|
|
},
|
|
"contextlib": {
|
|
"test": [
|
|
"test_contextlib.py",
|
|
"test_contextlib_async.py",
|
|
],
|
|
},
|
|
"io": {
|
|
"hard_deps": ["_pyio.py"],
|
|
"test": [
|
|
"test_io.py",
|
|
"test_bufio.py",
|
|
"test_fileio.py",
|
|
"test_memoryio.py",
|
|
],
|
|
},
|
|
"dbm": {
|
|
"test": [
|
|
"test_dbm.py",
|
|
"test_dbm_dumb.py",
|
|
"test_dbm_gnu.py",
|
|
"test_dbm_ndbm.py",
|
|
"test_dbm_sqlite3.py",
|
|
],
|
|
},
|
|
"datetime": {
|
|
"hard_deps": ["_strptime.py"],
|
|
"test": [
|
|
"test_datetime.py",
|
|
"test_strptime.py",
|
|
],
|
|
},
|
|
"locale": {
|
|
"test": [
|
|
"test_locale.py",
|
|
"test__locale.py",
|
|
],
|
|
},
|
|
"numbers": {
|
|
"test": [
|
|
"test_numbers.py",
|
|
"test_abstract_numbers.py",
|
|
],
|
|
},
|
|
"file": {
|
|
"lib": [],
|
|
"test": [
|
|
"test_file.py",
|
|
"test_largefile.py",
|
|
],
|
|
},
|
|
"fcntl": {
|
|
"lib": [],
|
|
"test": [
|
|
"test_fcntl.py",
|
|
"test_ioctl.py",
|
|
],
|
|
},
|
|
"select": {
|
|
"lib": [],
|
|
"test": [
|
|
"test_select.py",
|
|
"test_poll.py",
|
|
],
|
|
},
|
|
"xmlrpc": {
|
|
"test": [
|
|
"test_xmlrpc.py",
|
|
"test_docxmlrpc.py",
|
|
],
|
|
},
|
|
"ctypes": {
|
|
"test": [
|
|
"test_ctypes",
|
|
"test_stable_abi_ctypes.py",
|
|
],
|
|
},
|
|
# Grouped tests for modules without custom lib paths
|
|
"compile": {
|
|
"lib": [],
|
|
"test": [
|
|
"test_compile.py",
|
|
"test_compiler_assemble.py",
|
|
"test_compiler_codegen.py",
|
|
"test_peepholer.py",
|
|
],
|
|
},
|
|
"math": {
|
|
"lib": [],
|
|
"test": [
|
|
"test_math.py",
|
|
"test_math_property.py",
|
|
],
|
|
},
|
|
"float": {
|
|
"lib": [],
|
|
"test": [
|
|
"test_float.py",
|
|
"test_strtod.py",
|
|
],
|
|
},
|
|
"zipfile": {
|
|
"test": [
|
|
"test_zipfile.py",
|
|
"test_zipfile64.py",
|
|
],
|
|
},
|
|
"smtplib": {
|
|
"test": [
|
|
"test_smtplib.py",
|
|
"test_smtpnet.py",
|
|
],
|
|
},
|
|
"profile": {
|
|
"test": [
|
|
"test_profile.py",
|
|
"test_cprofile.py",
|
|
],
|
|
},
|
|
"string": {
|
|
"test": [
|
|
"test_string.py",
|
|
"test_userstring.py",
|
|
],
|
|
},
|
|
"os": {
|
|
"test": [
|
|
"test_os.py",
|
|
"test_popen.py",
|
|
],
|
|
},
|
|
"pyrepl": {
|
|
"test": [
|
|
"test_pyrepl",
|
|
"test_repl.py",
|
|
],
|
|
},
|
|
"concurrent": {
|
|
"test": [
|
|
"test_concurrent_futures",
|
|
"test_interpreters",
|
|
"test__interpreters.py",
|
|
"test__interpchannels.py",
|
|
"test_crossinterp.py",
|
|
],
|
|
},
|
|
"atexit": {
|
|
"test": [
|
|
"test_atexit.py",
|
|
"_test_atexit.py",
|
|
],
|
|
},
|
|
"eintr": {
|
|
"test": [
|
|
"test_eintr.py",
|
|
"_test_eintr.py",
|
|
]
|
|
},
|
|
"curses": {
|
|
"test": [
|
|
"test_curses.py",
|
|
"curses_tests.py",
|
|
],
|
|
},
|
|
}
|
|
|
|
|
|
def resolve_hard_dep_parent(name: str, cpython_prefix: str) -> str | None:
|
|
"""Resolve a hard_dep name to its parent module.
|
|
|
|
Only returns a parent if the file is actually tracked:
|
|
- Explicitly listed in DEPENDENCIES as a hard_dep
|
|
- Or auto-detected _py{module}.py pattern where the parent module exists
|
|
|
|
Args:
|
|
name: Module or file name (with or without .py extension)
|
|
cpython_prefix: CPython directory prefix
|
|
|
|
Returns:
|
|
Parent module name if found and tracked, None otherwise
|
|
"""
|
|
# Normalize: remove .py extension if present
|
|
if name.endswith(".py"):
|
|
name = name[:-3]
|
|
|
|
# Check DEPENDENCIES table first (explicit hard_deps)
|
|
for module_name, dep_info in DEPENDENCIES.items():
|
|
hard_deps = dep_info.get("hard_deps", [])
|
|
for dep in hard_deps:
|
|
# Normalize dep: remove .py extension
|
|
dep_normalized = dep[:-3] if dep.endswith(".py") else dep
|
|
if dep_normalized == name:
|
|
return module_name
|
|
|
|
# Auto-detect _py{module} or _py_{module} patterns
|
|
# Only if the parent module actually exists
|
|
if name.startswith("_py"):
|
|
# _py_abc -> abc
|
|
# _pydatetime -> datetime
|
|
parent = name.removeprefix("_py_").removeprefix("_py")
|
|
|
|
# Verify the parent module exists
|
|
lib_dir = pathlib.Path(cpython_prefix) / "Lib"
|
|
parent_file = lib_dir / f"{parent}.py"
|
|
parent_dir = lib_dir / parent
|
|
if parent_file.exists() or (
|
|
parent_dir.exists() and (parent_dir / "__init__.py").exists()
|
|
):
|
|
return parent
|
|
|
|
return None
|
|
|
|
|
|
def resolve_test_to_lib(test_name: str) -> str | None:
|
|
"""Resolve a test name to its library group from DEPENDENCIES.
|
|
|
|
Args:
|
|
test_name: Test name with or without test_ prefix (e.g., "test_urllib2" or "urllib2")
|
|
|
|
Returns:
|
|
Library name if test belongs to a group, None otherwise
|
|
"""
|
|
# Normalize: add test_ prefix if not present
|
|
if not test_name.startswith("test_"):
|
|
test_name = f"test_{test_name}"
|
|
|
|
for lib_name, dep_info in DEPENDENCIES.items():
|
|
tests = dep_info.get("test", [])
|
|
for test_path in tests:
|
|
# test_path is like "test_urllib2.py" or "test_multiprocessing_fork"
|
|
path_stem = test_path.removesuffix(".py")
|
|
if path_stem == test_name:
|
|
return lib_name
|
|
|
|
return None
|
|
|
|
|
|
# Test-specific dependencies (only when auto-detection isn't enough)
|
|
# - hard_deps: files to migrate (tightly coupled, must be migrated together)
|
|
# - data: directories to copy without migration
|
|
TEST_DEPENDENCIES = {
|
|
# Audio tests
|
|
"test_winsound": {
|
|
"data": ["audiodata"],
|
|
},
|
|
"test_wave": {
|
|
"data": ["audiodata"],
|
|
},
|
|
"audiotests": {
|
|
"data": ["audiodata"],
|
|
},
|
|
# Archive tests
|
|
"test_tarfile": {
|
|
"data": ["archivetestdata"],
|
|
},
|
|
"test_zipfile": {
|
|
"data": ["archivetestdata"],
|
|
},
|
|
# Config tests
|
|
"test_configparser": {
|
|
"data": ["configdata"],
|
|
},
|
|
"test_config": {
|
|
"data": ["configdata"],
|
|
},
|
|
# Other data directories
|
|
"test_decimal": {
|
|
"data": ["decimaltestdata"],
|
|
},
|
|
"test_dtrace": {
|
|
"data": ["dtracedata"],
|
|
},
|
|
"test_math": {
|
|
"data": ["mathdata"],
|
|
},
|
|
"test_ssl": {
|
|
"data": ["certdata"],
|
|
},
|
|
"test_subprocess": {
|
|
"data": ["subprocessdata"],
|
|
},
|
|
"test_tkinter": {
|
|
"data": ["tkinterdata"],
|
|
},
|
|
"test_tokenize": {
|
|
"data": ["tokenizedata"],
|
|
},
|
|
"test_type_annotations": {
|
|
"data": ["typinganndata"],
|
|
},
|
|
"test_zipimport": {
|
|
"data": ["zipimport_data"],
|
|
},
|
|
# XML tests share xmltestdata
|
|
"test_xml_etree": {
|
|
"data": ["xmltestdata"],
|
|
},
|
|
"test_pulldom": {
|
|
"data": ["xmltestdata"],
|
|
},
|
|
"test_sax": {
|
|
"data": ["xmltestdata"],
|
|
},
|
|
"test_minidom": {
|
|
"data": ["xmltestdata"],
|
|
},
|
|
# Multibytecodec support needs cjkencodings
|
|
"multibytecodec_support": {
|
|
"data": ["cjkencodings"],
|
|
},
|
|
# i18n
|
|
"i18n_helper": {
|
|
"data": ["translationdata"],
|
|
},
|
|
# wheeldata is used by test_makefile and support
|
|
"test_makefile": {
|
|
"data": ["wheeldata"],
|
|
},
|
|
# profilee is used by test_monitoring
|
|
"test_monitoring": {
|
|
"hard_deps": ["profilee"],
|
|
},
|
|
}
|
|
|
|
|
|
@functools.cache
|
|
def get_lib_paths(name: str, cpython_prefix: str) -> tuple[pathlib.Path, ...]:
|
|
"""Get all library paths for a module.
|
|
|
|
Args:
|
|
name: Module name (e.g., "datetime", "libregrtest")
|
|
cpython_prefix: CPython directory prefix
|
|
|
|
Returns:
|
|
Tuple of paths to copy
|
|
"""
|
|
dep_info = DEPENDENCIES.get(name, {})
|
|
|
|
# Get main lib path (override or default)
|
|
if "lib" in dep_info:
|
|
paths = [construct_lib_path(cpython_prefix, p) for p in dep_info["lib"]]
|
|
else:
|
|
# Default: try file first, then directory
|
|
paths = [resolve_module_path(name, cpython_prefix, prefer="file")]
|
|
|
|
# Add hard_deps from DEPENDENCIES
|
|
for dep in dep_info.get("hard_deps", []):
|
|
paths.append(construct_lib_path(cpython_prefix, dep))
|
|
|
|
# Auto-detect _py{module}.py or _py_{module}.py patterns
|
|
for pattern in [f"_py{name}.py", f"_py_{name}.py"]:
|
|
auto_path = construct_lib_path(cpython_prefix, pattern)
|
|
if auto_path.exists() and auto_path not in paths:
|
|
paths.append(auto_path)
|
|
|
|
return tuple(paths)
|
|
|
|
|
|
def get_all_hard_deps(name: str, cpython_prefix: str) -> list[str]:
|
|
"""Get all hard_deps for a module (explicit + auto-detected).
|
|
|
|
Args:
|
|
name: Module name (e.g., "decimal", "datetime")
|
|
cpython_prefix: CPython directory prefix
|
|
|
|
Returns:
|
|
List of hard_dep names (without .py extension)
|
|
"""
|
|
dep_info = DEPENDENCIES.get(name, {})
|
|
hard_deps = set()
|
|
|
|
# Explicit hard_deps from DEPENDENCIES
|
|
for hd in dep_info.get("hard_deps", []):
|
|
# Remove .py extension if present
|
|
hard_deps.add(hd[:-3] if hd.endswith(".py") else hd)
|
|
|
|
# Auto-detect _py{module}.py or _py_{module}.py patterns
|
|
for pattern in [f"_py{name}.py", f"_py_{name}.py"]:
|
|
auto_path = construct_lib_path(cpython_prefix, pattern)
|
|
if auto_path.exists():
|
|
hard_deps.add(auto_path.stem)
|
|
|
|
return sorted(hard_deps)
|
|
|
|
|
|
@functools.cache
|
|
def get_test_paths(name: str, cpython_prefix: str) -> tuple[pathlib.Path, ...]:
|
|
"""Get all test paths for a module.
|
|
|
|
Args:
|
|
name: Module name (e.g., "datetime", "libregrtest")
|
|
cpython_prefix: CPython directory prefix
|
|
|
|
Returns:
|
|
Tuple of test paths
|
|
"""
|
|
if name in DEPENDENCIES and "test" in DEPENDENCIES[name]:
|
|
return tuple(
|
|
construct_lib_path(cpython_prefix, f"test/{p}")
|
|
for p in DEPENDENCIES[name]["test"]
|
|
)
|
|
|
|
# Default: try directory first, then file
|
|
return (resolve_module_path(f"test/test_{name}", cpython_prefix, prefer="dir"),)
|
|
|
|
|
|
@functools.cache
|
|
def get_all_imports(name: str, cpython_prefix: str) -> frozenset[str]:
|
|
"""Get all imports from a library file.
|
|
|
|
Args:
|
|
name: Module name
|
|
cpython_prefix: CPython directory prefix
|
|
|
|
Returns:
|
|
Frozenset of all imported module names
|
|
"""
|
|
all_imports = set()
|
|
for lib_path in get_lib_paths(name, cpython_prefix):
|
|
if lib_path.exists():
|
|
for _, content in read_python_files(lib_path):
|
|
all_imports.update(parse_lib_imports(content))
|
|
|
|
# Remove self
|
|
all_imports.discard(name)
|
|
return frozenset(all_imports)
|
|
|
|
|
|
@functools.cache
|
|
def get_soft_deps(name: str, cpython_prefix: str) -> frozenset[str]:
|
|
"""Get soft dependencies by parsing imports from library file.
|
|
|
|
Args:
|
|
name: Module name
|
|
cpython_prefix: CPython directory prefix
|
|
|
|
Returns:
|
|
Frozenset of imported stdlib module names (those that exist in cpython/Lib/)
|
|
"""
|
|
all_imports = get_all_imports(name, cpython_prefix)
|
|
|
|
# Filter: only include modules that exist in cpython/Lib/
|
|
stdlib_deps = set()
|
|
for imp in all_imports:
|
|
module_path = resolve_module_path(imp, cpython_prefix)
|
|
if module_path.exists():
|
|
stdlib_deps.add(imp)
|
|
|
|
return frozenset(stdlib_deps)
|
|
|
|
|
|
@functools.cache
|
|
def get_rust_deps(name: str, cpython_prefix: str) -> frozenset[str]:
|
|
"""Get Rust/C dependencies (imports that don't exist in cpython/Lib/).
|
|
|
|
Args:
|
|
name: Module name
|
|
cpython_prefix: CPython directory prefix
|
|
|
|
Returns:
|
|
Frozenset of imported module names that are built-in or C extensions
|
|
"""
|
|
all_imports = get_all_imports(name, cpython_prefix)
|
|
soft_deps = get_soft_deps(name, cpython_prefix)
|
|
return frozenset(all_imports - soft_deps)
|
|
|
|
|
|
def is_path_synced(
|
|
cpython_path: pathlib.Path,
|
|
cpython_prefix: str,
|
|
lib_prefix: str,
|
|
) -> bool:
|
|
"""Check if a CPython path is synced with local.
|
|
|
|
Args:
|
|
cpython_path: Path in CPython directory
|
|
cpython_prefix: CPython directory prefix
|
|
lib_prefix: Local Lib directory prefix
|
|
|
|
Returns:
|
|
True if synced, False otherwise
|
|
"""
|
|
local_path = cpython_to_local_path(cpython_path, cpython_prefix, lib_prefix)
|
|
if local_path is None:
|
|
return False
|
|
return compare_paths(cpython_path, local_path)
|
|
|
|
|
|
@functools.cache
|
|
def is_up_to_date(name: str, cpython_prefix: str, lib_prefix: str) -> bool:
|
|
"""Check if a module is up-to-date by comparing files.
|
|
|
|
Args:
|
|
name: Module name
|
|
cpython_prefix: CPython directory prefix
|
|
lib_prefix: Local Lib directory prefix
|
|
|
|
Returns:
|
|
True if all files match, False otherwise
|
|
"""
|
|
lib_paths = get_lib_paths(name, cpython_prefix)
|
|
|
|
found_any = False
|
|
for cpython_path in lib_paths:
|
|
if not cpython_path.exists():
|
|
continue
|
|
|
|
found_any = True
|
|
|
|
# Convert cpython path to local path
|
|
# cpython/Lib/foo.py -> Lib/foo.py
|
|
rel_path = cpython_path.relative_to(cpython_prefix)
|
|
local_path = pathlib.Path(lib_prefix) / rel_path.relative_to("Lib")
|
|
|
|
if not compare_paths(cpython_path, local_path):
|
|
return False
|
|
|
|
if not found_any:
|
|
dep_info = DEPENDENCIES.get(name, {})
|
|
if dep_info.get("lib") == []:
|
|
return True
|
|
return found_any
|
|
|
|
|
|
def _count_file_diff(file_a: pathlib.Path, file_b: pathlib.Path) -> int:
|
|
"""Count changed lines between two text files using difflib."""
|
|
a_content = safe_read_text(file_a)
|
|
b_content = safe_read_text(file_b)
|
|
if a_content is None or b_content is None:
|
|
return 0
|
|
if a_content == b_content:
|
|
return 0
|
|
a_lines = a_content.splitlines()
|
|
b_lines = b_content.splitlines()
|
|
count = 0
|
|
for line in difflib.unified_diff(a_lines, b_lines, lineterm=""):
|
|
if (line.startswith("+") and not line.startswith("+++")) or (
|
|
line.startswith("-") and not line.startswith("---")
|
|
):
|
|
count += 1
|
|
return count
|
|
|
|
|
|
def _count_path_diff(path_a: pathlib.Path, path_b: pathlib.Path) -> int:
|
|
"""Count changed lines between two paths (file or directory, *.py only)."""
|
|
if path_a.is_file() and path_b.is_file():
|
|
return _count_file_diff(path_a, path_b)
|
|
if path_a.is_dir() and path_b.is_dir():
|
|
total = 0
|
|
a_files = {f.relative_to(path_a) for f in path_a.rglob("*.py")}
|
|
b_files = {f.relative_to(path_b) for f in path_b.rglob("*.py")}
|
|
for rel in a_files & b_files:
|
|
total += _count_file_diff(path_a / rel, path_b / rel)
|
|
for rel in a_files - b_files:
|
|
content = safe_read_text(path_a / rel)
|
|
if content:
|
|
total += len(content.splitlines())
|
|
for rel in b_files - a_files:
|
|
content = safe_read_text(path_b / rel)
|
|
if content:
|
|
total += len(content.splitlines())
|
|
return total
|
|
return 0
|
|
|
|
|
|
@functools.cache
|
|
def _bulk_last_updated() -> dict[str, str]:
|
|
"""Get last git commit dates for all paths under Lib/ in one git call.
|
|
|
|
Keys are Lib/-relative paths (e.g. "re/__init__.py", "test/test_os.py",
|
|
"os.py"), plus directory rollups (e.g. "re", "test/test_zoneinfo").
|
|
|
|
Returns:
|
|
Dict mapping Lib/-relative path to date string.
|
|
"""
|
|
file_map: dict[str, str] = {}
|
|
try:
|
|
result = subprocess.run(
|
|
["git", "log", "--format=%cd", "--date=short", "--name-only", "--", "Lib/"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=30,
|
|
)
|
|
if result.returncode != 0:
|
|
return file_map
|
|
except Exception:
|
|
return file_map
|
|
|
|
current_date = None
|
|
for line in result.stdout.splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
# Date lines are YYYY-MM-DD format
|
|
if len(line) == 10 and line[4] == "-" and line[7] == "-":
|
|
current_date = line
|
|
elif current_date and line.startswith("Lib/"):
|
|
# Strip "Lib/" prefix to get Lib-relative key
|
|
rel = line[4:]
|
|
if rel and rel not in file_map:
|
|
file_map[rel] = current_date
|
|
|
|
# Pre-compute directory rollups
|
|
dir_map: dict[str, str] = {}
|
|
for filepath, date in file_map.items():
|
|
parts = filepath.split("/")
|
|
for i in range(1, len(parts)):
|
|
dirpath = "/".join(parts[:i])
|
|
if dirpath not in dir_map or date > dir_map[dirpath]:
|
|
dir_map[dirpath] = date
|
|
|
|
dir_map.update(file_map)
|
|
return dir_map
|
|
|
|
|
|
@functools.cache
|
|
def _lib_prefix_stripped(lib_prefix: str) -> str:
|
|
"""Get the normalized prefix to strip from paths, with trailing /."""
|
|
# e.g. "Lib" -> "Lib/", "./Lib" -> "Lib/", "../Lib" -> "../Lib/"
|
|
return pathlib.Path(lib_prefix).as_posix().rstrip("/") + "/"
|
|
|
|
|
|
def _lookup_last_updated(paths: list[str], lib_prefix: str) -> str | None:
|
|
"""Look up the most recent date among paths from the bulk cache."""
|
|
cache = _bulk_last_updated()
|
|
prefix = _lib_prefix_stripped(lib_prefix)
|
|
latest = None
|
|
for p in paths:
|
|
p_norm = pathlib.Path(p).as_posix()
|
|
# Strip lib_prefix to get Lib-relative key
|
|
# e.g. "Lib/test/test_os.py" -> "test/test_os.py"
|
|
# "../Lib/re" -> "re"
|
|
if p_norm.startswith(prefix):
|
|
key = p_norm[len(prefix) :]
|
|
else:
|
|
key = p_norm
|
|
date = cache.get(key)
|
|
if date and (latest is None or date > latest):
|
|
latest = date
|
|
return latest
|
|
|
|
|
|
def get_module_last_updated(
|
|
name: str, cpython_prefix: str, lib_prefix: str
|
|
) -> str | None:
|
|
"""Get the last git commit date for a module's Lib files."""
|
|
local_paths = []
|
|
for cpython_path in get_lib_paths(name, cpython_prefix):
|
|
if not cpython_path.exists():
|
|
continue
|
|
try:
|
|
rel_path = cpython_path.relative_to(cpython_prefix)
|
|
local_path = pathlib.Path(lib_prefix) / rel_path.relative_to("Lib")
|
|
if local_path.exists():
|
|
local_paths.append(str(local_path))
|
|
except ValueError:
|
|
continue
|
|
if not local_paths:
|
|
return None
|
|
return _lookup_last_updated(local_paths, lib_prefix)
|
|
|
|
|
|
def get_module_diff_stat(name: str, cpython_prefix: str, lib_prefix: str) -> int:
|
|
"""Count differing lines between cpython and local Lib for a module."""
|
|
total = 0
|
|
for cpython_path in get_lib_paths(name, cpython_prefix):
|
|
if not cpython_path.exists():
|
|
continue
|
|
try:
|
|
rel_path = cpython_path.relative_to(cpython_prefix)
|
|
local_path = pathlib.Path(lib_prefix) / rel_path.relative_to("Lib")
|
|
except ValueError:
|
|
continue
|
|
if not local_path.exists():
|
|
continue
|
|
total += _count_path_diff(cpython_path, local_path)
|
|
return total
|
|
|
|
|
|
def get_test_last_updated(
|
|
test_name: str, cpython_prefix: str, lib_prefix: str
|
|
) -> str | None:
|
|
"""Get the last git commit date for a test's files."""
|
|
cpython_path = _get_cpython_test_path(test_name, cpython_prefix)
|
|
if cpython_path is None:
|
|
return None
|
|
local_path = _get_local_test_path(cpython_path, lib_prefix)
|
|
if not local_path.exists():
|
|
return None
|
|
return _lookup_last_updated([str(local_path)], lib_prefix)
|
|
|
|
|
|
def get_test_dependencies(
|
|
test_path: pathlib.Path,
|
|
) -> dict[str, list[pathlib.Path]]:
|
|
"""Get test dependencies by parsing imports.
|
|
|
|
Args:
|
|
test_path: Path to test file or directory
|
|
|
|
Returns:
|
|
Dict with "hard_deps" (files to migrate) and "data" (dirs to copy)
|
|
"""
|
|
result = {"hard_deps": [], "data": []}
|
|
|
|
if not test_path.exists():
|
|
return result
|
|
|
|
# Parse all files for imports (auto-detect deps)
|
|
all_imports = set()
|
|
for _, content in read_python_files(test_path):
|
|
all_imports.update(parse_test_imports(content))
|
|
|
|
# Also add manual dependencies from TEST_DEPENDENCIES
|
|
test_name = test_path.stem if test_path.is_file() else test_path.name
|
|
manual_deps = TEST_DEPENDENCIES.get(test_name, {})
|
|
if "hard_deps" in manual_deps:
|
|
all_imports.update(manual_deps["hard_deps"])
|
|
|
|
# Convert imports to paths (deps)
|
|
for imp in all_imports:
|
|
# Skip other test modules (test_*) - they are independently managed
|
|
# via their own update_lib entry. Only support/helper modules
|
|
# (e.g., string_tests, mapping_tests) should be treated as hard deps.
|
|
if imp.startswith("test_"):
|
|
continue
|
|
|
|
dep_path = test_path.parent / f"{imp}.py"
|
|
if not dep_path.exists():
|
|
dep_path = test_path.parent / imp
|
|
|
|
if dep_path.exists() and dep_path not in result["hard_deps"]:
|
|
result["hard_deps"].append(dep_path)
|
|
|
|
# Add data paths from manual table (for the test file itself)
|
|
if "data" in manual_deps:
|
|
for data_name in manual_deps["data"]:
|
|
data_path = test_path.parent / data_name
|
|
if data_path.exists() and data_path not in result["data"]:
|
|
result["data"].append(data_path)
|
|
|
|
# Also add data from auto-detected deps' TEST_DEPENDENCIES
|
|
# e.g., test_codecencodings_kr -> multibytecodec_support -> cjkencodings
|
|
for imp in all_imports:
|
|
dep_info = TEST_DEPENDENCIES.get(imp, {})
|
|
if "data" in dep_info:
|
|
for data_name in dep_info["data"]:
|
|
data_path = test_path.parent / data_name
|
|
if data_path.exists() and data_path not in result["data"]:
|
|
result["data"].append(data_path)
|
|
|
|
return result
|
|
|
|
|
|
def _parse_test_submodule_imports(content: str) -> dict[str, set[str]]:
|
|
"""Parse 'from test.X import Y' to get submodule imports.
|
|
|
|
Args:
|
|
content: Python file content
|
|
|
|
Returns:
|
|
Dict mapping submodule (e.g., "test_bar") -> set of imported names (e.g., {"helper"})
|
|
"""
|
|
tree = safe_parse_ast(content)
|
|
if tree is None:
|
|
return {}
|
|
|
|
result: dict[str, set[str]] = {}
|
|
for node in ast.walk(tree):
|
|
if isinstance(node, ast.ImportFrom):
|
|
if node.module and node.module.startswith("test."):
|
|
# from test.test_bar import helper -> test_bar: {helper}
|
|
parts = node.module.split(".")
|
|
if len(parts) >= 2:
|
|
submodule = parts[1]
|
|
if submodule not in ("support", "__init__"):
|
|
if submodule not in result:
|
|
result[submodule] = set()
|
|
for alias in node.names:
|
|
result[submodule].add(alias.name)
|
|
|
|
return result
|
|
|
|
|
|
_test_import_graph_cache: dict[
|
|
str, tuple[dict[str, set[str]], dict[str, set[str]]]
|
|
] = {}
|
|
|
|
|
|
def _is_standard_lib_path(path: str) -> bool:
|
|
"""Check if path is the standard Lib directory (not a temp dir)."""
|
|
if "/tmp" in path.lower() or "/var/folders" in path.lower():
|
|
return False
|
|
return (
|
|
path == "Lib/test"
|
|
or path.endswith("/Lib/test")
|
|
or path == "Lib"
|
|
or path.endswith("/Lib")
|
|
)
|
|
|
|
|
|
def _build_test_import_graph(
|
|
test_dir: pathlib.Path,
|
|
) -> tuple[dict[str, set[str]], dict[str, set[str]]]:
|
|
"""Build import graphs for files within test directory (recursive).
|
|
|
|
Uses cross-process shelve cache based on CPython version.
|
|
|
|
Args:
|
|
test_dir: Path to Lib/test/ directory
|
|
|
|
Returns:
|
|
Tuple of:
|
|
- Dict mapping relative path (without .py) -> set of test modules it imports
|
|
- Dict mapping relative path (without .py) -> set of all lib imports
|
|
"""
|
|
# In-process cache
|
|
cache_key = str(test_dir)
|
|
if cache_key in _test_import_graph_cache:
|
|
return _test_import_graph_cache[cache_key]
|
|
|
|
# Cross-process cache (only for standard Lib/test directory)
|
|
use_file_cache = _is_standard_lib_path(cache_key)
|
|
if use_file_cache:
|
|
version = _get_cpython_version("cpython")
|
|
shelve_key = f"test_import_graph:{version}"
|
|
try:
|
|
with shelve.open(_get_cache_path()) as db:
|
|
if shelve_key in db:
|
|
import_graph, lib_imports_graph = db[shelve_key]
|
|
_test_import_graph_cache[cache_key] = (
|
|
import_graph,
|
|
lib_imports_graph,
|
|
)
|
|
return import_graph, lib_imports_graph
|
|
except Exception:
|
|
pass
|
|
|
|
# Build from scratch
|
|
import_graph: dict[str, set[str]] = {}
|
|
lib_imports_graph: dict[str, set[str]] = {}
|
|
|
|
for py_file in test_dir.glob("**/*.py"):
|
|
content = safe_read_text(py_file)
|
|
if content is None:
|
|
continue
|
|
|
|
imports = set()
|
|
imports.update(parse_test_imports(content))
|
|
all_imports = parse_lib_imports(content)
|
|
|
|
for imp in all_imports:
|
|
if (py_file.parent / f"{imp}.py").exists():
|
|
imports.add(imp)
|
|
if (test_dir / f"{imp}.py").exists():
|
|
imports.add(imp)
|
|
|
|
submodule_imports = _parse_test_submodule_imports(content)
|
|
for submodule, imported_names in submodule_imports.items():
|
|
submodule_dir = test_dir / submodule
|
|
if submodule_dir.is_dir():
|
|
for name in imported_names:
|
|
if (submodule_dir / f"{name}.py").exists():
|
|
imports.add(name)
|
|
|
|
rel_path = py_file.relative_to(test_dir)
|
|
key = str(rel_path.with_suffix(""))
|
|
import_graph[key] = imports
|
|
lib_imports_graph[key] = all_imports
|
|
|
|
# Save to cross-process cache
|
|
if use_file_cache:
|
|
try:
|
|
with shelve.open(_get_cache_path()) as db:
|
|
db[shelve_key] = (import_graph, lib_imports_graph)
|
|
except Exception:
|
|
pass
|
|
_test_import_graph_cache[cache_key] = (import_graph, lib_imports_graph)
|
|
|
|
return import_graph, lib_imports_graph
|
|
|
|
|
|
_lib_import_graph_cache: dict[str, dict[str, set[str]]] = {}
|
|
|
|
|
|
def _build_lib_import_graph(lib_prefix: str) -> dict[str, set[str]]:
|
|
"""Build import graph for Lib modules (full module paths like urllib.request).
|
|
|
|
Uses cross-process shelve cache based on CPython version.
|
|
|
|
Args:
|
|
lib_prefix: RustPython Lib directory
|
|
|
|
Returns:
|
|
Dict mapping full_module_path -> set of modules it imports
|
|
"""
|
|
# In-process cache
|
|
if lib_prefix in _lib_import_graph_cache:
|
|
return _lib_import_graph_cache[lib_prefix]
|
|
|
|
# Cross-process cache (only for standard Lib directory)
|
|
use_file_cache = _is_standard_lib_path(lib_prefix)
|
|
if use_file_cache:
|
|
version = _get_cpython_version("cpython")
|
|
shelve_key = f"lib_import_graph:{version}"
|
|
try:
|
|
with shelve.open(_get_cache_path()) as db:
|
|
if shelve_key in db:
|
|
import_graph = db[shelve_key]
|
|
_lib_import_graph_cache[lib_prefix] = import_graph
|
|
return import_graph
|
|
except Exception:
|
|
pass
|
|
|
|
# Build from scratch
|
|
lib_dir = pathlib.Path(lib_prefix)
|
|
if not lib_dir.exists():
|
|
return {}
|
|
|
|
import_graph: dict[str, set[str]] = {}
|
|
|
|
for entry in lib_dir.iterdir():
|
|
if entry.name.startswith(("_", ".")):
|
|
continue
|
|
if entry.name == "test":
|
|
continue
|
|
|
|
if entry.is_file() and entry.suffix == ".py":
|
|
content = safe_read_text(entry)
|
|
if content:
|
|
imports = parse_lib_imports(content)
|
|
imports.discard(entry.stem)
|
|
import_graph[entry.stem] = imports
|
|
elif entry.is_dir() and (entry / "__init__.py").exists():
|
|
for py_file in entry.glob("**/*.py"):
|
|
content = safe_read_text(py_file)
|
|
if content:
|
|
imports = parse_lib_imports(content)
|
|
rel_path = py_file.relative_to(lib_dir)
|
|
if rel_path.name == "__init__.py":
|
|
full_name = str(rel_path.parent).replace("/", ".")
|
|
else:
|
|
full_name = str(rel_path.with_suffix("")).replace("/", ".")
|
|
imports.discard(full_name.split(".")[0])
|
|
import_graph[full_name] = imports
|
|
|
|
# Save to cross-process cache
|
|
if use_file_cache:
|
|
try:
|
|
with shelve.open(_get_cache_path()) as db:
|
|
db[shelve_key] = import_graph
|
|
except Exception:
|
|
pass
|
|
_lib_import_graph_cache[lib_prefix] = import_graph
|
|
|
|
return import_graph
|
|
|
|
|
|
def _get_lib_modules_importing(
|
|
module_name: str, lib_import_graph: dict[str, set[str]]
|
|
) -> set[str]:
|
|
"""Find Lib modules (full paths) that import module_name or any of its submodules."""
|
|
importers: set[str] = set()
|
|
target_top = module_name.split(".")[0]
|
|
|
|
for full_path, imports in lib_import_graph.items():
|
|
if full_path.split(".")[0] == target_top:
|
|
continue # Skip same package
|
|
# Match if module imports target OR any submodule of target
|
|
# e.g., for "xml": match imports of "xml", "xml.parsers", "xml.etree.ElementTree"
|
|
matches = any(
|
|
imp == module_name or imp.startswith(module_name + ".") for imp in imports
|
|
)
|
|
if matches:
|
|
importers.add(full_path)
|
|
|
|
return importers
|
|
|
|
|
|
def _consolidate_submodules(
|
|
modules: set[str], threshold: int = 3
|
|
) -> dict[str, set[str]]:
|
|
"""Consolidate submodules if count exceeds threshold.
|
|
|
|
Args:
|
|
modules: Set of full module paths (e.g., {"urllib.request", "urllib.parse", "xml.dom", "xml.sax"})
|
|
threshold: If submodules > threshold, consolidate to parent
|
|
|
|
Returns:
|
|
Dict mapping display_name -> set of original module paths
|
|
e.g., {"urllib.request": {"urllib.request"}, "xml": {"xml.dom", "xml.sax", "xml.etree", "xml.parsers"}}
|
|
"""
|
|
# Group by top-level package
|
|
by_package: dict[str, set[str]] = {}
|
|
for mod in modules:
|
|
parts = mod.split(".")
|
|
top = parts[0]
|
|
if top not in by_package:
|
|
by_package[top] = set()
|
|
by_package[top].add(mod)
|
|
|
|
result: dict[str, set[str]] = {}
|
|
for top, submods in by_package.items():
|
|
if len(submods) > threshold:
|
|
# Consolidate to top-level
|
|
result[top] = submods
|
|
else:
|
|
# Keep individual
|
|
for mod in submods:
|
|
result[mod] = {mod}
|
|
|
|
return result
|
|
|
|
|
|
# Modules that are used everywhere - show but don't expand their dependents
|
|
_BLOCKLIST_MODULES = frozenset(
|
|
{
|
|
"unittest",
|
|
"test.support",
|
|
"support",
|
|
"doctest",
|
|
"typing",
|
|
"abc",
|
|
"collections.abc",
|
|
"functools",
|
|
"itertools",
|
|
"operator",
|
|
"contextlib",
|
|
"warnings",
|
|
"types",
|
|
"enum",
|
|
"re",
|
|
"io",
|
|
"os",
|
|
"sys",
|
|
}
|
|
)
|
|
|
|
|
|
def find_dependent_tests_tree(
|
|
module_name: str,
|
|
lib_prefix: str,
|
|
max_depth: int = 1,
|
|
_depth: int = 0,
|
|
_visited_tests: set[str] | None = None,
|
|
_visited_modules: set[str] | None = None,
|
|
) -> dict:
|
|
"""Find dependent tests in a tree structure.
|
|
|
|
Args:
|
|
module_name: Module to search for (e.g., "ftplib")
|
|
lib_prefix: RustPython Lib directory
|
|
max_depth: Maximum depth to recurse (default 1 = show direct + 1 level of Lib deps)
|
|
|
|
Returns:
|
|
Dict with structure:
|
|
{
|
|
"module": "ftplib",
|
|
"tests": ["test_ftplib", "test_urllib2"], # Direct importers
|
|
"children": [
|
|
{"module": "urllib.request", "tests": [...], "children": []},
|
|
...
|
|
]
|
|
}
|
|
"""
|
|
lib_dir = pathlib.Path(lib_prefix)
|
|
test_dir = lib_dir / "test"
|
|
|
|
if _visited_tests is None:
|
|
_visited_tests = set()
|
|
if _visited_modules is None:
|
|
_visited_modules = set()
|
|
|
|
# Build graphs
|
|
test_import_graph, test_lib_imports = _build_test_import_graph(test_dir)
|
|
lib_import_graph = _build_lib_import_graph(lib_prefix)
|
|
|
|
# Find tests that directly import this module
|
|
target_top = module_name.split(".")[0]
|
|
direct_tests: set[str] = set()
|
|
for file_key, imports in test_lib_imports.items():
|
|
if file_key in _visited_tests:
|
|
continue
|
|
# Match exact module OR any child submodule
|
|
# e.g., "xml" matches imports of "xml", "xml.parsers", "xml.etree.ElementTree"
|
|
# but "collections._defaultdict" only matches "collections._defaultdict" (no children)
|
|
matches = any(
|
|
imp == module_name or imp.startswith(module_name + ".") for imp in imports
|
|
)
|
|
if matches:
|
|
# Check if it's a test file
|
|
if pathlib.Path(file_key).name.startswith("test_"):
|
|
direct_tests.add(file_key)
|
|
_visited_tests.add(file_key)
|
|
|
|
# Consolidate test names (test_sqlite3/test_dbapi -> test_sqlite3)
|
|
consolidated_tests = {_consolidate_file_key(t) for t in direct_tests}
|
|
|
|
# Mark this module as visited (cycle detection)
|
|
_visited_modules.add(module_name)
|
|
_visited_modules.add(target_top)
|
|
|
|
children = []
|
|
# Check blocklist and depth limit
|
|
should_expand = (
|
|
_depth < max_depth
|
|
and module_name not in _BLOCKLIST_MODULES
|
|
and target_top not in _BLOCKLIST_MODULES
|
|
)
|
|
|
|
if should_expand:
|
|
# Find Lib modules that import this module
|
|
lib_importers = _get_lib_modules_importing(module_name, lib_import_graph)
|
|
|
|
# Skip already visited modules (cycle detection) and blocklisted modules
|
|
lib_importers = {
|
|
m
|
|
for m in lib_importers
|
|
if m not in _visited_modules
|
|
and m.split(".")[0] not in _visited_modules
|
|
and m not in _BLOCKLIST_MODULES
|
|
and m.split(".")[0] not in _BLOCKLIST_MODULES
|
|
}
|
|
|
|
# Consolidate submodules (xml.dom, xml.sax, xml.etree -> xml if > 3)
|
|
consolidated_libs = _consolidate_submodules(lib_importers, threshold=3)
|
|
|
|
# Build children
|
|
for display_name, original_mods in sorted(consolidated_libs.items()):
|
|
child = find_dependent_tests_tree(
|
|
display_name,
|
|
lib_prefix,
|
|
max_depth,
|
|
_depth + 1,
|
|
_visited_tests,
|
|
_visited_modules,
|
|
)
|
|
if child["tests"] or child["children"]:
|
|
children.append(child)
|
|
|
|
return {
|
|
"module": module_name,
|
|
"tests": sorted(consolidated_tests),
|
|
"children": children,
|
|
}
|
|
|
|
|
|
def _consolidate_file_key(file_key: str) -> str:
|
|
"""Consolidate file_key to test name.
|
|
|
|
Args:
|
|
file_key: Relative path without .py (e.g., "test_foo", "test_bar/test_sub")
|
|
|
|
Returns:
|
|
Consolidated test name:
|
|
- "test_foo" for "test_foo"
|
|
- "test_sqlite3" for "test_sqlite3/test_dbapi"
|
|
"""
|
|
parts = pathlib.Path(file_key).parts
|
|
if len(parts) == 1:
|
|
return parts[0]
|
|
return parts[0]
|