refactor(local_bridge): split main.rs into cli/persistent/lsp_stdio/mirror modules
The 1429-line ``main.rs`` had grown to host four distinct concerns under the same roof: argv parsing, persistent-mode orchestration + broker, LSP relay loop + ``lsp-stdio`` subcommand, and the bridge-handled ``mirror-sync`` request. Cut each into its own sibling module so the binary entry is a slim 164-line dispatcher and each subsystem has a single self-contained file. Modules: - ``cli.rs`` (215 lines) — ``BridgeCliArgs`` and ``LspStdioCliArgs`` with their parsers. All items ``pub(crate)`` since both are consumed across module boundaries. - ``persistent.rs`` (539 lines) — ``run_persistent``, ``HelperDispatcher``, ``PersistentBroker`` (+ ``Drop``), ``persistent_broker_endpoint_path``, ``handle_broker_client``, ``BrokerAttachRequest``, ``BrokerAttachResponse``, ``lsp_response_body_to_framed_string``. ``HelperDispatcher`` and its ``request_blocking`` / ``forward_to_helper`` / ``deliver`` methods become ``pub(crate)`` because ``mirror.rs`` and ``lsp_stdio.rs`` clone the dispatcher; the rest stays private. - ``lsp_stdio.rs`` (427 lines) — ``BrokerLspRelayCfg``, ``LspMessageFlow``, ``LspSpawnInjection``, the pure ``lsp_transform_message`` (with its two unit tests preserved verbatim), ``broker_lsp_relay_loop``, ``run_lsp_stdio`` (Unix variant + Windows fallback), and the ``json_insert_optional`` helper (only call site is here). - ``mirror.rs`` (211 lines) — ``MirrorSyncParams``, ``From<MirrorSyncParams>`` for ``RemoteCacheMirrorOptions`` (with its ``partial_overrides_preserve_defaults`` unit test), ``handle_mirror_sync``, ``tree_list_entry_to_mirror``, ``MIRROR_SYNC_METHOD`` const. - ``main.rs`` (164 lines) — ``main``, ``run``, ``run_parse_agent_editor_envelope``, ``read_stdin``, ``write_bridge_output``, version banner, ``mod`` declarations. The cuts have cross-module references (``persistent`` ↔ ``lsp_stdio`` for ``BrokerLspRelayCfg`` / ``HelperDispatcher`` / ``BrokerAttachResponse``; ``persistent`` → ``mirror`` for ``MIRROR_SYNC_METHOD`` and ``handle_mirror_sync``) so a single commit lands them together rather than split per-module — each per-module commit would have been non-buildable on its own. No behavior changes; this is purely code organization. The test count is unchanged (339 workspace tests pass) and the three unit tests previously living in ``main.rs::tests`` move next to the functions they exercise. Verified: - cargo build --manifest-path rust/Cargo.toml -p local_bridge — green - cargo clippy --manifest-path rust/Cargo.toml --all-targets -- -D warnings — green - cargo test --manifest-path rust/Cargo.toml --workspace — 339 passed (same as baseline) - cargo fmt --manifest-path rust/Cargo.toml --all --check — green - cargo llvm-cov ... --fail-under-lines 80 — 80.52% (workspace), passes (the existing ``--ignore-filename-regex`` already excludes the four new modules per c3848e2; no config change needed in this commit) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
215
rust/crates/local_bridge/src/cli.rs
Normal file
215
rust/crates/local_bridge/src/cli.rs
Normal file
@@ -0,0 +1,215 @@
|
||||
//! Command-line argument parsers for the ``local_bridge`` binary.
|
||||
//!
|
||||
//! Two argv shapes are supported:
|
||||
//!
|
||||
//! - ``BridgeCliArgs`` — top-level forwarder mode (and its persistent variant);
|
||||
//! - ``LspStdioCliArgs`` — the ``lsp-stdio`` subcommand that connects to a
|
||||
//! running broker over a local socket.
|
||||
//!
|
||||
//! These were lifted out of ``main.rs`` verbatim during a code-organization
|
||||
//! split; behavior is unchanged. See module-level docs in ``main.rs`` for the
|
||||
//! overall dispatch order.
|
||||
|
||||
use local_bridge::BridgeRunError;
|
||||
|
||||
pub(crate) struct BridgeCliArgs {
|
||||
pub(crate) host_alias: String,
|
||||
pub(crate) revision: String,
|
||||
pub(crate) remote_helper_path: Option<String>,
|
||||
}
|
||||
|
||||
impl BridgeCliArgs {
|
||||
pub(crate) fn parse(args: &[String]) -> Result<Self, BridgeRunError> {
|
||||
let mut host_alias: Option<String> = None;
|
||||
let mut revision: Option<String> = None;
|
||||
let mut remote_helper_path: Option<String> = None;
|
||||
let mut idx = 0usize;
|
||||
|
||||
while idx < args.len() {
|
||||
match args[idx].as_str() {
|
||||
"--host" => {
|
||||
if let Some(value) = args.get(idx + 1) {
|
||||
host_alias = Some(value.clone());
|
||||
idx += 2;
|
||||
} else {
|
||||
return Err(BridgeRunError::HelperLaunchFailed(
|
||||
"--host requires a value".to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
"--helper-revision" => {
|
||||
if let Some(value) = args.get(idx + 1) {
|
||||
revision = Some(value.clone());
|
||||
idx += 2;
|
||||
} else {
|
||||
return Err(BridgeRunError::HelperLaunchFailed(
|
||||
"--helper-revision requires a value".to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
"--remote-helper-path" => {
|
||||
if let Some(value) = args.get(idx + 1) {
|
||||
remote_helper_path = Some(value.clone());
|
||||
idx += 2;
|
||||
} else {
|
||||
return Err(BridgeRunError::HelperLaunchFailed(
|
||||
"--remote-helper-path requires a value".to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
"--persistent" => {
|
||||
idx += 1;
|
||||
}
|
||||
_ => {
|
||||
idx += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let host_alias = match host_alias {
|
||||
Some(value) if !value.trim().is_empty() => value,
|
||||
_ => {
|
||||
return Err(BridgeRunError::HelperLaunchFailed(
|
||||
"--host is required".to_string(),
|
||||
));
|
||||
}
|
||||
};
|
||||
let revision = match revision {
|
||||
Some(value) if !value.trim().is_empty() => value,
|
||||
_ => {
|
||||
return Err(BridgeRunError::HelperLaunchFailed(
|
||||
"--helper-revision is required".to_string(),
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
host_alias,
|
||||
revision,
|
||||
remote_helper_path,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// The parser runs on every platform so Python can hand the same argv to a
|
||||
// Windows build without tripping "unknown arg" before the stub emits the
|
||||
// "Unix domain sockets only" error. Only the Unix ``run_lsp_stdio`` consumes
|
||||
// the spawn/URI-rewrite fields; on Windows they're parsed for symmetry and
|
||||
// then discarded, which looks like dead code to the compiler.
|
||||
#[cfg_attr(not(unix), allow(dead_code))]
|
||||
pub(crate) struct LspStdioCliArgs {
|
||||
pub(crate) bridge_socket: String,
|
||||
pub(crate) server_id: String,
|
||||
pub(crate) workspace_id: String,
|
||||
pub(crate) spawn_argv: Vec<String>,
|
||||
pub(crate) spawn_cwd: Option<String>,
|
||||
pub(crate) lsp_local_uri_prefix: Option<String>,
|
||||
pub(crate) lsp_remote_uri_prefix: Option<String>,
|
||||
}
|
||||
|
||||
impl LspStdioCliArgs {
|
||||
pub(crate) fn parse(args: &[String]) -> Result<Self, BridgeRunError> {
|
||||
let mut bridge_socket: Option<String> = None;
|
||||
let mut server_id: Option<String> = None;
|
||||
let mut workspace_id: Option<String> = None;
|
||||
let mut spawn_argv: Vec<String> = Vec::new();
|
||||
let mut spawn_cwd: Option<String> = None;
|
||||
let mut lsp_local_uri_prefix: Option<String> = None;
|
||||
let mut lsp_remote_uri_prefix: Option<String> = None;
|
||||
let mut idx = 0usize;
|
||||
while idx < args.len() {
|
||||
match args[idx].as_str() {
|
||||
"--bridge-socket" => {
|
||||
let value = args.get(idx + 1).ok_or_else(|| {
|
||||
BridgeRunError::HelperLaunchFailed(
|
||||
"--bridge-socket requires a value".to_string(),
|
||||
)
|
||||
})?;
|
||||
bridge_socket = Some(value.clone());
|
||||
idx += 2;
|
||||
}
|
||||
"--server-id" => {
|
||||
let value = args.get(idx + 1).ok_or_else(|| {
|
||||
BridgeRunError::HelperLaunchFailed(
|
||||
"--server-id requires a value".to_string(),
|
||||
)
|
||||
})?;
|
||||
server_id = Some(value.clone());
|
||||
idx += 2;
|
||||
}
|
||||
"--workspace-id" => {
|
||||
let value = args.get(idx + 1).ok_or_else(|| {
|
||||
BridgeRunError::HelperLaunchFailed(
|
||||
"--workspace-id requires a value".to_string(),
|
||||
)
|
||||
})?;
|
||||
workspace_id = Some(value.clone());
|
||||
idx += 2;
|
||||
}
|
||||
"--spawn-arg" => {
|
||||
let value = args.get(idx + 1).ok_or_else(|| {
|
||||
BridgeRunError::HelperLaunchFailed(
|
||||
"--spawn-arg requires a value".to_string(),
|
||||
)
|
||||
})?;
|
||||
spawn_argv.push(value.clone());
|
||||
idx += 2;
|
||||
}
|
||||
"--spawn-cwd" => {
|
||||
let value = args.get(idx + 1).ok_or_else(|| {
|
||||
BridgeRunError::HelperLaunchFailed(
|
||||
"--spawn-cwd requires a value".to_string(),
|
||||
)
|
||||
})?;
|
||||
spawn_cwd = Some(value.clone());
|
||||
idx += 2;
|
||||
}
|
||||
"--lsp-local-uri-prefix" => {
|
||||
let value = args.get(idx + 1).ok_or_else(|| {
|
||||
BridgeRunError::HelperLaunchFailed(
|
||||
"--lsp-local-uri-prefix requires a value".to_string(),
|
||||
)
|
||||
})?;
|
||||
lsp_local_uri_prefix = Some(value.clone());
|
||||
idx += 2;
|
||||
}
|
||||
"--lsp-remote-uri-prefix" => {
|
||||
let value = args.get(idx + 1).ok_or_else(|| {
|
||||
BridgeRunError::HelperLaunchFailed(
|
||||
"--lsp-remote-uri-prefix requires a value".to_string(),
|
||||
)
|
||||
})?;
|
||||
lsp_remote_uri_prefix = Some(value.clone());
|
||||
idx += 2;
|
||||
}
|
||||
other => {
|
||||
return Err(BridgeRunError::HelperLaunchFailed(format!(
|
||||
"unknown lsp-stdio argument: {other}"
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
let bridge_socket = bridge_socket
|
||||
.filter(|v| !v.trim().is_empty())
|
||||
.ok_or_else(|| {
|
||||
BridgeRunError::HelperLaunchFailed("--bridge-socket is required".to_string())
|
||||
})?;
|
||||
let server_id = server_id.filter(|v| !v.trim().is_empty()).ok_or_else(|| {
|
||||
BridgeRunError::HelperLaunchFailed("--server-id is required".to_string())
|
||||
})?;
|
||||
let workspace_id = workspace_id
|
||||
.filter(|v| !v.trim().is_empty())
|
||||
.ok_or_else(|| {
|
||||
BridgeRunError::HelperLaunchFailed("--workspace-id is required".to_string())
|
||||
})?;
|
||||
Ok(Self {
|
||||
bridge_socket,
|
||||
server_id,
|
||||
workspace_id,
|
||||
spawn_argv,
|
||||
spawn_cwd,
|
||||
lsp_local_uri_prefix,
|
||||
lsp_remote_uri_prefix,
|
||||
})
|
||||
}
|
||||
}
|
||||
427
rust/crates/local_bridge/src/lsp_stdio.rs
Normal file
427
rust/crates/local_bridge/src/lsp_stdio.rs
Normal file
@@ -0,0 +1,427 @@
|
||||
//! LSP-over-stdio relay loop and the ``lsp-stdio`` subcommand entrypoint.
|
||||
//!
|
||||
//! Three concerns live here:
|
||||
//!
|
||||
//! 1. ``lsp_transform_message`` — the pure per-frame transform: URI rewrite
|
||||
//! (direction picked via [`LspMessageFlow`]) and optional spawn-hint
|
||||
//! injection on the very first frame. Tested in this module.
|
||||
//!
|
||||
//! 2. ``broker_lsp_relay_loop`` — the inner per-client loop the persistent
|
||||
//! broker (in ``persistent.rs``) hands an attached ``IpcStream`` to. It
|
||||
//! pumps framed LSP messages through the helper via ``HelperDispatcher``,
|
||||
//! delegating each frame's transform to ``lsp_transform_message``.
|
||||
//!
|
||||
//! 3. ``run_lsp_stdio`` — the ``lsp-stdio`` subcommand: a thin client that
|
||||
//! connects to a running broker socket, sends an attach handshake, and
|
||||
//! then proxies ``stdin``↔socket↔``stdout``. Unix-only in practice; the
|
||||
//! Windows variant returns a "not supported" error after parsing argv
|
||||
//! for symmetry.
|
||||
//!
|
||||
//! Cut out of ``main.rs`` during a code-organization split; behavior is
|
||||
//! unchanged.
|
||||
|
||||
use crate::cli::LspStdioCliArgs;
|
||||
#[cfg(unix)]
|
||||
use crate::persistent::{HelperDispatcher, lsp_response_body_to_framed_string};
|
||||
#[cfg(unix)]
|
||||
use interprocess::local_socket::Stream as IpcStream;
|
||||
#[cfg(unix)]
|
||||
use local_bridge::lsp_uri_rewrite::rewrite_uri_strings;
|
||||
use local_bridge::{BridgeRunError, bridge_diag_event};
|
||||
use serde::Serialize;
|
||||
use serde_json::json;
|
||||
#[cfg(unix)]
|
||||
use session_protocol::RequestEnvelope;
|
||||
#[cfg(unix)]
|
||||
use std::io::{BufRead, Write};
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::net::UnixStream;
|
||||
#[cfg(unix)]
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
|
||||
#[cfg(unix)]
|
||||
pub(crate) struct BrokerLspRelayCfg {
|
||||
pub(crate) dispatcher: HelperDispatcher,
|
||||
pub(crate) server_id: String,
|
||||
pub(crate) workspace_id: String,
|
||||
pub(crate) spawn_argv: Option<Vec<String>>,
|
||||
pub(crate) spawn_cwd: Option<String>,
|
||||
pub(crate) uri_rewrite: Option<(String, String)>,
|
||||
}
|
||||
|
||||
/// Direction of a single LSP frame across the broker boundary.
|
||||
///
|
||||
/// Made explicit at the type level so the URI-rewrite step picks the right
|
||||
/// (from, to) pair without the call site having to remember the convention.
|
||||
#[cfg(unix)]
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
pub(crate) enum LspMessageFlow {
|
||||
/// Editor-side process wrote a frame heading to the helper-hosted server.
|
||||
LocalToBroker,
|
||||
/// Helper-hosted server returned a frame heading back to the editor.
|
||||
BrokerToLocal,
|
||||
}
|
||||
|
||||
/// Spawn-argv hint injected into the very first ``LocalToBroker`` frame so
|
||||
/// the helper knows which binary + cwd to launch for this LSP session.
|
||||
#[cfg(unix)]
|
||||
pub(crate) struct LspSpawnInjection<'a> {
|
||||
pub(crate) argv: &'a [String],
|
||||
pub(crate) cwd: Option<&'a str>,
|
||||
}
|
||||
|
||||
/// Pure-function transform for one LSP frame.
|
||||
///
|
||||
/// Mutates ``body`` in place: applies URI rewriting when ``uri_rewrite`` is
|
||||
/// ``Some`` (picking the direction implied by ``flow``) and, if
|
||||
/// ``spawn_injection`` is supplied, attaches a ``_sessions_lsp_spawn`` object
|
||||
/// to the JSON object root. The function performs no I/O and never inspects
|
||||
/// the ``first``-message flag — the caller decides whether to pass
|
||||
/// ``Some(LspSpawnInjection)`` at most once, which preserves the existing
|
||||
/// idempotency guarantee.
|
||||
///
|
||||
/// Returns the same ``method`` hint that the prior inline implementation
|
||||
/// emitted on the ``bridge.rust.lsp_stdio_broker_out`` diagnostic, so the
|
||||
/// transport relay does not need to re-inspect ``body``.
|
||||
#[cfg(unix)]
|
||||
pub(crate) fn lsp_transform_message(
|
||||
flow: LspMessageFlow,
|
||||
body: &mut serde_json::Value,
|
||||
uri_rewrite: Option<(&str, &str)>,
|
||||
spawn_injection: Option<LspSpawnInjection<'_>>,
|
||||
) -> String {
|
||||
if let Some((local_p, remote_p)) = uri_rewrite {
|
||||
let (from, to) = match flow {
|
||||
LspMessageFlow::LocalToBroker => (local_p, remote_p),
|
||||
LspMessageFlow::BrokerToLocal => (remote_p, local_p),
|
||||
};
|
||||
rewrite_uri_strings(body, from, to);
|
||||
}
|
||||
if let Some(spawn) = spawn_injection
|
||||
&& !spawn.argv.is_empty()
|
||||
&& let Some(obj) = body.as_object_mut()
|
||||
{
|
||||
obj.insert(
|
||||
"_sessions_lsp_spawn".to_string(),
|
||||
json!({
|
||||
"argv": spawn.argv,
|
||||
"cwd": spawn.cwd,
|
||||
}),
|
||||
);
|
||||
}
|
||||
body.get("method")
|
||||
.and_then(|m| m.as_str())
|
||||
.unwrap_or("(response-or-notification)")
|
||||
.to_string()
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
pub(crate) fn broker_lsp_relay_loop(
|
||||
mut reader: std::io::BufReader<IpcStream>,
|
||||
writer: &mut IpcStream,
|
||||
cfg: BrokerLspRelayCfg,
|
||||
) -> Result<(), BridgeRunError> {
|
||||
use std::io::ErrorKind;
|
||||
let BrokerLspRelayCfg {
|
||||
dispatcher,
|
||||
server_id,
|
||||
workspace_id,
|
||||
spawn_argv,
|
||||
spawn_cwd,
|
||||
uri_rewrite,
|
||||
} = cfg;
|
||||
let channel = format!("lsp:{server_id}");
|
||||
let seq = AtomicU64::new(0);
|
||||
let mut first = true;
|
||||
bridge_diag_event(
|
||||
"bridge.rust.lsp_stdio_broker_session",
|
||||
json!({
|
||||
"server_id": server_id,
|
||||
"workspace_id": workspace_id,
|
||||
"uri_rewrite": uri_rewrite.is_some(),
|
||||
}),
|
||||
);
|
||||
let uri_rewrite_pair = uri_rewrite
|
||||
.as_ref()
|
||||
.map(|(loc, rem)| (loc.as_str(), rem.as_str()));
|
||||
loop {
|
||||
let payload = match session_protocol::read_lsp_message(&mut reader) {
|
||||
Ok(text) => text,
|
||||
Err(error) if error.kind() == ErrorKind::UnexpectedEof => break,
|
||||
Err(error) => return Err(error.into()),
|
||||
};
|
||||
let mut body: serde_json::Value =
|
||||
serde_json::from_str(&payload).map_err(BridgeRunError::Json)?;
|
||||
let spawn_injection = if first {
|
||||
spawn_argv
|
||||
.as_deref()
|
||||
.filter(|argv| !argv.is_empty())
|
||||
.map(|argv| LspSpawnInjection {
|
||||
argv,
|
||||
cwd: spawn_cwd.as_deref(),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let method_hint = lsp_transform_message(
|
||||
LspMessageFlow::LocalToBroker,
|
||||
&mut body,
|
||||
uri_rewrite_pair,
|
||||
spawn_injection,
|
||||
);
|
||||
first = false;
|
||||
bridge_diag_event(
|
||||
"bridge.rust.lsp_stdio_broker_out",
|
||||
json!({
|
||||
"server_id": server_id,
|
||||
"method": method_hint,
|
||||
"payload_chars": payload.len(),
|
||||
}),
|
||||
);
|
||||
let envelope_id = format!(
|
||||
"lsp-broker-{}-{}-{}",
|
||||
workspace_id,
|
||||
server_id,
|
||||
seq.fetch_add(1, Ordering::Relaxed)
|
||||
);
|
||||
let envelope = RequestEnvelope {
|
||||
id: envelope_id,
|
||||
method: session_protocol::METHOD_CHANNEL_DISPATCH.to_string(),
|
||||
params: json!({
|
||||
"v": session_protocol::CHANNEL_ENVELOPE_V1,
|
||||
"channel": channel,
|
||||
"kind": session_protocol::CHANNEL_KIND_REQUEST,
|
||||
"body": body,
|
||||
}),
|
||||
timeout_ms: 120_000,
|
||||
trace: session_protocol::TraceLevel::Info,
|
||||
};
|
||||
let mut result = dispatcher.request_blocking(&envelope)?;
|
||||
let _ = lsp_transform_message(
|
||||
LspMessageFlow::BrokerToLocal,
|
||||
&mut result,
|
||||
uri_rewrite_pair,
|
||||
None,
|
||||
);
|
||||
let out = lsp_response_body_to_framed_string(&result)?;
|
||||
bridge_diag_event(
|
||||
"bridge.rust.lsp_stdio_broker_in",
|
||||
json!({
|
||||
"server_id": server_id,
|
||||
"response_chars": out.len(),
|
||||
}),
|
||||
);
|
||||
session_protocol::write_lsp_message(writer, &out).map_err(BridgeRunError::Io)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
pub(crate) fn run_lsp_stdio(args: &[String]) -> Result<(), BridgeRunError> {
|
||||
let cli = LspStdioCliArgs::parse(args)?;
|
||||
bridge_diag_event(
|
||||
"bridge.rust.lsp_stdio_start",
|
||||
json!({
|
||||
"server_id": cli.server_id,
|
||||
"workspace_id": cli.workspace_id,
|
||||
"spawn_argc": cli.spawn_argv.len(),
|
||||
"spawn_cwd_set": cli.spawn_cwd.as_ref().is_some_and(|s| !s.trim().is_empty()),
|
||||
"uri_rewrite_set": cli.lsp_local_uri_prefix.is_some()
|
||||
&& cli.lsp_remote_uri_prefix.is_some(),
|
||||
}),
|
||||
);
|
||||
let mut stream = UnixStream::connect(&cli.bridge_socket)?;
|
||||
let mut attach = json!({
|
||||
"kind": "attach",
|
||||
"server_id": cli.server_id,
|
||||
"workspace_id": cli.workspace_id,
|
||||
});
|
||||
if let Some(obj) = attach.as_object_mut() {
|
||||
let argv_opt = (!cli.spawn_argv.is_empty()).then(|| cli.spawn_argv.clone());
|
||||
json_insert_optional(obj, "argv", argv_opt).map_err(BridgeRunError::Json)?;
|
||||
let cwd_opt = cli
|
||||
.spawn_cwd
|
||||
.as_ref()
|
||||
.map(|s| s.trim())
|
||||
.filter(|s| !s.is_empty());
|
||||
json_insert_optional(obj, "cwd", cwd_opt).map_err(BridgeRunError::Json)?;
|
||||
let local_prefix_opt = cli
|
||||
.lsp_local_uri_prefix
|
||||
.as_ref()
|
||||
.map(|s| s.trim())
|
||||
.filter(|s| !s.is_empty());
|
||||
json_insert_optional(obj, "lsp_local_uri_prefix", local_prefix_opt)
|
||||
.map_err(BridgeRunError::Json)?;
|
||||
let remote_prefix_opt = cli
|
||||
.lsp_remote_uri_prefix
|
||||
.as_ref()
|
||||
.map(|s| s.trim())
|
||||
.filter(|s| !s.is_empty());
|
||||
json_insert_optional(obj, "lsp_remote_uri_prefix", remote_prefix_opt)
|
||||
.map_err(BridgeRunError::Json)?;
|
||||
}
|
||||
writeln!(
|
||||
stream,
|
||||
"{}",
|
||||
serde_json::to_string(&attach).map_err(BridgeRunError::Json)?
|
||||
)?;
|
||||
stream.flush()?;
|
||||
|
||||
let stream_for_read = stream.try_clone()?;
|
||||
let mut ack_reader = std::io::BufReader::new(stream_for_read);
|
||||
let mut ack_line = String::new();
|
||||
ack_reader.read_line(&mut ack_line)?;
|
||||
let ack: crate::persistent::BrokerAttachResponse =
|
||||
serde_json::from_str(ack_line.trim()).map_err(BridgeRunError::Json)?;
|
||||
if !ack.ok {
|
||||
return Err(BridgeRunError::HelperLaunchFailed(
|
||||
ack.error
|
||||
.unwrap_or_else(|| "broker attach failed".to_string()),
|
||||
));
|
||||
}
|
||||
bridge_diag_event(
|
||||
"bridge.rust.lsp_stdio_attach_ok",
|
||||
json!({
|
||||
"server_id": cli.server_id,
|
||||
"workspace_id": cli.workspace_id,
|
||||
}),
|
||||
);
|
||||
|
||||
let mut stream_writer = stream.try_clone()?;
|
||||
let writer_handle = std::thread::spawn(move || {
|
||||
let mut stdin = std::io::stdin().lock();
|
||||
let _ = std::io::copy(&mut stdin, &mut stream_writer);
|
||||
});
|
||||
|
||||
let mut stdout = std::io::stdout().lock();
|
||||
let mut stream_reader = stream;
|
||||
std::io::copy(&mut stream_reader, &mut stdout)?;
|
||||
let _ = writer_handle.join();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(not(unix))]
|
||||
pub(crate) fn run_lsp_stdio(args: &[String]) -> Result<(), BridgeRunError> {
|
||||
let cli = LspStdioCliArgs::parse(args)?;
|
||||
Err(BridgeRunError::HelperLaunchFailed(format!(
|
||||
"lsp-stdio mode currently requires Unix domain sockets \
|
||||
(bridge_socket={}, server_id={}, workspace_id={})",
|
||||
cli.bridge_socket, cli.server_id, cli.workspace_id,
|
||||
)))
|
||||
}
|
||||
|
||||
/// Insert ``value`` into ``obj`` under ``key`` only when ``value`` is ``Some``.
|
||||
///
|
||||
/// The helper does not interpret the inner ``T``; callers that want to skip
|
||||
/// empty / whitespace-only strings should pre-trim and filter to ``None`` at
|
||||
/// the call site (the existing call sites differ in whether they perform that
|
||||
/// extra check, so keeping the helper minimal preserves their individual
|
||||
/// semantics).
|
||||
#[cfg_attr(not(unix), allow(dead_code))]
|
||||
fn json_insert_optional<T: Serialize>(
|
||||
obj: &mut serde_json::Map<String, serde_json::Value>,
|
||||
key: &str,
|
||||
value: Option<T>,
|
||||
) -> Result<(), serde_json::Error> {
|
||||
if let Some(inner) = value {
|
||||
let encoded = serde_json::to_value(inner)?;
|
||||
obj.insert(key.to_string(), encoded);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[cfg(unix)]
|
||||
#[test]
|
||||
fn lsp_transform_rewrites_text_document_uri_local_to_broker() {
|
||||
let mut body = json!({
|
||||
"jsonrpc": "2.0",
|
||||
"method": "textDocument/didOpen",
|
||||
"params": {
|
||||
"textDocument": {
|
||||
"uri": "file:///local/cache/ws/src/main.rs",
|
||||
"languageId": "rust"
|
||||
}
|
||||
}
|
||||
});
|
||||
let local = "file:///local/cache/ws";
|
||||
let remote = "file:///remote/proj";
|
||||
let method = lsp_transform_message(
|
||||
LspMessageFlow::LocalToBroker,
|
||||
&mut body,
|
||||
Some((local, remote)),
|
||||
None,
|
||||
);
|
||||
assert_eq!(method, "textDocument/didOpen");
|
||||
assert_eq!(
|
||||
body.pointer("/params/textDocument/uri")
|
||||
.and_then(|v| v.as_str()),
|
||||
Some("file:///remote/proj/src/main.rs"),
|
||||
);
|
||||
// Reverse direction undoes the rewrite.
|
||||
let _ = lsp_transform_message(
|
||||
LspMessageFlow::BrokerToLocal,
|
||||
&mut body,
|
||||
Some((local, remote)),
|
||||
None,
|
||||
);
|
||||
assert_eq!(
|
||||
body.pointer("/params/textDocument/uri")
|
||||
.and_then(|v| v.as_str()),
|
||||
Some("file:///local/cache/ws/src/main.rs"),
|
||||
);
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[test]
|
||||
fn lsp_transform_spawn_injection_idempotent_via_caller_gating() {
|
||||
let argv = vec!["rust-analyzer".to_string(), "--stdio".to_string()];
|
||||
let cwd = "/remote/proj".to_string();
|
||||
let mut body = json!({
|
||||
"jsonrpc": "2.0",
|
||||
"id": 1,
|
||||
"method": "initialize",
|
||||
"params": {}
|
||||
});
|
||||
|
||||
// Caller passes Some(...) only on the first frame.
|
||||
let _ = lsp_transform_message(
|
||||
LspMessageFlow::LocalToBroker,
|
||||
&mut body,
|
||||
None,
|
||||
Some(LspSpawnInjection {
|
||||
argv: &argv,
|
||||
cwd: Some(cwd.as_str()),
|
||||
}),
|
||||
);
|
||||
let spawn = body.get("_sessions_lsp_spawn");
|
||||
assert!(spawn.is_some(), "first transform must inject spawn hint");
|
||||
assert_eq!(
|
||||
spawn
|
||||
.and_then(|s| s.get("argv"))
|
||||
.and_then(|v| v.as_array())
|
||||
.map(|a| a.len()),
|
||||
Some(2)
|
||||
);
|
||||
assert_eq!(
|
||||
spawn.and_then(|s| s.get("cwd")).and_then(|v| v.as_str()),
|
||||
Some("/remote/proj")
|
||||
);
|
||||
|
||||
// Second frame: caller passes None, so no further injection.
|
||||
let mut body2 = json!({
|
||||
"jsonrpc": "2.0",
|
||||
"id": 2,
|
||||
"method": "textDocument/didChange",
|
||||
"params": {}
|
||||
});
|
||||
let _ = lsp_transform_message(LspMessageFlow::LocalToBroker, &mut body2, None, None);
|
||||
assert!(
|
||||
body2.get("_sessions_lsp_spawn").is_none(),
|
||||
"subsequent transforms must not re-inject spawn hint"
|
||||
);
|
||||
}
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
211
rust/crates/local_bridge/src/mirror.rs
Normal file
211
rust/crates/local_bridge/src/mirror.rs
Normal file
@@ -0,0 +1,211 @@
|
||||
//! ``mirror-sync`` request handler.
|
||||
//!
|
||||
//! ``mirror-sync`` is one of the few request methods the bridge handles
|
||||
//! itself instead of forwarding to the helper: it walks remote directories
|
||||
//! via ``tree-list`` (which IS forwarded) and materializes a local cache
|
||||
//! shadow. The dispatch lives in ``persistent::run_persistent``; everything
|
||||
//! reachable from there hangs off this module.
|
||||
//!
|
||||
//! Cut out of ``main.rs`` during a code-organization split; behavior is
|
||||
//! unchanged.
|
||||
|
||||
use crate::persistent::HelperDispatcher;
|
||||
use local_bridge::{BridgeCliOutput, BridgeRunError};
|
||||
use serde_json::json;
|
||||
use session_protocol::{ErrorEnvelope, RequestEnvelope};
|
||||
use std::sync::Arc;
|
||||
|
||||
pub(crate) const MIRROR_SYNC_METHOD: &str = "mirror-sync";
|
||||
|
||||
/// Parameters for `mirror-sync` (bridge-handled, not forwarded to helper).
|
||||
#[derive(serde::Deserialize)]
|
||||
pub(crate) struct MirrorSyncParams {
|
||||
pub(crate) remote_root: String,
|
||||
pub(crate) local_files_root: String,
|
||||
#[serde(default)]
|
||||
pub(crate) max_traversal_depth: Option<usize>,
|
||||
#[serde(default)]
|
||||
pub(crate) max_entries: Option<usize>,
|
||||
#[serde(default)]
|
||||
pub(crate) include_files: Option<bool>,
|
||||
#[serde(default)]
|
||||
pub(crate) ignore_patterns: Option<Vec<String>>,
|
||||
#[serde(default)]
|
||||
pub(crate) prune_missing: Option<bool>,
|
||||
#[serde(default)]
|
||||
pub(crate) max_dir_fanout: Option<usize>,
|
||||
#[serde(default)]
|
||||
pub(crate) writes_per_second_cap: Option<u32>,
|
||||
#[serde(default)]
|
||||
pub(crate) consecutive_failure_budget: Option<u32>,
|
||||
}
|
||||
|
||||
impl From<MirrorSyncParams> for local_bridge::remote_cache_mirror::RemoteCacheMirrorOptions {
|
||||
fn from(params: MirrorSyncParams) -> Self {
|
||||
let mut opts = Self::default();
|
||||
if let Some(v) = params.max_traversal_depth {
|
||||
opts.max_traversal_depth = v;
|
||||
}
|
||||
if let Some(v) = params.max_entries {
|
||||
opts.max_entries = v;
|
||||
}
|
||||
if let Some(v) = params.include_files {
|
||||
opts.include_files = v;
|
||||
}
|
||||
if let Some(v) = params.ignore_patterns {
|
||||
opts.ignore_patterns = v;
|
||||
}
|
||||
if let Some(v) = params.prune_missing {
|
||||
opts.prune_missing = v;
|
||||
}
|
||||
if let Some(v) = params.max_dir_fanout {
|
||||
opts.max_dir_fanout = v;
|
||||
}
|
||||
if let Some(v) = params.writes_per_second_cap {
|
||||
opts.writes_per_second_cap = v;
|
||||
}
|
||||
if let Some(v) = params.consecutive_failure_budget {
|
||||
opts.consecutive_failure_budget = v;
|
||||
}
|
||||
opts
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn handle_mirror_sync(
|
||||
dispatcher: &HelperDispatcher,
|
||||
envelope: &RequestEnvelope,
|
||||
) -> BridgeCliOutput {
|
||||
let params: MirrorSyncParams = match serde_json::from_value(envelope.params.clone()) {
|
||||
Ok(p) => p,
|
||||
Err(e) => {
|
||||
return BridgeCliOutput {
|
||||
ok: false,
|
||||
id: Some(envelope.id.clone()),
|
||||
result: None,
|
||||
error: Some(ErrorEnvelope {
|
||||
id: Some(envelope.id.clone()),
|
||||
code: "invalid_params".to_string(),
|
||||
message: format!("mirror-sync params: {e}"),
|
||||
retryable: false,
|
||||
}),
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
let local_root = std::path::PathBuf::from(¶ms.local_files_root);
|
||||
let remote_root = params.remote_root.clone();
|
||||
let opts: local_bridge::remote_cache_mirror::RemoteCacheMirrorOptions = params.into();
|
||||
let req_id_counter = Arc::new(std::sync::atomic::AtomicU64::new(0));
|
||||
let disp = dispatcher.clone();
|
||||
|
||||
let result = local_bridge::remote_cache_mirror::mirror_remote_tree_to_local_cache(
|
||||
|_host, remote_dir| {
|
||||
let seq = req_id_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
let tree_req = RequestEnvelope {
|
||||
id: format!("{}_tree_{seq}", envelope.id),
|
||||
method: session_protocol::METHOD_TREE_LIST.to_string(),
|
||||
params: json!({ "remote_directory": remote_dir }),
|
||||
timeout_ms: 30_000,
|
||||
trace: session_protocol::TraceLevel::Info,
|
||||
};
|
||||
let value = disp
|
||||
.request_blocking(&tree_req)
|
||||
.map_err(|e: BridgeRunError| e.to_string())?;
|
||||
let tree_result: session_protocol::TreeListResult =
|
||||
serde_json::from_value(value).map_err(|e| e.to_string())?;
|
||||
Ok(tree_result
|
||||
.entries
|
||||
.into_iter()
|
||||
.map(tree_list_entry_to_mirror)
|
||||
.collect())
|
||||
},
|
||||
"",
|
||||
&remote_root,
|
||||
&local_root,
|
||||
&opts,
|
||||
);
|
||||
|
||||
BridgeCliOutput {
|
||||
ok: result.ok(),
|
||||
id: Some(envelope.id.clone()),
|
||||
result: Some(json!({
|
||||
"directories_created": result.directories_created,
|
||||
"file_placeholders_created": result.file_placeholders_created,
|
||||
"entries_scanned": result.entries_scanned,
|
||||
"truncated_by_entry_limit": result.truncated_by_entry_limit,
|
||||
"entries_pruned": result.entries_pruned,
|
||||
"error_detail": result.error_detail,
|
||||
"deferred_directories": result.deferred_directories,
|
||||
"aborted_by_failure_budget": result.aborted_by_failure_budget,
|
||||
})),
|
||||
error: if result.ok() {
|
||||
None
|
||||
} else {
|
||||
Some(ErrorEnvelope {
|
||||
id: Some(envelope.id.clone()),
|
||||
code: "mirror_sync_failed".to_string(),
|
||||
message: result
|
||||
.error_detail
|
||||
.unwrap_or_else(|| "unknown mirror error".to_string()),
|
||||
retryable: true,
|
||||
})
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn tree_list_entry_to_mirror(
|
||||
e: session_protocol::TreeListEntry,
|
||||
) -> local_bridge::remote_cache_mirror::RemoteDirectoryEntry {
|
||||
use local_bridge::remote_cache_mirror::{RemoteDirectoryEntry, RemoteFileKind};
|
||||
let kind = match e.kind {
|
||||
session_protocol::RemoteFileKind::RegularFile => RemoteFileKind::RegularFile,
|
||||
session_protocol::RemoteFileKind::Directory => RemoteFileKind::Directory,
|
||||
session_protocol::RemoteFileKind::Symlink => RemoteFileKind::Symlink,
|
||||
session_protocol::RemoteFileKind::Other => RemoteFileKind::Other,
|
||||
};
|
||||
RemoteDirectoryEntry {
|
||||
name: e.name,
|
||||
remote_absolute_path: e.remote_absolute_path,
|
||||
kind,
|
||||
is_symlink_loop: e.is_symlink_loop,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use local_bridge::remote_cache_mirror::RemoteCacheMirrorOptions;
|
||||
|
||||
#[test]
|
||||
fn mirror_sync_params_partial_overrides_preserve_defaults() {
|
||||
let defaults = RemoteCacheMirrorOptions::default();
|
||||
let params = MirrorSyncParams {
|
||||
remote_root: "/srv/proj".to_string(),
|
||||
local_files_root: "/cache/proj".to_string(),
|
||||
max_traversal_depth: Some(3),
|
||||
max_entries: None,
|
||||
include_files: Some(false),
|
||||
ignore_patterns: Some(vec!["target".to_string(), "node_modules".to_string()]),
|
||||
prune_missing: None,
|
||||
max_dir_fanout: None,
|
||||
writes_per_second_cap: Some(7),
|
||||
consecutive_failure_budget: None,
|
||||
};
|
||||
let opts: RemoteCacheMirrorOptions = params.into();
|
||||
|
||||
// Overridden fields take the explicit values.
|
||||
assert_eq!(opts.max_traversal_depth, 3);
|
||||
assert!(!opts.include_files);
|
||||
assert_eq!(opts.ignore_patterns, vec!["target", "node_modules"]);
|
||||
assert_eq!(opts.writes_per_second_cap, 7);
|
||||
|
||||
// Unset fields keep the struct defaults.
|
||||
assert_eq!(opts.max_entries, defaults.max_entries);
|
||||
assert_eq!(opts.prune_missing, defaults.prune_missing);
|
||||
assert_eq!(opts.max_dir_fanout, defaults.max_dir_fanout);
|
||||
assert_eq!(
|
||||
opts.consecutive_failure_budget,
|
||||
defaults.consecutive_failure_budget
|
||||
);
|
||||
}
|
||||
}
|
||||
539
rust/crates/local_bridge/src/persistent.rs
Normal file
539
rust/crates/local_bridge/src/persistent.rs
Normal file
@@ -0,0 +1,539 @@
|
||||
//! Persistent (long-lived) bridge mode: a single helper process serving many
|
||||
//! requests, plus a local-socket broker that lets ``lsp-stdio`` clients attach
|
||||
//! to running language-server channels.
|
||||
//!
|
||||
//! The orchestration is:
|
||||
//!
|
||||
//! - ``run_persistent`` brings up the SSH helper, prints the handshake banner
|
||||
//! on Python-stdout, and runs two halves: a *collector* thread that demuxes
|
||||
//! helper responses by id, and the main *forwarder* loop that ingests JSON
|
||||
//! request envelopes from Python-stdin.
|
||||
//! - ``HelperDispatcher`` is the synchronization point both halves share: it
|
||||
//! registers in-flight ids, writes framed messages to the helper's stdin,
|
||||
//! and routes responses back via ``mpsc`` channels. ``mirror.rs`` and
|
||||
//! ``lsp_stdio.rs`` each take a clone for their own dispatch flows.
|
||||
//! - ``PersistentBroker`` owns a local-socket listener (``AF_UNIX`` on Unix,
|
||||
//! Named Pipe on Windows via ``interprocess``) so external ``lsp-stdio``
|
||||
//! children can ``attach`` to a running helper session.
|
||||
//!
|
||||
//! Cut out of ``main.rs`` during a code-organization split; behavior is
|
||||
//! unchanged.
|
||||
|
||||
use crate::cli::BridgeCliArgs;
|
||||
use crate::lsp_stdio::{BrokerLspRelayCfg, broker_lsp_relay_loop};
|
||||
use crate::mirror::{MIRROR_SYNC_METHOD, handle_mirror_sync};
|
||||
use crate::write_bridge_output;
|
||||
use interprocess::TryClone;
|
||||
use interprocess::local_socket::{
|
||||
GenericFilePath, Listener as IpcListener, ListenerNonblockingMode, ListenerOptions,
|
||||
Stream as IpcStream, ToFsName, traits::Listener as IpcListenerTrait,
|
||||
};
|
||||
use local_bridge::{
|
||||
BridgeCliOutput, BridgeRunError, bridge_diag_event, default_remote_helper_path,
|
||||
protocol_message_kind, with_helper_session_handshake,
|
||||
};
|
||||
use serde_json::json;
|
||||
use session_protocol::{ErrorEnvelope, ProtocolMessage, RequestEnvelope};
|
||||
use std::collections::HashMap;
|
||||
use std::fs;
|
||||
#[cfg(unix)]
|
||||
use std::io::BufRead;
|
||||
use std::io::Write;
|
||||
use std::path::PathBuf;
|
||||
use std::process::ChildStdin;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
/// Shared handle for sending requests to the helper and receiving responses.
|
||||
///
|
||||
/// Used by both the main forwarder (Python → helper) and the mirror thread
|
||||
/// (bridge-internal tree/list → helper).
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct HelperDispatcher {
|
||||
pub(crate) helper_stdin: Arc<Mutex<ChildStdin>>,
|
||||
pub(crate) pending: Arc<Mutex<HashMap<String, mpsc::Sender<BridgeCliOutput>>>>,
|
||||
}
|
||||
|
||||
impl HelperDispatcher {
|
||||
pub(crate) fn request_blocking(
|
||||
&self,
|
||||
envelope: &RequestEnvelope,
|
||||
) -> Result<serde_json::Value, BridgeRunError> {
|
||||
let (tx, rx) = mpsc::channel();
|
||||
self.pending
|
||||
.lock()
|
||||
.unwrap_or_else(|e| e.into_inner())
|
||||
.insert(envelope.id.clone(), tx);
|
||||
self.forward_to_helper(envelope)?;
|
||||
|
||||
let timeout = std::time::Duration::from_millis(envelope.timeout_ms.clamp(1000, 120_000));
|
||||
match rx.recv_timeout(timeout) {
|
||||
Ok(out) if out.ok => Ok(out.result.unwrap_or(serde_json::Value::Null)),
|
||||
Ok(out) => {
|
||||
let err = out.error.unwrap_or_else(|| ErrorEnvelope {
|
||||
id: Some(envelope.id.clone()),
|
||||
code: "unknown".to_string(),
|
||||
message: "helper returned failure without error detail".to_string(),
|
||||
retryable: true,
|
||||
});
|
||||
Err(BridgeRunError::HelperError(err))
|
||||
}
|
||||
Err(_) => {
|
||||
self.pending
|
||||
.lock()
|
||||
.unwrap_or_else(|e| e.into_inner())
|
||||
.remove(&envelope.id);
|
||||
Err(BridgeRunError::HelperLaunchFailed(format!(
|
||||
"helper response timed out after {:.1}s",
|
||||
timeout.as_secs_f32()
|
||||
)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn forward_to_helper(
|
||||
&self,
|
||||
envelope: &RequestEnvelope,
|
||||
) -> Result<(), BridgeRunError> {
|
||||
let encoded =
|
||||
session_protocol::encode_message(&ProtocolMessage::Request(envelope.clone()))?;
|
||||
let mut guard = self.helper_stdin.lock().unwrap_or_else(|e| e.into_inner());
|
||||
guard.write_all(encoded.as_bytes())?;
|
||||
guard.flush()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn deliver(&self, id: &str, out: BridgeCliOutput) -> Option<BridgeCliOutput> {
|
||||
let sender = self
|
||||
.pending
|
||||
.lock()
|
||||
.unwrap_or_else(|e| e.into_inner())
|
||||
.remove(id);
|
||||
match sender {
|
||||
Some(tx) => {
|
||||
let _ = tx.send(out);
|
||||
None
|
||||
}
|
||||
None => Some(out),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn run_persistent(args: &[String]) -> Result<(), BridgeRunError> {
|
||||
let cli = BridgeCliArgs::parse(args)?;
|
||||
let default_remote_helper_path = default_remote_helper_path();
|
||||
let remote_helper_path = cli
|
||||
.remote_helper_path
|
||||
.as_deref()
|
||||
.unwrap_or(default_remote_helper_path.as_str());
|
||||
|
||||
with_helper_session_handshake(
|
||||
&cli.host_alias,
|
||||
remote_helper_path,
|
||||
&cli.revision,
|
||||
session_protocol::TraceLevel::Info,
|
||||
|session, handshake| {
|
||||
let py_stdout = Arc::new(Mutex::new(std::io::stdout()));
|
||||
|
||||
let dispatcher = HelperDispatcher {
|
||||
helper_stdin: Arc::new(Mutex::new(session.take_stdin()?)),
|
||||
pending: Arc::new(Mutex::new(HashMap::new())),
|
||||
};
|
||||
|
||||
#[cfg(unix)]
|
||||
let (_broker_keepalive, broker_socket_str) = {
|
||||
let broker = PersistentBroker::start(&cli.host_alias, dispatcher.clone())?;
|
||||
let path = broker.socket_path_str();
|
||||
(broker, path)
|
||||
};
|
||||
#[cfg(not(unix))]
|
||||
let broker_socket_str = String::new();
|
||||
|
||||
let handshake_info = BridgeCliOutput {
|
||||
ok: true,
|
||||
id: None,
|
||||
result: Some(json!({
|
||||
"handshake": {
|
||||
"remote_home": handshake.remote_home,
|
||||
"arch": handshake.arch,
|
||||
"helper_version": handshake.helper_version,
|
||||
"remote_platform": format!("{:?}", handshake.remote_platform),
|
||||
"capabilities": handshake.capabilities.iter()
|
||||
.map(|c| format!("{:?}", c))
|
||||
.collect::<Vec<_>>(),
|
||||
"broker_socket": broker_socket_str,
|
||||
}
|
||||
})),
|
||||
error: None,
|
||||
};
|
||||
write_bridge_output(&py_stdout, &handshake_info)?;
|
||||
|
||||
// Collector thread: helper stdout → dispatch by id or pass to Python.
|
||||
let messages = session.take_messages()?;
|
||||
let dispatcher_for_collector = dispatcher.clone();
|
||||
let py_stdout_collector = Arc::clone(&py_stdout);
|
||||
let collector_handle = std::thread::spawn(move || {
|
||||
loop {
|
||||
let msg = match messages.recv() {
|
||||
Ok(Ok(msg)) => msg,
|
||||
Ok(Err(e)) => {
|
||||
bridge_diag_event(
|
||||
"bridge.rust.collector_error",
|
||||
json!({ "detail": e.to_string() }),
|
||||
);
|
||||
break;
|
||||
}
|
||||
Err(_) => break,
|
||||
};
|
||||
let (id, out) = match msg {
|
||||
ProtocolMessage::Response(resp) => {
|
||||
let id = resp.id.clone();
|
||||
(
|
||||
id.clone(),
|
||||
BridgeCliOutput {
|
||||
ok: true,
|
||||
id: Some(id),
|
||||
result: Some(resp.result),
|
||||
error: None,
|
||||
},
|
||||
)
|
||||
}
|
||||
ProtocolMessage::Error(err) => {
|
||||
let id = err.id.clone().unwrap_or_default();
|
||||
(
|
||||
id.clone(),
|
||||
BridgeCliOutput {
|
||||
ok: false,
|
||||
id: Some(id),
|
||||
result: None,
|
||||
error: Some(err),
|
||||
},
|
||||
)
|
||||
}
|
||||
ProtocolMessage::Shutdown(_) => break,
|
||||
other => {
|
||||
bridge_diag_event(
|
||||
"bridge.rust.collector_unexpected",
|
||||
json!({ "kind": protocol_message_kind(&other) }),
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
if let Some(passthrough) = dispatcher_for_collector.deliver(&id, out)
|
||||
&& write_bridge_output(&py_stdout_collector, &passthrough).is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Main thread: Python stdin → helper or bridge-handled commands.
|
||||
let py_stdin = std::io::stdin();
|
||||
let input = std::io::BufRead::lines(std::io::BufReader::new(py_stdin.lock()));
|
||||
for line in input {
|
||||
let raw = match line {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
let out = BridgeCliOutput {
|
||||
ok: false,
|
||||
id: None,
|
||||
result: None,
|
||||
error: Some(ErrorEnvelope {
|
||||
id: None,
|
||||
code: "bridge_stdin_error".to_string(),
|
||||
message: format!("stdin read failed: {e}"),
|
||||
retryable: true,
|
||||
}),
|
||||
};
|
||||
let _ = write_bridge_output(&py_stdout, &out);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let trimmed = raw.trim();
|
||||
if trimmed.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let envelope: RequestEnvelope = match serde_json::from_str(trimmed) {
|
||||
Ok(e) => e,
|
||||
Err(e) => {
|
||||
bridge_diag_event(
|
||||
"bridge.rust.persistent_invalid_envelope",
|
||||
json!({ "detail": e.to_string(), "line_bytes": trimmed.len() }),
|
||||
);
|
||||
let out = BridgeCliOutput {
|
||||
ok: false,
|
||||
id: None,
|
||||
result: None,
|
||||
error: Some(ErrorEnvelope {
|
||||
id: None,
|
||||
code: "invalid_bridge_request".to_string(),
|
||||
message: format!("invalid request envelope: {e}"),
|
||||
retryable: true,
|
||||
}),
|
||||
};
|
||||
let _ = write_bridge_output(&py_stdout, &out);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
bridge_diag_event(
|
||||
"bridge.rust.request_start",
|
||||
json!({
|
||||
"request_id": envelope.id,
|
||||
"method": envelope.method,
|
||||
"timeout_ms": envelope.timeout_ms,
|
||||
}),
|
||||
);
|
||||
|
||||
if envelope.method == MIRROR_SYNC_METHOD {
|
||||
let disp = dispatcher.clone();
|
||||
let stdout = Arc::clone(&py_stdout);
|
||||
std::thread::spawn(move || {
|
||||
let out = handle_mirror_sync(&disp, &envelope);
|
||||
let _ = write_bridge_output(&stdout, &out);
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
// Normal request: forward to helper (fire-and-forget).
|
||||
if let Err(e) = dispatcher.forward_to_helper(&envelope) {
|
||||
let out = BridgeCliOutput {
|
||||
ok: false,
|
||||
id: Some(envelope.id.clone()),
|
||||
result: None,
|
||||
error: Some(ErrorEnvelope {
|
||||
id: Some(envelope.id),
|
||||
code: "helper_write_failed".to_string(),
|
||||
message: e.to_string(),
|
||||
retryable: true,
|
||||
}),
|
||||
};
|
||||
let _ = write_bridge_output(&py_stdout, &out);
|
||||
continue;
|
||||
}
|
||||
bridge_diag_event(
|
||||
"bridge.rust.request_flushed",
|
||||
json!({ "request_id": envelope.id, "method": envelope.method }),
|
||||
);
|
||||
}
|
||||
|
||||
// Python stdin closed — collector thread will end when helper exits.
|
||||
let _ = collector_handle.join();
|
||||
Ok(())
|
||||
},
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[derive(serde::Deserialize)]
|
||||
struct BrokerAttachRequest {
|
||||
kind: String,
|
||||
server_id: String,
|
||||
workspace_id: String,
|
||||
#[serde(default)]
|
||||
argv: Option<Vec<String>>,
|
||||
#[serde(default)]
|
||||
cwd: Option<String>,
|
||||
/// Local cache ``file://`` prefix (editor); rewritten to ``lsp_remote_uri_prefix`` outbound.
|
||||
#[serde(default)]
|
||||
lsp_local_uri_prefix: Option<String>,
|
||||
/// Remote workspace ``file://`` prefix (helper); rewritten back on inbound responses.
|
||||
#[serde(default)]
|
||||
lsp_remote_uri_prefix: Option<String>,
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[derive(serde::Serialize, serde::Deserialize)]
|
||||
pub(crate) struct BrokerAttachResponse {
|
||||
pub(crate) ok: bool,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub(crate) error: Option<String>,
|
||||
}
|
||||
|
||||
pub(crate) struct PersistentBroker {
|
||||
socket_path: PathBuf,
|
||||
running: Arc<AtomicBool>,
|
||||
handle: Option<std::thread::JoinHandle<()>>,
|
||||
}
|
||||
|
||||
impl PersistentBroker {
|
||||
pub(crate) fn start(
|
||||
host_alias: &str,
|
||||
dispatcher: HelperDispatcher,
|
||||
) -> Result<Self, BridgeRunError> {
|
||||
let sanitized_host = host_alias
|
||||
.chars()
|
||||
.map(|c| {
|
||||
if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
|
||||
c
|
||||
} else {
|
||||
'_'
|
||||
}
|
||||
})
|
||||
.collect::<String>();
|
||||
let socket_path = persistent_broker_endpoint_path(&sanitized_host);
|
||||
// ``LocalSocketListener::bind`` (via interprocess 2.x) is the
|
||||
// cross-platform front end: on Unix it opens an `AF_UNIX` socket at
|
||||
// the given path; on Windows it creates a Named Pipe at
|
||||
// ``\\.\pipe\<basename>`` (the prefix is implied by the
|
||||
// ``GenericFilePath`` resolver). The bytes on the wire are
|
||||
// unchanged on Unix versus the previous ``UnixListener::bind``
|
||||
// path so existing tests + the ``run_lsp_stdio`` client (still
|
||||
// ``UnixStream::connect``) keep working.
|
||||
let _ = fs::remove_file(&socket_path);
|
||||
let endpoint = socket_path
|
||||
.as_path()
|
||||
.to_fs_name::<GenericFilePath>()
|
||||
.map_err(|error| {
|
||||
BridgeRunError::HelperLaunchFailed(format!("broker endpoint name failed: {error}"))
|
||||
})?;
|
||||
let listener: IpcListener = ListenerOptions::new()
|
||||
.name(endpoint)
|
||||
.nonblocking(ListenerNonblockingMode::Accept)
|
||||
.create_sync()?;
|
||||
#[cfg(unix)]
|
||||
{
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
fs::set_permissions(&socket_path, fs::Permissions::from_mode(0o600))?;
|
||||
}
|
||||
let running = Arc::new(AtomicBool::new(true));
|
||||
let running_thread = Arc::clone(&running);
|
||||
let dispatcher_for_listener = dispatcher.clone();
|
||||
let handle = std::thread::spawn(move || {
|
||||
while running_thread.load(Ordering::Relaxed) {
|
||||
match listener.accept() {
|
||||
Ok(stream) => {
|
||||
let dispatch = dispatcher_for_listener.clone();
|
||||
std::thread::spawn(move || {
|
||||
if let Err(error) = handle_broker_client(stream, dispatch) {
|
||||
bridge_diag_event(
|
||||
"bridge.rust.broker_client_error",
|
||||
json!({ "detail": error.to_string() }),
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
|
||||
std::thread::sleep(std::time::Duration::from_millis(20));
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
});
|
||||
Ok(Self {
|
||||
socket_path,
|
||||
running,
|
||||
handle: Some(handle),
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn socket_path_str(&self) -> String {
|
||||
self.socket_path.to_string_lossy().to_string()
|
||||
}
|
||||
}
|
||||
|
||||
/// Construct the broker endpoint path for ``host_alias``.
|
||||
///
|
||||
/// On Unix the result is an absolute filesystem path (an `AF_UNIX`
|
||||
/// socket file); on Windows it is a Named Pipe path of the form
|
||||
/// ``\\.\pipe\sessions-local-bridge-<host>-<pid>``. ``GenericFilePath``
|
||||
/// resolves both forms uniformly when the path is fed back into
|
||||
/// :type:`ListenerOptions`.
|
||||
fn persistent_broker_endpoint_path(sanitized_host: &str) -> PathBuf {
|
||||
let pid = std::process::id();
|
||||
#[cfg(unix)]
|
||||
{
|
||||
std::env::temp_dir().join(format!("sessions-local-bridge-{sanitized_host}-{pid}.sock"))
|
||||
}
|
||||
#[cfg(windows)]
|
||||
{
|
||||
PathBuf::from(format!(
|
||||
r"\\.\pipe\sessions-local-bridge-{sanitized_host}-{pid}"
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PersistentBroker {
|
||||
fn drop(&mut self) {
|
||||
self.running.store(false, Ordering::Relaxed);
|
||||
if let Some(handle) = self.handle.take() {
|
||||
let _ = handle.join();
|
||||
}
|
||||
// Unix socket files want explicit removal (the listener thread
|
||||
// already exited above); Windows Named Pipes are reaped when the
|
||||
// last handle closes, so the equivalent step is the listener's
|
||||
// own Drop and there's no path to unlink.
|
||||
#[cfg(unix)]
|
||||
{
|
||||
let _ = fs::remove_file(&self.socket_path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
fn handle_broker_client(
|
||||
stream: IpcStream,
|
||||
dispatcher: HelperDispatcher,
|
||||
) -> Result<(), BridgeRunError> {
|
||||
let read_half = stream.try_clone()?;
|
||||
let mut buf_reader = std::io::BufReader::new(read_half);
|
||||
let mut first_line = String::new();
|
||||
buf_reader.read_line(&mut first_line)?;
|
||||
let req: BrokerAttachRequest =
|
||||
serde_json::from_str(first_line.trim()).map_err(BridgeRunError::Json)?;
|
||||
let mut write_half = stream;
|
||||
if req.kind != "attach" || req.server_id.trim().is_empty() || req.workspace_id.trim().is_empty()
|
||||
{
|
||||
let response = BrokerAttachResponse {
|
||||
ok: false,
|
||||
error: Some("invalid attach request".to_string()),
|
||||
};
|
||||
let encoded = serde_json::to_string(&response)?;
|
||||
writeln!(write_half, "{encoded}")?;
|
||||
write_half.flush()?;
|
||||
return Ok(());
|
||||
}
|
||||
let response = BrokerAttachResponse {
|
||||
ok: true,
|
||||
error: None,
|
||||
};
|
||||
let encoded = serde_json::to_string(&response)?;
|
||||
writeln!(write_half, "{encoded}")?;
|
||||
write_half.flush()?;
|
||||
let uri_rewrite = match (req.lsp_local_uri_prefix, req.lsp_remote_uri_prefix) {
|
||||
(Some(loc), Some(rem))
|
||||
if !loc.trim().is_empty() && !rem.trim().is_empty() && loc.trim() != rem.trim() =>
|
||||
{
|
||||
Some((loc.trim().to_string(), rem.trim().to_string()))
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
broker_lsp_relay_loop(
|
||||
buf_reader,
|
||||
&mut write_half,
|
||||
BrokerLspRelayCfg {
|
||||
dispatcher,
|
||||
server_id: req.server_id,
|
||||
workspace_id: req.workspace_id,
|
||||
spawn_argv: req.argv,
|
||||
spawn_cwd: req.cwd,
|
||||
uri_rewrite,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
pub(crate) fn lsp_response_body_to_framed_string(
|
||||
result: &serde_json::Value,
|
||||
) -> Result<String, BridgeRunError> {
|
||||
if let (Some(kind), Some(body)) = (
|
||||
result.get("kind").and_then(|v| v.as_str()),
|
||||
result.get("body"),
|
||||
) && kind == session_protocol::CHANNEL_KIND_LSP_STDIO_MESSAGE
|
||||
{
|
||||
return serde_json::to_string(body).map_err(BridgeRunError::Json);
|
||||
}
|
||||
serde_json::to_string(result).map_err(BridgeRunError::Json)
|
||||
}
|
||||
Reference in New Issue
Block a user