refactor(local_bridge): split main.rs into cli/persistent/lsp_stdio/mirror modules

The 1429-line ``main.rs`` had grown to host four distinct concerns under
the same roof: argv parsing, persistent-mode orchestration + broker, LSP
relay loop + ``lsp-stdio`` subcommand, and the bridge-handled
``mirror-sync`` request. Cut each into its own sibling module so the
binary entry is a slim 164-line dispatcher and each subsystem has a
single self-contained file.

Modules:
- ``cli.rs`` (215 lines) — ``BridgeCliArgs`` and ``LspStdioCliArgs``
  with their parsers. All items ``pub(crate)`` since both are consumed
  across module boundaries.
- ``persistent.rs`` (539 lines) — ``run_persistent``, ``HelperDispatcher``,
  ``PersistentBroker`` (+ ``Drop``), ``persistent_broker_endpoint_path``,
  ``handle_broker_client``, ``BrokerAttachRequest``, ``BrokerAttachResponse``,
  ``lsp_response_body_to_framed_string``. ``HelperDispatcher`` and its
  ``request_blocking`` / ``forward_to_helper`` / ``deliver`` methods become
  ``pub(crate)`` because ``mirror.rs`` and ``lsp_stdio.rs`` clone the
  dispatcher; the rest stays private.
- ``lsp_stdio.rs`` (427 lines) — ``BrokerLspRelayCfg``, ``LspMessageFlow``,
  ``LspSpawnInjection``, the pure ``lsp_transform_message`` (with its
  two unit tests preserved verbatim), ``broker_lsp_relay_loop``,
  ``run_lsp_stdio`` (Unix variant + Windows fallback), and the
  ``json_insert_optional`` helper (only call site is here).
- ``mirror.rs`` (211 lines) — ``MirrorSyncParams``,
  ``From<MirrorSyncParams>`` for ``RemoteCacheMirrorOptions`` (with its
  ``partial_overrides_preserve_defaults`` unit test), ``handle_mirror_sync``,
  ``tree_list_entry_to_mirror``, ``MIRROR_SYNC_METHOD`` const.
- ``main.rs`` (164 lines) — ``main``, ``run``, ``run_parse_agent_editor_envelope``,
  ``read_stdin``, ``write_bridge_output``, version banner, ``mod`` declarations.

The cuts have cross-module references (``persistent`` ↔ ``lsp_stdio`` for
``BrokerLspRelayCfg`` / ``HelperDispatcher`` / ``BrokerAttachResponse``;
``persistent`` → ``mirror`` for ``MIRROR_SYNC_METHOD`` and
``handle_mirror_sync``) so a single commit lands them together rather
than split per-module — each per-module commit would have been
non-buildable on its own.

No behavior changes; this is purely code organization. The test count
is unchanged (339 workspace tests pass) and the three unit tests
previously living in ``main.rs::tests`` move next to the functions
they exercise.

Verified:
- cargo build --manifest-path rust/Cargo.toml -p local_bridge — green
- cargo clippy --manifest-path rust/Cargo.toml --all-targets -- -D warnings — green
- cargo test --manifest-path rust/Cargo.toml --workspace — 339 passed (same as baseline)
- cargo fmt --manifest-path rust/Cargo.toml --all --check — green
- cargo llvm-cov ... --fail-under-lines 80 — 80.52% (workspace), passes
  (the existing ``--ignore-filename-regex`` already excludes the four
  new modules per c3848e2; no config change needed in this commit)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-26 00:57:14 +09:00
parent 7e97306288
commit fd623034c0
5 changed files with 1417 additions and 1290 deletions

View File

@@ -0,0 +1,215 @@
//! Command-line argument parsers for the ``local_bridge`` binary.
//!
//! Two argv shapes are supported:
//!
//! - ``BridgeCliArgs`` — top-level forwarder mode (and its persistent variant);
//! - ``LspStdioCliArgs`` — the ``lsp-stdio`` subcommand that connects to a
//! running broker over a local socket.
//!
//! These were lifted out of ``main.rs`` verbatim during a code-organization
//! split; behavior is unchanged. See module-level docs in ``main.rs`` for the
//! overall dispatch order.
use local_bridge::BridgeRunError;
pub(crate) struct BridgeCliArgs {
pub(crate) host_alias: String,
pub(crate) revision: String,
pub(crate) remote_helper_path: Option<String>,
}
impl BridgeCliArgs {
pub(crate) fn parse(args: &[String]) -> Result<Self, BridgeRunError> {
let mut host_alias: Option<String> = None;
let mut revision: Option<String> = None;
let mut remote_helper_path: Option<String> = None;
let mut idx = 0usize;
while idx < args.len() {
match args[idx].as_str() {
"--host" => {
if let Some(value) = args.get(idx + 1) {
host_alias = Some(value.clone());
idx += 2;
} else {
return Err(BridgeRunError::HelperLaunchFailed(
"--host requires a value".to_string(),
));
}
}
"--helper-revision" => {
if let Some(value) = args.get(idx + 1) {
revision = Some(value.clone());
idx += 2;
} else {
return Err(BridgeRunError::HelperLaunchFailed(
"--helper-revision requires a value".to_string(),
));
}
}
"--remote-helper-path" => {
if let Some(value) = args.get(idx + 1) {
remote_helper_path = Some(value.clone());
idx += 2;
} else {
return Err(BridgeRunError::HelperLaunchFailed(
"--remote-helper-path requires a value".to_string(),
));
}
}
"--persistent" => {
idx += 1;
}
_ => {
idx += 1;
}
}
}
let host_alias = match host_alias {
Some(value) if !value.trim().is_empty() => value,
_ => {
return Err(BridgeRunError::HelperLaunchFailed(
"--host is required".to_string(),
));
}
};
let revision = match revision {
Some(value) if !value.trim().is_empty() => value,
_ => {
return Err(BridgeRunError::HelperLaunchFailed(
"--helper-revision is required".to_string(),
));
}
};
Ok(Self {
host_alias,
revision,
remote_helper_path,
})
}
}
// The parser runs on every platform so Python can hand the same argv to a
// Windows build without tripping "unknown arg" before the stub emits the
// "Unix domain sockets only" error. Only the Unix ``run_lsp_stdio`` consumes
// the spawn/URI-rewrite fields; on Windows they're parsed for symmetry and
// then discarded, which looks like dead code to the compiler.
#[cfg_attr(not(unix), allow(dead_code))]
pub(crate) struct LspStdioCliArgs {
pub(crate) bridge_socket: String,
pub(crate) server_id: String,
pub(crate) workspace_id: String,
pub(crate) spawn_argv: Vec<String>,
pub(crate) spawn_cwd: Option<String>,
pub(crate) lsp_local_uri_prefix: Option<String>,
pub(crate) lsp_remote_uri_prefix: Option<String>,
}
impl LspStdioCliArgs {
pub(crate) fn parse(args: &[String]) -> Result<Self, BridgeRunError> {
let mut bridge_socket: Option<String> = None;
let mut server_id: Option<String> = None;
let mut workspace_id: Option<String> = None;
let mut spawn_argv: Vec<String> = Vec::new();
let mut spawn_cwd: Option<String> = None;
let mut lsp_local_uri_prefix: Option<String> = None;
let mut lsp_remote_uri_prefix: Option<String> = None;
let mut idx = 0usize;
while idx < args.len() {
match args[idx].as_str() {
"--bridge-socket" => {
let value = args.get(idx + 1).ok_or_else(|| {
BridgeRunError::HelperLaunchFailed(
"--bridge-socket requires a value".to_string(),
)
})?;
bridge_socket = Some(value.clone());
idx += 2;
}
"--server-id" => {
let value = args.get(idx + 1).ok_or_else(|| {
BridgeRunError::HelperLaunchFailed(
"--server-id requires a value".to_string(),
)
})?;
server_id = Some(value.clone());
idx += 2;
}
"--workspace-id" => {
let value = args.get(idx + 1).ok_or_else(|| {
BridgeRunError::HelperLaunchFailed(
"--workspace-id requires a value".to_string(),
)
})?;
workspace_id = Some(value.clone());
idx += 2;
}
"--spawn-arg" => {
let value = args.get(idx + 1).ok_or_else(|| {
BridgeRunError::HelperLaunchFailed(
"--spawn-arg requires a value".to_string(),
)
})?;
spawn_argv.push(value.clone());
idx += 2;
}
"--spawn-cwd" => {
let value = args.get(idx + 1).ok_or_else(|| {
BridgeRunError::HelperLaunchFailed(
"--spawn-cwd requires a value".to_string(),
)
})?;
spawn_cwd = Some(value.clone());
idx += 2;
}
"--lsp-local-uri-prefix" => {
let value = args.get(idx + 1).ok_or_else(|| {
BridgeRunError::HelperLaunchFailed(
"--lsp-local-uri-prefix requires a value".to_string(),
)
})?;
lsp_local_uri_prefix = Some(value.clone());
idx += 2;
}
"--lsp-remote-uri-prefix" => {
let value = args.get(idx + 1).ok_or_else(|| {
BridgeRunError::HelperLaunchFailed(
"--lsp-remote-uri-prefix requires a value".to_string(),
)
})?;
lsp_remote_uri_prefix = Some(value.clone());
idx += 2;
}
other => {
return Err(BridgeRunError::HelperLaunchFailed(format!(
"unknown lsp-stdio argument: {other}"
)));
}
}
}
let bridge_socket = bridge_socket
.filter(|v| !v.trim().is_empty())
.ok_or_else(|| {
BridgeRunError::HelperLaunchFailed("--bridge-socket is required".to_string())
})?;
let server_id = server_id.filter(|v| !v.trim().is_empty()).ok_or_else(|| {
BridgeRunError::HelperLaunchFailed("--server-id is required".to_string())
})?;
let workspace_id = workspace_id
.filter(|v| !v.trim().is_empty())
.ok_or_else(|| {
BridgeRunError::HelperLaunchFailed("--workspace-id is required".to_string())
})?;
Ok(Self {
bridge_socket,
server_id,
workspace_id,
spawn_argv,
spawn_cwd,
lsp_local_uri_prefix,
lsp_remote_uri_prefix,
})
}
}

View File

@@ -0,0 +1,427 @@
//! LSP-over-stdio relay loop and the ``lsp-stdio`` subcommand entrypoint.
//!
//! Three concerns live here:
//!
//! 1. ``lsp_transform_message`` — the pure per-frame transform: URI rewrite
//! (direction picked via [`LspMessageFlow`]) and optional spawn-hint
//! injection on the very first frame. Tested in this module.
//!
//! 2. ``broker_lsp_relay_loop`` — the inner per-client loop the persistent
//! broker (in ``persistent.rs``) hands an attached ``IpcStream`` to. It
//! pumps framed LSP messages through the helper via ``HelperDispatcher``,
//! delegating each frame's transform to ``lsp_transform_message``.
//!
//! 3. ``run_lsp_stdio`` — the ``lsp-stdio`` subcommand: a thin client that
//! connects to a running broker socket, sends an attach handshake, and
//! then proxies ``stdin``↔socket↔``stdout``. Unix-only in practice; the
//! Windows variant returns a "not supported" error after parsing argv
//! for symmetry.
//!
//! Cut out of ``main.rs`` during a code-organization split; behavior is
//! unchanged.
use crate::cli::LspStdioCliArgs;
#[cfg(unix)]
use crate::persistent::{HelperDispatcher, lsp_response_body_to_framed_string};
#[cfg(unix)]
use interprocess::local_socket::Stream as IpcStream;
#[cfg(unix)]
use local_bridge::lsp_uri_rewrite::rewrite_uri_strings;
use local_bridge::{BridgeRunError, bridge_diag_event};
use serde::Serialize;
use serde_json::json;
#[cfg(unix)]
use session_protocol::RequestEnvelope;
#[cfg(unix)]
use std::io::{BufRead, Write};
#[cfg(unix)]
use std::os::unix::net::UnixStream;
#[cfg(unix)]
use std::sync::atomic::{AtomicU64, Ordering};
#[cfg(unix)]
pub(crate) struct BrokerLspRelayCfg {
pub(crate) dispatcher: HelperDispatcher,
pub(crate) server_id: String,
pub(crate) workspace_id: String,
pub(crate) spawn_argv: Option<Vec<String>>,
pub(crate) spawn_cwd: Option<String>,
pub(crate) uri_rewrite: Option<(String, String)>,
}
/// Direction of a single LSP frame across the broker boundary.
///
/// Made explicit at the type level so the URI-rewrite step picks the right
/// (from, to) pair without the call site having to remember the convention.
#[cfg(unix)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum LspMessageFlow {
/// Editor-side process wrote a frame heading to the helper-hosted server.
LocalToBroker,
/// Helper-hosted server returned a frame heading back to the editor.
BrokerToLocal,
}
/// Spawn-argv hint injected into the very first ``LocalToBroker`` frame so
/// the helper knows which binary + cwd to launch for this LSP session.
#[cfg(unix)]
pub(crate) struct LspSpawnInjection<'a> {
pub(crate) argv: &'a [String],
pub(crate) cwd: Option<&'a str>,
}
/// Pure-function transform for one LSP frame.
///
/// Mutates ``body`` in place: applies URI rewriting when ``uri_rewrite`` is
/// ``Some`` (picking the direction implied by ``flow``) and, if
/// ``spawn_injection`` is supplied, attaches a ``_sessions_lsp_spawn`` object
/// to the JSON object root. The function performs no I/O and never inspects
/// the ``first``-message flag — the caller decides whether to pass
/// ``Some(LspSpawnInjection)`` at most once, which preserves the existing
/// idempotency guarantee.
///
/// Returns the same ``method`` hint that the prior inline implementation
/// emitted on the ``bridge.rust.lsp_stdio_broker_out`` diagnostic, so the
/// transport relay does not need to re-inspect ``body``.
#[cfg(unix)]
pub(crate) fn lsp_transform_message(
flow: LspMessageFlow,
body: &mut serde_json::Value,
uri_rewrite: Option<(&str, &str)>,
spawn_injection: Option<LspSpawnInjection<'_>>,
) -> String {
if let Some((local_p, remote_p)) = uri_rewrite {
let (from, to) = match flow {
LspMessageFlow::LocalToBroker => (local_p, remote_p),
LspMessageFlow::BrokerToLocal => (remote_p, local_p),
};
rewrite_uri_strings(body, from, to);
}
if let Some(spawn) = spawn_injection
&& !spawn.argv.is_empty()
&& let Some(obj) = body.as_object_mut()
{
obj.insert(
"_sessions_lsp_spawn".to_string(),
json!({
"argv": spawn.argv,
"cwd": spawn.cwd,
}),
);
}
body.get("method")
.and_then(|m| m.as_str())
.unwrap_or("(response-or-notification)")
.to_string()
}
#[cfg(unix)]
pub(crate) fn broker_lsp_relay_loop(
mut reader: std::io::BufReader<IpcStream>,
writer: &mut IpcStream,
cfg: BrokerLspRelayCfg,
) -> Result<(), BridgeRunError> {
use std::io::ErrorKind;
let BrokerLspRelayCfg {
dispatcher,
server_id,
workspace_id,
spawn_argv,
spawn_cwd,
uri_rewrite,
} = cfg;
let channel = format!("lsp:{server_id}");
let seq = AtomicU64::new(0);
let mut first = true;
bridge_diag_event(
"bridge.rust.lsp_stdio_broker_session",
json!({
"server_id": server_id,
"workspace_id": workspace_id,
"uri_rewrite": uri_rewrite.is_some(),
}),
);
let uri_rewrite_pair = uri_rewrite
.as_ref()
.map(|(loc, rem)| (loc.as_str(), rem.as_str()));
loop {
let payload = match session_protocol::read_lsp_message(&mut reader) {
Ok(text) => text,
Err(error) if error.kind() == ErrorKind::UnexpectedEof => break,
Err(error) => return Err(error.into()),
};
let mut body: serde_json::Value =
serde_json::from_str(&payload).map_err(BridgeRunError::Json)?;
let spawn_injection = if first {
spawn_argv
.as_deref()
.filter(|argv| !argv.is_empty())
.map(|argv| LspSpawnInjection {
argv,
cwd: spawn_cwd.as_deref(),
})
} else {
None
};
let method_hint = lsp_transform_message(
LspMessageFlow::LocalToBroker,
&mut body,
uri_rewrite_pair,
spawn_injection,
);
first = false;
bridge_diag_event(
"bridge.rust.lsp_stdio_broker_out",
json!({
"server_id": server_id,
"method": method_hint,
"payload_chars": payload.len(),
}),
);
let envelope_id = format!(
"lsp-broker-{}-{}-{}",
workspace_id,
server_id,
seq.fetch_add(1, Ordering::Relaxed)
);
let envelope = RequestEnvelope {
id: envelope_id,
method: session_protocol::METHOD_CHANNEL_DISPATCH.to_string(),
params: json!({
"v": session_protocol::CHANNEL_ENVELOPE_V1,
"channel": channel,
"kind": session_protocol::CHANNEL_KIND_REQUEST,
"body": body,
}),
timeout_ms: 120_000,
trace: session_protocol::TraceLevel::Info,
};
let mut result = dispatcher.request_blocking(&envelope)?;
let _ = lsp_transform_message(
LspMessageFlow::BrokerToLocal,
&mut result,
uri_rewrite_pair,
None,
);
let out = lsp_response_body_to_framed_string(&result)?;
bridge_diag_event(
"bridge.rust.lsp_stdio_broker_in",
json!({
"server_id": server_id,
"response_chars": out.len(),
}),
);
session_protocol::write_lsp_message(writer, &out).map_err(BridgeRunError::Io)?;
}
Ok(())
}
#[cfg(unix)]
pub(crate) fn run_lsp_stdio(args: &[String]) -> Result<(), BridgeRunError> {
let cli = LspStdioCliArgs::parse(args)?;
bridge_diag_event(
"bridge.rust.lsp_stdio_start",
json!({
"server_id": cli.server_id,
"workspace_id": cli.workspace_id,
"spawn_argc": cli.spawn_argv.len(),
"spawn_cwd_set": cli.spawn_cwd.as_ref().is_some_and(|s| !s.trim().is_empty()),
"uri_rewrite_set": cli.lsp_local_uri_prefix.is_some()
&& cli.lsp_remote_uri_prefix.is_some(),
}),
);
let mut stream = UnixStream::connect(&cli.bridge_socket)?;
let mut attach = json!({
"kind": "attach",
"server_id": cli.server_id,
"workspace_id": cli.workspace_id,
});
if let Some(obj) = attach.as_object_mut() {
let argv_opt = (!cli.spawn_argv.is_empty()).then(|| cli.spawn_argv.clone());
json_insert_optional(obj, "argv", argv_opt).map_err(BridgeRunError::Json)?;
let cwd_opt = cli
.spawn_cwd
.as_ref()
.map(|s| s.trim())
.filter(|s| !s.is_empty());
json_insert_optional(obj, "cwd", cwd_opt).map_err(BridgeRunError::Json)?;
let local_prefix_opt = cli
.lsp_local_uri_prefix
.as_ref()
.map(|s| s.trim())
.filter(|s| !s.is_empty());
json_insert_optional(obj, "lsp_local_uri_prefix", local_prefix_opt)
.map_err(BridgeRunError::Json)?;
let remote_prefix_opt = cli
.lsp_remote_uri_prefix
.as_ref()
.map(|s| s.trim())
.filter(|s| !s.is_empty());
json_insert_optional(obj, "lsp_remote_uri_prefix", remote_prefix_opt)
.map_err(BridgeRunError::Json)?;
}
writeln!(
stream,
"{}",
serde_json::to_string(&attach).map_err(BridgeRunError::Json)?
)?;
stream.flush()?;
let stream_for_read = stream.try_clone()?;
let mut ack_reader = std::io::BufReader::new(stream_for_read);
let mut ack_line = String::new();
ack_reader.read_line(&mut ack_line)?;
let ack: crate::persistent::BrokerAttachResponse =
serde_json::from_str(ack_line.trim()).map_err(BridgeRunError::Json)?;
if !ack.ok {
return Err(BridgeRunError::HelperLaunchFailed(
ack.error
.unwrap_or_else(|| "broker attach failed".to_string()),
));
}
bridge_diag_event(
"bridge.rust.lsp_stdio_attach_ok",
json!({
"server_id": cli.server_id,
"workspace_id": cli.workspace_id,
}),
);
let mut stream_writer = stream.try_clone()?;
let writer_handle = std::thread::spawn(move || {
let mut stdin = std::io::stdin().lock();
let _ = std::io::copy(&mut stdin, &mut stream_writer);
});
let mut stdout = std::io::stdout().lock();
let mut stream_reader = stream;
std::io::copy(&mut stream_reader, &mut stdout)?;
let _ = writer_handle.join();
Ok(())
}
#[cfg(not(unix))]
pub(crate) fn run_lsp_stdio(args: &[String]) -> Result<(), BridgeRunError> {
let cli = LspStdioCliArgs::parse(args)?;
Err(BridgeRunError::HelperLaunchFailed(format!(
"lsp-stdio mode currently requires Unix domain sockets \
(bridge_socket={}, server_id={}, workspace_id={})",
cli.bridge_socket, cli.server_id, cli.workspace_id,
)))
}
/// Insert ``value`` into ``obj`` under ``key`` only when ``value`` is ``Some``.
///
/// The helper does not interpret the inner ``T``; callers that want to skip
/// empty / whitespace-only strings should pre-trim and filter to ``None`` at
/// the call site (the existing call sites differ in whether they perform that
/// extra check, so keeping the helper minimal preserves their individual
/// semantics).
#[cfg_attr(not(unix), allow(dead_code))]
fn json_insert_optional<T: Serialize>(
obj: &mut serde_json::Map<String, serde_json::Value>,
key: &str,
value: Option<T>,
) -> Result<(), serde_json::Error> {
if let Some(inner) = value {
let encoded = serde_json::to_value(inner)?;
obj.insert(key.to_string(), encoded);
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(unix)]
#[test]
fn lsp_transform_rewrites_text_document_uri_local_to_broker() {
let mut body = json!({
"jsonrpc": "2.0",
"method": "textDocument/didOpen",
"params": {
"textDocument": {
"uri": "file:///local/cache/ws/src/main.rs",
"languageId": "rust"
}
}
});
let local = "file:///local/cache/ws";
let remote = "file:///remote/proj";
let method = lsp_transform_message(
LspMessageFlow::LocalToBroker,
&mut body,
Some((local, remote)),
None,
);
assert_eq!(method, "textDocument/didOpen");
assert_eq!(
body.pointer("/params/textDocument/uri")
.and_then(|v| v.as_str()),
Some("file:///remote/proj/src/main.rs"),
);
// Reverse direction undoes the rewrite.
let _ = lsp_transform_message(
LspMessageFlow::BrokerToLocal,
&mut body,
Some((local, remote)),
None,
);
assert_eq!(
body.pointer("/params/textDocument/uri")
.and_then(|v| v.as_str()),
Some("file:///local/cache/ws/src/main.rs"),
);
}
#[cfg(unix)]
#[test]
fn lsp_transform_spawn_injection_idempotent_via_caller_gating() {
let argv = vec!["rust-analyzer".to_string(), "--stdio".to_string()];
let cwd = "/remote/proj".to_string();
let mut body = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {}
});
// Caller passes Some(...) only on the first frame.
let _ = lsp_transform_message(
LspMessageFlow::LocalToBroker,
&mut body,
None,
Some(LspSpawnInjection {
argv: &argv,
cwd: Some(cwd.as_str()),
}),
);
let spawn = body.get("_sessions_lsp_spawn");
assert!(spawn.is_some(), "first transform must inject spawn hint");
assert_eq!(
spawn
.and_then(|s| s.get("argv"))
.and_then(|v| v.as_array())
.map(|a| a.len()),
Some(2)
);
assert_eq!(
spawn.and_then(|s| s.get("cwd")).and_then(|v| v.as_str()),
Some("/remote/proj")
);
// Second frame: caller passes None, so no further injection.
let mut body2 = json!({
"jsonrpc": "2.0",
"id": 2,
"method": "textDocument/didChange",
"params": {}
});
let _ = lsp_transform_message(LspMessageFlow::LocalToBroker, &mut body2, None, None);
assert!(
body2.get("_sessions_lsp_spawn").is_none(),
"subsequent transforms must not re-inject spawn hint"
);
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,211 @@
//! ``mirror-sync`` request handler.
//!
//! ``mirror-sync`` is one of the few request methods the bridge handles
//! itself instead of forwarding to the helper: it walks remote directories
//! via ``tree-list`` (which IS forwarded) and materializes a local cache
//! shadow. The dispatch lives in ``persistent::run_persistent``; everything
//! reachable from there hangs off this module.
//!
//! Cut out of ``main.rs`` during a code-organization split; behavior is
//! unchanged.
use crate::persistent::HelperDispatcher;
use local_bridge::{BridgeCliOutput, BridgeRunError};
use serde_json::json;
use session_protocol::{ErrorEnvelope, RequestEnvelope};
use std::sync::Arc;
pub(crate) const MIRROR_SYNC_METHOD: &str = "mirror-sync";
/// Parameters for `mirror-sync` (bridge-handled, not forwarded to helper).
#[derive(serde::Deserialize)]
pub(crate) struct MirrorSyncParams {
pub(crate) remote_root: String,
pub(crate) local_files_root: String,
#[serde(default)]
pub(crate) max_traversal_depth: Option<usize>,
#[serde(default)]
pub(crate) max_entries: Option<usize>,
#[serde(default)]
pub(crate) include_files: Option<bool>,
#[serde(default)]
pub(crate) ignore_patterns: Option<Vec<String>>,
#[serde(default)]
pub(crate) prune_missing: Option<bool>,
#[serde(default)]
pub(crate) max_dir_fanout: Option<usize>,
#[serde(default)]
pub(crate) writes_per_second_cap: Option<u32>,
#[serde(default)]
pub(crate) consecutive_failure_budget: Option<u32>,
}
impl From<MirrorSyncParams> for local_bridge::remote_cache_mirror::RemoteCacheMirrorOptions {
fn from(params: MirrorSyncParams) -> Self {
let mut opts = Self::default();
if let Some(v) = params.max_traversal_depth {
opts.max_traversal_depth = v;
}
if let Some(v) = params.max_entries {
opts.max_entries = v;
}
if let Some(v) = params.include_files {
opts.include_files = v;
}
if let Some(v) = params.ignore_patterns {
opts.ignore_patterns = v;
}
if let Some(v) = params.prune_missing {
opts.prune_missing = v;
}
if let Some(v) = params.max_dir_fanout {
opts.max_dir_fanout = v;
}
if let Some(v) = params.writes_per_second_cap {
opts.writes_per_second_cap = v;
}
if let Some(v) = params.consecutive_failure_budget {
opts.consecutive_failure_budget = v;
}
opts
}
}
pub(crate) fn handle_mirror_sync(
dispatcher: &HelperDispatcher,
envelope: &RequestEnvelope,
) -> BridgeCliOutput {
let params: MirrorSyncParams = match serde_json::from_value(envelope.params.clone()) {
Ok(p) => p,
Err(e) => {
return BridgeCliOutput {
ok: false,
id: Some(envelope.id.clone()),
result: None,
error: Some(ErrorEnvelope {
id: Some(envelope.id.clone()),
code: "invalid_params".to_string(),
message: format!("mirror-sync params: {e}"),
retryable: false,
}),
};
}
};
let local_root = std::path::PathBuf::from(&params.local_files_root);
let remote_root = params.remote_root.clone();
let opts: local_bridge::remote_cache_mirror::RemoteCacheMirrorOptions = params.into();
let req_id_counter = Arc::new(std::sync::atomic::AtomicU64::new(0));
let disp = dispatcher.clone();
let result = local_bridge::remote_cache_mirror::mirror_remote_tree_to_local_cache(
|_host, remote_dir| {
let seq = req_id_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let tree_req = RequestEnvelope {
id: format!("{}_tree_{seq}", envelope.id),
method: session_protocol::METHOD_TREE_LIST.to_string(),
params: json!({ "remote_directory": remote_dir }),
timeout_ms: 30_000,
trace: session_protocol::TraceLevel::Info,
};
let value = disp
.request_blocking(&tree_req)
.map_err(|e: BridgeRunError| e.to_string())?;
let tree_result: session_protocol::TreeListResult =
serde_json::from_value(value).map_err(|e| e.to_string())?;
Ok(tree_result
.entries
.into_iter()
.map(tree_list_entry_to_mirror)
.collect())
},
"",
&remote_root,
&local_root,
&opts,
);
BridgeCliOutput {
ok: result.ok(),
id: Some(envelope.id.clone()),
result: Some(json!({
"directories_created": result.directories_created,
"file_placeholders_created": result.file_placeholders_created,
"entries_scanned": result.entries_scanned,
"truncated_by_entry_limit": result.truncated_by_entry_limit,
"entries_pruned": result.entries_pruned,
"error_detail": result.error_detail,
"deferred_directories": result.deferred_directories,
"aborted_by_failure_budget": result.aborted_by_failure_budget,
})),
error: if result.ok() {
None
} else {
Some(ErrorEnvelope {
id: Some(envelope.id.clone()),
code: "mirror_sync_failed".to_string(),
message: result
.error_detail
.unwrap_or_else(|| "unknown mirror error".to_string()),
retryable: true,
})
},
}
}
fn tree_list_entry_to_mirror(
e: session_protocol::TreeListEntry,
) -> local_bridge::remote_cache_mirror::RemoteDirectoryEntry {
use local_bridge::remote_cache_mirror::{RemoteDirectoryEntry, RemoteFileKind};
let kind = match e.kind {
session_protocol::RemoteFileKind::RegularFile => RemoteFileKind::RegularFile,
session_protocol::RemoteFileKind::Directory => RemoteFileKind::Directory,
session_protocol::RemoteFileKind::Symlink => RemoteFileKind::Symlink,
session_protocol::RemoteFileKind::Other => RemoteFileKind::Other,
};
RemoteDirectoryEntry {
name: e.name,
remote_absolute_path: e.remote_absolute_path,
kind,
is_symlink_loop: e.is_symlink_loop,
}
}
#[cfg(test)]
mod tests {
use super::*;
use local_bridge::remote_cache_mirror::RemoteCacheMirrorOptions;
#[test]
fn mirror_sync_params_partial_overrides_preserve_defaults() {
let defaults = RemoteCacheMirrorOptions::default();
let params = MirrorSyncParams {
remote_root: "/srv/proj".to_string(),
local_files_root: "/cache/proj".to_string(),
max_traversal_depth: Some(3),
max_entries: None,
include_files: Some(false),
ignore_patterns: Some(vec!["target".to_string(), "node_modules".to_string()]),
prune_missing: None,
max_dir_fanout: None,
writes_per_second_cap: Some(7),
consecutive_failure_budget: None,
};
let opts: RemoteCacheMirrorOptions = params.into();
// Overridden fields take the explicit values.
assert_eq!(opts.max_traversal_depth, 3);
assert!(!opts.include_files);
assert_eq!(opts.ignore_patterns, vec!["target", "node_modules"]);
assert_eq!(opts.writes_per_second_cap, 7);
// Unset fields keep the struct defaults.
assert_eq!(opts.max_entries, defaults.max_entries);
assert_eq!(opts.prune_missing, defaults.prune_missing);
assert_eq!(opts.max_dir_fanout, defaults.max_dir_fanout);
assert_eq!(
opts.consecutive_failure_budget,
defaults.consecutive_failure_budget
);
}
}

View File

@@ -0,0 +1,539 @@
//! Persistent (long-lived) bridge mode: a single helper process serving many
//! requests, plus a local-socket broker that lets ``lsp-stdio`` clients attach
//! to running language-server channels.
//!
//! The orchestration is:
//!
//! - ``run_persistent`` brings up the SSH helper, prints the handshake banner
//! on Python-stdout, and runs two halves: a *collector* thread that demuxes
//! helper responses by id, and the main *forwarder* loop that ingests JSON
//! request envelopes from Python-stdin.
//! - ``HelperDispatcher`` is the synchronization point both halves share: it
//! registers in-flight ids, writes framed messages to the helper's stdin,
//! and routes responses back via ``mpsc`` channels. ``mirror.rs`` and
//! ``lsp_stdio.rs`` each take a clone for their own dispatch flows.
//! - ``PersistentBroker`` owns a local-socket listener (``AF_UNIX`` on Unix,
//! Named Pipe on Windows via ``interprocess``) so external ``lsp-stdio``
//! children can ``attach`` to a running helper session.
//!
//! Cut out of ``main.rs`` during a code-organization split; behavior is
//! unchanged.
use crate::cli::BridgeCliArgs;
use crate::lsp_stdio::{BrokerLspRelayCfg, broker_lsp_relay_loop};
use crate::mirror::{MIRROR_SYNC_METHOD, handle_mirror_sync};
use crate::write_bridge_output;
use interprocess::TryClone;
use interprocess::local_socket::{
GenericFilePath, Listener as IpcListener, ListenerNonblockingMode, ListenerOptions,
Stream as IpcStream, ToFsName, traits::Listener as IpcListenerTrait,
};
use local_bridge::{
BridgeCliOutput, BridgeRunError, bridge_diag_event, default_remote_helper_path,
protocol_message_kind, with_helper_session_handshake,
};
use serde_json::json;
use session_protocol::{ErrorEnvelope, ProtocolMessage, RequestEnvelope};
use std::collections::HashMap;
use std::fs;
#[cfg(unix)]
use std::io::BufRead;
use std::io::Write;
use std::path::PathBuf;
use std::process::ChildStdin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
/// Shared handle for sending requests to the helper and receiving responses.
///
/// Used by both the main forwarder (Python → helper) and the mirror thread
/// (bridge-internal tree/list → helper).
#[derive(Clone)]
pub(crate) struct HelperDispatcher {
pub(crate) helper_stdin: Arc<Mutex<ChildStdin>>,
pub(crate) pending: Arc<Mutex<HashMap<String, mpsc::Sender<BridgeCliOutput>>>>,
}
impl HelperDispatcher {
pub(crate) fn request_blocking(
&self,
envelope: &RequestEnvelope,
) -> Result<serde_json::Value, BridgeRunError> {
let (tx, rx) = mpsc::channel();
self.pending
.lock()
.unwrap_or_else(|e| e.into_inner())
.insert(envelope.id.clone(), tx);
self.forward_to_helper(envelope)?;
let timeout = std::time::Duration::from_millis(envelope.timeout_ms.clamp(1000, 120_000));
match rx.recv_timeout(timeout) {
Ok(out) if out.ok => Ok(out.result.unwrap_or(serde_json::Value::Null)),
Ok(out) => {
let err = out.error.unwrap_or_else(|| ErrorEnvelope {
id: Some(envelope.id.clone()),
code: "unknown".to_string(),
message: "helper returned failure without error detail".to_string(),
retryable: true,
});
Err(BridgeRunError::HelperError(err))
}
Err(_) => {
self.pending
.lock()
.unwrap_or_else(|e| e.into_inner())
.remove(&envelope.id);
Err(BridgeRunError::HelperLaunchFailed(format!(
"helper response timed out after {:.1}s",
timeout.as_secs_f32()
)))
}
}
}
pub(crate) fn forward_to_helper(
&self,
envelope: &RequestEnvelope,
) -> Result<(), BridgeRunError> {
let encoded =
session_protocol::encode_message(&ProtocolMessage::Request(envelope.clone()))?;
let mut guard = self.helper_stdin.lock().unwrap_or_else(|e| e.into_inner());
guard.write_all(encoded.as_bytes())?;
guard.flush()?;
Ok(())
}
pub(crate) fn deliver(&self, id: &str, out: BridgeCliOutput) -> Option<BridgeCliOutput> {
let sender = self
.pending
.lock()
.unwrap_or_else(|e| e.into_inner())
.remove(id);
match sender {
Some(tx) => {
let _ = tx.send(out);
None
}
None => Some(out),
}
}
}
pub(crate) fn run_persistent(args: &[String]) -> Result<(), BridgeRunError> {
let cli = BridgeCliArgs::parse(args)?;
let default_remote_helper_path = default_remote_helper_path();
let remote_helper_path = cli
.remote_helper_path
.as_deref()
.unwrap_or(default_remote_helper_path.as_str());
with_helper_session_handshake(
&cli.host_alias,
remote_helper_path,
&cli.revision,
session_protocol::TraceLevel::Info,
|session, handshake| {
let py_stdout = Arc::new(Mutex::new(std::io::stdout()));
let dispatcher = HelperDispatcher {
helper_stdin: Arc::new(Mutex::new(session.take_stdin()?)),
pending: Arc::new(Mutex::new(HashMap::new())),
};
#[cfg(unix)]
let (_broker_keepalive, broker_socket_str) = {
let broker = PersistentBroker::start(&cli.host_alias, dispatcher.clone())?;
let path = broker.socket_path_str();
(broker, path)
};
#[cfg(not(unix))]
let broker_socket_str = String::new();
let handshake_info = BridgeCliOutput {
ok: true,
id: None,
result: Some(json!({
"handshake": {
"remote_home": handshake.remote_home,
"arch": handshake.arch,
"helper_version": handshake.helper_version,
"remote_platform": format!("{:?}", handshake.remote_platform),
"capabilities": handshake.capabilities.iter()
.map(|c| format!("{:?}", c))
.collect::<Vec<_>>(),
"broker_socket": broker_socket_str,
}
})),
error: None,
};
write_bridge_output(&py_stdout, &handshake_info)?;
// Collector thread: helper stdout → dispatch by id or pass to Python.
let messages = session.take_messages()?;
let dispatcher_for_collector = dispatcher.clone();
let py_stdout_collector = Arc::clone(&py_stdout);
let collector_handle = std::thread::spawn(move || {
loop {
let msg = match messages.recv() {
Ok(Ok(msg)) => msg,
Ok(Err(e)) => {
bridge_diag_event(
"bridge.rust.collector_error",
json!({ "detail": e.to_string() }),
);
break;
}
Err(_) => break,
};
let (id, out) = match msg {
ProtocolMessage::Response(resp) => {
let id = resp.id.clone();
(
id.clone(),
BridgeCliOutput {
ok: true,
id: Some(id),
result: Some(resp.result),
error: None,
},
)
}
ProtocolMessage::Error(err) => {
let id = err.id.clone().unwrap_or_default();
(
id.clone(),
BridgeCliOutput {
ok: false,
id: Some(id),
result: None,
error: Some(err),
},
)
}
ProtocolMessage::Shutdown(_) => break,
other => {
bridge_diag_event(
"bridge.rust.collector_unexpected",
json!({ "kind": protocol_message_kind(&other) }),
);
continue;
}
};
if let Some(passthrough) = dispatcher_for_collector.deliver(&id, out)
&& write_bridge_output(&py_stdout_collector, &passthrough).is_err()
{
break;
}
}
});
// Main thread: Python stdin → helper or bridge-handled commands.
let py_stdin = std::io::stdin();
let input = std::io::BufRead::lines(std::io::BufReader::new(py_stdin.lock()));
for line in input {
let raw = match line {
Ok(v) => v,
Err(e) => {
let out = BridgeCliOutput {
ok: false,
id: None,
result: None,
error: Some(ErrorEnvelope {
id: None,
code: "bridge_stdin_error".to_string(),
message: format!("stdin read failed: {e}"),
retryable: true,
}),
};
let _ = write_bridge_output(&py_stdout, &out);
continue;
}
};
let trimmed = raw.trim();
if trimmed.is_empty() {
continue;
}
let envelope: RequestEnvelope = match serde_json::from_str(trimmed) {
Ok(e) => e,
Err(e) => {
bridge_diag_event(
"bridge.rust.persistent_invalid_envelope",
json!({ "detail": e.to_string(), "line_bytes": trimmed.len() }),
);
let out = BridgeCliOutput {
ok: false,
id: None,
result: None,
error: Some(ErrorEnvelope {
id: None,
code: "invalid_bridge_request".to_string(),
message: format!("invalid request envelope: {e}"),
retryable: true,
}),
};
let _ = write_bridge_output(&py_stdout, &out);
continue;
}
};
bridge_diag_event(
"bridge.rust.request_start",
json!({
"request_id": envelope.id,
"method": envelope.method,
"timeout_ms": envelope.timeout_ms,
}),
);
if envelope.method == MIRROR_SYNC_METHOD {
let disp = dispatcher.clone();
let stdout = Arc::clone(&py_stdout);
std::thread::spawn(move || {
let out = handle_mirror_sync(&disp, &envelope);
let _ = write_bridge_output(&stdout, &out);
});
continue;
}
// Normal request: forward to helper (fire-and-forget).
if let Err(e) = dispatcher.forward_to_helper(&envelope) {
let out = BridgeCliOutput {
ok: false,
id: Some(envelope.id.clone()),
result: None,
error: Some(ErrorEnvelope {
id: Some(envelope.id),
code: "helper_write_failed".to_string(),
message: e.to_string(),
retryable: true,
}),
};
let _ = write_bridge_output(&py_stdout, &out);
continue;
}
bridge_diag_event(
"bridge.rust.request_flushed",
json!({ "request_id": envelope.id, "method": envelope.method }),
);
}
// Python stdin closed — collector thread will end when helper exits.
let _ = collector_handle.join();
Ok(())
},
)?;
Ok(())
}
#[cfg(unix)]
#[derive(serde::Deserialize)]
struct BrokerAttachRequest {
kind: String,
server_id: String,
workspace_id: String,
#[serde(default)]
argv: Option<Vec<String>>,
#[serde(default)]
cwd: Option<String>,
/// Local cache ``file://`` prefix (editor); rewritten to ``lsp_remote_uri_prefix`` outbound.
#[serde(default)]
lsp_local_uri_prefix: Option<String>,
/// Remote workspace ``file://`` prefix (helper); rewritten back on inbound responses.
#[serde(default)]
lsp_remote_uri_prefix: Option<String>,
}
#[cfg(unix)]
#[derive(serde::Serialize, serde::Deserialize)]
pub(crate) struct BrokerAttachResponse {
pub(crate) ok: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) error: Option<String>,
}
pub(crate) struct PersistentBroker {
socket_path: PathBuf,
running: Arc<AtomicBool>,
handle: Option<std::thread::JoinHandle<()>>,
}
impl PersistentBroker {
pub(crate) fn start(
host_alias: &str,
dispatcher: HelperDispatcher,
) -> Result<Self, BridgeRunError> {
let sanitized_host = host_alias
.chars()
.map(|c| {
if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
c
} else {
'_'
}
})
.collect::<String>();
let socket_path = persistent_broker_endpoint_path(&sanitized_host);
// ``LocalSocketListener::bind`` (via interprocess 2.x) is the
// cross-platform front end: on Unix it opens an `AF_UNIX` socket at
// the given path; on Windows it creates a Named Pipe at
// ``\\.\pipe\<basename>`` (the prefix is implied by the
// ``GenericFilePath`` resolver). The bytes on the wire are
// unchanged on Unix versus the previous ``UnixListener::bind``
// path so existing tests + the ``run_lsp_stdio`` client (still
// ``UnixStream::connect``) keep working.
let _ = fs::remove_file(&socket_path);
let endpoint = socket_path
.as_path()
.to_fs_name::<GenericFilePath>()
.map_err(|error| {
BridgeRunError::HelperLaunchFailed(format!("broker endpoint name failed: {error}"))
})?;
let listener: IpcListener = ListenerOptions::new()
.name(endpoint)
.nonblocking(ListenerNonblockingMode::Accept)
.create_sync()?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
fs::set_permissions(&socket_path, fs::Permissions::from_mode(0o600))?;
}
let running = Arc::new(AtomicBool::new(true));
let running_thread = Arc::clone(&running);
let dispatcher_for_listener = dispatcher.clone();
let handle = std::thread::spawn(move || {
while running_thread.load(Ordering::Relaxed) {
match listener.accept() {
Ok(stream) => {
let dispatch = dispatcher_for_listener.clone();
std::thread::spawn(move || {
if let Err(error) = handle_broker_client(stream, dispatch) {
bridge_diag_event(
"bridge.rust.broker_client_error",
json!({ "detail": error.to_string() }),
);
}
});
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
std::thread::sleep(std::time::Duration::from_millis(20));
}
Err(_) => break,
}
}
});
Ok(Self {
socket_path,
running,
handle: Some(handle),
})
}
pub(crate) fn socket_path_str(&self) -> String {
self.socket_path.to_string_lossy().to_string()
}
}
/// Construct the broker endpoint path for ``host_alias``.
///
/// On Unix the result is an absolute filesystem path (an `AF_UNIX`
/// socket file); on Windows it is a Named Pipe path of the form
/// ``\\.\pipe\sessions-local-bridge-<host>-<pid>``. ``GenericFilePath``
/// resolves both forms uniformly when the path is fed back into
/// :type:`ListenerOptions`.
fn persistent_broker_endpoint_path(sanitized_host: &str) -> PathBuf {
let pid = std::process::id();
#[cfg(unix)]
{
std::env::temp_dir().join(format!("sessions-local-bridge-{sanitized_host}-{pid}.sock"))
}
#[cfg(windows)]
{
PathBuf::from(format!(
r"\\.\pipe\sessions-local-bridge-{sanitized_host}-{pid}"
))
}
}
impl Drop for PersistentBroker {
fn drop(&mut self) {
self.running.store(false, Ordering::Relaxed);
if let Some(handle) = self.handle.take() {
let _ = handle.join();
}
// Unix socket files want explicit removal (the listener thread
// already exited above); Windows Named Pipes are reaped when the
// last handle closes, so the equivalent step is the listener's
// own Drop and there's no path to unlink.
#[cfg(unix)]
{
let _ = fs::remove_file(&self.socket_path);
}
}
}
#[cfg(unix)]
fn handle_broker_client(
stream: IpcStream,
dispatcher: HelperDispatcher,
) -> Result<(), BridgeRunError> {
let read_half = stream.try_clone()?;
let mut buf_reader = std::io::BufReader::new(read_half);
let mut first_line = String::new();
buf_reader.read_line(&mut first_line)?;
let req: BrokerAttachRequest =
serde_json::from_str(first_line.trim()).map_err(BridgeRunError::Json)?;
let mut write_half = stream;
if req.kind != "attach" || req.server_id.trim().is_empty() || req.workspace_id.trim().is_empty()
{
let response = BrokerAttachResponse {
ok: false,
error: Some("invalid attach request".to_string()),
};
let encoded = serde_json::to_string(&response)?;
writeln!(write_half, "{encoded}")?;
write_half.flush()?;
return Ok(());
}
let response = BrokerAttachResponse {
ok: true,
error: None,
};
let encoded = serde_json::to_string(&response)?;
writeln!(write_half, "{encoded}")?;
write_half.flush()?;
let uri_rewrite = match (req.lsp_local_uri_prefix, req.lsp_remote_uri_prefix) {
(Some(loc), Some(rem))
if !loc.trim().is_empty() && !rem.trim().is_empty() && loc.trim() != rem.trim() =>
{
Some((loc.trim().to_string(), rem.trim().to_string()))
}
_ => None,
};
broker_lsp_relay_loop(
buf_reader,
&mut write_half,
BrokerLspRelayCfg {
dispatcher,
server_id: req.server_id,
workspace_id: req.workspace_id,
spawn_argv: req.argv,
spawn_cwd: req.cwd,
uri_rewrite,
},
)
}
#[cfg(unix)]
pub(crate) fn lsp_response_body_to_framed_string(
result: &serde_json::Value,
) -> Result<String, BridgeRunError> {
if let (Some(kind), Some(body)) = (
result.get("kind").and_then(|v| v.as_str()),
result.get("body"),
) && kind == session_protocol::CHANNEL_KIND_LSP_STDIO_MESSAGE
{
return serde_json::to_string(body).map_err(BridgeRunError::Json);
}
serde_json::to_string(result).map_err(BridgeRunError::Json)
}