Files
sessions/rust/crates/sessions_native/src/eager_hydrate.rs
Myeongseon Choi 927b685059 fix(eager_hydrate): dedicated thread + Rust parallelism — unblock hydrate_open_file after reconnect
Symptom (debug-trace.log post-reconnect)
----------------------------------------

Background queue grew to 8 tasks across 46s with zero `queue.dequeue`
events. ``hydrate_open_file`` (prioritize=true, fires when user opens
a file) was head-of-line-blocked behind the running ``eager_hydrate``,
so opening a remote file after reconnect did not trigger sync.

Root cause
----------

PR-B made eager_hydrate a single synchronous Rust call that loops
sequentially through every placeholder (each ``file_open`` round-trip
is ~50–500ms; with N≈100 placeholders the worker thread is occupied
for tens of seconds — minutes if the helper is loaded). The shared
``_BACKGROUND_TASK_QUEUE`` worker has no preemption, so user-facing
``hydrate_open_file`` cannot run until eager_hydrate finishes.

Fix 1 — dedicated thread per cache_key (Python)
-----------------------------------------------

* ``_schedule_eager_hydrate_if_needed`` now runs the pass on its own
  daemon thread, not via ``_run_in_background``. The shared background
  worker is freed for ``hydrate_open_file`` / ``open_file_refresh_*`` /
  ``sessions.refresh_git_state``.
* Per-key in-flight set ``_EAGER_HYDRATE_INFLIGHT`` preserves the
  dedupe-by-cache_key semantics the old ``task_key`` provided. Same
  cache_key triggered twice while the first pass is running emits a
  ``mirror.eager_hydrate_skip_inflight`` trace and returns.
* Lint #2 stays satisfied — no new ``_*_TASK_QUEUE = deque()`` is
  introduced; the new lane is a per-key set + dedicated thread.

Fix 2 — N-way parallelism inside Rust apply pass
------------------------------------------------

* ``run_apply_pass`` accepts a ``parallelism`` parameter. Per batch,
  spawns up to ``parallelism`` workers via ``thread::scope`` that pull
  placeholders from a shared work queue and call
  ``file_open::run_file_open_transaction`` concurrently. The broker
  multiplexes by envelope id, so concurrent file/read is safe.
* Per-placeholder logic factored into ``process_placeholder`` (atomic
  counters for skipped/failed, mutex-guarded ``Vec<Value>`` for
  hydrated entries — no dirty-read hazard).
* ``parallelism = 1`` retains the strictly sequential PR-B behaviour
  for tests / single-thread debugging; tiny batches take a fast path
  that avoids scope/Mutex overhead.
* Default from ``commands.py``: ``parallelism=8``. Cuts the wall-clock
  of a 50-placeholder pass roughly linearly until per-placeholder
  latency becomes helper-bound rather than round-trip-bound.

Fix 3 — tighten per-placeholder timeout
---------------------------------------

* ``timeout_ms`` for eager_hydrate file_opens drops from 30 s to 10 s.
  Eager hydrate is best-effort; placeholders that miss a pass simply
  re-run on the next sync. The smaller cap stops a stuck helper from
  blocking the dedicated thread for minutes.

Tests
-----

1298 Python tests pass, 89 Rust unit tests pass, ``cargo clippy
--workspace -- -D warnings`` green, boundary lint clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 01:31:07 +09:00

368 lines
12 KiB
Rust

//! Eager-hydrate placeholder discovery (Wave 2 PR 14) + apply pass body
//! (Wave 2 PR 17 / PR-B).
//!
//! Walks a local cache root and yields zero-byte regular files whose basename
//! is in an allow-list. Mirrors the Python ``find_placeholder_candidates``
//! contract pinned by ``test_eager_hydrate_parity``:
//!
//! - Symbolic links never followed (Sessions cache has no symlinks; the
//! guard is cheap and matches Python's ``Path.is_file`` after stat).
//! - ``__extern`` subtree is skipped (external/out-of-workspace cache).
//! - Directories that fail to enumerate are silently skipped (partial
//! cache → produces what candidates it can).
//! - Empty allow-list returns no candidates.
//!
//! PR-B (apply pass body) extends the Rust ownership: the loop, batch
//! pacing, per-placeholder ``file_open`` transaction, and outcome
//! collection all run in Rust. Python becomes a thin caller — one FFI
//! round-trip per pass, then writes sidecar metadata for hydrated entries.
use std::collections::HashSet;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use std::time::Duration;
use serde_json::{Value, json};
use crate::file_open;
use crate::map_local_to_remote_path;
/// Return zero-byte regular files under `cache_root` whose basename is in
/// `allowed_basenames`. Order is BFS-stable but not lexicographic.
///
/// Both arguments are passed as owned `String`s to keep the C ABI surface
/// tight (see `lib.rs::sessions_eager_hydrate_find_candidates`). When
/// `allowed_basenames` is empty an empty Vec is returned without walking the
/// tree.
pub fn find_placeholder_candidates(
cache_root: &Path,
allowed_basenames: &[String],
) -> Vec<PathBuf> {
let allowed: HashSet<&str> = allowed_basenames.iter().map(String::as_str).collect();
if allowed.is_empty() {
return Vec::new();
}
if !cache_root.is_dir() {
return Vec::new();
}
let mut out: Vec<PathBuf> = Vec::new();
let mut stack: Vec<PathBuf> = vec![cache_root.to_path_buf()];
while let Some(current) = stack.pop() {
let entries = match fs::read_dir(&current) {
Ok(it) => it,
Err(_) => continue,
};
for entry in entries.flatten() {
let path = entry.path();
let file_type = match entry.file_type() {
Ok(ft) => ft,
Err(_) => continue,
};
if file_type.is_dir() {
let name_owned = match path.file_name() {
Some(name) => name.to_string_lossy().into_owned(),
None => continue,
};
if name_owned == "__extern" {
continue;
}
stack.push(path);
continue;
}
if !file_type.is_file() {
// Symlinks / sockets / devices — Sessions cache should never
// hold these; mirror Python's ``Path.is_file`` skip.
continue;
}
let name = match path.file_name().and_then(|n| n.to_str()) {
Some(n) => n,
None => continue,
};
if !allowed.contains(name) {
continue;
}
// Zero-byte filter — Python does ``stat.st_size != 0`` skip.
let metadata = match entry.metadata() {
Ok(m) => m,
Err(_) => continue,
};
if metadata.len() != 0 {
continue;
}
out.push(path);
}
}
out
}
/// Drive one eager-hydrate apply pass over placeholders under
/// ``cache_root``. Returns a JSON object summarising the pass:
///
/// ```json
/// {
/// "hydrated": [{"local_path": "...", "metadata": {...}}, ...],
/// "skipped_existing": N,
/// "failed": M
/// }
/// ```
///
/// Re-checks zero-byte before fetch (so a concurrent path filling the
/// placeholder lands in ``skipped_existing`` rather than re-fetched),
/// counts failures without aborting, and pauses ``batch_sleep_ms``
/// between batches.
///
/// Per-batch, runs up to ``parallelism`` ``file_open`` transactions
/// concurrently (the broker session multiplexes by envelope id, so
/// concurrent file/read requests are safe). ``parallelism = 1``
/// preserves the strictly sequential PR-B behaviour. Setting it
/// higher cuts the wall-clock of a 50-placeholder pass roughly
/// linearly until per-placeholder latency becomes helper-bound rather
/// than round-trip-bound.
#[allow(clippy::too_many_arguments)]
pub fn run_apply_pass(
cache_root: &Path,
host_alias: &str,
remote_workspace_root: &str,
allowed_basenames: &[String],
batch_size: usize,
batch_sleep_ms: u64,
max_open_bytes: u64,
binary_probe_bytes: usize,
allow_empty: bool,
timeout_ms: u64,
parallelism: usize,
) -> Value {
let placeholders = find_placeholder_candidates(cache_root, allowed_basenames);
let hydrated: Mutex<Vec<Value>> = Mutex::new(Vec::new());
let skipped_existing = AtomicUsize::new(0);
let failed = AtomicUsize::new(0);
let batch_size_safe = if batch_size == 0 { 1 } else { batch_size };
let parallelism_safe = parallelism.max(1);
for (batch_index, batch) in placeholders.chunks(batch_size_safe).enumerate() {
if batch_index > 0 && batch_sleep_ms > 0 {
thread::sleep(Duration::from_millis(batch_sleep_ms));
}
let workers = parallelism_safe.min(batch.len()).max(1);
if workers <= 1 {
// Fast path — avoid scope/Mutex overhead for tiny batches.
for path in batch {
process_placeholder(
path,
host_alias,
remote_workspace_root,
cache_root,
max_open_bytes,
binary_probe_bytes,
allow_empty,
timeout_ms,
&hydrated,
&skipped_existing,
&failed,
);
}
continue;
}
let work_queue: Mutex<Vec<&PathBuf>> = Mutex::new(batch.iter().collect());
thread::scope(|s| {
for _ in 0..workers {
let work_queue_ref = &work_queue;
let hydrated_ref = &hydrated;
let skipped_ref = &skipped_existing;
let failed_ref = &failed;
s.spawn(move || {
loop {
let next = match work_queue_ref.lock() {
Ok(mut q) => q.pop(),
Err(_) => break,
};
let Some(path) = next else { break };
process_placeholder(
path,
host_alias,
remote_workspace_root,
cache_root,
max_open_bytes,
binary_probe_bytes,
allow_empty,
timeout_ms,
hydrated_ref,
skipped_ref,
failed_ref,
);
}
});
}
});
}
let hydrated_vec = hydrated.into_inner().unwrap_or_default();
json!({
"hydrated": hydrated_vec,
"skipped_existing": skipped_existing.into_inner(),
"failed": failed.into_inner(),
})
}
#[allow(clippy::too_many_arguments)]
fn process_placeholder(
path: &Path,
host_alias: &str,
remote_workspace_root: &str,
cache_root: &Path,
max_open_bytes: u64,
binary_probe_bytes: usize,
allow_empty: bool,
timeout_ms: u64,
hydrated: &Mutex<Vec<Value>>,
skipped_existing: &AtomicUsize,
failed: &AtomicUsize,
) {
// Re-check zero-byte: a concurrent path (sidebar hydrate /
// on-demand fetch) may have filled the placeholder while we
// were iterating. Mirror Python's pre-fetch guard.
let still_placeholder = match path.metadata() {
Ok(m) => m.is_file() && m.len() == 0,
Err(_) => false,
};
if !still_placeholder {
skipped_existing.fetch_add(1, Ordering::Relaxed);
return;
}
let remote = match map_local_to_remote_path(remote_workspace_root, cache_root, path) {
Some(r) => r,
None => {
failed.fetch_add(1, Ordering::Relaxed);
return;
}
};
let outcome = file_open::run_file_open_transaction(
host_alias,
&remote,
path,
max_open_bytes,
binary_probe_bytes,
allow_empty,
timeout_ms,
);
let outcome_str = outcome.get("outcome").and_then(Value::as_str).unwrap_or("");
if outcome_str == "OK" {
let metadata = outcome.get("metadata").cloned().unwrap_or(Value::Null);
let entry = json!({
"local_path": path.to_string_lossy(),
"metadata": metadata,
});
if let Ok(mut h) = hydrated.lock() {
h.push(entry);
}
} else {
failed.fetch_add(1, Ordering::Relaxed);
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs::File;
use std::io::Write;
fn touch(path: &Path, size: usize) -> std::io::Result<()> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
let mut f = File::create(path)?;
if size > 0 {
f.write_all(&vec![b'x'; size])?;
}
Ok(())
}
fn names_only(paths: &[PathBuf]) -> Vec<String> {
let mut names: Vec<String> = paths
.iter()
.filter_map(|p| p.file_name().map(|n| n.to_string_lossy().into_owned()))
.collect();
names.sort();
names
}
type TestResult = Result<(), Box<dyn std::error::Error>>;
#[test]
fn empty_allowlist_yields_nothing() -> TestResult {
let temp = tempfile::tempdir()?;
touch(&temp.path().join("Cargo.toml"), 0)?;
let result = find_placeholder_candidates(temp.path(), &[]);
assert!(result.is_empty());
Ok(())
}
#[test]
fn root_is_file_not_dir_yields_nothing() -> TestResult {
let temp = tempfile::tempdir()?;
let root_file = temp.path().join("root_is_file");
touch(&root_file, 4)?;
let result = find_placeholder_candidates(&root_file, &["Cargo.toml".to_string()]);
assert!(result.is_empty());
Ok(())
}
#[test]
fn skips_nonzero_size_files() -> TestResult {
let temp = tempfile::tempdir()?;
touch(&temp.path().join("Cargo.toml"), 1)?;
touch(&temp.path().join("pyproject.toml"), 0)?;
let result = find_placeholder_candidates(
temp.path(),
&["Cargo.toml".to_string(), "pyproject.toml".to_string()],
);
assert_eq!(names_only(&result), vec!["pyproject.toml".to_string()]);
Ok(())
}
#[test]
fn basename_match_is_case_sensitive() -> TestResult {
let temp = tempfile::tempdir()?;
touch(&temp.path().join("cargo.toml"), 0)?;
touch(&temp.path().join("Cargo.toml"), 0)?;
let result = find_placeholder_candidates(temp.path(), &["Cargo.toml".to_string()]);
assert_eq!(names_only(&result), vec!["Cargo.toml".to_string()]);
Ok(())
}
#[test]
fn skips_extern_subtree() -> TestResult {
let temp = tempfile::tempdir()?;
touch(&temp.path().join("__extern").join("Cargo.toml"), 0)?;
touch(&temp.path().join("ok").join("Cargo.toml"), 0)?;
let result = find_placeholder_candidates(temp.path(), &["Cargo.toml".to_string()]);
assert_eq!(result.len(), 1);
assert!(result[0].to_string_lossy().contains("/ok/"));
Ok(())
}
#[test]
fn nested_directories_are_traversed() -> TestResult {
let temp = tempfile::tempdir()?;
touch(&temp.path().join("a/b/c/Cargo.toml"), 0)?;
touch(&temp.path().join("a/b/package.json"), 0)?;
let result = find_placeholder_candidates(
temp.path(),
&["Cargo.toml".to_string(), "package.json".to_string()],
);
assert_eq!(
names_only(&result),
vec!["Cargo.toml".to_string(), "package.json".to_string()],
);
Ok(())
}
}