Compare commits
2 Commits
b7189f9550
...
32c3e6241a
| Author | SHA1 | Date | |
|---|---|---|---|
| 32c3e6241a | |||
| 9691726d99 |
@@ -295,10 +295,10 @@ queue/dispatcher/lane gating + `_CONNECT_GENERATION` token 의미 + `_connect_ge
|
||||
### PR 17+ — 본 plan scope 밖 (별도 갱신)
|
||||
|
||||
PR 16(PR-A) land 후 본 plan을 갱신해서:
|
||||
- **PR-B**: mirror BFS task body, eager_hydrate apply 본체 → orchestrator (PR 13b envelope 위에서)
|
||||
- **H3-queue**: BACKLOG H3 본 이관 (queue 본체)
|
||||
- **H2-save / H2-connect**: BACKLOG H2 분할 (Track H2 main track 흡수)
|
||||
- **`_rust_ffi` 디코더 Rust 이관**: `_parse_*_outcome` Rust ABI typed JSON (Rust schema oracle 도구는 잔존 쟁점 #6 결정 후)
|
||||
- **PR 17 / PR-B** ✅ `9691726` — eager_hydrate apply pass body → `sessions_native::eager_hydrate::run_apply_pass`. Python driver 삭제(`run_eager_hydrate`/`batched`/`EagerHydrateSummary`); 1 Rust round-trip per pass + Python sidecar 쓰기.
|
||||
- **PR 18 / H3-queue** ⏭ — BACKLOG H3 본 이관 (queue 본체)
|
||||
- **PR 19 / `_rust_ffi` 디코더 Rust 이관** ⏭ — `_parse_*_outcome` Rust ABI typed JSON (Rust schema oracle 도구는 잔존 쟁점 #6 결정 후)
|
||||
- **H2-save / H2-connect**: BACKLOG H2 분할 (Track H2 main track 흡수, *병행*)
|
||||
- **데드라인 Layer 3** auto-revert 활성화
|
||||
|
||||
이 시점에 commands.py 예상 LOC: 7394 - (worker loop ~550) - (connect SM ~330 부분) - (hydrate preflight ~300, PR 12–14 영향) ≈ **5500–6000 LOC**.
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
//! Eager-hydrate placeholder discovery (Wave 2 PR 14).
|
||||
//! Eager-hydrate placeholder discovery (Wave 2 PR 14) + apply pass body
|
||||
//! (Wave 2 PR 17 / PR-B).
|
||||
//!
|
||||
//! Walks a local cache root and yields zero-byte regular files whose basename
|
||||
//! is in an allow-list. Mirrors the Python ``find_placeholder_candidates``
|
||||
@@ -11,15 +12,21 @@
|
||||
//! cache → produces what candidates it can).
|
||||
//! - Empty allow-list returns no candidates.
|
||||
//!
|
||||
//! Batching/sleep pacing stays in Python. The Rust side returns a sorted
|
||||
//! `Vec<String>` of absolute paths so the caller can deterministically batch
|
||||
//! over the result without invoking a Python callback per file (the FFI
|
||||
//! round-trip cost outweighs any LOC savings — see rust-pragmatist's note
|
||||
//! in the team synthesis).
|
||||
//! PR-B (apply pass body) extends the Rust ownership: the loop, batch
|
||||
//! pacing, per-placeholder ``file_open`` transaction, and outcome
|
||||
//! collection all run in Rust. Python becomes a thin caller — one FFI
|
||||
//! round-trip per pass, then writes sidecar metadata for hydrated entries.
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::fs;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
use serde_json::{Value, json};
|
||||
|
||||
use crate::file_open;
|
||||
use crate::map_local_to_remote_path;
|
||||
|
||||
/// Return zero-byte regular files under `cache_root` whose basename is in
|
||||
/// `allowed_basenames`. Order is BFS-stable but not lexicographic.
|
||||
@@ -91,6 +98,100 @@ pub fn find_placeholder_candidates(
|
||||
out
|
||||
}
|
||||
|
||||
/// Drive one eager-hydrate apply pass over placeholders under
|
||||
/// ``cache_root``. Returns a JSON object summarising the pass:
|
||||
///
|
||||
/// ```json
|
||||
/// {
|
||||
/// "hydrated": [{"local_path": "...", "metadata": {...}}, ...],
|
||||
/// "skipped_existing": N,
|
||||
/// "failed": M
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
/// The driver mirrors the previous Python ``run_eager_hydrate``:
|
||||
/// re-checks zero-byte before fetch (so a concurrent path filling the
|
||||
/// placeholder lands in ``skipped_existing`` rather than re-fetched),
|
||||
/// counts failures without aborting, and pauses ``batch_sleep_ms``
|
||||
/// between batches.
|
||||
///
|
||||
/// Per-placeholder, calls :func:`file_open::run_file_open_transaction`
|
||||
/// (PR 14.5c). The Python wrapper (PR 14.5d) is bypassed so we get one
|
||||
/// FFI round-trip per pass instead of one per file.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn run_apply_pass(
|
||||
cache_root: &Path,
|
||||
host_alias: &str,
|
||||
remote_workspace_root: &str,
|
||||
allowed_basenames: &[String],
|
||||
batch_size: usize,
|
||||
batch_sleep_ms: u64,
|
||||
max_open_bytes: u64,
|
||||
binary_probe_bytes: usize,
|
||||
allow_empty: bool,
|
||||
timeout_ms: u64,
|
||||
) -> Value {
|
||||
let placeholders = find_placeholder_candidates(cache_root, allowed_basenames);
|
||||
let mut hydrated: Vec<Value> = Vec::new();
|
||||
let mut skipped_existing: usize = 0;
|
||||
let mut failed: usize = 0;
|
||||
|
||||
let batch_size_safe = if batch_size == 0 { 1 } else { batch_size };
|
||||
|
||||
for (batch_index, batch) in placeholders.chunks(batch_size_safe).enumerate() {
|
||||
if batch_index > 0 && batch_sleep_ms > 0 {
|
||||
thread::sleep(Duration::from_millis(batch_sleep_ms));
|
||||
}
|
||||
for path in batch {
|
||||
// Re-check zero-byte: a concurrent path (sidebar hydrate /
|
||||
// on-demand fetch) may have filled the placeholder while we
|
||||
// were iterating. Mirror Python's pre-fetch guard.
|
||||
let still_placeholder = match path.metadata() {
|
||||
Ok(m) => m.is_file() && m.len() == 0,
|
||||
Err(_) => false,
|
||||
};
|
||||
if !still_placeholder {
|
||||
skipped_existing += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
let remote = match map_local_to_remote_path(remote_workspace_root, cache_root, path) {
|
||||
Some(r) => r,
|
||||
None => {
|
||||
failed += 1;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let outcome = file_open::run_file_open_transaction(
|
||||
host_alias,
|
||||
&remote,
|
||||
path,
|
||||
max_open_bytes,
|
||||
binary_probe_bytes,
|
||||
allow_empty,
|
||||
timeout_ms,
|
||||
);
|
||||
let outcome_str = outcome.get("outcome").and_then(Value::as_str).unwrap_or("");
|
||||
if outcome_str == "OK" {
|
||||
let metadata = outcome.get("metadata").cloned().unwrap_or(Value::Null);
|
||||
hydrated.push(json!({
|
||||
"local_path": path.to_string_lossy(),
|
||||
"metadata": metadata,
|
||||
}));
|
||||
} else {
|
||||
failed += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
json!({
|
||||
"hydrated": hydrated,
|
||||
"skipped_existing": skipped_existing,
|
||||
"failed": failed,
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -257,7 +257,7 @@ fn write_output(out_buf: *mut c_char, out_cap: usize, value: &str) -> c_int {
|
||||
0
|
||||
}
|
||||
|
||||
fn normalize_local_path(path: &Path) -> PathBuf {
|
||||
pub(crate) fn normalize_local_path(path: &Path) -> PathBuf {
|
||||
let base = if path.is_absolute() {
|
||||
path.to_path_buf()
|
||||
} else if let Ok(cwd) = std::env::current_dir() {
|
||||
@@ -278,6 +278,45 @@ fn normalize_local_path(path: &Path) -> PathBuf {
|
||||
out
|
||||
}
|
||||
|
||||
/// Map ``local_path`` (under ``files_cache_root``) back to a remote POSIX
|
||||
/// path. Returns ``None`` when the path does not belong to this cache root.
|
||||
///
|
||||
/// Mirrors the ABI ``sessions_file_map_local_to_remote`` logic so the
|
||||
/// orchestrator-side (eager hydrate, mirror BFS body) does not need to
|
||||
/// re-implement it.
|
||||
pub(crate) fn map_local_to_remote_path(
|
||||
remote_root: &str,
|
||||
files_cache_root: &Path,
|
||||
local_path: &Path,
|
||||
) -> Option<String> {
|
||||
let cache_root = normalize_local_path(files_cache_root);
|
||||
let local = normalize_local_path(local_path);
|
||||
let extern_root = cache_root.join("__extern");
|
||||
if let Ok(rel) = local.strip_prefix(&extern_root) {
|
||||
let rel_s = rel
|
||||
.components()
|
||||
.map(|c| c.as_os_str().to_string_lossy().into_owned())
|
||||
.collect::<Vec<String>>()
|
||||
.join("/");
|
||||
return Some(format!("/{}", rel_s));
|
||||
}
|
||||
let rel = local.strip_prefix(&cache_root).ok()?;
|
||||
let rel_s = rel
|
||||
.components()
|
||||
.map(|c| c.as_os_str().to_string_lossy().into_owned())
|
||||
.collect::<Vec<String>>()
|
||||
.join("/");
|
||||
let root_trim = remote_root.trim_end_matches('/');
|
||||
let remote = if root_trim.is_empty() || root_trim == "/" {
|
||||
format!("/{}", rel_s)
|
||||
} else if rel_s.is_empty() {
|
||||
root_trim.to_string()
|
||||
} else {
|
||||
format!("{}/{}", root_trim, rel_s)
|
||||
};
|
||||
Some(remote)
|
||||
}
|
||||
|
||||
fn split_posix(path: &str) -> Vec<&str> {
|
||||
path.split('/').filter(|part| !part.is_empty()).collect()
|
||||
}
|
||||
@@ -828,35 +867,14 @@ pub unsafe extern "C" fn sessions_file_map_local_to_remote(
|
||||
return AbiError::InvalidUtf8.code();
|
||||
};
|
||||
|
||||
let cache_root = normalize_local_path(Path::new(files_cache_root_s));
|
||||
let local = normalize_local_path(Path::new(local_path_s));
|
||||
let extern_root = cache_root.join("__extern");
|
||||
if let Ok(rel) = local.strip_prefix(&extern_root) {
|
||||
let rel_s = rel
|
||||
.components()
|
||||
.map(|c| c.as_os_str().to_string_lossy().into_owned())
|
||||
.collect::<Vec<String>>()
|
||||
.join("/");
|
||||
let remote = format!("/{}", rel_s);
|
||||
return write_output(out_buf, out_cap, &remote);
|
||||
match map_local_to_remote_path(
|
||||
remote_root_s,
|
||||
Path::new(files_cache_root_s),
|
||||
Path::new(local_path_s),
|
||||
) {
|
||||
Some(remote) => write_output(out_buf, out_cap, &remote),
|
||||
None => 1,
|
||||
}
|
||||
let Ok(rel) = local.strip_prefix(&cache_root) else {
|
||||
return 1;
|
||||
};
|
||||
let rel_s = rel
|
||||
.components()
|
||||
.map(|c| c.as_os_str().to_string_lossy().into_owned())
|
||||
.collect::<Vec<String>>()
|
||||
.join("/");
|
||||
let root_trim = remote_root_s.trim_end_matches('/');
|
||||
let remote = if root_trim.is_empty() || root_trim == "/" {
|
||||
format!("/{}", rel_s)
|
||||
} else if rel_s.is_empty() {
|
||||
root_trim.to_string()
|
||||
} else {
|
||||
format!("{}/{}", root_trim, rel_s)
|
||||
};
|
||||
write_output(out_buf, out_cap, &remote)
|
||||
}
|
||||
|
||||
/// Return `1` if local path is under `files_cache_root/__extern`, else `0`.
|
||||
@@ -1553,6 +1571,78 @@ pub unsafe extern "C" fn sessions_eager_hydrate_find_candidates(
|
||||
write_output(out_buf, out_cap, &joined)
|
||||
}
|
||||
|
||||
/// Run the eager-hydrate apply pass body (Wave 2 PR-B).
|
||||
///
|
||||
/// One Rust round-trip drives the entire pass: find candidates →
|
||||
/// per-batch sleep → re-check zero-byte → map local→remote → file_open
|
||||
/// transaction → collect outcomes. Python writes sidecar metadata for
|
||||
/// the returned ``hydrated`` list.
|
||||
///
|
||||
/// # Safety
|
||||
/// `cache_root`, `host_alias`, `remote_workspace_root`, and
|
||||
/// `allowed_basenames_joined` must be valid UTF-8 C strings (the latter
|
||||
/// uses 0x1F as the unit separator). `out_buf` must be writable for
|
||||
/// `out_cap` bytes when non-null. Returns 0 on success and writes a
|
||||
/// JSON object documented on
|
||||
/// :func:`eager_hydrate::run_apply_pass`.
|
||||
#[unsafe(no_mangle)]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub unsafe extern "C" fn sessions_eager_hydrate_apply(
|
||||
cache_root: *const c_char,
|
||||
host_alias: *const c_char,
|
||||
remote_workspace_root: *const c_char,
|
||||
allowed_basenames_joined: *const c_char,
|
||||
batch_size: usize,
|
||||
batch_sleep_ms: u64,
|
||||
max_open_bytes: u64,
|
||||
binary_probe_bytes: usize,
|
||||
allow_empty: c_int,
|
||||
timeout_ms: u64,
|
||||
out_buf: *mut c_char,
|
||||
out_cap: usize,
|
||||
) -> c_int {
|
||||
if cache_root.is_null()
|
||||
|| host_alias.is_null()
|
||||
|| remote_workspace_root.is_null()
|
||||
|| allowed_basenames_joined.is_null()
|
||||
{
|
||||
return AbiError::NullPointer.code();
|
||||
}
|
||||
let Ok(cache_root_s) = (unsafe { CStr::from_ptr(cache_root) }).to_str() else {
|
||||
return AbiError::InvalidUtf8.code();
|
||||
};
|
||||
let Ok(host_s) = (unsafe { CStr::from_ptr(host_alias) }).to_str() else {
|
||||
return AbiError::InvalidUtf8.code();
|
||||
};
|
||||
let Ok(remote_root_s) = (unsafe { CStr::from_ptr(remote_workspace_root) }).to_str() else {
|
||||
return AbiError::InvalidUtf8.code();
|
||||
};
|
||||
let Ok(allowed_s) = (unsafe { CStr::from_ptr(allowed_basenames_joined) }).to_str() else {
|
||||
return AbiError::InvalidUtf8.code();
|
||||
};
|
||||
let allowed: Vec<String> = allowed_s
|
||||
.split('\x1f')
|
||||
.filter(|s| !s.is_empty())
|
||||
.map(str::to_string)
|
||||
.collect();
|
||||
let summary = eager_hydrate::run_apply_pass(
|
||||
Path::new(cache_root_s),
|
||||
host_s,
|
||||
remote_root_s,
|
||||
&allowed,
|
||||
batch_size,
|
||||
batch_sleep_ms,
|
||||
max_open_bytes,
|
||||
binary_probe_bytes,
|
||||
allow_empty != 0,
|
||||
timeout_ms,
|
||||
);
|
||||
let Ok(serialized) = serde_json::to_string(&summary) else {
|
||||
return AbiError::Serialization.code();
|
||||
};
|
||||
write_output(out_buf, out_cap, &serialized)
|
||||
}
|
||||
|
||||
/// Derive a human-friendly venv label from a remote interpreter path.
|
||||
///
|
||||
/// # Safety
|
||||
|
||||
@@ -95,6 +95,7 @@ from ._orchestrator import (
|
||||
)
|
||||
from ._tool_runtime import (
|
||||
derive_venv_name,
|
||||
eager_hydrate_apply,
|
||||
eager_hydrate_find_candidates,
|
||||
merge_remote_extension_catalog_json,
|
||||
normalize_code_server_specs_json,
|
||||
@@ -134,6 +135,7 @@ __all__ = (
|
||||
"save_decision_code",
|
||||
# _tool_runtime
|
||||
"derive_venv_name",
|
||||
"eager_hydrate_apply",
|
||||
"eager_hydrate_find_candidates",
|
||||
"merge_remote_extension_catalog_json",
|
||||
"normalize_code_server_specs_json",
|
||||
|
||||
@@ -7,7 +7,12 @@ import json
|
||||
from typing import Any, Dict, Sequence, Tuple
|
||||
|
||||
from . import _loader
|
||||
from ._loader import SessionsNativeLibraryError, _bind_abi_symbol, call_string_abi
|
||||
from ._loader import (
|
||||
SessionsNativeLibraryError,
|
||||
_bind_abi_symbol,
|
||||
_call_json_returning_abi,
|
||||
call_string_abi,
|
||||
)
|
||||
|
||||
|
||||
def parse_ruff_diagnostics(
|
||||
@@ -148,6 +153,63 @@ def eager_hydrate_find_candidates(
|
||||
return tuple(out.split("\x1f"))
|
||||
|
||||
|
||||
def eager_hydrate_apply(
|
||||
*,
|
||||
cache_root: str,
|
||||
host_alias: str,
|
||||
remote_workspace_root: str,
|
||||
allowed_basenames: Sequence[str],
|
||||
batch_size: int,
|
||||
batch_sleep_ms: int,
|
||||
max_open_bytes: int,
|
||||
binary_probe_bytes: int,
|
||||
allow_empty: bool,
|
||||
timeout_ms: int,
|
||||
) -> Dict[str, Any]:
|
||||
"""Drive one Rust eager-hydrate apply pass (PR-B / PR 17).
|
||||
|
||||
Rust owns: candidate discovery, batch loop, batch_sleep pacing,
|
||||
re-check zero-byte, local→remote mapping, ``file_open`` transaction,
|
||||
outcome counting. Python writes sidecar metadata for ``hydrated``
|
||||
entries and emits the trace event.
|
||||
|
||||
Returns a dict with keys ``hydrated`` (list of
|
||||
``{"local_path": ..., "metadata": ...}``), ``skipped_existing``,
|
||||
``failed``.
|
||||
"""
|
||||
decoded = _call_json_returning_abi(
|
||||
"sessions_eager_hydrate_apply",
|
||||
(
|
||||
cache_root,
|
||||
host_alias,
|
||||
remote_workspace_root,
|
||||
"\x1f".join(name for name in allowed_basenames if name),
|
||||
ctypes.c_size_t(int(batch_size)),
|
||||
ctypes.c_uint64(int(batch_sleep_ms)),
|
||||
ctypes.c_uint64(int(max_open_bytes)),
|
||||
ctypes.c_size_t(int(binary_probe_bytes)),
|
||||
ctypes.c_int(1 if allow_empty else 0),
|
||||
ctypes.c_uint64(int(timeout_ms)),
|
||||
),
|
||||
argtypes=[
|
||||
ctypes.c_char_p,
|
||||
ctypes.c_char_p,
|
||||
ctypes.c_char_p,
|
||||
ctypes.c_char_p,
|
||||
ctypes.c_size_t,
|
||||
ctypes.c_uint64,
|
||||
ctypes.c_uint64,
|
||||
ctypes.c_size_t,
|
||||
ctypes.c_int,
|
||||
ctypes.c_uint64,
|
||||
],
|
||||
initial_buf=64 * 1024,
|
||||
)
|
||||
if decoded is None:
|
||||
return {"hydrated": [], "skipped_existing": 0, "failed": 0}
|
||||
return decoded
|
||||
|
||||
|
||||
def merge_remote_extension_catalog_json(
|
||||
builtin_specs: Sequence[Dict[str, Any]], user_raw: Any
|
||||
) -> Tuple[Dict[str, Any], ...]:
|
||||
|
||||
@@ -52,9 +52,6 @@ from .eager_hydrate import (
|
||||
from .eager_hydrate import (
|
||||
DEFAULT_BATCH_SLEEP_S as _EAGER_HYDRATE_BATCH_SLEEP_S,
|
||||
)
|
||||
from .eager_hydrate import (
|
||||
run_eager_hydrate,
|
||||
)
|
||||
from .file_state import (
|
||||
FileOpenGuardrails,
|
||||
OpenFileResult,
|
||||
@@ -3583,64 +3580,52 @@ def _eager_hydrate_workspace(
|
||||
) -> None:
|
||||
"""Hydrate a bounded batch of placeholder build-graph files.
|
||||
|
||||
Runs on the background task worker. Each placeholder fetch reuses the
|
||||
same primitive as ``SessionsOnDemandFetchListener`` so guardrails (size
|
||||
caps, binary heuristics) stay consistent across entry points.
|
||||
Wave 2 PR-B (PR 17): the apply pass body runs in Rust
|
||||
(``sessions_native::eager_hydrate::run_apply_pass``). One Rust
|
||||
round-trip drives candidate discovery, batch pacing, re-check, and
|
||||
the per-placeholder ``file_open`` transaction. Python persists
|
||||
sidecar metadata for entries Rust marked ``hydrated``.
|
||||
"""
|
||||
host_alias = context.recent_entry.host_alias
|
||||
mapper = RemoteToLocalCacheMapper(
|
||||
workspace_cache_key=context.cache_key,
|
||||
limits = FileOpenGuardrails()
|
||||
summary = _rust_ffi.eager_hydrate_apply(
|
||||
cache_root=str(context.local_cache_root),
|
||||
host_alias=context.recent_entry.host_alias,
|
||||
remote_workspace_root=context.recent_entry.remote_root,
|
||||
files_cache_root=context.local_cache_root,
|
||||
)
|
||||
|
||||
def fetch_one(local_path: Path) -> bool:
|
||||
remote = mapper.remote_path_for_local_cache_file(local_path)
|
||||
if remote is None:
|
||||
return False
|
||||
try:
|
||||
opened = open_remote_file_into_local_cache(
|
||||
host_alias,
|
||||
remote_absolute_path=remote,
|
||||
local_cache_path=local_path,
|
||||
read_timeout_s=30.0,
|
||||
)
|
||||
except Exception:
|
||||
# Defensive: fetch primitive should already encode transport
|
||||
# failures as ``OpenOutcome.TRANSPORT_ERROR``; guard against any
|
||||
# unexpected exception so the batch driver can keep going.
|
||||
_trace_event(
|
||||
"mirror.eager_hydrate_fetch_error",
|
||||
cache_key=context.cache_key,
|
||||
local_path=str(local_path),
|
||||
)
|
||||
return False
|
||||
if opened.outcome is OpenOutcome.OK:
|
||||
if opened.remote_metadata is not None:
|
||||
_write_remote_metadata_sidecar(local_path, opened.remote_metadata)
|
||||
return True
|
||||
_trace_event(
|
||||
"mirror.eager_hydrate_fetch_failed",
|
||||
cache_key=context.cache_key,
|
||||
local_path=str(local_path),
|
||||
outcome=getattr(opened.outcome, "value", str(opened.outcome)),
|
||||
detail=opened.detail,
|
||||
)
|
||||
return False
|
||||
|
||||
summary = run_eager_hydrate(
|
||||
context.local_cache_root,
|
||||
fetch_fn=fetch_one,
|
||||
allowed_basenames=basenames,
|
||||
batch_size=_EAGER_HYDRATE_BATCH_SIZE,
|
||||
batch_sleep_s=_EAGER_HYDRATE_BATCH_SLEEP_S,
|
||||
batch_sleep_ms=int(_EAGER_HYDRATE_BATCH_SLEEP_S * 1000),
|
||||
max_open_bytes=limits.max_open_bytes,
|
||||
binary_probe_bytes=limits.binary_probe_bytes,
|
||||
allow_empty=limits.allow_empty_files,
|
||||
timeout_ms=30_000,
|
||||
)
|
||||
|
||||
hydrated_entries = summary.get("hydrated", [])
|
||||
for entry in hydrated_entries:
|
||||
local_str = entry.get("local_path")
|
||||
meta_dict = entry.get("metadata")
|
||||
if not isinstance(local_str, str) or not isinstance(meta_dict, dict):
|
||||
continue
|
||||
kind_str = str(meta_dict.get("kind", RemoteFileKind.REGULAR_FILE.value))
|
||||
try:
|
||||
kind = RemoteFileKind(kind_str)
|
||||
except ValueError:
|
||||
kind = RemoteFileKind.OTHER
|
||||
unix_mode_raw = meta_dict.get("unix_mode")
|
||||
meta = RemoteFileMetadata(
|
||||
mtime_ns=int(meta_dict.get("mtime_ns", 0)),
|
||||
size_bytes=int(meta_dict.get("size_bytes", 0)),
|
||||
kind=kind,
|
||||
unix_mode=int(unix_mode_raw) if unix_mode_raw is not None else None,
|
||||
)
|
||||
_write_remote_metadata_sidecar(Path(local_str), meta)
|
||||
|
||||
_trace_event(
|
||||
"mirror.eager_hydrate_done",
|
||||
cache_key=context.cache_key,
|
||||
hydrated=summary.hydrated,
|
||||
skipped_existing=summary.skipped_existing,
|
||||
failed=summary.failed,
|
||||
hydrated=len(hydrated_entries),
|
||||
skipped_existing=int(summary.get("skipped_existing", 0)),
|
||||
failed=int(summary.get("failed", 0)),
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -7,19 +7,17 @@ disk directly — it never flows through Sublime's ``open_file`` hook, so
|
||||
zero-byte placeholder (created by the sidebar mirror pass), the CLI tool
|
||||
reports a malformed manifest and gives up.
|
||||
|
||||
This module walks an already-mirrored local cache once a workspace activates
|
||||
and schedules a bounded bulk fetch for placeholders whose basename matches a
|
||||
small allow-list of "essential" files (``Cargo.toml``, ``pyproject.toml``,
|
||||
``package.json``, …). The actual fetch primitive is injected so the driver
|
||||
stays importable without the Sublime/SSH runtime.
|
||||
This module exposes the candidate discovery + settings normaliser that
|
||||
back the eager-hydrate apply pass. The driver itself (batch loop,
|
||||
re-check, fetch transaction) lives in
|
||||
``sessions_native::eager_hydrate::run_apply_pass`` (Wave 2 PR-B / PR 17)
|
||||
— see :func:`sessions._rust_ffi.eager_hydrate_apply`.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Callable, Iterable, Iterator, List, Optional, Tuple
|
||||
from typing import Iterable, Iterator, List, Tuple
|
||||
|
||||
from . import _rust_ffi
|
||||
|
||||
@@ -51,38 +49,6 @@ DEFAULT_BATCH_SIZE: int = 20
|
||||
DEFAULT_BATCH_SLEEP_S: float = 0.05
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class EagerHydrateSummary:
|
||||
"""Outcome of one eager-hydrate pass.
|
||||
|
||||
Attributes:
|
||||
hydrated: Count of placeholders that were fetched successfully.
|
||||
skipped_existing: Placeholders that turned out to have non-zero size
|
||||
by the time the driver reached them (another worker won the race).
|
||||
failed: Placeholders whose ``fetch_fn`` returned ``False``.
|
||||
"""
|
||||
|
||||
hydrated: int = 0
|
||||
skipped_existing: int = 0
|
||||
failed: int = 0
|
||||
|
||||
|
||||
def _is_placeholder(path: Path) -> bool:
|
||||
"""Return ``True`` if ``path`` is a regular zero-byte file."""
|
||||
try:
|
||||
stat = path.stat()
|
||||
except OSError:
|
||||
return False
|
||||
if stat.st_size != 0:
|
||||
return False
|
||||
# ``Path.is_file`` resolves symlinks; the Sessions cache never uses
|
||||
# symlinks but the guard is cheap.
|
||||
try:
|
||||
return path.is_file()
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
|
||||
def find_placeholder_candidates(
|
||||
cache_root: Path,
|
||||
allowed_basenames: Iterable[str],
|
||||
@@ -90,9 +56,8 @@ def find_placeholder_candidates(
|
||||
"""Yield zero-byte files under ``cache_root`` whose basename is allowed.
|
||||
|
||||
Wave 2 PR 14: BFS + size filter run in
|
||||
``sessions_native::eager_hydrate``. Pacing/batching stay in Python so
|
||||
the FFI is one call per pass. Directories that fail to enumerate are
|
||||
silently skipped (Rust matches Python's ``OSError`` swallow).
|
||||
``sessions_native::eager_hydrate``. Directories that fail to enumerate
|
||||
are silently skipped (Rust matches Python's ``OSError`` swallow).
|
||||
"""
|
||||
allowed_list = [name for name in allowed_basenames if name]
|
||||
if not allowed_list:
|
||||
@@ -107,88 +72,6 @@ def find_placeholder_candidates(
|
||||
yield Path(path_str)
|
||||
|
||||
|
||||
def batched(items: Iterable[Path], batch_size: int) -> Iterator[List[Path]]:
|
||||
"""Yield ``items`` in lists of at most ``batch_size``.
|
||||
|
||||
Args:
|
||||
items: Source iterable.
|
||||
batch_size: Maximum list length; values ``<= 0`` collapse to ``1``.
|
||||
"""
|
||||
size = max(1, batch_size)
|
||||
bucket: List[Path] = []
|
||||
for item in items:
|
||||
bucket.append(item)
|
||||
if len(bucket) >= size:
|
||||
yield bucket
|
||||
bucket = []
|
||||
if bucket:
|
||||
yield bucket
|
||||
|
||||
|
||||
FetchFn = Callable[[Path], bool]
|
||||
"""Hydrate one placeholder. Returns ``True`` on success, ``False`` otherwise."""
|
||||
|
||||
|
||||
def run_eager_hydrate(
|
||||
cache_root: Path,
|
||||
*,
|
||||
fetch_fn: FetchFn,
|
||||
allowed_basenames: Iterable[str] = DEFAULT_EAGER_HYDRATE_BASENAMES,
|
||||
batch_size: int = DEFAULT_BATCH_SIZE,
|
||||
batch_sleep_s: float = DEFAULT_BATCH_SLEEP_S,
|
||||
sleep_fn: Optional[Callable[[float], None]] = None,
|
||||
) -> EagerHydrateSummary:
|
||||
"""Drive one hydrate pass over placeholders under ``cache_root``.
|
||||
|
||||
The driver is deliberately dumb: no retries, no per-file concurrency,
|
||||
no global state. Failures are counted but do not abort the pass — the
|
||||
next placeholder still gets its chance.
|
||||
|
||||
Args:
|
||||
cache_root: Local cache root to walk.
|
||||
fetch_fn: Callable invoked for each placeholder. Return ``True`` on
|
||||
successful hydration. Must not raise; failures should be encoded
|
||||
as ``False`` so the pass can continue.
|
||||
allowed_basenames: Override for the default allow-list.
|
||||
batch_size: Placeholders per batch before pausing.
|
||||
batch_sleep_s: Pause between batches, in seconds.
|
||||
sleep_fn: Injection point for tests; defaults to :func:`time.sleep`.
|
||||
|
||||
Returns:
|
||||
An :class:`EagerHydrateSummary` with per-outcome counts.
|
||||
"""
|
||||
sleeper = sleep_fn if sleep_fn is not None else time.sleep
|
||||
hydrated = 0
|
||||
skipped_existing = 0
|
||||
failed = 0
|
||||
|
||||
placeholders = find_placeholder_candidates(cache_root, allowed_basenames)
|
||||
for batch_index, batch in enumerate(batched(placeholders, batch_size)):
|
||||
if batch_index > 0 and batch_sleep_s > 0:
|
||||
sleeper(batch_sleep_s)
|
||||
for path in batch:
|
||||
# Re-check size right before fetching: a different code path
|
||||
# (``SessionsOnDemandFetchListener`` / sidebar hydrate) may have
|
||||
# filled the placeholder while we were iterating.
|
||||
if not _is_placeholder(path):
|
||||
skipped_existing += 1
|
||||
continue
|
||||
try:
|
||||
ok = bool(fetch_fn(path))
|
||||
except Exception:
|
||||
ok = False
|
||||
if ok:
|
||||
hydrated += 1
|
||||
else:
|
||||
failed += 1
|
||||
|
||||
return EagerHydrateSummary(
|
||||
hydrated=hydrated,
|
||||
skipped_existing=skipped_existing,
|
||||
failed=failed,
|
||||
)
|
||||
|
||||
|
||||
def normalize_eager_hydrate_basenames(
|
||||
raw: object,
|
||||
default: Tuple[str, ...] = DEFAULT_EAGER_HYDRATE_BASENAMES,
|
||||
|
||||
@@ -1,19 +1,24 @@
|
||||
"""Unit tests for :mod:`sessions.eager_hydrate`."""
|
||||
"""Unit tests for :mod:`sessions.eager_hydrate`.
|
||||
|
||||
Driver tests (``run_eager_hydrate``, ``batched``, ``EagerHydrateSummary``)
|
||||
were dropped at PR-B / PR 17 — the apply pass body now runs entirely in
|
||||
``sessions_native::eager_hydrate::run_apply_pass`` and is exercised by
|
||||
the Rust unit tests + integration smoke against
|
||||
``sessions_eager_hydrate_apply``. The Python side keeps the candidate
|
||||
discovery wrapper + settings normaliser, which are still tested below.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from typing import List, Tuple
|
||||
from typing import Tuple
|
||||
|
||||
import pytest
|
||||
from sessions.eager_hydrate import (
|
||||
DEFAULT_BATCH_SIZE,
|
||||
DEFAULT_EAGER_HYDRATE_BASENAMES,
|
||||
EagerHydrateSummary,
|
||||
batched,
|
||||
find_placeholder_candidates,
|
||||
normalize_eager_hydrate_basenames,
|
||||
run_eager_hydrate,
|
||||
)
|
||||
|
||||
|
||||
@@ -67,169 +72,6 @@ def test_find_placeholder_candidates_returns_empty_when_allow_list_empty(
|
||||
assert out == []
|
||||
|
||||
|
||||
def test_batched_yields_in_order_and_respects_size() -> None:
|
||||
items = [Path("a"), Path("b"), Path("c"), Path("d"), Path("e")]
|
||||
batches = list(batched(items, 2))
|
||||
assert batches == [
|
||||
[Path("a"), Path("b")],
|
||||
[Path("c"), Path("d")],
|
||||
[Path("e")],
|
||||
]
|
||||
|
||||
|
||||
def test_batched_collapses_nonpositive_size_to_one() -> None:
|
||||
items = [Path("a"), Path("b")]
|
||||
assert list(batched(items, 0)) == [[Path("a")], [Path("b")]]
|
||||
assert list(batched(items, -5)) == [[Path("a")], [Path("b")]]
|
||||
|
||||
|
||||
def test_run_eager_hydrate_fetches_all_placeholders(tmp_path: Path) -> None:
|
||||
_make_placeholder(tmp_path / "Cargo.toml")
|
||||
_make_placeholder(tmp_path / "sub" / "Cargo.lock")
|
||||
calls: List[Path] = []
|
||||
|
||||
def fetch_fn(path: Path) -> bool:
|
||||
calls.append(path)
|
||||
path.write_bytes(b"content")
|
||||
return True
|
||||
|
||||
summary = run_eager_hydrate(
|
||||
tmp_path,
|
||||
fetch_fn=fetch_fn,
|
||||
allowed_basenames=("Cargo.toml", "Cargo.lock"),
|
||||
sleep_fn=lambda _s: None,
|
||||
)
|
||||
|
||||
assert summary == EagerHydrateSummary(hydrated=2, skipped_existing=0, failed=0)
|
||||
assert sorted(calls) == sorted(
|
||||
[tmp_path / "Cargo.toml", tmp_path / "sub" / "Cargo.lock"]
|
||||
)
|
||||
|
||||
|
||||
def test_run_eager_hydrate_counts_failures_without_aborting(tmp_path: Path) -> None:
|
||||
good = tmp_path / "Cargo.toml"
|
||||
bad = tmp_path / "pyproject.toml"
|
||||
_make_placeholder(good)
|
||||
_make_placeholder(bad)
|
||||
|
||||
def fetch_fn(path: Path) -> bool:
|
||||
if path == bad:
|
||||
return False
|
||||
path.write_bytes(b"ok")
|
||||
return True
|
||||
|
||||
summary = run_eager_hydrate(
|
||||
tmp_path,
|
||||
fetch_fn=fetch_fn,
|
||||
allowed_basenames=("Cargo.toml", "pyproject.toml"),
|
||||
sleep_fn=lambda _s: None,
|
||||
)
|
||||
|
||||
assert summary.hydrated == 1
|
||||
assert summary.failed == 1
|
||||
assert summary.skipped_existing == 0
|
||||
|
||||
|
||||
def test_run_eager_hydrate_counts_raising_fetch_as_failure(tmp_path: Path) -> None:
|
||||
_make_placeholder(tmp_path / "Cargo.toml")
|
||||
|
||||
def fetch_fn(_path: Path) -> bool:
|
||||
raise RuntimeError("boom")
|
||||
|
||||
summary = run_eager_hydrate(
|
||||
tmp_path,
|
||||
fetch_fn=fetch_fn,
|
||||
allowed_basenames=("Cargo.toml",),
|
||||
sleep_fn=lambda _s: None,
|
||||
)
|
||||
|
||||
assert summary == EagerHydrateSummary(hydrated=0, skipped_existing=0, failed=1)
|
||||
|
||||
|
||||
def test_run_eager_hydrate_skips_when_placeholder_already_filled(
|
||||
tmp_path: Path,
|
||||
) -> None:
|
||||
# Two placeholders at enumeration time; while hydrating one, a concurrent
|
||||
# code path fills the other. Recheck inside ``run_eager_hydrate`` must
|
||||
# treat the now-non-empty peer as ``skipped_existing`` rather than
|
||||
# failing or re-fetching.
|
||||
first = tmp_path / "a" / "Cargo.toml"
|
||||
second = tmp_path / "b" / "Cargo.toml"
|
||||
_make_placeholder(first)
|
||||
_make_placeholder(second)
|
||||
|
||||
def fetch_fn(path: Path) -> bool:
|
||||
# Whichever placeholder runs first, clobber its sibling so the
|
||||
# sibling's recheck trips the ``skipped_existing`` branch regardless
|
||||
# of filesystem ordering.
|
||||
peer = second if path == first else first
|
||||
path.write_bytes(b"fetched body")
|
||||
peer.write_bytes(b"concurrent body")
|
||||
return True
|
||||
|
||||
# Batch size 8 forces both placeholders into one batch, so enumeration
|
||||
# completes before any fetch runs.
|
||||
summary = run_eager_hydrate(
|
||||
tmp_path,
|
||||
fetch_fn=fetch_fn,
|
||||
allowed_basenames=("Cargo.toml",),
|
||||
batch_size=8,
|
||||
batch_sleep_s=0,
|
||||
sleep_fn=lambda _s: None,
|
||||
)
|
||||
|
||||
assert summary.hydrated == 1
|
||||
assert summary.skipped_existing == 1
|
||||
assert summary.failed == 0
|
||||
|
||||
|
||||
def test_run_eager_hydrate_sleeps_between_batches_but_not_before_first(
|
||||
tmp_path: Path,
|
||||
) -> None:
|
||||
for i in range(5):
|
||||
_make_placeholder(tmp_path / "pkg{}".format(i) / "Cargo.toml")
|
||||
|
||||
sleeps: List[float] = []
|
||||
|
||||
def fetch_fn(path: Path) -> bool:
|
||||
path.write_bytes(b"x")
|
||||
return True
|
||||
|
||||
summary = run_eager_hydrate(
|
||||
tmp_path,
|
||||
fetch_fn=fetch_fn,
|
||||
allowed_basenames=("Cargo.toml",),
|
||||
batch_size=2,
|
||||
batch_sleep_s=0.123,
|
||||
sleep_fn=lambda s: sleeps.append(s),
|
||||
)
|
||||
|
||||
assert summary.hydrated == 5
|
||||
# 5 items in batches of 2 => batches [2, 2, 1]; sleep fires before
|
||||
# batches 2 and 3, i.e. twice.
|
||||
assert sleeps == [0.123, 0.123]
|
||||
|
||||
|
||||
def test_run_eager_hydrate_skips_sleep_when_interval_zero(tmp_path: Path) -> None:
|
||||
for i in range(3):
|
||||
_make_placeholder(tmp_path / "pkg{}".format(i) / "Cargo.toml")
|
||||
sleeps: List[float] = []
|
||||
|
||||
def fetch_fn(path: Path) -> bool:
|
||||
path.write_bytes(b"x")
|
||||
return True
|
||||
|
||||
run_eager_hydrate(
|
||||
tmp_path,
|
||||
fetch_fn=fetch_fn,
|
||||
allowed_basenames=("Cargo.toml",),
|
||||
batch_size=1,
|
||||
batch_sleep_s=0.0,
|
||||
sleep_fn=lambda s: sleeps.append(s),
|
||||
)
|
||||
assert sleeps == []
|
||||
|
||||
|
||||
def test_default_batch_size_is_capped_low_enough_for_edr() -> None:
|
||||
# Documented batch size is 20 per spec; guard against silent bumps.
|
||||
assert DEFAULT_BATCH_SIZE == 20
|
||||
|
||||
@@ -1,58 +1,28 @@
|
||||
"""Parity baseline for ``eager_hydrate`` BFS + batching + sleep pacing.
|
||||
"""Parity baseline for ``eager_hydrate`` BFS + apply pass.
|
||||
|
||||
Wave 1.5 amend §D paired parity test PR — PR 14 (envelope land 후 BFS Rust
|
||||
이관, ``local_bridge::remote_cache_mirror`` 통합) 의 baseline. 기존
|
||||
``test_eager_hydrate.py`` 14 시나리오를 보존하면서 +12 추가:
|
||||
- batched edge cases (empty / exact / single).
|
||||
- find_placeholder_candidates 추가 boundary (size>0 ignored, basename
|
||||
Wave 1.5 amend §D paired parity test — PR 14 (BFS Rust 이관) +
|
||||
PR-B / PR 17 (apply pass body Rust 이관) baseline. After PR-B the
|
||||
batched/run_eager_hydrate driver lives entirely in
|
||||
``sessions_native::eager_hydrate::run_apply_pass`` (Rust unit-tested
|
||||
side); the Python parity baseline now pins:
|
||||
- ``find_placeholder_candidates`` boundary (size>0 ignored, basename
|
||||
case-sensitivity, nested traversal, cache_root is file).
|
||||
- run_eager_hydrate 호출 순서 / fetch_fn 인자 검증 / batch boundary.
|
||||
- normalize_eager_hydrate_basenames edge cases.
|
||||
- ``normalize_eager_hydrate_basenames`` edge cases.
|
||||
- Default constants invariants used by Python wrappers.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
from sessions.eager_hydrate import (
|
||||
DEFAULT_BATCH_SIZE,
|
||||
DEFAULT_BATCH_SLEEP_S,
|
||||
DEFAULT_EAGER_HYDRATE_BASENAMES,
|
||||
EagerHydrateSummary,
|
||||
batched,
|
||||
find_placeholder_candidates,
|
||||
normalize_eager_hydrate_basenames,
|
||||
run_eager_hydrate,
|
||||
)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# batched edge cases
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_batched_empty_iterable_yields_nothing() -> None:
|
||||
assert list(batched(iter([]), 5)) == []
|
||||
|
||||
|
||||
def test_batched_single_item_yields_single_batch() -> None:
|
||||
items = [Path("/x")]
|
||||
assert list(batched(iter(items), 5)) == [[Path("/x")]]
|
||||
|
||||
|
||||
def test_batched_exact_multiple_no_trailing_partial() -> None:
|
||||
items = [Path(str(i)) for i in range(6)]
|
||||
out = list(batched(iter(items), 3))
|
||||
assert len(out) == 2
|
||||
assert all(len(b) == 3 for b in out)
|
||||
|
||||
|
||||
def test_batched_partial_trailing_batch() -> None:
|
||||
items = [Path(str(i)) for i in range(7)]
|
||||
out = list(batched(iter(items), 3))
|
||||
assert [len(b) for b in out] == [3, 3, 1]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# find_placeholder_candidates boundaries
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -94,55 +64,6 @@ def test_find_placeholder_root_is_file_not_dir(tmp_path: Path) -> None:
|
||||
assert out == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# run_eager_hydrate behaviour pinning
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_run_eager_hydrate_passes_path_to_fetch_fn(tmp_path: Path) -> None:
|
||||
target = tmp_path / "Cargo.toml"
|
||||
_touch(target, size=0)
|
||||
seen: List[Path] = []
|
||||
|
||||
def fetch(path: Path) -> bool:
|
||||
seen.append(path)
|
||||
# Simulate hydration: write content so the post-fetch check sees it.
|
||||
path.write_text("[package]\n")
|
||||
return True
|
||||
|
||||
summary = run_eager_hydrate(
|
||||
tmp_path, fetch_fn=fetch, batch_sleep_s=0.0, sleep_fn=lambda _s: None
|
||||
)
|
||||
assert seen == [target]
|
||||
assert summary.hydrated == 1
|
||||
|
||||
|
||||
def test_run_eager_hydrate_returns_zero_summary_when_no_candidates(
|
||||
tmp_path: Path,
|
||||
) -> None:
|
||||
summary = run_eager_hydrate(
|
||||
tmp_path,
|
||||
fetch_fn=lambda _p: True,
|
||||
batch_sleep_s=0.0,
|
||||
sleep_fn=lambda _s: None,
|
||||
)
|
||||
assert summary == EagerHydrateSummary(hydrated=0, skipped_existing=0, failed=0)
|
||||
|
||||
|
||||
def test_run_eager_hydrate_disabled_when_basenames_empty(tmp_path: Path) -> None:
|
||||
_touch(tmp_path / "Cargo.toml", size=0)
|
||||
seen: List[Path] = []
|
||||
summary = run_eager_hydrate(
|
||||
tmp_path,
|
||||
fetch_fn=lambda p: seen.append(p) or True,
|
||||
allowed_basenames=(),
|
||||
batch_sleep_s=0.0,
|
||||
sleep_fn=lambda _s: None,
|
||||
)
|
||||
assert seen == []
|
||||
assert summary.hydrated == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# normalize_eager_hydrate_basenames edge cases
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
Reference in New Issue
Block a user