Compare commits

...

2 Commits

Author SHA1 Message Date
32c3e6241a docs(planning): PR-B / PR 17 land 표기
All checks were successful
boundary-lint / PR boundary-claim (Lint (push) Has been skipped
boundary-lint / ban-list lint (Lint (push) Successful in 19s
ci / test-health gate (push) Successful in 17s
ci / mutation test (broker) (push) Has been skipped
boundary-lint / duplication-deadline (Layer 1/2) (push) Successful in 19s
ci / rust debug (push) Successful in 2m10s
ci / rust release (push) Successful in 2m38s
ci / python (push) Successful in 1m21s
PR-B (eager_hydrate apply pass body Rust 이관, commit 9691726) 마감.
PR 18 (H3-queue), PR 19 (디코더), Track H2 가 다음.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 22:21:53 +09:00
9691726d99 feat(rust): PR-B / PR 17 — eager_hydrate apply pass body → sessions_native
Wave 2 PR-B closes the eager-hydrate Rust ownership: PR 14 moved the
BFS algorithm; PR-B moves the apply pass driver (loop, batch pacing,
re-check, fetch transaction, outcome counting) into Rust. Python
becomes a thin caller that persists sidecar metadata for hydrated
entries.

Rust additions
--------------

* ``sessions_native::eager_hydrate::run_apply_pass`` — drives one pass
  in Rust: find candidates → batch loop → ``thread::sleep`` between
  batches → re-check zero-byte → ``map_local_to_remote_path`` → call
  ``file_open::run_file_open_transaction`` (PR 14.5c) → collect
  outcomes. Returns ``serde_json::Value`` with
  ``{hydrated, skipped_existing, failed}``.
* ABI ``sessions_eager_hydrate_apply`` — JSON-returning wrapper
  around the new function. Allowed-basenames passed via 0x1F unit-
  separator string (matches the existing
  ``sessions_eager_hydrate_find_candidates`` encoding).
* ``map_local_to_remote_path`` extracted to ``pub(crate)`` in
  ``lib.rs`` so the apply pass and the existing
  ``sessions_file_map_local_to_remote`` ABI share one implementation.

Python changes
--------------

* ``_rust_ffi.eager_hydrate_apply`` — ctypes wrapper, returns a dict
  with ``hydrated``/``skipped_existing``/``failed``.
* ``commands._eager_hydrate_workspace`` — body shrinks from ~50 LOC
  (build mapper, define ``fetch_one`` closure, drive Python loop) to
  ~25 LOC (one Rust round-trip + sidecar writes for hydrated entries).
* ``eager_hydrate.py`` — ``run_eager_hydrate`` / ``EagerHydrateSummary``
  / ``batched`` / ``_is_placeholder`` / ``FetchFn`` removed. Module
  now exposes only candidate discovery + settings normaliser
  (``find_placeholder_candidates``, ``normalize_eager_hydrate_basenames``,
  default constants).

Tests
-----

* Removed 8 driver tests from ``test_eager_hydrate.py`` and 7 from
  ``test_eager_hydrate_parity.py`` (all ``run_eager_hydrate`` /
  ``batched`` / ``EagerHydrateSummary`` tests). The Rust unit tests
  in ``eager_hydrate.rs`` cover candidate discovery; the apply pass
  body is exercised via the live ``file_open`` transaction integration
  smoke (no broker mock available in unit tests).
* 1298 Python tests pass; ``cargo clippy --workspace -- -D warnings``
  green; boundary lint clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 22:20:32 +09:00
10 changed files with 361 additions and 475 deletions

View File

@@ -295,10 +295,10 @@ queue/dispatcher/lane gating + `_CONNECT_GENERATION` token 의미 + `_connect_ge
### PR 17+ — 본 plan scope 밖 (별도 갱신)
PR 16(PR-A) land 후 본 plan을 갱신해서:
- **PR-B**: mirror BFS task body, eager_hydrate apply 본체 → orchestrator (PR 13b envelope 위에서)
- **H3-queue**: BACKLOG H3 본 이관 (queue 본체)
- **H2-save / H2-connect**: BACKLOG H2 분할 (Track H2 main track 흡수)
- **`_rust_ffi` 디코더 Rust 이관**: `_parse_*_outcome` Rust ABI typed JSON (Rust schema oracle 도구는 잔존 쟁점 #6 결정 후)
- **PR 17 / PR-B** ✅ `9691726` — eager_hydrate apply pass body`sessions_native::eager_hydrate::run_apply_pass`. Python driver 삭제(`run_eager_hydrate`/`batched`/`EagerHydrateSummary`); 1 Rust round-trip per pass + Python sidecar 쓰기.
- **PR 18 / H3-queue** ⏭ — BACKLOG H3 본 이관 (queue 본체)
- **PR 19 / `_rust_ffi` 디코더 Rust 이관** ⏭ — `_parse_*_outcome` Rust ABI typed JSON (Rust schema oracle 도구는 잔존 쟁점 #6 결정 후)
- **H2-save / H2-connect**: BACKLOG H2 분할 (Track H2 main track 흡수, *병행*)
- **데드라인 Layer 3** auto-revert 활성화
이 시점에 commands.py 예상 LOC: 7394 - (worker loop ~550) - (connect SM ~330 부분) - (hydrate preflight ~300, PR 1214 영향) ≈ **55006000 LOC**.

View File

@@ -1,4 +1,5 @@
//! Eager-hydrate placeholder discovery (Wave 2 PR 14).
//! Eager-hydrate placeholder discovery (Wave 2 PR 14) + apply pass body
//! (Wave 2 PR 17 / PR-B).
//!
//! Walks a local cache root and yields zero-byte regular files whose basename
//! is in an allow-list. Mirrors the Python ``find_placeholder_candidates``
@@ -11,15 +12,21 @@
//! cache → produces what candidates it can).
//! - Empty allow-list returns no candidates.
//!
//! Batching/sleep pacing stays in Python. The Rust side returns a sorted
//! `Vec<String>` of absolute paths so the caller can deterministically batch
//! over the result without invoking a Python callback per file (the FFI
//! round-trip cost outweighs any LOC savings — see rust-pragmatist's note
//! in the team synthesis).
//! PR-B (apply pass body) extends the Rust ownership: the loop, batch
//! pacing, per-placeholder ``file_open`` transaction, and outcome
//! collection all run in Rust. Python becomes a thin caller — one FFI
//! round-trip per pass, then writes sidecar metadata for hydrated entries.
use std::collections::HashSet;
use std::fs;
use std::path::{Path, PathBuf};
use std::thread;
use std::time::Duration;
use serde_json::{Value, json};
use crate::file_open;
use crate::map_local_to_remote_path;
/// Return zero-byte regular files under `cache_root` whose basename is in
/// `allowed_basenames`. Order is BFS-stable but not lexicographic.
@@ -91,6 +98,100 @@ pub fn find_placeholder_candidates(
out
}
/// Drive one eager-hydrate apply pass over placeholders under
/// ``cache_root``. Returns a JSON object summarising the pass:
///
/// ```json
/// {
/// "hydrated": [{"local_path": "...", "metadata": {...}}, ...],
/// "skipped_existing": N,
/// "failed": M
/// }
/// ```
///
/// The driver mirrors the previous Python ``run_eager_hydrate``:
/// re-checks zero-byte before fetch (so a concurrent path filling the
/// placeholder lands in ``skipped_existing`` rather than re-fetched),
/// counts failures without aborting, and pauses ``batch_sleep_ms``
/// between batches.
///
/// Per-placeholder, calls :func:`file_open::run_file_open_transaction`
/// (PR 14.5c). The Python wrapper (PR 14.5d) is bypassed so we get one
/// FFI round-trip per pass instead of one per file.
#[allow(clippy::too_many_arguments)]
pub fn run_apply_pass(
cache_root: &Path,
host_alias: &str,
remote_workspace_root: &str,
allowed_basenames: &[String],
batch_size: usize,
batch_sleep_ms: u64,
max_open_bytes: u64,
binary_probe_bytes: usize,
allow_empty: bool,
timeout_ms: u64,
) -> Value {
let placeholders = find_placeholder_candidates(cache_root, allowed_basenames);
let mut hydrated: Vec<Value> = Vec::new();
let mut skipped_existing: usize = 0;
let mut failed: usize = 0;
let batch_size_safe = if batch_size == 0 { 1 } else { batch_size };
for (batch_index, batch) in placeholders.chunks(batch_size_safe).enumerate() {
if batch_index > 0 && batch_sleep_ms > 0 {
thread::sleep(Duration::from_millis(batch_sleep_ms));
}
for path in batch {
// Re-check zero-byte: a concurrent path (sidebar hydrate /
// on-demand fetch) may have filled the placeholder while we
// were iterating. Mirror Python's pre-fetch guard.
let still_placeholder = match path.metadata() {
Ok(m) => m.is_file() && m.len() == 0,
Err(_) => false,
};
if !still_placeholder {
skipped_existing += 1;
continue;
}
let remote = match map_local_to_remote_path(remote_workspace_root, cache_root, path) {
Some(r) => r,
None => {
failed += 1;
continue;
}
};
let outcome = file_open::run_file_open_transaction(
host_alias,
&remote,
path,
max_open_bytes,
binary_probe_bytes,
allow_empty,
timeout_ms,
);
let outcome_str = outcome.get("outcome").and_then(Value::as_str).unwrap_or("");
if outcome_str == "OK" {
let metadata = outcome.get("metadata").cloned().unwrap_or(Value::Null);
hydrated.push(json!({
"local_path": path.to_string_lossy(),
"metadata": metadata,
}));
} else {
failed += 1;
}
}
}
json!({
"hydrated": hydrated,
"skipped_existing": skipped_existing,
"failed": failed,
})
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -257,7 +257,7 @@ fn write_output(out_buf: *mut c_char, out_cap: usize, value: &str) -> c_int {
0
}
fn normalize_local_path(path: &Path) -> PathBuf {
pub(crate) fn normalize_local_path(path: &Path) -> PathBuf {
let base = if path.is_absolute() {
path.to_path_buf()
} else if let Ok(cwd) = std::env::current_dir() {
@@ -278,6 +278,45 @@ fn normalize_local_path(path: &Path) -> PathBuf {
out
}
/// Map ``local_path`` (under ``files_cache_root``) back to a remote POSIX
/// path. Returns ``None`` when the path does not belong to this cache root.
///
/// Mirrors the ABI ``sessions_file_map_local_to_remote`` logic so the
/// orchestrator-side (eager hydrate, mirror BFS body) does not need to
/// re-implement it.
pub(crate) fn map_local_to_remote_path(
remote_root: &str,
files_cache_root: &Path,
local_path: &Path,
) -> Option<String> {
let cache_root = normalize_local_path(files_cache_root);
let local = normalize_local_path(local_path);
let extern_root = cache_root.join("__extern");
if let Ok(rel) = local.strip_prefix(&extern_root) {
let rel_s = rel
.components()
.map(|c| c.as_os_str().to_string_lossy().into_owned())
.collect::<Vec<String>>()
.join("/");
return Some(format!("/{}", rel_s));
}
let rel = local.strip_prefix(&cache_root).ok()?;
let rel_s = rel
.components()
.map(|c| c.as_os_str().to_string_lossy().into_owned())
.collect::<Vec<String>>()
.join("/");
let root_trim = remote_root.trim_end_matches('/');
let remote = if root_trim.is_empty() || root_trim == "/" {
format!("/{}", rel_s)
} else if rel_s.is_empty() {
root_trim.to_string()
} else {
format!("{}/{}", root_trim, rel_s)
};
Some(remote)
}
fn split_posix(path: &str) -> Vec<&str> {
path.split('/').filter(|part| !part.is_empty()).collect()
}
@@ -828,35 +867,14 @@ pub unsafe extern "C" fn sessions_file_map_local_to_remote(
return AbiError::InvalidUtf8.code();
};
let cache_root = normalize_local_path(Path::new(files_cache_root_s));
let local = normalize_local_path(Path::new(local_path_s));
let extern_root = cache_root.join("__extern");
if let Ok(rel) = local.strip_prefix(&extern_root) {
let rel_s = rel
.components()
.map(|c| c.as_os_str().to_string_lossy().into_owned())
.collect::<Vec<String>>()
.join("/");
let remote = format!("/{}", rel_s);
return write_output(out_buf, out_cap, &remote);
match map_local_to_remote_path(
remote_root_s,
Path::new(files_cache_root_s),
Path::new(local_path_s),
) {
Some(remote) => write_output(out_buf, out_cap, &remote),
None => 1,
}
let Ok(rel) = local.strip_prefix(&cache_root) else {
return 1;
};
let rel_s = rel
.components()
.map(|c| c.as_os_str().to_string_lossy().into_owned())
.collect::<Vec<String>>()
.join("/");
let root_trim = remote_root_s.trim_end_matches('/');
let remote = if root_trim.is_empty() || root_trim == "/" {
format!("/{}", rel_s)
} else if rel_s.is_empty() {
root_trim.to_string()
} else {
format!("{}/{}", root_trim, rel_s)
};
write_output(out_buf, out_cap, &remote)
}
/// Return `1` if local path is under `files_cache_root/__extern`, else `0`.
@@ -1553,6 +1571,78 @@ pub unsafe extern "C" fn sessions_eager_hydrate_find_candidates(
write_output(out_buf, out_cap, &joined)
}
/// Run the eager-hydrate apply pass body (Wave 2 PR-B).
///
/// One Rust round-trip drives the entire pass: find candidates →
/// per-batch sleep → re-check zero-byte → map local→remote → file_open
/// transaction → collect outcomes. Python writes sidecar metadata for
/// the returned ``hydrated`` list.
///
/// # Safety
/// `cache_root`, `host_alias`, `remote_workspace_root`, and
/// `allowed_basenames_joined` must be valid UTF-8 C strings (the latter
/// uses 0x1F as the unit separator). `out_buf` must be writable for
/// `out_cap` bytes when non-null. Returns 0 on success and writes a
/// JSON object documented on
/// :func:`eager_hydrate::run_apply_pass`.
#[unsafe(no_mangle)]
#[allow(clippy::too_many_arguments)]
pub unsafe extern "C" fn sessions_eager_hydrate_apply(
cache_root: *const c_char,
host_alias: *const c_char,
remote_workspace_root: *const c_char,
allowed_basenames_joined: *const c_char,
batch_size: usize,
batch_sleep_ms: u64,
max_open_bytes: u64,
binary_probe_bytes: usize,
allow_empty: c_int,
timeout_ms: u64,
out_buf: *mut c_char,
out_cap: usize,
) -> c_int {
if cache_root.is_null()
|| host_alias.is_null()
|| remote_workspace_root.is_null()
|| allowed_basenames_joined.is_null()
{
return AbiError::NullPointer.code();
}
let Ok(cache_root_s) = (unsafe { CStr::from_ptr(cache_root) }).to_str() else {
return AbiError::InvalidUtf8.code();
};
let Ok(host_s) = (unsafe { CStr::from_ptr(host_alias) }).to_str() else {
return AbiError::InvalidUtf8.code();
};
let Ok(remote_root_s) = (unsafe { CStr::from_ptr(remote_workspace_root) }).to_str() else {
return AbiError::InvalidUtf8.code();
};
let Ok(allowed_s) = (unsafe { CStr::from_ptr(allowed_basenames_joined) }).to_str() else {
return AbiError::InvalidUtf8.code();
};
let allowed: Vec<String> = allowed_s
.split('\x1f')
.filter(|s| !s.is_empty())
.map(str::to_string)
.collect();
let summary = eager_hydrate::run_apply_pass(
Path::new(cache_root_s),
host_s,
remote_root_s,
&allowed,
batch_size,
batch_sleep_ms,
max_open_bytes,
binary_probe_bytes,
allow_empty != 0,
timeout_ms,
);
let Ok(serialized) = serde_json::to_string(&summary) else {
return AbiError::Serialization.code();
};
write_output(out_buf, out_cap, &serialized)
}
/// Derive a human-friendly venv label from a remote interpreter path.
///
/// # Safety

View File

@@ -95,6 +95,7 @@ from ._orchestrator import (
)
from ._tool_runtime import (
derive_venv_name,
eager_hydrate_apply,
eager_hydrate_find_candidates,
merge_remote_extension_catalog_json,
normalize_code_server_specs_json,
@@ -134,6 +135,7 @@ __all__ = (
"save_decision_code",
# _tool_runtime
"derive_venv_name",
"eager_hydrate_apply",
"eager_hydrate_find_candidates",
"merge_remote_extension_catalog_json",
"normalize_code_server_specs_json",

View File

@@ -7,7 +7,12 @@ import json
from typing import Any, Dict, Sequence, Tuple
from . import _loader
from ._loader import SessionsNativeLibraryError, _bind_abi_symbol, call_string_abi
from ._loader import (
SessionsNativeLibraryError,
_bind_abi_symbol,
_call_json_returning_abi,
call_string_abi,
)
def parse_ruff_diagnostics(
@@ -148,6 +153,63 @@ def eager_hydrate_find_candidates(
return tuple(out.split("\x1f"))
def eager_hydrate_apply(
*,
cache_root: str,
host_alias: str,
remote_workspace_root: str,
allowed_basenames: Sequence[str],
batch_size: int,
batch_sleep_ms: int,
max_open_bytes: int,
binary_probe_bytes: int,
allow_empty: bool,
timeout_ms: int,
) -> Dict[str, Any]:
"""Drive one Rust eager-hydrate apply pass (PR-B / PR 17).
Rust owns: candidate discovery, batch loop, batch_sleep pacing,
re-check zero-byte, local→remote mapping, ``file_open`` transaction,
outcome counting. Python writes sidecar metadata for ``hydrated``
entries and emits the trace event.
Returns a dict with keys ``hydrated`` (list of
``{"local_path": ..., "metadata": ...}``), ``skipped_existing``,
``failed``.
"""
decoded = _call_json_returning_abi(
"sessions_eager_hydrate_apply",
(
cache_root,
host_alias,
remote_workspace_root,
"\x1f".join(name for name in allowed_basenames if name),
ctypes.c_size_t(int(batch_size)),
ctypes.c_uint64(int(batch_sleep_ms)),
ctypes.c_uint64(int(max_open_bytes)),
ctypes.c_size_t(int(binary_probe_bytes)),
ctypes.c_int(1 if allow_empty else 0),
ctypes.c_uint64(int(timeout_ms)),
),
argtypes=[
ctypes.c_char_p,
ctypes.c_char_p,
ctypes.c_char_p,
ctypes.c_char_p,
ctypes.c_size_t,
ctypes.c_uint64,
ctypes.c_uint64,
ctypes.c_size_t,
ctypes.c_int,
ctypes.c_uint64,
],
initial_buf=64 * 1024,
)
if decoded is None:
return {"hydrated": [], "skipped_existing": 0, "failed": 0}
return decoded
def merge_remote_extension_catalog_json(
builtin_specs: Sequence[Dict[str, Any]], user_raw: Any
) -> Tuple[Dict[str, Any], ...]:

View File

@@ -52,9 +52,6 @@ from .eager_hydrate import (
from .eager_hydrate import (
DEFAULT_BATCH_SLEEP_S as _EAGER_HYDRATE_BATCH_SLEEP_S,
)
from .eager_hydrate import (
run_eager_hydrate,
)
from .file_state import (
FileOpenGuardrails,
OpenFileResult,
@@ -3583,64 +3580,52 @@ def _eager_hydrate_workspace(
) -> None:
"""Hydrate a bounded batch of placeholder build-graph files.
Runs on the background task worker. Each placeholder fetch reuses the
same primitive as ``SessionsOnDemandFetchListener`` so guardrails (size
caps, binary heuristics) stay consistent across entry points.
Wave 2 PR-B (PR 17): the apply pass body runs in Rust
(``sessions_native::eager_hydrate::run_apply_pass``). One Rust
round-trip drives candidate discovery, batch pacing, re-check, and
the per-placeholder ``file_open`` transaction. Python persists
sidecar metadata for entries Rust marked ``hydrated``.
"""
host_alias = context.recent_entry.host_alias
mapper = RemoteToLocalCacheMapper(
workspace_cache_key=context.cache_key,
limits = FileOpenGuardrails()
summary = _rust_ffi.eager_hydrate_apply(
cache_root=str(context.local_cache_root),
host_alias=context.recent_entry.host_alias,
remote_workspace_root=context.recent_entry.remote_root,
files_cache_root=context.local_cache_root,
)
def fetch_one(local_path: Path) -> bool:
remote = mapper.remote_path_for_local_cache_file(local_path)
if remote is None:
return False
try:
opened = open_remote_file_into_local_cache(
host_alias,
remote_absolute_path=remote,
local_cache_path=local_path,
read_timeout_s=30.0,
)
except Exception:
# Defensive: fetch primitive should already encode transport
# failures as ``OpenOutcome.TRANSPORT_ERROR``; guard against any
# unexpected exception so the batch driver can keep going.
_trace_event(
"mirror.eager_hydrate_fetch_error",
cache_key=context.cache_key,
local_path=str(local_path),
)
return False
if opened.outcome is OpenOutcome.OK:
if opened.remote_metadata is not None:
_write_remote_metadata_sidecar(local_path, opened.remote_metadata)
return True
_trace_event(
"mirror.eager_hydrate_fetch_failed",
cache_key=context.cache_key,
local_path=str(local_path),
outcome=getattr(opened.outcome, "value", str(opened.outcome)),
detail=opened.detail,
)
return False
summary = run_eager_hydrate(
context.local_cache_root,
fetch_fn=fetch_one,
allowed_basenames=basenames,
batch_size=_EAGER_HYDRATE_BATCH_SIZE,
batch_sleep_s=_EAGER_HYDRATE_BATCH_SLEEP_S,
batch_sleep_ms=int(_EAGER_HYDRATE_BATCH_SLEEP_S * 1000),
max_open_bytes=limits.max_open_bytes,
binary_probe_bytes=limits.binary_probe_bytes,
allow_empty=limits.allow_empty_files,
timeout_ms=30_000,
)
hydrated_entries = summary.get("hydrated", [])
for entry in hydrated_entries:
local_str = entry.get("local_path")
meta_dict = entry.get("metadata")
if not isinstance(local_str, str) or not isinstance(meta_dict, dict):
continue
kind_str = str(meta_dict.get("kind", RemoteFileKind.REGULAR_FILE.value))
try:
kind = RemoteFileKind(kind_str)
except ValueError:
kind = RemoteFileKind.OTHER
unix_mode_raw = meta_dict.get("unix_mode")
meta = RemoteFileMetadata(
mtime_ns=int(meta_dict.get("mtime_ns", 0)),
size_bytes=int(meta_dict.get("size_bytes", 0)),
kind=kind,
unix_mode=int(unix_mode_raw) if unix_mode_raw is not None else None,
)
_write_remote_metadata_sidecar(Path(local_str), meta)
_trace_event(
"mirror.eager_hydrate_done",
cache_key=context.cache_key,
hydrated=summary.hydrated,
skipped_existing=summary.skipped_existing,
failed=summary.failed,
hydrated=len(hydrated_entries),
skipped_existing=int(summary.get("skipped_existing", 0)),
failed=int(summary.get("failed", 0)),
)

View File

@@ -7,19 +7,17 @@ disk directly — it never flows through Sublime's ``open_file`` hook, so
zero-byte placeholder (created by the sidebar mirror pass), the CLI tool
reports a malformed manifest and gives up.
This module walks an already-mirrored local cache once a workspace activates
and schedules a bounded bulk fetch for placeholders whose basename matches a
small allow-list of "essential" files (``Cargo.toml``, ``pyproject.toml``,
``package.json``, …). The actual fetch primitive is injected so the driver
stays importable without the Sublime/SSH runtime.
This module exposes the candidate discovery + settings normaliser that
back the eager-hydrate apply pass. The driver itself (batch loop,
re-check, fetch transaction) lives in
``sessions_native::eager_hydrate::run_apply_pass`` (Wave 2 PR-B / PR 17)
— see :func:`sessions._rust_ffi.eager_hydrate_apply`.
"""
from __future__ import annotations
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Callable, Iterable, Iterator, List, Optional, Tuple
from typing import Iterable, Iterator, List, Tuple
from . import _rust_ffi
@@ -51,38 +49,6 @@ DEFAULT_BATCH_SIZE: int = 20
DEFAULT_BATCH_SLEEP_S: float = 0.05
@dataclass(frozen=True)
class EagerHydrateSummary:
"""Outcome of one eager-hydrate pass.
Attributes:
hydrated: Count of placeholders that were fetched successfully.
skipped_existing: Placeholders that turned out to have non-zero size
by the time the driver reached them (another worker won the race).
failed: Placeholders whose ``fetch_fn`` returned ``False``.
"""
hydrated: int = 0
skipped_existing: int = 0
failed: int = 0
def _is_placeholder(path: Path) -> bool:
"""Return ``True`` if ``path`` is a regular zero-byte file."""
try:
stat = path.stat()
except OSError:
return False
if stat.st_size != 0:
return False
# ``Path.is_file`` resolves symlinks; the Sessions cache never uses
# symlinks but the guard is cheap.
try:
return path.is_file()
except OSError:
return False
def find_placeholder_candidates(
cache_root: Path,
allowed_basenames: Iterable[str],
@@ -90,9 +56,8 @@ def find_placeholder_candidates(
"""Yield zero-byte files under ``cache_root`` whose basename is allowed.
Wave 2 PR 14: BFS + size filter run in
``sessions_native::eager_hydrate``. Pacing/batching stay in Python so
the FFI is one call per pass. Directories that fail to enumerate are
silently skipped (Rust matches Python's ``OSError`` swallow).
``sessions_native::eager_hydrate``. Directories that fail to enumerate
are silently skipped (Rust matches Python's ``OSError`` swallow).
"""
allowed_list = [name for name in allowed_basenames if name]
if not allowed_list:
@@ -107,88 +72,6 @@ def find_placeholder_candidates(
yield Path(path_str)
def batched(items: Iterable[Path], batch_size: int) -> Iterator[List[Path]]:
"""Yield ``items`` in lists of at most ``batch_size``.
Args:
items: Source iterable.
batch_size: Maximum list length; values ``<= 0`` collapse to ``1``.
"""
size = max(1, batch_size)
bucket: List[Path] = []
for item in items:
bucket.append(item)
if len(bucket) >= size:
yield bucket
bucket = []
if bucket:
yield bucket
FetchFn = Callable[[Path], bool]
"""Hydrate one placeholder. Returns ``True`` on success, ``False`` otherwise."""
def run_eager_hydrate(
cache_root: Path,
*,
fetch_fn: FetchFn,
allowed_basenames: Iterable[str] = DEFAULT_EAGER_HYDRATE_BASENAMES,
batch_size: int = DEFAULT_BATCH_SIZE,
batch_sleep_s: float = DEFAULT_BATCH_SLEEP_S,
sleep_fn: Optional[Callable[[float], None]] = None,
) -> EagerHydrateSummary:
"""Drive one hydrate pass over placeholders under ``cache_root``.
The driver is deliberately dumb: no retries, no per-file concurrency,
no global state. Failures are counted but do not abort the pass — the
next placeholder still gets its chance.
Args:
cache_root: Local cache root to walk.
fetch_fn: Callable invoked for each placeholder. Return ``True`` on
successful hydration. Must not raise; failures should be encoded
as ``False`` so the pass can continue.
allowed_basenames: Override for the default allow-list.
batch_size: Placeholders per batch before pausing.
batch_sleep_s: Pause between batches, in seconds.
sleep_fn: Injection point for tests; defaults to :func:`time.sleep`.
Returns:
An :class:`EagerHydrateSummary` with per-outcome counts.
"""
sleeper = sleep_fn if sleep_fn is not None else time.sleep
hydrated = 0
skipped_existing = 0
failed = 0
placeholders = find_placeholder_candidates(cache_root, allowed_basenames)
for batch_index, batch in enumerate(batched(placeholders, batch_size)):
if batch_index > 0 and batch_sleep_s > 0:
sleeper(batch_sleep_s)
for path in batch:
# Re-check size right before fetching: a different code path
# (``SessionsOnDemandFetchListener`` / sidebar hydrate) may have
# filled the placeholder while we were iterating.
if not _is_placeholder(path):
skipped_existing += 1
continue
try:
ok = bool(fetch_fn(path))
except Exception:
ok = False
if ok:
hydrated += 1
else:
failed += 1
return EagerHydrateSummary(
hydrated=hydrated,
skipped_existing=skipped_existing,
failed=failed,
)
def normalize_eager_hydrate_basenames(
raw: object,
default: Tuple[str, ...] = DEFAULT_EAGER_HYDRATE_BASENAMES,

View File

@@ -1,19 +1,24 @@
"""Unit tests for :mod:`sessions.eager_hydrate`."""
"""Unit tests for :mod:`sessions.eager_hydrate`.
Driver tests (``run_eager_hydrate``, ``batched``, ``EagerHydrateSummary``)
were dropped at PR-B / PR 17 — the apply pass body now runs entirely in
``sessions_native::eager_hydrate::run_apply_pass`` and is exercised by
the Rust unit tests + integration smoke against
``sessions_eager_hydrate_apply``. The Python side keeps the candidate
discovery wrapper + settings normaliser, which are still tested below.
"""
from __future__ import annotations
from pathlib import Path
from typing import List, Tuple
from typing import Tuple
import pytest
from sessions.eager_hydrate import (
DEFAULT_BATCH_SIZE,
DEFAULT_EAGER_HYDRATE_BASENAMES,
EagerHydrateSummary,
batched,
find_placeholder_candidates,
normalize_eager_hydrate_basenames,
run_eager_hydrate,
)
@@ -67,169 +72,6 @@ def test_find_placeholder_candidates_returns_empty_when_allow_list_empty(
assert out == []
def test_batched_yields_in_order_and_respects_size() -> None:
items = [Path("a"), Path("b"), Path("c"), Path("d"), Path("e")]
batches = list(batched(items, 2))
assert batches == [
[Path("a"), Path("b")],
[Path("c"), Path("d")],
[Path("e")],
]
def test_batched_collapses_nonpositive_size_to_one() -> None:
items = [Path("a"), Path("b")]
assert list(batched(items, 0)) == [[Path("a")], [Path("b")]]
assert list(batched(items, -5)) == [[Path("a")], [Path("b")]]
def test_run_eager_hydrate_fetches_all_placeholders(tmp_path: Path) -> None:
_make_placeholder(tmp_path / "Cargo.toml")
_make_placeholder(tmp_path / "sub" / "Cargo.lock")
calls: List[Path] = []
def fetch_fn(path: Path) -> bool:
calls.append(path)
path.write_bytes(b"content")
return True
summary = run_eager_hydrate(
tmp_path,
fetch_fn=fetch_fn,
allowed_basenames=("Cargo.toml", "Cargo.lock"),
sleep_fn=lambda _s: None,
)
assert summary == EagerHydrateSummary(hydrated=2, skipped_existing=0, failed=0)
assert sorted(calls) == sorted(
[tmp_path / "Cargo.toml", tmp_path / "sub" / "Cargo.lock"]
)
def test_run_eager_hydrate_counts_failures_without_aborting(tmp_path: Path) -> None:
good = tmp_path / "Cargo.toml"
bad = tmp_path / "pyproject.toml"
_make_placeholder(good)
_make_placeholder(bad)
def fetch_fn(path: Path) -> bool:
if path == bad:
return False
path.write_bytes(b"ok")
return True
summary = run_eager_hydrate(
tmp_path,
fetch_fn=fetch_fn,
allowed_basenames=("Cargo.toml", "pyproject.toml"),
sleep_fn=lambda _s: None,
)
assert summary.hydrated == 1
assert summary.failed == 1
assert summary.skipped_existing == 0
def test_run_eager_hydrate_counts_raising_fetch_as_failure(tmp_path: Path) -> None:
_make_placeholder(tmp_path / "Cargo.toml")
def fetch_fn(_path: Path) -> bool:
raise RuntimeError("boom")
summary = run_eager_hydrate(
tmp_path,
fetch_fn=fetch_fn,
allowed_basenames=("Cargo.toml",),
sleep_fn=lambda _s: None,
)
assert summary == EagerHydrateSummary(hydrated=0, skipped_existing=0, failed=1)
def test_run_eager_hydrate_skips_when_placeholder_already_filled(
tmp_path: Path,
) -> None:
# Two placeholders at enumeration time; while hydrating one, a concurrent
# code path fills the other. Recheck inside ``run_eager_hydrate`` must
# treat the now-non-empty peer as ``skipped_existing`` rather than
# failing or re-fetching.
first = tmp_path / "a" / "Cargo.toml"
second = tmp_path / "b" / "Cargo.toml"
_make_placeholder(first)
_make_placeholder(second)
def fetch_fn(path: Path) -> bool:
# Whichever placeholder runs first, clobber its sibling so the
# sibling's recheck trips the ``skipped_existing`` branch regardless
# of filesystem ordering.
peer = second if path == first else first
path.write_bytes(b"fetched body")
peer.write_bytes(b"concurrent body")
return True
# Batch size 8 forces both placeholders into one batch, so enumeration
# completes before any fetch runs.
summary = run_eager_hydrate(
tmp_path,
fetch_fn=fetch_fn,
allowed_basenames=("Cargo.toml",),
batch_size=8,
batch_sleep_s=0,
sleep_fn=lambda _s: None,
)
assert summary.hydrated == 1
assert summary.skipped_existing == 1
assert summary.failed == 0
def test_run_eager_hydrate_sleeps_between_batches_but_not_before_first(
tmp_path: Path,
) -> None:
for i in range(5):
_make_placeholder(tmp_path / "pkg{}".format(i) / "Cargo.toml")
sleeps: List[float] = []
def fetch_fn(path: Path) -> bool:
path.write_bytes(b"x")
return True
summary = run_eager_hydrate(
tmp_path,
fetch_fn=fetch_fn,
allowed_basenames=("Cargo.toml",),
batch_size=2,
batch_sleep_s=0.123,
sleep_fn=lambda s: sleeps.append(s),
)
assert summary.hydrated == 5
# 5 items in batches of 2 => batches [2, 2, 1]; sleep fires before
# batches 2 and 3, i.e. twice.
assert sleeps == [0.123, 0.123]
def test_run_eager_hydrate_skips_sleep_when_interval_zero(tmp_path: Path) -> None:
for i in range(3):
_make_placeholder(tmp_path / "pkg{}".format(i) / "Cargo.toml")
sleeps: List[float] = []
def fetch_fn(path: Path) -> bool:
path.write_bytes(b"x")
return True
run_eager_hydrate(
tmp_path,
fetch_fn=fetch_fn,
allowed_basenames=("Cargo.toml",),
batch_size=1,
batch_sleep_s=0.0,
sleep_fn=lambda s: sleeps.append(s),
)
assert sleeps == []
def test_default_batch_size_is_capped_low_enough_for_edr() -> None:
# Documented batch size is 20 per spec; guard against silent bumps.
assert DEFAULT_BATCH_SIZE == 20

View File

@@ -1,58 +1,28 @@
"""Parity baseline for ``eager_hydrate`` BFS + batching + sleep pacing.
"""Parity baseline for ``eager_hydrate`` BFS + apply pass.
Wave 1.5 amend §D paired parity test PR — PR 14 (envelope land 후 BFS Rust
이관, ``local_bridge::remote_cache_mirror`` 통합) 의 baseline. 기존
``test_eager_hydrate.py`` 14 시나리오를 보존하면서 +12 추가:
- batched edge cases (empty / exact / single).
- find_placeholder_candidates 추가 boundary (size>0 ignored, basename
Wave 1.5 amend §D paired parity test — PR 14 (BFS Rust 이관) +
PR-B / PR 17 (apply pass body Rust 이관) baseline. After PR-B the
batched/run_eager_hydrate driver lives entirely in
``sessions_native::eager_hydrate::run_apply_pass`` (Rust unit-tested
side); the Python parity baseline now pins:
- ``find_placeholder_candidates`` boundary (size>0 ignored, basename
case-sensitivity, nested traversal, cache_root is file).
- run_eager_hydrate 호출 순서 / fetch_fn 인자 검증 / batch boundary.
- normalize_eager_hydrate_basenames edge cases.
- ``normalize_eager_hydrate_basenames`` edge cases.
- Default constants invariants used by Python wrappers.
"""
from __future__ import annotations
from pathlib import Path
from typing import List
from sessions.eager_hydrate import (
DEFAULT_BATCH_SIZE,
DEFAULT_BATCH_SLEEP_S,
DEFAULT_EAGER_HYDRATE_BASENAMES,
EagerHydrateSummary,
batched,
find_placeholder_candidates,
normalize_eager_hydrate_basenames,
run_eager_hydrate,
)
# ---------------------------------------------------------------------------
# batched edge cases
# ---------------------------------------------------------------------------
def test_batched_empty_iterable_yields_nothing() -> None:
assert list(batched(iter([]), 5)) == []
def test_batched_single_item_yields_single_batch() -> None:
items = [Path("/x")]
assert list(batched(iter(items), 5)) == [[Path("/x")]]
def test_batched_exact_multiple_no_trailing_partial() -> None:
items = [Path(str(i)) for i in range(6)]
out = list(batched(iter(items), 3))
assert len(out) == 2
assert all(len(b) == 3 for b in out)
def test_batched_partial_trailing_batch() -> None:
items = [Path(str(i)) for i in range(7)]
out = list(batched(iter(items), 3))
assert [len(b) for b in out] == [3, 3, 1]
# ---------------------------------------------------------------------------
# find_placeholder_candidates boundaries
# ---------------------------------------------------------------------------
@@ -94,55 +64,6 @@ def test_find_placeholder_root_is_file_not_dir(tmp_path: Path) -> None:
assert out == []
# ---------------------------------------------------------------------------
# run_eager_hydrate behaviour pinning
# ---------------------------------------------------------------------------
def test_run_eager_hydrate_passes_path_to_fetch_fn(tmp_path: Path) -> None:
target = tmp_path / "Cargo.toml"
_touch(target, size=0)
seen: List[Path] = []
def fetch(path: Path) -> bool:
seen.append(path)
# Simulate hydration: write content so the post-fetch check sees it.
path.write_text("[package]\n")
return True
summary = run_eager_hydrate(
tmp_path, fetch_fn=fetch, batch_sleep_s=0.0, sleep_fn=lambda _s: None
)
assert seen == [target]
assert summary.hydrated == 1
def test_run_eager_hydrate_returns_zero_summary_when_no_candidates(
tmp_path: Path,
) -> None:
summary = run_eager_hydrate(
tmp_path,
fetch_fn=lambda _p: True,
batch_sleep_s=0.0,
sleep_fn=lambda _s: None,
)
assert summary == EagerHydrateSummary(hydrated=0, skipped_existing=0, failed=0)
def test_run_eager_hydrate_disabled_when_basenames_empty(tmp_path: Path) -> None:
_touch(tmp_path / "Cargo.toml", size=0)
seen: List[Path] = []
summary = run_eager_hydrate(
tmp_path,
fetch_fn=lambda p: seen.append(p) or True,
allowed_basenames=(),
batch_sleep_s=0.0,
sleep_fn=lambda _s: None,
)
assert seen == []
assert summary.hydrated == 0
# ---------------------------------------------------------------------------
# normalize_eager_hydrate_basenames edge cases
# ---------------------------------------------------------------------------

2
uv.lock generated
View File

@@ -854,7 +854,7 @@ wheels = [
[[package]]
name = "sessions-sublime"
version = "0.7.25"
version = "0.7.26"
source = { virtual = "." }
[package.dev-dependencies]