Compare commits

..

2 Commits

Author SHA1 Message Date
1fbfa8010b chore(release): v0.7.28 — eager_hydrate hot-fix (separate lane + Rust parallelism)
All checks were successful
boundary-lint / PR boundary-claim (Lint (push) Has been skipped
boundary-lint / duplication-deadline (Layer 1/2) (push) Successful in 19s
boundary-lint / ban-list lint (Lint (push) Successful in 20s
ci / test-health gate (push) Successful in 17s
ci / mutation test (broker) (push) Has been skipped
ci / rust release (push) Successful in 2m47s
ci / rust debug (push) Successful in 2m58s
ci / python (push) Successful in 1m30s
Release Publish (Gitea session_helper) / verify-release-tag (push) Successful in 16s
Release Publish (Gitea session_helper) / publish-linux-x86_64 (push) Successful in 3m46s
Hot-fix on top of v0.7.27 (PR-B): the eager-hydrate apply pass head-of-
line-blocked ``hydrate_open_file`` after reconnect, so user file opens
did not trigger sync until the (long) hydrate pass completed. Triple
fix:

* eager_hydrate now runs on a dedicated daemon thread per ``cache_key``;
  the shared background worker is freed for user-facing tasks.
* Rust apply pass spawns 8 ``file_open`` transactions concurrently per
  batch (broker multiplexes by envelope id, safe). Per-placeholder
  latency falls roughly linearly with parallelism.
* Per-placeholder ``timeout_ms`` 30 s → 10 s caps the cost of a stuck
  helper.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 01:32:26 +09:00
927b685059 fix(eager_hydrate): dedicated thread + Rust parallelism — unblock hydrate_open_file after reconnect
Symptom (debug-trace.log post-reconnect)
----------------------------------------

Background queue grew to 8 tasks across 46s with zero `queue.dequeue`
events. ``hydrate_open_file`` (prioritize=true, fires when user opens
a file) was head-of-line-blocked behind the running ``eager_hydrate``,
so opening a remote file after reconnect did not trigger sync.

Root cause
----------

PR-B made eager_hydrate a single synchronous Rust call that loops
sequentially through every placeholder (each ``file_open`` round-trip
is ~50–500ms; with N≈100 placeholders the worker thread is occupied
for tens of seconds — minutes if the helper is loaded). The shared
``_BACKGROUND_TASK_QUEUE`` worker has no preemption, so user-facing
``hydrate_open_file`` cannot run until eager_hydrate finishes.

Fix 1 — dedicated thread per cache_key (Python)
-----------------------------------------------

* ``_schedule_eager_hydrate_if_needed`` now runs the pass on its own
  daemon thread, not via ``_run_in_background``. The shared background
  worker is freed for ``hydrate_open_file`` / ``open_file_refresh_*`` /
  ``sessions.refresh_git_state``.
* Per-key in-flight set ``_EAGER_HYDRATE_INFLIGHT`` preserves the
  dedupe-by-cache_key semantics the old ``task_key`` provided. Same
  cache_key triggered twice while the first pass is running emits a
  ``mirror.eager_hydrate_skip_inflight`` trace and returns.
* Lint #2 stays satisfied — no new ``_*_TASK_QUEUE = deque()`` is
  introduced; the new lane is a per-key set + dedicated thread.

Fix 2 — N-way parallelism inside Rust apply pass
------------------------------------------------

* ``run_apply_pass`` accepts a ``parallelism`` parameter. Per batch,
  spawns up to ``parallelism`` workers via ``thread::scope`` that pull
  placeholders from a shared work queue and call
  ``file_open::run_file_open_transaction`` concurrently. The broker
  multiplexes by envelope id, so concurrent file/read is safe.
* Per-placeholder logic factored into ``process_placeholder`` (atomic
  counters for skipped/failed, mutex-guarded ``Vec<Value>`` for
  hydrated entries — no dirty-read hazard).
* ``parallelism = 1`` retains the strictly sequential PR-B behaviour
  for tests / single-thread debugging; tiny batches take a fast path
  that avoids scope/Mutex overhead.
* Default from ``commands.py``: ``parallelism=8``. Cuts the wall-clock
  of a 50-placeholder pass roughly linearly until per-placeholder
  latency becomes helper-bound rather than round-trip-bound.

Fix 3 — tighten per-placeholder timeout
---------------------------------------

* ``timeout_ms`` for eager_hydrate file_opens drops from 30 s to 10 s.
  Eager hydrate is best-effort; placeholders that miss a pass simply
  re-run on the next sync. The smaller cap stops a stuck helper from
  blocking the dedicated thread for minutes.

Tests
-----

1298 Python tests pass, 89 Rust unit tests pass, ``cargo clippy
--workspace -- -D warnings`` green, boundary lint clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 01:31:07 +09:00
8 changed files with 200 additions and 81 deletions

View File

@@ -1,6 +1,6 @@
[project]
name = "sessions-sublime"
version = "0.7.27"
version = "0.7.28"
description = "Sublime-facing Python code for Sessions."
requires-python = ">=3.8"
license = {text = "MIT"}

12
rust/Cargo.lock generated
View File

@@ -221,7 +221,7 @@ checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53"
[[package]]
name = "local_bridge"
version = "0.7.27"
version = "0.7.28"
dependencies = [
"base64",
"glob",
@@ -432,7 +432,7 @@ dependencies = [
[[package]]
name = "session_helper"
version = "0.7.27"
version = "0.7.28"
dependencies = [
"base64",
"notify",
@@ -443,7 +443,7 @@ dependencies = [
[[package]]
name = "session_protocol"
version = "0.7.27"
version = "0.7.28"
dependencies = [
"base64",
"serde",
@@ -452,14 +452,14 @@ dependencies = [
[[package]]
name = "sessions_askpass"
version = "0.7.27"
version = "0.7.28"
dependencies = [
"tempfile",
]
[[package]]
name = "sessions_native"
version = "0.7.27"
version = "0.7.28"
dependencies = [
"base64",
"serde_json",
@@ -772,7 +772,7 @@ dependencies = [
[[package]]
name = "workspace_identity"
version = "0.7.27"
version = "0.7.28"
[[package]]
name = "zmij"

View File

@@ -12,7 +12,7 @@ resolver = "2"
[workspace.package]
edition = "2024"
license = "MIT"
version = "0.7.27"
version = "0.7.28"
authors = ["Myeongseon Choi <key262yek@gmail.com>"]
repository = "https://git.teahaven.kr/sublime-rs/sessions"
homepage = "https://git.teahaven.kr/sublime-rs/sessions"

View File

@@ -20,6 +20,8 @@
use std::collections::HashSet;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use std::time::Duration;
@@ -109,15 +111,18 @@ pub fn find_placeholder_candidates(
/// }
/// ```
///
/// The driver mirrors the previous Python ``run_eager_hydrate``:
/// re-checks zero-byte before fetch (so a concurrent path filling the
/// Re-checks zero-byte before fetch (so a concurrent path filling the
/// placeholder lands in ``skipped_existing`` rather than re-fetched),
/// counts failures without aborting, and pauses ``batch_sleep_ms``
/// between batches.
///
/// Per-placeholder, calls :func:`file_open::run_file_open_transaction`
/// (PR 14.5c). The Python wrapper (PR 14.5d) is bypassed so we get one
/// FFI round-trip per pass instead of one per file.
/// Per-batch, runs up to ``parallelism`` ``file_open`` transactions
/// concurrently (the broker session multiplexes by envelope id, so
/// concurrent file/read requests are safe). ``parallelism = 1``
/// preserves the strictly sequential PR-B behaviour. Setting it
/// higher cuts the wall-clock of a 50-placeholder pass roughly
/// linearly until per-placeholder latency becomes helper-bound rather
/// than round-trip-bound.
#[allow(clippy::too_many_arguments)]
pub fn run_apply_pass(
cache_root: &Path,
@@ -130,68 +135,139 @@ pub fn run_apply_pass(
binary_probe_bytes: usize,
allow_empty: bool,
timeout_ms: u64,
parallelism: usize,
) -> Value {
let placeholders = find_placeholder_candidates(cache_root, allowed_basenames);
let mut hydrated: Vec<Value> = Vec::new();
let mut skipped_existing: usize = 0;
let mut failed: usize = 0;
let hydrated: Mutex<Vec<Value>> = Mutex::new(Vec::new());
let skipped_existing = AtomicUsize::new(0);
let failed = AtomicUsize::new(0);
let batch_size_safe = if batch_size == 0 { 1 } else { batch_size };
let parallelism_safe = parallelism.max(1);
for (batch_index, batch) in placeholders.chunks(batch_size_safe).enumerate() {
if batch_index > 0 && batch_sleep_ms > 0 {
thread::sleep(Duration::from_millis(batch_sleep_ms));
}
for path in batch {
// Re-check zero-byte: a concurrent path (sidebar hydrate /
// on-demand fetch) may have filled the placeholder while we
// were iterating. Mirror Python's pre-fetch guard.
let still_placeholder = match path.metadata() {
Ok(m) => m.is_file() && m.len() == 0,
Err(_) => false,
};
if !still_placeholder {
skipped_existing += 1;
continue;
}
let remote = match map_local_to_remote_path(remote_workspace_root, cache_root, path) {
Some(r) => r,
None => {
failed += 1;
continue;
}
};
let outcome = file_open::run_file_open_transaction(
host_alias,
&remote,
path,
max_open_bytes,
binary_probe_bytes,
allow_empty,
timeout_ms,
);
let outcome_str = outcome.get("outcome").and_then(Value::as_str).unwrap_or("");
if outcome_str == "OK" {
let metadata = outcome.get("metadata").cloned().unwrap_or(Value::Null);
hydrated.push(json!({
"local_path": path.to_string_lossy(),
"metadata": metadata,
}));
} else {
failed += 1;
let workers = parallelism_safe.min(batch.len()).max(1);
if workers <= 1 {
// Fast path — avoid scope/Mutex overhead for tiny batches.
for path in batch {
process_placeholder(
path,
host_alias,
remote_workspace_root,
cache_root,
max_open_bytes,
binary_probe_bytes,
allow_empty,
timeout_ms,
&hydrated,
&skipped_existing,
&failed,
);
}
continue;
}
let work_queue: Mutex<Vec<&PathBuf>> = Mutex::new(batch.iter().collect());
thread::scope(|s| {
for _ in 0..workers {
let work_queue_ref = &work_queue;
let hydrated_ref = &hydrated;
let skipped_ref = &skipped_existing;
let failed_ref = &failed;
s.spawn(move || {
loop {
let next = match work_queue_ref.lock() {
Ok(mut q) => q.pop(),
Err(_) => break,
};
let Some(path) = next else { break };
process_placeholder(
path,
host_alias,
remote_workspace_root,
cache_root,
max_open_bytes,
binary_probe_bytes,
allow_empty,
timeout_ms,
hydrated_ref,
skipped_ref,
failed_ref,
);
}
});
}
});
}
let hydrated_vec = hydrated.into_inner().unwrap_or_default();
json!({
"hydrated": hydrated,
"skipped_existing": skipped_existing,
"failed": failed,
"hydrated": hydrated_vec,
"skipped_existing": skipped_existing.into_inner(),
"failed": failed.into_inner(),
})
}
#[allow(clippy::too_many_arguments)]
fn process_placeholder(
path: &Path,
host_alias: &str,
remote_workspace_root: &str,
cache_root: &Path,
max_open_bytes: u64,
binary_probe_bytes: usize,
allow_empty: bool,
timeout_ms: u64,
hydrated: &Mutex<Vec<Value>>,
skipped_existing: &AtomicUsize,
failed: &AtomicUsize,
) {
// Re-check zero-byte: a concurrent path (sidebar hydrate /
// on-demand fetch) may have filled the placeholder while we
// were iterating. Mirror Python's pre-fetch guard.
let still_placeholder = match path.metadata() {
Ok(m) => m.is_file() && m.len() == 0,
Err(_) => false,
};
if !still_placeholder {
skipped_existing.fetch_add(1, Ordering::Relaxed);
return;
}
let remote = match map_local_to_remote_path(remote_workspace_root, cache_root, path) {
Some(r) => r,
None => {
failed.fetch_add(1, Ordering::Relaxed);
return;
}
};
let outcome = file_open::run_file_open_transaction(
host_alias,
&remote,
path,
max_open_bytes,
binary_probe_bytes,
allow_empty,
timeout_ms,
);
let outcome_str = outcome.get("outcome").and_then(Value::as_str).unwrap_or("");
if outcome_str == "OK" {
let metadata = outcome.get("metadata").cloned().unwrap_or(Value::Null);
let entry = json!({
"local_path": path.to_string_lossy(),
"metadata": metadata,
});
if let Ok(mut h) = hydrated.lock() {
h.push(entry);
}
} else {
failed.fetch_add(1, Ordering::Relaxed);
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -1571,12 +1571,13 @@ pub unsafe extern "C" fn sessions_eager_hydrate_find_candidates(
write_output(out_buf, out_cap, &joined)
}
/// Run the eager-hydrate apply pass body (Wave 2 PR-B).
/// Run the eager-hydrate apply pass body (Wave 2 PR-B + PR-B.1).
///
/// One Rust round-trip drives the entire pass: find candidates →
/// per-batch sleep → re-check zero-byte → map local→remote → file_open
/// transaction → collect outcomes. Python writes sidecar metadata for
/// the returned ``hydrated`` list.
/// transaction (up to ``parallelism`` concurrent in-flight, broker
/// multiplexes by envelope id) → collect outcomes. Python writes
/// sidecar metadata for the returned ``hydrated`` list.
///
/// # Safety
/// `cache_root`, `host_alias`, `remote_workspace_root`, and
@@ -1598,6 +1599,7 @@ pub unsafe extern "C" fn sessions_eager_hydrate_apply(
binary_probe_bytes: usize,
allow_empty: c_int,
timeout_ms: u64,
parallelism: usize,
out_buf: *mut c_char,
out_cap: usize,
) -> c_int {
@@ -1636,6 +1638,7 @@ pub unsafe extern "C" fn sessions_eager_hydrate_apply(
binary_probe_bytes,
allow_empty != 0,
timeout_ms,
parallelism,
);
let Ok(serialized) = serde_json::to_string(&summary) else {
return AbiError::Serialization.code();

View File

@@ -165,13 +165,17 @@ def eager_hydrate_apply(
binary_probe_bytes: int,
allow_empty: bool,
timeout_ms: int,
parallelism: int = 1,
) -> Dict[str, Any]:
"""Drive one Rust eager-hydrate apply pass (PR-B / PR 17).
"""Drive one Rust eager-hydrate apply pass (PR-B / PR 17 + PR-B.1).
Rust owns: candidate discovery, batch loop, batch_sleep pacing,
re-check zero-byte, local→remote mapping, ``file_open`` transaction,
outcome counting. Python writes sidecar metadata for ``hydrated``
entries and emits the trace event.
outcome counting. ``parallelism`` controls how many ``file_open``
transactions Rust runs concurrently per batch (broker session
multiplexes by envelope id, so concurrent file/read is safe).
Python writes sidecar metadata for ``hydrated`` entries and emits
the trace event.
Returns a dict with keys ``hydrated`` (list of
``{"local_path": ..., "metadata": ...}``), ``skipped_existing``,
@@ -190,6 +194,7 @@ def eager_hydrate_apply(
ctypes.c_size_t(int(binary_probe_bytes)),
ctypes.c_int(1 if allow_empty else 0),
ctypes.c_uint64(int(timeout_ms)),
ctypes.c_size_t(int(max(1, parallelism))),
),
argtypes=[
ctypes.c_char_p,
@@ -202,6 +207,7 @@ def eager_hydrate_apply(
ctypes.c_size_t,
ctypes.c_int,
ctypes.c_uint64,
ctypes.c_size_t,
],
initial_buf=64 * 1024,
)

View File

@@ -295,6 +295,17 @@ _MIRROR_WORKER_STARTED = False
_BACKGROUND_QUEUE_MAX = 128
_MIRROR_QUEUE_MAX = 8
# Eager-hydrate runs on a dedicated thread per cache_key so its long
# pass (sequential file_open transactions over many placeholders) cannot
# block ``hydrate_open_file`` (a ``prioritize=True`` background task that
# fires every time the user opens a file). Lint #2 grandfathers the
# `_BACKGROUND_TASK_QUEUE` / `_MIRROR_TASK_QUEUE` deques in this module
# but explicitly bans new ones in ``commands_*.py`` split modules — we
# stay within the spirit by NOT introducing a third queue, only a per-
# key in-flight set.
_EAGER_HYDRATE_INFLIGHT: Set[str] = set()
_EAGER_HYDRATE_INFLIGHT_LOCK = threading.Lock()
def _mirror_queue_pressure(queue_size: int, dropped: int) -> str:
return _rust_ffi.mirror_queue_pressure(
@@ -3549,28 +3560,45 @@ def _schedule_eager_hydrate_if_needed(
window: object,
context: "_WorkspaceContext",
) -> None:
"""Enqueue one eager-hydrate pass for the activated workspace.
"""Run one eager-hydrate pass on a dedicated thread per ``cache_key``.
Called from both workspace activation and sync.done. The background
queue dedupes by ``task_key``, so parallel triggers collapse to one
run; each run is idempotent (already-hydrated placeholders count as
``skipped_existing``). Calling on sync.done is what catches build
manifests that only land after the deep mirror completes — activation
alone fires before those placeholders exist.
Called from both workspace activation and sync.done. A per-key
in-flight set dedupes parallel triggers (already-running pass
re-entries are dropped). Each run is idempotent already-hydrated
placeholders count as ``skipped_existing``. Running on its own
thread instead of the shared background worker means a long pass
(sequential ``file_open`` transactions over many placeholders) cannot
block ``hydrate_open_file`` tasks queued by user file opens.
"""
merged = load_sessions_settings_from_sublime()
basenames = tuple(merged.mirror_eager_hydrate_basenames)
if not basenames:
return
cache_key = context.cache_key
_run_in_background(
_eager_hydrate_workspace,
window,
context,
basenames,
task_key="eager_hydrate:{}".format(cache_key),
task_label="eager_hydrate",
)
if bool(getattr(sublime, "_sessions_test_sync", False)):
_eager_hydrate_workspace(window, context, basenames)
return
with _EAGER_HYDRATE_INFLIGHT_LOCK:
if cache_key in _EAGER_HYDRATE_INFLIGHT:
_trace_event("mirror.eager_hydrate_skip_inflight", cache_key=cache_key)
return
_EAGER_HYDRATE_INFLIGHT.add(cache_key)
def _run() -> None:
try:
_eager_hydrate_workspace(window, context, basenames)
except Exception:
_trace_event("mirror.eager_hydrate_thread_error", cache_key=cache_key)
print("[Sessions] Eager-hydrate thread failed.", file=sys.stderr)
finally:
with _EAGER_HYDRATE_INFLIGHT_LOCK:
_EAGER_HYDRATE_INFLIGHT.discard(cache_key)
threading.Thread(
target=_run,
daemon=True,
name="sessions-eager-hydrate-{}".format(cache_key),
).start()
def _eager_hydrate_workspace(
@@ -3597,7 +3625,13 @@ def _eager_hydrate_workspace(
max_open_bytes=limits.max_open_bytes,
binary_probe_bytes=limits.binary_probe_bytes,
allow_empty=limits.allow_empty_files,
timeout_ms=30_000,
# Per-placeholder timeout: 10s caps a stuck helper at a fraction
# of the prior 30s budget. Eager hydrate is best-effort —
# placeholders that miss a pass simply re-run on the next sync.
timeout_ms=10_000,
# PR-B.1: 8-way concurrency per batch saturates a healthy
# broker session without overwhelming the remote helper.
parallelism=8,
)
hydrated_entries = summary.get("hydrated", [])

2
uv.lock generated
View File

@@ -854,7 +854,7 @@ wheels = [
[[package]]
name = "sessions-sublime"
version = "0.7.26"
version = "0.7.27"
source = { virtual = "." }
[package.dev-dependencies]