Compare commits

...

3 Commits

Author SHA1 Message Date
5c8a29efa5 chore(test): top up coverage so CI clears the 80% gate (v0.7.38)
All checks were successful
boundary-lint / PR boundary-claim (Lint (push) Has been skipped
ci / test-health gate (push) Successful in 17s
boundary-lint / duplication-deadline (Layer 1/2) (push) Successful in 19s
boundary-lint / ban-list lint (Lint (push) Successful in 19s
ci / mutation test (broker) (push) Has been skipped
Release Publish (Gitea session_helper) / verify-release-tag (push) Successful in 17s
ci / rust release (push) Successful in 2m50s
ci / rust debug (push) Successful in 2m55s
ci / python (push) Successful in 1m26s
Release Publish (Gitea session_helper) / publish-linux-x86_64 (push) Successful in 4m17s
v0.7.37 left CI at 79.94% — the runner is consistently ~0.6p below
the workstation due to subprocess paths in ssh_file_transport.py that
race differently in CI. Add 14 deterministic tests in two adjacent
modules to lift the floor with margin.

* lsp_save_preferences (90% → 100%): _settings_getter fallbacks (no
  settings method / store.get not callable), _as_enabled_flag
  int/float/unknown branches, list / blank / non-string filter paths
  in lsp_code_actions_on_save_kinds, plus the unsupported-shape
  early-return.
* connect_progress (81% → ~89%): _hide_panel_if_progress three
  branches (active panel matches → hide_panel runs, user switched
  panels → no-op, window without active_panel → no-op),
  ConnectProgressPanel.failure terminal line, and
  ConnectProgressPanel.success terminal line.

Local: 1,352 tests pass at 80.67%. The +0.67-point margin should
land CI at ~80.08% and clear the gate.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 21:40:49 +09:00
718c7bcc42 chore(test): widen ssh_file_transport coverage so CI clears the 80% gate (v0.7.37)
Some checks failed
boundary-lint / PR boundary-claim (Lint (push) Has been skipped
ci / test-health gate (push) Successful in 17s
boundary-lint / duplication-deadline (Layer 1/2) (push) Successful in 19s
boundary-lint / ban-list lint (Lint (push) Successful in 19s
ci / mutation test (broker) (push) Has been skipped
Release Publish (Gitea session_helper) / verify-release-tag (push) Successful in 18s
ci / rust debug (push) Successful in 2m40s
ci / rust release (push) Successful in 2m54s
Release Publish (Gitea session_helper) / publish-linux-x86_64 (push) Failing after 2m40s
ci / python (push) Successful in 1m28s
The v0.7.36 release passed the 80% gate locally (80.08%) but CI reported
79.45% — the platform delta lives in ``ssh_file_transport.py`` (79% local
vs 72% CI), where the SSH-spawn paths in ``_persistent_bridge_for_host``
and ``_execute_rust_bridge_request_persistent`` exercise differently
between environments. This commit adds 16 deterministic tests for pure
helpers in the same module so the floor rises uniformly across both
runners.

New tests in sublime/tests/test_ssh_file_transport.py
-----------------------------------------------------
* _transport_trace_event — listener notification regardless of enable
  flag, JSONL append when enabled, listener-exception swallowing, no-op
  when disabled and no listeners, log-path layout under sublime cache
  root, _transport_trace_enabled safe fallback when load_settings is
  unavailable.
* _emit_bridge_diagnostic_matrix — disabled-gate no-op; full payload
  shape covering bridge_path / revision / envelope / process / env
  flags / timeout context branches.
* register_transport_trace_listener idempotency + unregister no-op.
* _binary_stat_snapshot — happy path + missing-file stat_error branch.
* _bridge_diagnostic_hypothesis_catalog — schema invariants on every
  documented row.
* _child_env_session_flags — true / false / blank-value branches for
  the bridge-diagnostic flag.
* _next_envelope_id and _next_bridge_trace_request_id — strict
  monotonicity contracts.
* _revision_cache_segment — alnum passthrough, blank → "unknown",
  unsafe-char hash fallback, overlong hash fallback.
* _validate_revision_path_segment — accepts safe chars, rejects path
  separators (raises SessionHelperStartError).
* _ssh_auth_failure_hint — empty for non-auth stderr, non-empty when
  the message looks like Permission-denied.

Result: 1,339 tests pass at 80.53% coverage locally — the +0.45-point
margin over the 80% floor cushions the local-vs-CI delta in subprocess
heavy paths and should clear the gate on the runner.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 19:46:13 +09:00
d51e5f2f05 chore(test): restore 80% coverage gate via targeted helper tests (v0.7.36)
Some checks failed
boundary-lint / PR boundary-claim (Lint (push) Has been skipped
boundary-lint / ban-list lint (Lint (push) Successful in 20s
boundary-lint / duplication-deadline (Layer 1/2) (push) Successful in 20s
ci / mutation test (broker) (push) Has been skipped
ci / test-health gate (push) Successful in 17s
Release Publish (Gitea session_helper) / verify-release-tag (push) Successful in 17s
ci / rust release (push) Successful in 2m13s
ci / rust debug (push) Successful in 2m45s
Release Publish (Gitea session_helper) / publish-linux-x86_64 (push) Failing after 3m8s
ci / python (push) Successful in 1m23s
The v0.7.35 cleanup lowered the Python coverage gate 80→78 to absorb
the loss of incidental live-helper coverage that the removed orphan
command tests were providing. Per project policy ("fix the actual
problem, not the metric"), this commit re-pays that debt with direct
unit tests for the affected helpers and restores the 80% gate in all
three locations (.pre-commit-config.yaml, .gitea/workflows/ci.yml,
.gitea/workflows/upload-session-helper-gitea.yml).

New tests in sublime/tests/test_cmd_tools.py
--------------------------------------------
* _refresh_local_cache_after_format full path (real helper now runs):
  mapper + lane bookkeeping + finish() OK branch + sidecar emit
* _refresh_local_cache_after_format REMOTE_NOT_FOUND finish branch
* _present_remote_tool_result with non-empty diagnostics tuple drives
  _apply_inline_diagnostics through map_diagnostics_batch +
  inline_presentations_from_mapped_diagnostics + add_regions, plus
  _remote_tool_footer_lines through unopened_files_summary and
  tool_not_found_hint paths
* _apply_inline_diagnostics three early-return branches (no active view
  / view without file_name / no matching presentation)
* _remote_tool_footer_lines direct: unopened summary + unopened-mapped
  count + hint
* _force_overwrite_remote three branches: SUCCESS, TRANSPORT_ERROR,
  PERMISSION_DENIED fallthrough
* _precheck_remote_file_openability four branches: transport error /
  missing-remote / blocked-by-guard / clean-metadata
* _eager_hydrate_workspace: sidecar persistence for valid entries,
  bad-shape skip, RemoteFileKind.OTHER fallthrough
* _collect_remote_python_pipeline_results four paths: non-.py skip,
  diagnostics-off skip, happy-path tuple, transport error returns None
* _trace_event JSONL append on enable, no-op on disable
* _default_remote_workspace_directory: handshake fast path + ssh \$HOME
  fallback

Result: 1,317 tests pass at 80.08% coverage (was 79.21% at v0.7.35).
Floor restored to 80% across all three gate sites.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-03 18:35:15 +09:00
11 changed files with 930 additions and 22 deletions

View File

@@ -23,7 +23,7 @@ on:
env:
RUST_COV_FAIL_UNDER: 80
PYTHON_COV_FAIL_UNDER: 78
PYTHON_COV_FAIL_UNDER: 80
jobs:
rust:

View File

@@ -147,7 +147,7 @@ jobs:
UV_PYTHON_INSTALL_DIR=.uv-python uv run --python 3.8 \
pytest --cov=sublime/sessions --cov-report=term-missing \
--cov-report=xml:coverage-python.xml \
--cov-fail-under=78
--cov-fail-under=80
- name: Import Sublime runtime modules
run: UV_PYTHON_INSTALL_DIR=.uv-python uv run --python 3.8 pytest sublime/tests/test_runtime_import_smoke.py

View File

@@ -17,7 +17,7 @@ repos:
- id: python-test
name: python test
entry: sh -c 'cargo build --manifest-path rust/Cargo.toml -p local_bridge -p sessions_native -q && env UV_PYTHON_INSTALL_DIR=.uv-python uv run --python 3.8 pytest --cov=sublime/sessions --cov-fail-under=78 -q'
entry: sh -c 'cargo build --manifest-path rust/Cargo.toml -p local_bridge -p sessions_native -q && env UV_PYTHON_INSTALL_DIR=.uv-python uv run --python 3.8 pytest --cov=sublime/sessions --cov-fail-under=80 -q'
language: system
types: [python]
pass_filenames: false

View File

@@ -1,6 +1,6 @@
[project]
name = "sessions-sublime"
version = "0.7.35"
version = "0.7.38"
description = "Sublime-facing Python code for Sessions."
requires-python = ">=3.8"
license = {text = "MIT"}

12
rust/Cargo.lock generated
View File

@@ -221,7 +221,7 @@ checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53"
[[package]]
name = "local_bridge"
version = "0.7.35"
version = "0.7.38"
dependencies = [
"base64",
"glob",
@@ -432,7 +432,7 @@ dependencies = [
[[package]]
name = "session_helper"
version = "0.7.35"
version = "0.7.38"
dependencies = [
"base64",
"notify",
@@ -443,7 +443,7 @@ dependencies = [
[[package]]
name = "session_protocol"
version = "0.7.35"
version = "0.7.38"
dependencies = [
"base64",
"serde",
@@ -452,14 +452,14 @@ dependencies = [
[[package]]
name = "sessions_askpass"
version = "0.7.35"
version = "0.7.38"
dependencies = [
"tempfile",
]
[[package]]
name = "sessions_native"
version = "0.7.35"
version = "0.7.38"
dependencies = [
"base64",
"notify",
@@ -773,7 +773,7 @@ dependencies = [
[[package]]
name = "workspace_identity"
version = "0.7.35"
version = "0.7.38"
[[package]]
name = "zmij"

View File

@@ -12,7 +12,7 @@ resolver = "2"
[workspace.package]
edition = "2024"
license = "MIT"
version = "0.7.35"
version = "0.7.38"
authors = ["Myeongseon Choi <key262yek@gmail.com>"]
repository = "https://git.teahaven.kr/sublime-rs/sessions"
homepage = "https://git.teahaven.kr/sublime-rs/sessions"

View File

@@ -6,10 +6,13 @@ from dataclasses import replace
from pathlib import Path
from typing import List
from conftest import FakeWindow
from sessions import commands
from conftest import FakeView, FakeWindow
from sessions import commands, commands_python_pipeline
from sessions.file_state import OpenFileResult, OpenOutcome
from sessions.recent_state import RecentWorkspace, RecentWorkspaceIndex
from sessions.remote import (
RemoteFileKind,
RemoteFileMetadata,
RunTrigger,
ToolExecutionRequest,
ToolExecutionResult,
@@ -872,18 +875,44 @@ def _make_tool_request(kind: ToolRunKind = ToolRunKind.LINTER) -> ToolExecutionR
)
def _stub_refresh_cache_lower_layers(monkeypatch, *, opened: OpenFileResult):
"""Wire up the four indirection points _refresh_local_cache_after_format hits.
Lets the real helper run end-to-end (mapper, lane bookkeeping,
sidecar write) under deterministic in-process substitutes so the
body covers ~36 lines instead of being monkey-patched out.
"""
monkeypatch.setattr(commands, "_run_mirror_in_background", lambda fn, *a: fn(*a))
monkeypatch.setattr(commands, "_set_timeout", lambda fn, delay_ms=0: fn())
monkeypatch.setattr(commands, "_begin_interactive_ssh_lane", lambda host: None)
monkeypatch.setattr(commands, "_end_interactive_ssh_lane", lambda host: None)
monkeypatch.setattr(
commands,
"open_remote_file_into_local_cache",
lambda host_alias, *, remote_absolute_path, local_cache_path: opened,
)
def test_present_remote_tool_result_format_success_refreshes_cache(
tmp_path: Path, monkeypatch
) -> None:
status_messages: List[str] = []
monkeypatch.setattr(commands.sublime, "status_message", status_messages.append)
refresh_calls: List[str] = []
monkeypatch.setattr(
commands,
"_refresh_local_cache_after_format",
lambda window, ctx, remote_path: refresh_calls.append(remote_path),
)
window, ctx = _present_tool_result_ctx(tmp_path, monkeypatch)
expected_local = ctx.local_cache_root / "a.py"
expected_local.parent.mkdir(parents=True, exist_ok=True)
expected_local.write_text("formatted\n", encoding="utf-8")
metadata = RemoteFileMetadata(
mtime_ns=1, size_bytes=10, kind=RemoteFileKind.REGULAR_FILE, unix_mode=33188
)
_stub_refresh_cache_lower_layers(
monkeypatch,
opened=OpenFileResult(
outcome=OpenOutcome.OK,
local_cache_path=expected_local,
remote_metadata=metadata,
),
)
req = _make_tool_request(ToolRunKind.FORMATTER)
res = ToolExecutionResult(
exit_code=0,
@@ -895,11 +924,511 @@ def test_present_remote_tool_result_format_success_refreshes_cache(
commands._present_remote_tool_result(
window, ctx, req, res, normalized_kind="format"
)
assert refresh_calls == ["/srv/ws/a.py"]
assert status_messages[-1].startswith("Sessions ready")
sidecar = commands._remote_metadata_sidecar_path(expected_local)
assert sidecar.is_file()
assert "Remote formatter completed" in status_messages[-1]
def test_refresh_local_cache_after_format_remote_not_found_warns(
tmp_path: Path, monkeypatch
) -> None:
status_messages: List[str] = []
monkeypatch.setattr(commands.sublime, "status_message", status_messages.append)
window, ctx = _present_tool_result_ctx(tmp_path, monkeypatch)
expected_local = ctx.local_cache_root / "a.py"
_stub_refresh_cache_lower_layers(
monkeypatch,
opened=OpenFileResult(
outcome=OpenOutcome.REMOTE_NOT_FOUND,
local_cache_path=expected_local,
),
)
commands._refresh_local_cache_after_format(window, ctx, "/srv/ws/a.py")
assert any("no longer exists" in m for m in status_messages)
# --- _apply_inline_diagnostics early-return paths ---
def test_apply_inline_diagnostics_no_active_view_is_noop() -> None:
window = FakeWindow()
window.active_view_value = None
commands._apply_inline_diagnostics(window, ()) # must not raise
def test_apply_inline_diagnostics_view_without_file_is_noop() -> None:
window = FakeWindow()
window.active_view_value = FakeView(file_name=None)
commands._apply_inline_diagnostics(window, ()) # must not raise
def test_apply_inline_diagnostics_no_matching_presentation_is_noop() -> None:
window = FakeWindow()
view = FakeView(file_name="/local/x.py")
window.active_view_value = view
class _Pres:
local_file_path = "/local/y.py"
start_row = 0
start_col = 0
end_row = 0
end_col = 1
scope = None
commands._apply_inline_diagnostics(window, (_Pres(),))
assert view.regions == {}
# --- _remote_tool_footer_lines branches ---
def test_remote_tool_footer_lines_collects_unopened_summary_and_hint() -> None:
from sessions.diagnostics import (
DiagnosticRange,
DiagnosticRecord,
DiagnosticSeverity,
MappedDiagnostic,
SourcePosition,
)
rec = DiagnosticRecord(
remote_path="/srv/ws/missing.py",
message="m",
severity=DiagnosticSeverity.ERROR,
range=DiagnosticRange(
start=SourcePosition(line=1, character=0),
end=SourcePosition(line=1, character=1),
),
source=None,
code=None,
)
unmapped = MappedDiagnostic(
diagnostic=rec, local_path=None, unmapped_reason="not_in_workspace"
)
mapped_other = MappedDiagnostic(
diagnostic=rec, local_path=Path("/local/other.py"), unmapped_reason=None
)
class _Result:
tool_not_found_hint = "install ruff"
lines = commands._remote_tool_footer_lines(
_Result(),
(unmapped, mapped_other),
current_local_file="/local/active.py",
)
assert any("Unopened/unmapped diagnostics" in line for line in lines)
assert any("unopened cache file" in line for line in lines)
assert "install ruff" in lines
# --- _default_remote_workspace_directory direct coverage ---
def test_default_remote_workspace_directory_uses_handshake_remote_home(
monkeypatch,
) -> None:
monkeypatch.setattr(
commands,
"bridge_handshake_info",
lambda host: {"remote_home": "/srv/people/alice"},
)
out = commands._default_remote_workspace_directory("prod")
assert out == "/srv/people/alice"
def test_default_remote_workspace_directory_falls_back_to_ssh_home(
monkeypatch,
) -> None:
from sessions.ssh_runner import SshRunResult
monkeypatch.setattr(commands, "bridge_handshake_info", lambda host: None)
monkeypatch.setattr(
commands,
"run_ssh_remote_command",
lambda host, argv: SshRunResult(
returncode=0, stdout="/home/bob\n", stderr="", local_argv=tuple(argv)
),
)
out = commands._default_remote_workspace_directory("prod")
assert out == "/home/bob"
# --- _trace_event direct coverage ---
def test_trace_event_appends_jsonl_when_enabled(tmp_path: Path, monkeypatch) -> None:
import json as _json
log_path = tmp_path / "logs" / "debug-trace.log"
monkeypatch.setattr(commands, "_trace_enabled", lambda: True)
monkeypatch.setattr(commands, "_trace_log_path", lambda: log_path)
commands._trace_event("ut.event", foo="bar", count=3)
assert log_path.is_file()
line = log_path.read_text(encoding="utf-8").strip().splitlines()[-1]
payload = _json.loads(line)
assert payload["event"] == "ut.event"
assert payload["foo"] == "bar"
assert payload["count"] == 3
assert "ts" in payload and "time" in payload
def test_trace_event_no_op_when_disabled(tmp_path: Path, monkeypatch) -> None:
log_path = tmp_path / "logs" / "debug-trace.log"
monkeypatch.setattr(commands, "_trace_enabled", lambda: False)
monkeypatch.setattr(commands, "_trace_log_path", lambda: log_path)
commands._trace_event("ut.event", foo="bar")
assert not log_path.exists()
# --- _eager_hydrate_workspace branch coverage ---
def test_eager_hydrate_workspace_writes_sidecars_for_hydrated_entries(
tmp_path: Path, monkeypatch
) -> None:
window, ctx = _present_tool_result_ctx(tmp_path, monkeypatch)
hydrated_dir = ctx.local_cache_root / "pkg"
hydrated_dir.mkdir(parents=True, exist_ok=True)
hydrated_path = hydrated_dir / "build_graph.py"
hydrated_path.write_bytes(b"x = 1\n")
bad_kind_path = hydrated_dir / "weird.bin"
bad_kind_path.write_bytes(b"\x00\x01")
summary = {
"hydrated": [
{
"local_path": str(hydrated_path),
"metadata": {
"mtime_ns": 1,
"size_bytes": 6,
"kind": "regular_file",
"unix_mode": 33188,
},
},
# bad shape — must be skipped without raising
{"local_path": 42, "metadata": "not a dict"},
# unknown kind string falls through to OTHER
{
"local_path": str(bad_kind_path),
"metadata": {
"mtime_ns": 2,
"size_bytes": 2,
"kind": "this_is_not_a_real_kind",
"unix_mode": None,
},
},
],
"skipped_existing": 5,
"failed": 1,
}
monkeypatch.setattr(commands._rust_ffi, "eager_hydrate_apply", lambda **_: summary)
commands._eager_hydrate_workspace(window, ctx, ("build_graph.py",))
assert commands._remote_metadata_sidecar_path(hydrated_path).is_file()
assert commands._remote_metadata_sidecar_path(bad_kind_path).is_file()
# --- _collect_remote_python_pipeline_results branch coverage ---
def _collect_pipeline_ctx(tmp_path: Path, monkeypatch):
"""Workspace context wired so the pipeline collector reaches the run loop."""
settings = SessionsSettings(
ssh_config_path=tmp_path / "config",
remote_python_tool_pipeline=("ruff_lint",),
)
monkeypatch.setattr(commands, "SessionsSettings", lambda: settings)
monkeypatch.setattr(commands.sublime, "cache_path", lambda: str(tmp_path / "cache"))
monkeypatch.setattr(
commands_python_pipeline,
"_effective_sessions_settings_for_remote_python",
lambda window: settings,
)
monkeypatch.setattr(
commands_python_pipeline,
"_remote_python_pipeline_targets",
lambda view, window, merged: object(),
)
recent_store = commands._recent_store(settings)
recent_store.save_index(
RecentWorkspaceIndex(
(
RecentWorkspace(
"prod",
"/srv/ws",
"cache-1",
"2026-04-12T03:00:00+00:00",
),
)
)
)
window = FakeWindow(project_data={"settings": {PROJECT_SETTINGS_KEY: "cache-1"}})
ctx = commands._workspace_context(window, settings)
assert ctx is not None
return window, ctx
def test_collect_pipeline_results_skips_non_python_path(
tmp_path: Path, monkeypatch
) -> None:
window, ctx = _collect_pipeline_ctx(tmp_path, monkeypatch)
out = commands_python_pipeline._collect_remote_python_pipeline_results(
window, None, ctx, "/srv/ws/README.md", RunTrigger.ON_SAVE
)
assert out == ()
def test_collect_pipeline_results_skips_when_diagnostics_off(
tmp_path: Path, monkeypatch
) -> None:
window, ctx = _collect_pipeline_ctx(tmp_path, monkeypatch)
monkeypatch.setattr(
commands_python_pipeline,
"_effective_sessions_settings_for_remote_python",
lambda window: SessionsSettings(
ssh_config_path=tmp_path / "config",
remote_python_auto_diagnostics_on_save=False,
),
)
out = commands_python_pipeline._collect_remote_python_pipeline_results(
window, None, ctx, "/srv/ws/a.py", RunTrigger.ON_SAVE
)
assert out == ()
def test_collect_pipeline_results_runs_pipeline_and_returns_tuple(
tmp_path: Path, monkeypatch
) -> None:
window, ctx = _collect_pipeline_ctx(tmp_path, monkeypatch)
fake_result = ToolExecutionResult(
exit_code=0,
stdout=TruncatedStream("", False),
stderr=TruncatedStream("", False),
diagnostics=(),
failure_category=ToolFailureCategory.SUCCESS,
)
monkeypatch.setattr(
commands, "execute_remote_tool_request", lambda h, r: fake_result
)
out = commands_python_pipeline._collect_remote_python_pipeline_results(
window, None, ctx, "/srv/ws/a.py", RunTrigger.ON_SAVE
)
assert out is not None
assert len(out) >= 1
def test_collect_pipeline_results_transport_error_returns_none(
tmp_path: Path, monkeypatch
) -> None:
from sessions.connect_preflight import SessionHelperStartError
window, ctx = _collect_pipeline_ctx(tmp_path, monkeypatch)
def raise_transport(*a, **k):
raise SessionHelperStartError("ssh broke")
monkeypatch.setattr(commands, "execute_remote_tool_request", raise_transport)
monkeypatch.setattr(commands, "_set_timeout", lambda fn, delay_ms=0: fn())
out = commands_python_pipeline._collect_remote_python_pipeline_results(
window, None, ctx, "/srv/ws/a.py", RunTrigger.ON_SAVE
)
assert out is None
# --- _force_overwrite_remote branch coverage ---
def _force_overwrite_ctx_with_cache(tmp_path: Path, monkeypatch):
"""Build a workspace + on-disk cache file ready for force-overwrite."""
window, ctx = _present_tool_result_ctx(tmp_path, monkeypatch)
cache_file = ctx.local_cache_root / "a.py"
cache_file.parent.mkdir(parents=True, exist_ok=True)
cache_file.write_bytes(b"local body\n")
return ctx, cache_file
def test_force_overwrite_remote_success_writes_sidecar(
tmp_path: Path, monkeypatch
) -> None:
status_messages: List[str] = []
monkeypatch.setattr(commands.sublime, "status_message", status_messages.append)
ctx, cache_file = _force_overwrite_ctx_with_cache(tmp_path, monkeypatch)
fresh_meta = RemoteFileMetadata(
mtime_ns=2, size_bytes=11, kind=RemoteFileKind.REGULAR_FILE, unix_mode=33188
)
from sessions.remote import RemoteWriteFileResult
monkeypatch.setattr(
commands,
"execute_remote_write_file",
lambda host, req: RemoteWriteFileResult(ok=True, updated_metadata=fresh_meta),
)
commands._force_overwrite_remote(ctx, "/srv/ws/a.py", cache_file, None)
sidecar = commands._remote_metadata_sidecar_path(cache_file)
assert sidecar.is_file()
assert "Overwritten remote file" in status_messages[-1]
def test_force_overwrite_remote_transport_error_emits_disconnected(
tmp_path: Path, monkeypatch
) -> None:
from sessions.remote import RemoteWriteErrorCode, RemoteWriteFileResult
status_messages: List[str] = []
monkeypatch.setattr(commands.sublime, "status_message", status_messages.append)
ctx, cache_file = _force_overwrite_ctx_with_cache(tmp_path, monkeypatch)
monkeypatch.setattr(
commands,
"execute_remote_write_file",
lambda host, req: RemoteWriteFileResult(
ok=False,
error_code=RemoteWriteErrorCode.TRANSPORT_ERROR,
error_message="ssh broke",
),
)
commands._force_overwrite_remote(ctx, "/srv/ws/a.py", cache_file, None)
assert "ssh broke" in status_messages[-1]
def test_force_overwrite_remote_other_failure_emits_warning(
tmp_path: Path, monkeypatch
) -> None:
from sessions.remote import RemoteWriteErrorCode, RemoteWriteFileResult
status_messages: List[str] = []
monkeypatch.setattr(commands.sublime, "status_message", status_messages.append)
ctx, cache_file = _force_overwrite_ctx_with_cache(tmp_path, monkeypatch)
monkeypatch.setattr(
commands,
"execute_remote_write_file",
lambda host, req: RemoteWriteFileResult(
ok=False,
error_code=RemoteWriteErrorCode.PERMISSION_DENIED,
error_message="EACCES",
),
)
commands._force_overwrite_remote(ctx, "/srv/ws/a.py", cache_file, None)
assert "EACCES" in status_messages[-1]
# --- _precheck_remote_file_openability branch coverage ---
def test_precheck_remote_file_openability_transport_error_skips(monkeypatch) -> None:
from sessions.connect_preflight import SessionHelperStartError
def raise_transport(*args, **kwargs):
raise SessionHelperStartError("ssh down")
monkeypatch.setattr(commands, "execute_remote_stat_file", raise_transport)
skipped = []
outcome = commands._precheck_remote_file_openability(
cache_key="cache-1",
host_alias="prod",
remote="/srv/ws/a.py",
path_str="/local/a.py",
on_skip=lambda: skipped.append(True),
)
assert outcome.proceed is False
assert skipped == [True]
def test_precheck_remote_file_openability_missing_remote_proceeds(monkeypatch) -> None:
monkeypatch.setattr(commands, "execute_remote_stat_file", lambda *a, **k: None)
skipped = []
outcome = commands._precheck_remote_file_openability(
cache_key="cache-1",
host_alias="prod",
remote="/srv/ws/a.py",
path_str="/local/a.py",
on_skip=lambda: skipped.append(True),
)
assert outcome.proceed is True
assert outcome.stat_metadata is None
assert skipped == []
def test_precheck_remote_file_openability_blocked_by_guard_skips(monkeypatch) -> None:
huge = RemoteFileMetadata(
mtime_ns=1,
size_bytes=10 * 1024 * 1024 * 1024, # 10 GiB — defeats every reasonable cap
kind=RemoteFileKind.REGULAR_FILE,
unix_mode=33188,
)
monkeypatch.setattr(commands, "execute_remote_stat_file", lambda *a, **k: huge)
skipped = []
outcome = commands._precheck_remote_file_openability(
cache_key="cache-1",
host_alias="prod",
remote="/srv/ws/big.bin",
path_str="/local/big.bin",
on_skip=lambda: skipped.append(True),
)
assert outcome.proceed is False
assert outcome.stat_metadata is huge
assert skipped == [True]
def test_precheck_remote_file_openability_clean_metadata_proceeds(monkeypatch) -> None:
ok_meta = RemoteFileMetadata(
mtime_ns=1, size_bytes=128, kind=RemoteFileKind.REGULAR_FILE, unix_mode=33188
)
monkeypatch.setattr(commands, "execute_remote_stat_file", lambda *a, **k: ok_meta)
skipped = []
outcome = commands._precheck_remote_file_openability(
cache_key="cache-1",
host_alias="prod",
remote="/srv/ws/a.py",
path_str="/local/a.py",
on_skip=lambda: skipped.append(True),
)
assert outcome.proceed is True
assert outcome.stat_metadata is ok_meta
assert skipped == []
def test_present_remote_tool_result_with_diagnostics_applies_inline_regions(
tmp_path: Path, monkeypatch
) -> None:
status_messages: List[str] = []
monkeypatch.setattr(commands.sublime, "status_message", status_messages.append)
window, ctx = _present_tool_result_ctx(tmp_path, monkeypatch)
expected_local = ctx.local_cache_root / "a.py"
expected_local.parent.mkdir(parents=True, exist_ok=True)
expected_local.write_text("x = 1\n", encoding="utf-8")
active_view = FakeView(file_name=str(expected_local))
window.active_view_value = active_view
diag = {
"path": "/srv/ws/a.py",
"message": "unused import",
"severity": "warning",
"row": 1,
"column": 1,
"end_column": 5,
"code": "F401",
}
other_diag = {
"path": "/srv/ws/other.py",
"message": "unused",
"severity": "warning",
"row": 1,
}
req = _make_tool_request()
res = ToolExecutionResult(
exit_code=1,
stdout=TruncatedStream("{}", False),
stderr=TruncatedStream("", False),
diagnostics=(diag, other_diag),
failure_category=ToolFailureCategory.NON_ZERO_WITH_DIAGNOSTICS,
tool_not_found_hint="(unused hint to exercise footer notes)",
)
commands._present_remote_tool_result(window, ctx, req, res, normalized_kind="lint")
assert active_view.regions, "inline diagnostic regions should be added"
panel = window.output_panels.get("sessions_remote_tool")
assert panel is not None
assert "exit code 1" in status_messages[-1]
def test_present_remote_tool_result_lint_success_emits_ready(
tmp_path: Path, monkeypatch
) -> None:

View File

@@ -199,3 +199,76 @@ def test_progress_panel_ignores_noisy_events() -> None:
text_blob = "\n".join(text for text, _ in calls)
assert "queue.enqueue" not in text_blob
assert "bridge.request_start" not in text_blob
# --- _hide_panel_if_progress branches ---
class _PanelHideWindow:
def __init__(self, active_panel_name):
self._active = active_panel_name
self.run_calls: list = []
def active_panel(self):
return self._active
def run_command(self, name, args=None):
self.run_calls.append((name, args))
def test_hide_panel_if_progress_hides_when_panel_is_active() -> None:
win = _PanelHideWindow(
active_panel_name="output." + connect_progress._PROGRESS_PANEL_NAME
)
connect_progress._hide_panel_if_progress(win)
assert ("hide_panel", {}) in win.run_calls
def test_hide_panel_if_progress_no_op_when_user_switched_panels() -> None:
win = _PanelHideWindow(active_panel_name="output.exec")
connect_progress._hide_panel_if_progress(win)
assert win.run_calls == []
def test_hide_panel_if_progress_no_op_when_window_lacks_active_panel() -> None:
class _NoActivePanel:
run_calls: list = []
def run_command(self, name, args=None):
self.run_calls.append((name, args))
win = _NoActivePanel()
connect_progress._hide_panel_if_progress(win)
assert win.run_calls == []
# --- ConnectProgressPanel.success / failure branches ---
def test_progress_panel_failure_appends_terminal_line() -> None:
window = FakeWindow()
panel = connect_progress.ConnectProgressPanel(window, "aws-celery")
panel.start()
try:
panel.failure("ssh down")
finally:
panel.stop()
panel_buf = window.output_panels.get(connect_progress._PROGRESS_PANEL_NAME)
assert panel_buf is not None
text = "\n".join(text for text, _ in panel_buf.append_calls)
assert "Connect FAILED" in text
assert "ssh down" in text
def test_progress_panel_success_appends_terminal_line() -> None:
window = FakeWindow()
panel = connect_progress.ConnectProgressPanel(window, "aws-celery")
panel.start()
try:
panel.success(detail="ready")
finally:
panel.stop()
panel_buf = window.output_panels.get(connect_progress._PROGRESS_PANEL_NAME)
assert panel_buf is not None
text = "\n".join(text for text, _ in panel_buf.append_calls)
assert "Connect SUCCESS" in text

View File

@@ -4,12 +4,87 @@ from __future__ import annotations
from conftest import FakeView
from sessions.lsp_save_preferences import (
_as_enabled_flag,
_settings_getter,
lsp_code_actions_on_save_kinds,
lsp_fix_all_on_save_enabled,
lsp_format_on_save_enabled,
lsp_organize_imports_on_save_enabled,
)
# --- _settings_getter / _as_enabled_flag edge branches ---
class _ViewWithoutSettings:
pass
class _ViewWithBrokenSettings:
def settings(self):
class _Store:
get = "not callable"
return _Store()
def test_settings_getter_returns_none_when_view_has_no_settings_method() -> None:
assert _settings_getter(_ViewWithoutSettings()) is None
def test_settings_getter_returns_none_when_store_get_is_not_callable() -> None:
assert _settings_getter(_ViewWithBrokenSettings()) is None
def test_as_enabled_flag_truthy_int_and_float_branches() -> None:
assert _as_enabled_flag(1) is True
assert _as_enabled_flag(0) is False
assert _as_enabled_flag(1.5) is True
assert _as_enabled_flag(0.0) is False
def test_as_enabled_flag_unknown_type_falls_through_to_false() -> None:
assert _as_enabled_flag(object()) is False
# --- lsp_code_actions_on_save_kinds list/tuple + filter branches ---
def test_lsp_code_actions_kinds_returns_empty_for_view_without_settings() -> None:
assert lsp_code_actions_on_save_kinds(_ViewWithoutSettings()) == ()
def test_lsp_code_actions_kinds_filters_blank_and_non_string_keys() -> None:
v = FakeView()
v.settings().set(
"lsp_code_actions_on_save",
{
"source.fixAll": True,
" ": True, # blank key — must be skipped
42: True, # non-string key — must be skipped
"source.disabled": False, # disabled flag — must be skipped
},
)
out = lsp_code_actions_on_save_kinds(v)
assert out == ("source.fixAll",)
def test_lsp_code_actions_kinds_accepts_list_form() -> None:
v = FakeView()
v.settings().set(
"lsp_code_actions_on_save",
["source.organizeImports", " ", 7, "source.fixAll"],
)
assert lsp_code_actions_on_save_kinds(v) == (
"source.organizeImports",
"source.fixAll",
)
def test_lsp_code_actions_kinds_returns_empty_for_unsupported_shape() -> None:
v = FakeView()
v.settings().set("lsp_code_actions_on_save", "not a dict or list")
assert lsp_code_actions_on_save_kinds(v) == ()
def test_lsp_format_on_save_enabled_bool() -> None:
v = FakeView()

View File

@@ -602,3 +602,234 @@ def test_per_method_timeouts_fallback_on_garbage_setting(monkeypatch) -> None:
"""A non-numeric value falls back to the documented default."""
_stub_settings(monkeypatch, {"sessions_file_read_timeout_s": "not-a-number"})
assert ssh_ft._file_read_timeout_s() == 30.0
# --- _transport_trace_event branch coverage ---
def test_transport_trace_event_notifies_listeners_even_when_disabled(
monkeypatch,
) -> None:
captured: list = []
def listener(event, fields):
captured.append((event, dict(fields)))
monkeypatch.setattr(ssh_ft, "_transport_trace_enabled", lambda: False)
ssh_ft.register_transport_trace_listener(listener)
try:
ssh_ft._transport_trace_event("ut.event", host="x", count=2)
finally:
ssh_ft.unregister_transport_trace_listener(listener)
assert captured == [("ut.event", {"host": "x", "count": 2})]
def test_transport_trace_event_writes_jsonl_when_enabled(
tmp_path: Path, monkeypatch
) -> None:
import json as _json
log_path = tmp_path / "logs" / "debug-trace.log"
monkeypatch.setattr(ssh_ft, "_transport_trace_enabled", lambda: True)
monkeypatch.setattr(ssh_ft, "_transport_trace_log_path", lambda: log_path)
ssh_ft._transport_trace_event("ut.event", host="x", count=2)
assert log_path.is_file()
line = log_path.read_text(encoding="utf-8").strip().splitlines()[-1]
payload = _json.loads(line)
assert payload["event"] == "ut.event"
assert payload["host"] == "x"
assert payload["count"] == 2
assert "ts" in payload and "time" in payload
def test_transport_trace_event_swallows_listener_exceptions(monkeypatch) -> None:
"""A listener that raises must not crash the trace path or the caller."""
def bad_listener(event, fields):
raise RuntimeError("boom")
monkeypatch.setattr(ssh_ft, "_transport_trace_enabled", lambda: False)
ssh_ft.register_transport_trace_listener(bad_listener)
try:
ssh_ft._transport_trace_event("ut.event") # must not raise
finally:
ssh_ft.unregister_transport_trace_listener(bad_listener)
def test_transport_trace_log_path_uses_sublime_cache_root(monkeypatch) -> None:
"""Log path lives under sublime.cache_path()/Sessions/logs."""
monkeypatch.setattr(ssh_ft.sublime, "cache_path", lambda: "/tmp/fake_cache")
out = ssh_ft._transport_trace_log_path()
assert str(out).endswith("Sessions/logs/debug-trace.log")
assert "/tmp/fake_cache" in str(out)
def test_transport_trace_enabled_returns_false_when_settings_unavailable(
monkeypatch,
) -> None:
"""Missing load_settings must safely return False, not crash."""
monkeypatch.setattr(ssh_ft.sublime, "load_settings", None, raising=False)
assert ssh_ft._transport_trace_enabled() is False
# --- _emit_bridge_diagnostic_matrix branches ---
def test_emit_bridge_diagnostic_matrix_no_op_when_disabled(monkeypatch) -> None:
captured: list = []
monkeypatch.setattr(ssh_ft, "_transport_trace_enabled", lambda: False)
monkeypatch.setattr(
ssh_ft, "_transport_trace_event", lambda *a, **k: captured.append((a, k))
)
ssh_ft._emit_bridge_diagnostic_matrix("prod", "spawn")
assert captured == []
def test_emit_bridge_diagnostic_matrix_includes_optional_payloads(
monkeypatch,
) -> None:
captured: list = []
monkeypatch.setattr(ssh_ft, "_transport_trace_enabled", lambda: True)
monkeypatch.setattr(
ssh_ft, "_transport_trace_event", lambda event, **k: captured.append((event, k))
)
class _Proc:
pid = 4242
payload = {
"id": "envelope-1",
"method": "file/read",
"timeout_ms": 5000,
}
ssh_ft._emit_bridge_diagnostic_matrix(
"prod",
"after_handshake",
bridge_path=None,
revision="rev-abc",
payload=payload,
process=_Proc(),
child_env_summary={"SESSIONS_BRIDGE_DIAG_LOG": True},
timeout_context={"phase": "handshake", "elapsed_ms": 12},
)
assert len(captured) == 1
event, fields = captured[0]
assert event == "bridge.diagnostic_matrix"
assert fields["phase"] == "after_handshake"
assert fields["host_alias"] == "prod"
assert fields["helper_revision"] == "rev-abc"
assert fields["envelope_id"] == "envelope-1"
assert fields["envelope_method"] == "file/read"
assert fields["envelope_timeout_ms"] == 5000
assert fields["bridge_subprocess_pid"] == 4242
assert fields["child_env_flags"] == {"SESSIONS_BRIDGE_DIAG_LOG": True}
assert fields["timeout_context"] == {"phase": "handshake", "elapsed_ms": 12}
# --- transport-trace listener registry ---
def test_binary_stat_snapshot_reports_size_and_mtime(tmp_path: Path) -> None:
target = tmp_path / "binary"
target.write_bytes(b"abc")
snap = ssh_ft._binary_stat_snapshot(target)
assert snap["path"] == str(target)
assert snap["size_bytes"] == 3
assert isinstance(snap["mtime_ns"], int)
assert "stat_error" not in snap
def test_binary_stat_snapshot_records_stat_error_for_missing(tmp_path: Path) -> None:
snap = ssh_ft._binary_stat_snapshot(tmp_path / "does-not-exist")
assert "stat_error" in snap
assert "size_bytes" not in snap
def test_bridge_diagnostic_hypothesis_catalog_returns_documented_rows() -> None:
rows = ssh_ft._bridge_diagnostic_hypothesis_catalog()
assert isinstance(rows, list) and rows, "catalog must list at least one hypothesis"
for row in rows:
assert {"id", "rust_events", "meaning"}.issubset(row.keys())
assert isinstance(row["id"], str) and row["id"].startswith("H")
def test_child_env_session_flags_reflects_bridge_diag_log_presence() -> None:
assert ssh_ft._child_env_session_flags({"SESSIONS_BRIDGE_DIAG_LOG": "/tmp/x"}) == {
"bridge_diag_log": True
}
assert ssh_ft._child_env_session_flags({}) == {"bridge_diag_log": False}
assert ssh_ft._child_env_session_flags({"SESSIONS_BRIDGE_DIAG_LOG": " "}) == {
"bridge_diag_log": False
}
# --- pure helpers (envelope id, revision normalization, auth hint) ---
def test_next_envelope_id_is_monotonic_per_prefix() -> None:
a = ssh_ft._next_envelope_id("tree-list")
b = ssh_ft._next_envelope_id("tree-list")
assert a.startswith("tree-list-") and b.startswith("tree-list-")
assert int(a.rsplit("-", 1)[1]) < int(b.rsplit("-", 1)[1])
def test_next_bridge_trace_request_id_is_strictly_increasing() -> None:
first = ssh_ft._next_bridge_trace_request_id()
second = ssh_ft._next_bridge_trace_request_id()
assert second == first + 1
def test_revision_cache_segment_short_alnum_passthrough() -> None:
assert ssh_ft._revision_cache_segment("v0.7.36") == "v0.7.36"
assert ssh_ft._revision_cache_segment("rev_abc-123") == "rev_abc-123"
def test_revision_cache_segment_blank_returns_unknown() -> None:
assert ssh_ft._revision_cache_segment("") == "unknown"
assert ssh_ft._revision_cache_segment(" ") == "unknown"
def test_revision_cache_segment_unsafe_chars_hash_fallback() -> None:
out = ssh_ft._revision_cache_segment("ev!l/path with spaces")
assert out.startswith("sha256_")
assert len(out) == len("sha256_") + 24 # truncated digest
def test_revision_cache_segment_overlong_hash_fallback() -> None:
out = ssh_ft._revision_cache_segment("a" * 200)
assert out.startswith("sha256_")
def test_validate_revision_path_segment_accepts_safe_chars() -> None:
ssh_ft._validate_revision_path_segment("v0.7.36-rc1")
ssh_ft._validate_revision_path_segment("rev_abc-123") # must not raise
def test_validate_revision_path_segment_rejects_path_separators() -> None:
with pytest.raises(SessionHelperStartError):
ssh_ft._validate_revision_path_segment("../escape")
def test_ssh_auth_failure_hint_returns_empty_for_non_auth_stderr() -> None:
assert ssh_ft._ssh_auth_failure_hint("connection refused") == ""
assert ssh_ft._ssh_auth_failure_hint("") == ""
def test_ssh_auth_failure_hint_returns_text_on_permission_denied() -> None:
hint = ssh_ft._ssh_auth_failure_hint("Permission denied (publickey)")
assert hint, "expected a one-line hint when stderr is auth-shaped"
def test_transport_trace_listener_register_and_unregister_round_trip() -> None:
def listener(event, fields):
return None
ssh_ft.register_transport_trace_listener(listener)
assert listener in ssh_ft._TRANSPORT_TRACE_LISTENERS
# Re-registering is idempotent (no duplicates).
ssh_ft.register_transport_trace_listener(listener)
assert ssh_ft._TRANSPORT_TRACE_LISTENERS.count(listener) == 1
ssh_ft.unregister_transport_trace_listener(listener)
assert listener not in ssh_ft._TRANSPORT_TRACE_LISTENERS
# Unregistering a not-registered listener is a no-op.
ssh_ft.unregister_transport_trace_listener(listener)

2
uv.lock generated
View File

@@ -854,7 +854,7 @@ wheels = [
[[package]]
name = "sessions-sublime"
version = "0.7.35"
version = "0.7.38"
source = { virtual = "." }
[package.dev-dependencies]