Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 5c8a29efa5 | |||
| 718c7bcc42 | |||
| d51e5f2f05 |
@@ -23,7 +23,7 @@ on:
|
||||
|
||||
env:
|
||||
RUST_COV_FAIL_UNDER: 80
|
||||
PYTHON_COV_FAIL_UNDER: 78
|
||||
PYTHON_COV_FAIL_UNDER: 80
|
||||
|
||||
jobs:
|
||||
rust:
|
||||
|
||||
@@ -147,7 +147,7 @@ jobs:
|
||||
UV_PYTHON_INSTALL_DIR=.uv-python uv run --python 3.8 \
|
||||
pytest --cov=sublime/sessions --cov-report=term-missing \
|
||||
--cov-report=xml:coverage-python.xml \
|
||||
--cov-fail-under=78
|
||||
--cov-fail-under=80
|
||||
|
||||
- name: Import Sublime runtime modules
|
||||
run: UV_PYTHON_INSTALL_DIR=.uv-python uv run --python 3.8 pytest sublime/tests/test_runtime_import_smoke.py
|
||||
|
||||
@@ -17,7 +17,7 @@ repos:
|
||||
|
||||
- id: python-test
|
||||
name: python test
|
||||
entry: sh -c 'cargo build --manifest-path rust/Cargo.toml -p local_bridge -p sessions_native -q && env UV_PYTHON_INSTALL_DIR=.uv-python uv run --python 3.8 pytest --cov=sublime/sessions --cov-fail-under=78 -q'
|
||||
entry: sh -c 'cargo build --manifest-path rust/Cargo.toml -p local_bridge -p sessions_native -q && env UV_PYTHON_INSTALL_DIR=.uv-python uv run --python 3.8 pytest --cov=sublime/sessions --cov-fail-under=80 -q'
|
||||
language: system
|
||||
types: [python]
|
||||
pass_filenames: false
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "sessions-sublime"
|
||||
version = "0.7.35"
|
||||
version = "0.7.38"
|
||||
description = "Sublime-facing Python code for Sessions."
|
||||
requires-python = ">=3.8"
|
||||
license = {text = "MIT"}
|
||||
|
||||
12
rust/Cargo.lock
generated
12
rust/Cargo.lock
generated
@@ -221,7 +221,7 @@ checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53"
|
||||
|
||||
[[package]]
|
||||
name = "local_bridge"
|
||||
version = "0.7.35"
|
||||
version = "0.7.38"
|
||||
dependencies = [
|
||||
"base64",
|
||||
"glob",
|
||||
@@ -432,7 +432,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "session_helper"
|
||||
version = "0.7.35"
|
||||
version = "0.7.38"
|
||||
dependencies = [
|
||||
"base64",
|
||||
"notify",
|
||||
@@ -443,7 +443,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "session_protocol"
|
||||
version = "0.7.35"
|
||||
version = "0.7.38"
|
||||
dependencies = [
|
||||
"base64",
|
||||
"serde",
|
||||
@@ -452,14 +452,14 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "sessions_askpass"
|
||||
version = "0.7.35"
|
||||
version = "0.7.38"
|
||||
dependencies = [
|
||||
"tempfile",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sessions_native"
|
||||
version = "0.7.35"
|
||||
version = "0.7.38"
|
||||
dependencies = [
|
||||
"base64",
|
||||
"notify",
|
||||
@@ -773,7 +773,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "workspace_identity"
|
||||
version = "0.7.35"
|
||||
version = "0.7.38"
|
||||
|
||||
[[package]]
|
||||
name = "zmij"
|
||||
|
||||
@@ -12,7 +12,7 @@ resolver = "2"
|
||||
[workspace.package]
|
||||
edition = "2024"
|
||||
license = "MIT"
|
||||
version = "0.7.35"
|
||||
version = "0.7.38"
|
||||
authors = ["Myeongseon Choi <key262yek@gmail.com>"]
|
||||
repository = "https://git.teahaven.kr/sublime-rs/sessions"
|
||||
homepage = "https://git.teahaven.kr/sublime-rs/sessions"
|
||||
|
||||
@@ -6,10 +6,13 @@ from dataclasses import replace
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
from conftest import FakeWindow
|
||||
from sessions import commands
|
||||
from conftest import FakeView, FakeWindow
|
||||
from sessions import commands, commands_python_pipeline
|
||||
from sessions.file_state import OpenFileResult, OpenOutcome
|
||||
from sessions.recent_state import RecentWorkspace, RecentWorkspaceIndex
|
||||
from sessions.remote import (
|
||||
RemoteFileKind,
|
||||
RemoteFileMetadata,
|
||||
RunTrigger,
|
||||
ToolExecutionRequest,
|
||||
ToolExecutionResult,
|
||||
@@ -872,18 +875,44 @@ def _make_tool_request(kind: ToolRunKind = ToolRunKind.LINTER) -> ToolExecutionR
|
||||
)
|
||||
|
||||
|
||||
def _stub_refresh_cache_lower_layers(monkeypatch, *, opened: OpenFileResult):
|
||||
"""Wire up the four indirection points _refresh_local_cache_after_format hits.
|
||||
|
||||
Lets the real helper run end-to-end (mapper, lane bookkeeping,
|
||||
sidecar write) under deterministic in-process substitutes so the
|
||||
body covers ~36 lines instead of being monkey-patched out.
|
||||
"""
|
||||
monkeypatch.setattr(commands, "_run_mirror_in_background", lambda fn, *a: fn(*a))
|
||||
monkeypatch.setattr(commands, "_set_timeout", lambda fn, delay_ms=0: fn())
|
||||
monkeypatch.setattr(commands, "_begin_interactive_ssh_lane", lambda host: None)
|
||||
monkeypatch.setattr(commands, "_end_interactive_ssh_lane", lambda host: None)
|
||||
monkeypatch.setattr(
|
||||
commands,
|
||||
"open_remote_file_into_local_cache",
|
||||
lambda host_alias, *, remote_absolute_path, local_cache_path: opened,
|
||||
)
|
||||
|
||||
|
||||
def test_present_remote_tool_result_format_success_refreshes_cache(
|
||||
tmp_path: Path, monkeypatch
|
||||
) -> None:
|
||||
status_messages: List[str] = []
|
||||
monkeypatch.setattr(commands.sublime, "status_message", status_messages.append)
|
||||
refresh_calls: List[str] = []
|
||||
monkeypatch.setattr(
|
||||
commands,
|
||||
"_refresh_local_cache_after_format",
|
||||
lambda window, ctx, remote_path: refresh_calls.append(remote_path),
|
||||
)
|
||||
window, ctx = _present_tool_result_ctx(tmp_path, monkeypatch)
|
||||
expected_local = ctx.local_cache_root / "a.py"
|
||||
expected_local.parent.mkdir(parents=True, exist_ok=True)
|
||||
expected_local.write_text("formatted\n", encoding="utf-8")
|
||||
metadata = RemoteFileMetadata(
|
||||
mtime_ns=1, size_bytes=10, kind=RemoteFileKind.REGULAR_FILE, unix_mode=33188
|
||||
)
|
||||
_stub_refresh_cache_lower_layers(
|
||||
monkeypatch,
|
||||
opened=OpenFileResult(
|
||||
outcome=OpenOutcome.OK,
|
||||
local_cache_path=expected_local,
|
||||
remote_metadata=metadata,
|
||||
),
|
||||
)
|
||||
req = _make_tool_request(ToolRunKind.FORMATTER)
|
||||
res = ToolExecutionResult(
|
||||
exit_code=0,
|
||||
@@ -895,11 +924,511 @@ def test_present_remote_tool_result_format_success_refreshes_cache(
|
||||
commands._present_remote_tool_result(
|
||||
window, ctx, req, res, normalized_kind="format"
|
||||
)
|
||||
assert refresh_calls == ["/srv/ws/a.py"]
|
||||
assert status_messages[-1].startswith("Sessions ready")
|
||||
sidecar = commands._remote_metadata_sidecar_path(expected_local)
|
||||
assert sidecar.is_file()
|
||||
assert "Remote formatter completed" in status_messages[-1]
|
||||
|
||||
|
||||
def test_refresh_local_cache_after_format_remote_not_found_warns(
|
||||
tmp_path: Path, monkeypatch
|
||||
) -> None:
|
||||
status_messages: List[str] = []
|
||||
monkeypatch.setattr(commands.sublime, "status_message", status_messages.append)
|
||||
window, ctx = _present_tool_result_ctx(tmp_path, monkeypatch)
|
||||
expected_local = ctx.local_cache_root / "a.py"
|
||||
_stub_refresh_cache_lower_layers(
|
||||
monkeypatch,
|
||||
opened=OpenFileResult(
|
||||
outcome=OpenOutcome.REMOTE_NOT_FOUND,
|
||||
local_cache_path=expected_local,
|
||||
),
|
||||
)
|
||||
commands._refresh_local_cache_after_format(window, ctx, "/srv/ws/a.py")
|
||||
assert any("no longer exists" in m for m in status_messages)
|
||||
|
||||
|
||||
# --- _apply_inline_diagnostics early-return paths ---
|
||||
|
||||
|
||||
def test_apply_inline_diagnostics_no_active_view_is_noop() -> None:
|
||||
window = FakeWindow()
|
||||
window.active_view_value = None
|
||||
commands._apply_inline_diagnostics(window, ()) # must not raise
|
||||
|
||||
|
||||
def test_apply_inline_diagnostics_view_without_file_is_noop() -> None:
|
||||
window = FakeWindow()
|
||||
window.active_view_value = FakeView(file_name=None)
|
||||
commands._apply_inline_diagnostics(window, ()) # must not raise
|
||||
|
||||
|
||||
def test_apply_inline_diagnostics_no_matching_presentation_is_noop() -> None:
|
||||
window = FakeWindow()
|
||||
view = FakeView(file_name="/local/x.py")
|
||||
window.active_view_value = view
|
||||
|
||||
class _Pres:
|
||||
local_file_path = "/local/y.py"
|
||||
start_row = 0
|
||||
start_col = 0
|
||||
end_row = 0
|
||||
end_col = 1
|
||||
scope = None
|
||||
|
||||
commands._apply_inline_diagnostics(window, (_Pres(),))
|
||||
assert view.regions == {}
|
||||
|
||||
|
||||
# --- _remote_tool_footer_lines branches ---
|
||||
|
||||
|
||||
def test_remote_tool_footer_lines_collects_unopened_summary_and_hint() -> None:
|
||||
from sessions.diagnostics import (
|
||||
DiagnosticRange,
|
||||
DiagnosticRecord,
|
||||
DiagnosticSeverity,
|
||||
MappedDiagnostic,
|
||||
SourcePosition,
|
||||
)
|
||||
|
||||
rec = DiagnosticRecord(
|
||||
remote_path="/srv/ws/missing.py",
|
||||
message="m",
|
||||
severity=DiagnosticSeverity.ERROR,
|
||||
range=DiagnosticRange(
|
||||
start=SourcePosition(line=1, character=0),
|
||||
end=SourcePosition(line=1, character=1),
|
||||
),
|
||||
source=None,
|
||||
code=None,
|
||||
)
|
||||
unmapped = MappedDiagnostic(
|
||||
diagnostic=rec, local_path=None, unmapped_reason="not_in_workspace"
|
||||
)
|
||||
mapped_other = MappedDiagnostic(
|
||||
diagnostic=rec, local_path=Path("/local/other.py"), unmapped_reason=None
|
||||
)
|
||||
|
||||
class _Result:
|
||||
tool_not_found_hint = "install ruff"
|
||||
|
||||
lines = commands._remote_tool_footer_lines(
|
||||
_Result(),
|
||||
(unmapped, mapped_other),
|
||||
current_local_file="/local/active.py",
|
||||
)
|
||||
assert any("Unopened/unmapped diagnostics" in line for line in lines)
|
||||
assert any("unopened cache file" in line for line in lines)
|
||||
assert "install ruff" in lines
|
||||
|
||||
|
||||
# --- _default_remote_workspace_directory direct coverage ---
|
||||
|
||||
|
||||
def test_default_remote_workspace_directory_uses_handshake_remote_home(
|
||||
monkeypatch,
|
||||
) -> None:
|
||||
monkeypatch.setattr(
|
||||
commands,
|
||||
"bridge_handshake_info",
|
||||
lambda host: {"remote_home": "/srv/people/alice"},
|
||||
)
|
||||
out = commands._default_remote_workspace_directory("prod")
|
||||
assert out == "/srv/people/alice"
|
||||
|
||||
|
||||
def test_default_remote_workspace_directory_falls_back_to_ssh_home(
|
||||
monkeypatch,
|
||||
) -> None:
|
||||
from sessions.ssh_runner import SshRunResult
|
||||
|
||||
monkeypatch.setattr(commands, "bridge_handshake_info", lambda host: None)
|
||||
monkeypatch.setattr(
|
||||
commands,
|
||||
"run_ssh_remote_command",
|
||||
lambda host, argv: SshRunResult(
|
||||
returncode=0, stdout="/home/bob\n", stderr="", local_argv=tuple(argv)
|
||||
),
|
||||
)
|
||||
out = commands._default_remote_workspace_directory("prod")
|
||||
assert out == "/home/bob"
|
||||
|
||||
|
||||
# --- _trace_event direct coverage ---
|
||||
|
||||
|
||||
def test_trace_event_appends_jsonl_when_enabled(tmp_path: Path, monkeypatch) -> None:
|
||||
import json as _json
|
||||
|
||||
log_path = tmp_path / "logs" / "debug-trace.log"
|
||||
monkeypatch.setattr(commands, "_trace_enabled", lambda: True)
|
||||
monkeypatch.setattr(commands, "_trace_log_path", lambda: log_path)
|
||||
commands._trace_event("ut.event", foo="bar", count=3)
|
||||
assert log_path.is_file()
|
||||
line = log_path.read_text(encoding="utf-8").strip().splitlines()[-1]
|
||||
payload = _json.loads(line)
|
||||
assert payload["event"] == "ut.event"
|
||||
assert payload["foo"] == "bar"
|
||||
assert payload["count"] == 3
|
||||
assert "ts" in payload and "time" in payload
|
||||
|
||||
|
||||
def test_trace_event_no_op_when_disabled(tmp_path: Path, monkeypatch) -> None:
|
||||
log_path = tmp_path / "logs" / "debug-trace.log"
|
||||
monkeypatch.setattr(commands, "_trace_enabled", lambda: False)
|
||||
monkeypatch.setattr(commands, "_trace_log_path", lambda: log_path)
|
||||
commands._trace_event("ut.event", foo="bar")
|
||||
assert not log_path.exists()
|
||||
|
||||
|
||||
# --- _eager_hydrate_workspace branch coverage ---
|
||||
|
||||
|
||||
def test_eager_hydrate_workspace_writes_sidecars_for_hydrated_entries(
|
||||
tmp_path: Path, monkeypatch
|
||||
) -> None:
|
||||
window, ctx = _present_tool_result_ctx(tmp_path, monkeypatch)
|
||||
hydrated_dir = ctx.local_cache_root / "pkg"
|
||||
hydrated_dir.mkdir(parents=True, exist_ok=True)
|
||||
hydrated_path = hydrated_dir / "build_graph.py"
|
||||
hydrated_path.write_bytes(b"x = 1\n")
|
||||
bad_kind_path = hydrated_dir / "weird.bin"
|
||||
bad_kind_path.write_bytes(b"\x00\x01")
|
||||
|
||||
summary = {
|
||||
"hydrated": [
|
||||
{
|
||||
"local_path": str(hydrated_path),
|
||||
"metadata": {
|
||||
"mtime_ns": 1,
|
||||
"size_bytes": 6,
|
||||
"kind": "regular_file",
|
||||
"unix_mode": 33188,
|
||||
},
|
||||
},
|
||||
# bad shape — must be skipped without raising
|
||||
{"local_path": 42, "metadata": "not a dict"},
|
||||
# unknown kind string falls through to OTHER
|
||||
{
|
||||
"local_path": str(bad_kind_path),
|
||||
"metadata": {
|
||||
"mtime_ns": 2,
|
||||
"size_bytes": 2,
|
||||
"kind": "this_is_not_a_real_kind",
|
||||
"unix_mode": None,
|
||||
},
|
||||
},
|
||||
],
|
||||
"skipped_existing": 5,
|
||||
"failed": 1,
|
||||
}
|
||||
monkeypatch.setattr(commands._rust_ffi, "eager_hydrate_apply", lambda **_: summary)
|
||||
commands._eager_hydrate_workspace(window, ctx, ("build_graph.py",))
|
||||
assert commands._remote_metadata_sidecar_path(hydrated_path).is_file()
|
||||
assert commands._remote_metadata_sidecar_path(bad_kind_path).is_file()
|
||||
|
||||
|
||||
# --- _collect_remote_python_pipeline_results branch coverage ---
|
||||
|
||||
|
||||
def _collect_pipeline_ctx(tmp_path: Path, monkeypatch):
|
||||
"""Workspace context wired so the pipeline collector reaches the run loop."""
|
||||
settings = SessionsSettings(
|
||||
ssh_config_path=tmp_path / "config",
|
||||
remote_python_tool_pipeline=("ruff_lint",),
|
||||
)
|
||||
monkeypatch.setattr(commands, "SessionsSettings", lambda: settings)
|
||||
monkeypatch.setattr(commands.sublime, "cache_path", lambda: str(tmp_path / "cache"))
|
||||
monkeypatch.setattr(
|
||||
commands_python_pipeline,
|
||||
"_effective_sessions_settings_for_remote_python",
|
||||
lambda window: settings,
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
commands_python_pipeline,
|
||||
"_remote_python_pipeline_targets",
|
||||
lambda view, window, merged: object(),
|
||||
)
|
||||
recent_store = commands._recent_store(settings)
|
||||
recent_store.save_index(
|
||||
RecentWorkspaceIndex(
|
||||
(
|
||||
RecentWorkspace(
|
||||
"prod",
|
||||
"/srv/ws",
|
||||
"cache-1",
|
||||
"2026-04-12T03:00:00+00:00",
|
||||
),
|
||||
)
|
||||
)
|
||||
)
|
||||
window = FakeWindow(project_data={"settings": {PROJECT_SETTINGS_KEY: "cache-1"}})
|
||||
ctx = commands._workspace_context(window, settings)
|
||||
assert ctx is not None
|
||||
return window, ctx
|
||||
|
||||
|
||||
def test_collect_pipeline_results_skips_non_python_path(
|
||||
tmp_path: Path, monkeypatch
|
||||
) -> None:
|
||||
window, ctx = _collect_pipeline_ctx(tmp_path, monkeypatch)
|
||||
out = commands_python_pipeline._collect_remote_python_pipeline_results(
|
||||
window, None, ctx, "/srv/ws/README.md", RunTrigger.ON_SAVE
|
||||
)
|
||||
assert out == ()
|
||||
|
||||
|
||||
def test_collect_pipeline_results_skips_when_diagnostics_off(
|
||||
tmp_path: Path, monkeypatch
|
||||
) -> None:
|
||||
window, ctx = _collect_pipeline_ctx(tmp_path, monkeypatch)
|
||||
monkeypatch.setattr(
|
||||
commands_python_pipeline,
|
||||
"_effective_sessions_settings_for_remote_python",
|
||||
lambda window: SessionsSettings(
|
||||
ssh_config_path=tmp_path / "config",
|
||||
remote_python_auto_diagnostics_on_save=False,
|
||||
),
|
||||
)
|
||||
out = commands_python_pipeline._collect_remote_python_pipeline_results(
|
||||
window, None, ctx, "/srv/ws/a.py", RunTrigger.ON_SAVE
|
||||
)
|
||||
assert out == ()
|
||||
|
||||
|
||||
def test_collect_pipeline_results_runs_pipeline_and_returns_tuple(
|
||||
tmp_path: Path, monkeypatch
|
||||
) -> None:
|
||||
window, ctx = _collect_pipeline_ctx(tmp_path, monkeypatch)
|
||||
fake_result = ToolExecutionResult(
|
||||
exit_code=0,
|
||||
stdout=TruncatedStream("", False),
|
||||
stderr=TruncatedStream("", False),
|
||||
diagnostics=(),
|
||||
failure_category=ToolFailureCategory.SUCCESS,
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
commands, "execute_remote_tool_request", lambda h, r: fake_result
|
||||
)
|
||||
out = commands_python_pipeline._collect_remote_python_pipeline_results(
|
||||
window, None, ctx, "/srv/ws/a.py", RunTrigger.ON_SAVE
|
||||
)
|
||||
assert out is not None
|
||||
assert len(out) >= 1
|
||||
|
||||
|
||||
def test_collect_pipeline_results_transport_error_returns_none(
|
||||
tmp_path: Path, monkeypatch
|
||||
) -> None:
|
||||
from sessions.connect_preflight import SessionHelperStartError
|
||||
|
||||
window, ctx = _collect_pipeline_ctx(tmp_path, monkeypatch)
|
||||
|
||||
def raise_transport(*a, **k):
|
||||
raise SessionHelperStartError("ssh broke")
|
||||
|
||||
monkeypatch.setattr(commands, "execute_remote_tool_request", raise_transport)
|
||||
monkeypatch.setattr(commands, "_set_timeout", lambda fn, delay_ms=0: fn())
|
||||
out = commands_python_pipeline._collect_remote_python_pipeline_results(
|
||||
window, None, ctx, "/srv/ws/a.py", RunTrigger.ON_SAVE
|
||||
)
|
||||
assert out is None
|
||||
|
||||
|
||||
# --- _force_overwrite_remote branch coverage ---
|
||||
|
||||
|
||||
def _force_overwrite_ctx_with_cache(tmp_path: Path, monkeypatch):
|
||||
"""Build a workspace + on-disk cache file ready for force-overwrite."""
|
||||
window, ctx = _present_tool_result_ctx(tmp_path, monkeypatch)
|
||||
cache_file = ctx.local_cache_root / "a.py"
|
||||
cache_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
cache_file.write_bytes(b"local body\n")
|
||||
return ctx, cache_file
|
||||
|
||||
|
||||
def test_force_overwrite_remote_success_writes_sidecar(
|
||||
tmp_path: Path, monkeypatch
|
||||
) -> None:
|
||||
status_messages: List[str] = []
|
||||
monkeypatch.setattr(commands.sublime, "status_message", status_messages.append)
|
||||
ctx, cache_file = _force_overwrite_ctx_with_cache(tmp_path, monkeypatch)
|
||||
fresh_meta = RemoteFileMetadata(
|
||||
mtime_ns=2, size_bytes=11, kind=RemoteFileKind.REGULAR_FILE, unix_mode=33188
|
||||
)
|
||||
from sessions.remote import RemoteWriteFileResult
|
||||
|
||||
monkeypatch.setattr(
|
||||
commands,
|
||||
"execute_remote_write_file",
|
||||
lambda host, req: RemoteWriteFileResult(ok=True, updated_metadata=fresh_meta),
|
||||
)
|
||||
commands._force_overwrite_remote(ctx, "/srv/ws/a.py", cache_file, None)
|
||||
sidecar = commands._remote_metadata_sidecar_path(cache_file)
|
||||
assert sidecar.is_file()
|
||||
assert "Overwritten remote file" in status_messages[-1]
|
||||
|
||||
|
||||
def test_force_overwrite_remote_transport_error_emits_disconnected(
|
||||
tmp_path: Path, monkeypatch
|
||||
) -> None:
|
||||
from sessions.remote import RemoteWriteErrorCode, RemoteWriteFileResult
|
||||
|
||||
status_messages: List[str] = []
|
||||
monkeypatch.setattr(commands.sublime, "status_message", status_messages.append)
|
||||
ctx, cache_file = _force_overwrite_ctx_with_cache(tmp_path, monkeypatch)
|
||||
monkeypatch.setattr(
|
||||
commands,
|
||||
"execute_remote_write_file",
|
||||
lambda host, req: RemoteWriteFileResult(
|
||||
ok=False,
|
||||
error_code=RemoteWriteErrorCode.TRANSPORT_ERROR,
|
||||
error_message="ssh broke",
|
||||
),
|
||||
)
|
||||
commands._force_overwrite_remote(ctx, "/srv/ws/a.py", cache_file, None)
|
||||
assert "ssh broke" in status_messages[-1]
|
||||
|
||||
|
||||
def test_force_overwrite_remote_other_failure_emits_warning(
|
||||
tmp_path: Path, monkeypatch
|
||||
) -> None:
|
||||
from sessions.remote import RemoteWriteErrorCode, RemoteWriteFileResult
|
||||
|
||||
status_messages: List[str] = []
|
||||
monkeypatch.setattr(commands.sublime, "status_message", status_messages.append)
|
||||
ctx, cache_file = _force_overwrite_ctx_with_cache(tmp_path, monkeypatch)
|
||||
monkeypatch.setattr(
|
||||
commands,
|
||||
"execute_remote_write_file",
|
||||
lambda host, req: RemoteWriteFileResult(
|
||||
ok=False,
|
||||
error_code=RemoteWriteErrorCode.PERMISSION_DENIED,
|
||||
error_message="EACCES",
|
||||
),
|
||||
)
|
||||
commands._force_overwrite_remote(ctx, "/srv/ws/a.py", cache_file, None)
|
||||
assert "EACCES" in status_messages[-1]
|
||||
|
||||
|
||||
# --- _precheck_remote_file_openability branch coverage ---
|
||||
|
||||
|
||||
def test_precheck_remote_file_openability_transport_error_skips(monkeypatch) -> None:
|
||||
from sessions.connect_preflight import SessionHelperStartError
|
||||
|
||||
def raise_transport(*args, **kwargs):
|
||||
raise SessionHelperStartError("ssh down")
|
||||
|
||||
monkeypatch.setattr(commands, "execute_remote_stat_file", raise_transport)
|
||||
skipped = []
|
||||
outcome = commands._precheck_remote_file_openability(
|
||||
cache_key="cache-1",
|
||||
host_alias="prod",
|
||||
remote="/srv/ws/a.py",
|
||||
path_str="/local/a.py",
|
||||
on_skip=lambda: skipped.append(True),
|
||||
)
|
||||
assert outcome.proceed is False
|
||||
assert skipped == [True]
|
||||
|
||||
|
||||
def test_precheck_remote_file_openability_missing_remote_proceeds(monkeypatch) -> None:
|
||||
monkeypatch.setattr(commands, "execute_remote_stat_file", lambda *a, **k: None)
|
||||
skipped = []
|
||||
outcome = commands._precheck_remote_file_openability(
|
||||
cache_key="cache-1",
|
||||
host_alias="prod",
|
||||
remote="/srv/ws/a.py",
|
||||
path_str="/local/a.py",
|
||||
on_skip=lambda: skipped.append(True),
|
||||
)
|
||||
assert outcome.proceed is True
|
||||
assert outcome.stat_metadata is None
|
||||
assert skipped == []
|
||||
|
||||
|
||||
def test_precheck_remote_file_openability_blocked_by_guard_skips(monkeypatch) -> None:
|
||||
huge = RemoteFileMetadata(
|
||||
mtime_ns=1,
|
||||
size_bytes=10 * 1024 * 1024 * 1024, # 10 GiB — defeats every reasonable cap
|
||||
kind=RemoteFileKind.REGULAR_FILE,
|
||||
unix_mode=33188,
|
||||
)
|
||||
monkeypatch.setattr(commands, "execute_remote_stat_file", lambda *a, **k: huge)
|
||||
skipped = []
|
||||
outcome = commands._precheck_remote_file_openability(
|
||||
cache_key="cache-1",
|
||||
host_alias="prod",
|
||||
remote="/srv/ws/big.bin",
|
||||
path_str="/local/big.bin",
|
||||
on_skip=lambda: skipped.append(True),
|
||||
)
|
||||
assert outcome.proceed is False
|
||||
assert outcome.stat_metadata is huge
|
||||
assert skipped == [True]
|
||||
|
||||
|
||||
def test_precheck_remote_file_openability_clean_metadata_proceeds(monkeypatch) -> None:
|
||||
ok_meta = RemoteFileMetadata(
|
||||
mtime_ns=1, size_bytes=128, kind=RemoteFileKind.REGULAR_FILE, unix_mode=33188
|
||||
)
|
||||
monkeypatch.setattr(commands, "execute_remote_stat_file", lambda *a, **k: ok_meta)
|
||||
skipped = []
|
||||
outcome = commands._precheck_remote_file_openability(
|
||||
cache_key="cache-1",
|
||||
host_alias="prod",
|
||||
remote="/srv/ws/a.py",
|
||||
path_str="/local/a.py",
|
||||
on_skip=lambda: skipped.append(True),
|
||||
)
|
||||
assert outcome.proceed is True
|
||||
assert outcome.stat_metadata is ok_meta
|
||||
assert skipped == []
|
||||
|
||||
|
||||
def test_present_remote_tool_result_with_diagnostics_applies_inline_regions(
|
||||
tmp_path: Path, monkeypatch
|
||||
) -> None:
|
||||
status_messages: List[str] = []
|
||||
monkeypatch.setattr(commands.sublime, "status_message", status_messages.append)
|
||||
window, ctx = _present_tool_result_ctx(tmp_path, monkeypatch)
|
||||
expected_local = ctx.local_cache_root / "a.py"
|
||||
expected_local.parent.mkdir(parents=True, exist_ok=True)
|
||||
expected_local.write_text("x = 1\n", encoding="utf-8")
|
||||
active_view = FakeView(file_name=str(expected_local))
|
||||
window.active_view_value = active_view
|
||||
diag = {
|
||||
"path": "/srv/ws/a.py",
|
||||
"message": "unused import",
|
||||
"severity": "warning",
|
||||
"row": 1,
|
||||
"column": 1,
|
||||
"end_column": 5,
|
||||
"code": "F401",
|
||||
}
|
||||
other_diag = {
|
||||
"path": "/srv/ws/other.py",
|
||||
"message": "unused",
|
||||
"severity": "warning",
|
||||
"row": 1,
|
||||
}
|
||||
req = _make_tool_request()
|
||||
res = ToolExecutionResult(
|
||||
exit_code=1,
|
||||
stdout=TruncatedStream("{}", False),
|
||||
stderr=TruncatedStream("", False),
|
||||
diagnostics=(diag, other_diag),
|
||||
failure_category=ToolFailureCategory.NON_ZERO_WITH_DIAGNOSTICS,
|
||||
tool_not_found_hint="(unused hint to exercise footer notes)",
|
||||
)
|
||||
commands._present_remote_tool_result(window, ctx, req, res, normalized_kind="lint")
|
||||
assert active_view.regions, "inline diagnostic regions should be added"
|
||||
panel = window.output_panels.get("sessions_remote_tool")
|
||||
assert panel is not None
|
||||
assert "exit code 1" in status_messages[-1]
|
||||
|
||||
|
||||
def test_present_remote_tool_result_lint_success_emits_ready(
|
||||
tmp_path: Path, monkeypatch
|
||||
) -> None:
|
||||
|
||||
@@ -199,3 +199,76 @@ def test_progress_panel_ignores_noisy_events() -> None:
|
||||
text_blob = "\n".join(text for text, _ in calls)
|
||||
assert "queue.enqueue" not in text_blob
|
||||
assert "bridge.request_start" not in text_blob
|
||||
|
||||
|
||||
# --- _hide_panel_if_progress branches ---
|
||||
|
||||
|
||||
class _PanelHideWindow:
|
||||
def __init__(self, active_panel_name):
|
||||
self._active = active_panel_name
|
||||
self.run_calls: list = []
|
||||
|
||||
def active_panel(self):
|
||||
return self._active
|
||||
|
||||
def run_command(self, name, args=None):
|
||||
self.run_calls.append((name, args))
|
||||
|
||||
|
||||
def test_hide_panel_if_progress_hides_when_panel_is_active() -> None:
|
||||
win = _PanelHideWindow(
|
||||
active_panel_name="output." + connect_progress._PROGRESS_PANEL_NAME
|
||||
)
|
||||
connect_progress._hide_panel_if_progress(win)
|
||||
assert ("hide_panel", {}) in win.run_calls
|
||||
|
||||
|
||||
def test_hide_panel_if_progress_no_op_when_user_switched_panels() -> None:
|
||||
win = _PanelHideWindow(active_panel_name="output.exec")
|
||||
connect_progress._hide_panel_if_progress(win)
|
||||
assert win.run_calls == []
|
||||
|
||||
|
||||
def test_hide_panel_if_progress_no_op_when_window_lacks_active_panel() -> None:
|
||||
class _NoActivePanel:
|
||||
run_calls: list = []
|
||||
|
||||
def run_command(self, name, args=None):
|
||||
self.run_calls.append((name, args))
|
||||
|
||||
win = _NoActivePanel()
|
||||
connect_progress._hide_panel_if_progress(win)
|
||||
assert win.run_calls == []
|
||||
|
||||
|
||||
# --- ConnectProgressPanel.success / failure branches ---
|
||||
|
||||
|
||||
def test_progress_panel_failure_appends_terminal_line() -> None:
|
||||
window = FakeWindow()
|
||||
panel = connect_progress.ConnectProgressPanel(window, "aws-celery")
|
||||
panel.start()
|
||||
try:
|
||||
panel.failure("ssh down")
|
||||
finally:
|
||||
panel.stop()
|
||||
panel_buf = window.output_panels.get(connect_progress._PROGRESS_PANEL_NAME)
|
||||
assert panel_buf is not None
|
||||
text = "\n".join(text for text, _ in panel_buf.append_calls)
|
||||
assert "Connect FAILED" in text
|
||||
assert "ssh down" in text
|
||||
|
||||
|
||||
def test_progress_panel_success_appends_terminal_line() -> None:
|
||||
window = FakeWindow()
|
||||
panel = connect_progress.ConnectProgressPanel(window, "aws-celery")
|
||||
panel.start()
|
||||
try:
|
||||
panel.success(detail="ready")
|
||||
finally:
|
||||
panel.stop()
|
||||
panel_buf = window.output_panels.get(connect_progress._PROGRESS_PANEL_NAME)
|
||||
assert panel_buf is not None
|
||||
text = "\n".join(text for text, _ in panel_buf.append_calls)
|
||||
assert "Connect SUCCESS" in text
|
||||
|
||||
@@ -4,12 +4,87 @@ from __future__ import annotations
|
||||
|
||||
from conftest import FakeView
|
||||
from sessions.lsp_save_preferences import (
|
||||
_as_enabled_flag,
|
||||
_settings_getter,
|
||||
lsp_code_actions_on_save_kinds,
|
||||
lsp_fix_all_on_save_enabled,
|
||||
lsp_format_on_save_enabled,
|
||||
lsp_organize_imports_on_save_enabled,
|
||||
)
|
||||
|
||||
# --- _settings_getter / _as_enabled_flag edge branches ---
|
||||
|
||||
|
||||
class _ViewWithoutSettings:
|
||||
pass
|
||||
|
||||
|
||||
class _ViewWithBrokenSettings:
|
||||
def settings(self):
|
||||
class _Store:
|
||||
get = "not callable"
|
||||
|
||||
return _Store()
|
||||
|
||||
|
||||
def test_settings_getter_returns_none_when_view_has_no_settings_method() -> None:
|
||||
assert _settings_getter(_ViewWithoutSettings()) is None
|
||||
|
||||
|
||||
def test_settings_getter_returns_none_when_store_get_is_not_callable() -> None:
|
||||
assert _settings_getter(_ViewWithBrokenSettings()) is None
|
||||
|
||||
|
||||
def test_as_enabled_flag_truthy_int_and_float_branches() -> None:
|
||||
assert _as_enabled_flag(1) is True
|
||||
assert _as_enabled_flag(0) is False
|
||||
assert _as_enabled_flag(1.5) is True
|
||||
assert _as_enabled_flag(0.0) is False
|
||||
|
||||
|
||||
def test_as_enabled_flag_unknown_type_falls_through_to_false() -> None:
|
||||
assert _as_enabled_flag(object()) is False
|
||||
|
||||
|
||||
# --- lsp_code_actions_on_save_kinds list/tuple + filter branches ---
|
||||
|
||||
|
||||
def test_lsp_code_actions_kinds_returns_empty_for_view_without_settings() -> None:
|
||||
assert lsp_code_actions_on_save_kinds(_ViewWithoutSettings()) == ()
|
||||
|
||||
|
||||
def test_lsp_code_actions_kinds_filters_blank_and_non_string_keys() -> None:
|
||||
v = FakeView()
|
||||
v.settings().set(
|
||||
"lsp_code_actions_on_save",
|
||||
{
|
||||
"source.fixAll": True,
|
||||
" ": True, # blank key — must be skipped
|
||||
42: True, # non-string key — must be skipped
|
||||
"source.disabled": False, # disabled flag — must be skipped
|
||||
},
|
||||
)
|
||||
out = lsp_code_actions_on_save_kinds(v)
|
||||
assert out == ("source.fixAll",)
|
||||
|
||||
|
||||
def test_lsp_code_actions_kinds_accepts_list_form() -> None:
|
||||
v = FakeView()
|
||||
v.settings().set(
|
||||
"lsp_code_actions_on_save",
|
||||
["source.organizeImports", " ", 7, "source.fixAll"],
|
||||
)
|
||||
assert lsp_code_actions_on_save_kinds(v) == (
|
||||
"source.organizeImports",
|
||||
"source.fixAll",
|
||||
)
|
||||
|
||||
|
||||
def test_lsp_code_actions_kinds_returns_empty_for_unsupported_shape() -> None:
|
||||
v = FakeView()
|
||||
v.settings().set("lsp_code_actions_on_save", "not a dict or list")
|
||||
assert lsp_code_actions_on_save_kinds(v) == ()
|
||||
|
||||
|
||||
def test_lsp_format_on_save_enabled_bool() -> None:
|
||||
v = FakeView()
|
||||
|
||||
@@ -602,3 +602,234 @@ def test_per_method_timeouts_fallback_on_garbage_setting(monkeypatch) -> None:
|
||||
"""A non-numeric value falls back to the documented default."""
|
||||
_stub_settings(monkeypatch, {"sessions_file_read_timeout_s": "not-a-number"})
|
||||
assert ssh_ft._file_read_timeout_s() == 30.0
|
||||
|
||||
|
||||
# --- _transport_trace_event branch coverage ---
|
||||
|
||||
|
||||
def test_transport_trace_event_notifies_listeners_even_when_disabled(
|
||||
monkeypatch,
|
||||
) -> None:
|
||||
captured: list = []
|
||||
|
||||
def listener(event, fields):
|
||||
captured.append((event, dict(fields)))
|
||||
|
||||
monkeypatch.setattr(ssh_ft, "_transport_trace_enabled", lambda: False)
|
||||
ssh_ft.register_transport_trace_listener(listener)
|
||||
try:
|
||||
ssh_ft._transport_trace_event("ut.event", host="x", count=2)
|
||||
finally:
|
||||
ssh_ft.unregister_transport_trace_listener(listener)
|
||||
assert captured == [("ut.event", {"host": "x", "count": 2})]
|
||||
|
||||
|
||||
def test_transport_trace_event_writes_jsonl_when_enabled(
|
||||
tmp_path: Path, monkeypatch
|
||||
) -> None:
|
||||
import json as _json
|
||||
|
||||
log_path = tmp_path / "logs" / "debug-trace.log"
|
||||
monkeypatch.setattr(ssh_ft, "_transport_trace_enabled", lambda: True)
|
||||
monkeypatch.setattr(ssh_ft, "_transport_trace_log_path", lambda: log_path)
|
||||
ssh_ft._transport_trace_event("ut.event", host="x", count=2)
|
||||
assert log_path.is_file()
|
||||
line = log_path.read_text(encoding="utf-8").strip().splitlines()[-1]
|
||||
payload = _json.loads(line)
|
||||
assert payload["event"] == "ut.event"
|
||||
assert payload["host"] == "x"
|
||||
assert payload["count"] == 2
|
||||
assert "ts" in payload and "time" in payload
|
||||
|
||||
|
||||
def test_transport_trace_event_swallows_listener_exceptions(monkeypatch) -> None:
|
||||
"""A listener that raises must not crash the trace path or the caller."""
|
||||
|
||||
def bad_listener(event, fields):
|
||||
raise RuntimeError("boom")
|
||||
|
||||
monkeypatch.setattr(ssh_ft, "_transport_trace_enabled", lambda: False)
|
||||
ssh_ft.register_transport_trace_listener(bad_listener)
|
||||
try:
|
||||
ssh_ft._transport_trace_event("ut.event") # must not raise
|
||||
finally:
|
||||
ssh_ft.unregister_transport_trace_listener(bad_listener)
|
||||
|
||||
|
||||
def test_transport_trace_log_path_uses_sublime_cache_root(monkeypatch) -> None:
|
||||
"""Log path lives under sublime.cache_path()/Sessions/logs."""
|
||||
monkeypatch.setattr(ssh_ft.sublime, "cache_path", lambda: "/tmp/fake_cache")
|
||||
out = ssh_ft._transport_trace_log_path()
|
||||
assert str(out).endswith("Sessions/logs/debug-trace.log")
|
||||
assert "/tmp/fake_cache" in str(out)
|
||||
|
||||
|
||||
def test_transport_trace_enabled_returns_false_when_settings_unavailable(
|
||||
monkeypatch,
|
||||
) -> None:
|
||||
"""Missing load_settings must safely return False, not crash."""
|
||||
monkeypatch.setattr(ssh_ft.sublime, "load_settings", None, raising=False)
|
||||
assert ssh_ft._transport_trace_enabled() is False
|
||||
|
||||
|
||||
# --- _emit_bridge_diagnostic_matrix branches ---
|
||||
|
||||
|
||||
def test_emit_bridge_diagnostic_matrix_no_op_when_disabled(monkeypatch) -> None:
|
||||
captured: list = []
|
||||
monkeypatch.setattr(ssh_ft, "_transport_trace_enabled", lambda: False)
|
||||
monkeypatch.setattr(
|
||||
ssh_ft, "_transport_trace_event", lambda *a, **k: captured.append((a, k))
|
||||
)
|
||||
ssh_ft._emit_bridge_diagnostic_matrix("prod", "spawn")
|
||||
assert captured == []
|
||||
|
||||
|
||||
def test_emit_bridge_diagnostic_matrix_includes_optional_payloads(
|
||||
monkeypatch,
|
||||
) -> None:
|
||||
captured: list = []
|
||||
monkeypatch.setattr(ssh_ft, "_transport_trace_enabled", lambda: True)
|
||||
monkeypatch.setattr(
|
||||
ssh_ft, "_transport_trace_event", lambda event, **k: captured.append((event, k))
|
||||
)
|
||||
|
||||
class _Proc:
|
||||
pid = 4242
|
||||
|
||||
payload = {
|
||||
"id": "envelope-1",
|
||||
"method": "file/read",
|
||||
"timeout_ms": 5000,
|
||||
}
|
||||
ssh_ft._emit_bridge_diagnostic_matrix(
|
||||
"prod",
|
||||
"after_handshake",
|
||||
bridge_path=None,
|
||||
revision="rev-abc",
|
||||
payload=payload,
|
||||
process=_Proc(),
|
||||
child_env_summary={"SESSIONS_BRIDGE_DIAG_LOG": True},
|
||||
timeout_context={"phase": "handshake", "elapsed_ms": 12},
|
||||
)
|
||||
assert len(captured) == 1
|
||||
event, fields = captured[0]
|
||||
assert event == "bridge.diagnostic_matrix"
|
||||
assert fields["phase"] == "after_handshake"
|
||||
assert fields["host_alias"] == "prod"
|
||||
assert fields["helper_revision"] == "rev-abc"
|
||||
assert fields["envelope_id"] == "envelope-1"
|
||||
assert fields["envelope_method"] == "file/read"
|
||||
assert fields["envelope_timeout_ms"] == 5000
|
||||
assert fields["bridge_subprocess_pid"] == 4242
|
||||
assert fields["child_env_flags"] == {"SESSIONS_BRIDGE_DIAG_LOG": True}
|
||||
assert fields["timeout_context"] == {"phase": "handshake", "elapsed_ms": 12}
|
||||
|
||||
|
||||
# --- transport-trace listener registry ---
|
||||
|
||||
|
||||
def test_binary_stat_snapshot_reports_size_and_mtime(tmp_path: Path) -> None:
|
||||
target = tmp_path / "binary"
|
||||
target.write_bytes(b"abc")
|
||||
snap = ssh_ft._binary_stat_snapshot(target)
|
||||
assert snap["path"] == str(target)
|
||||
assert snap["size_bytes"] == 3
|
||||
assert isinstance(snap["mtime_ns"], int)
|
||||
assert "stat_error" not in snap
|
||||
|
||||
|
||||
def test_binary_stat_snapshot_records_stat_error_for_missing(tmp_path: Path) -> None:
|
||||
snap = ssh_ft._binary_stat_snapshot(tmp_path / "does-not-exist")
|
||||
assert "stat_error" in snap
|
||||
assert "size_bytes" not in snap
|
||||
|
||||
|
||||
def test_bridge_diagnostic_hypothesis_catalog_returns_documented_rows() -> None:
|
||||
rows = ssh_ft._bridge_diagnostic_hypothesis_catalog()
|
||||
assert isinstance(rows, list) and rows, "catalog must list at least one hypothesis"
|
||||
for row in rows:
|
||||
assert {"id", "rust_events", "meaning"}.issubset(row.keys())
|
||||
assert isinstance(row["id"], str) and row["id"].startswith("H")
|
||||
|
||||
|
||||
def test_child_env_session_flags_reflects_bridge_diag_log_presence() -> None:
|
||||
assert ssh_ft._child_env_session_flags({"SESSIONS_BRIDGE_DIAG_LOG": "/tmp/x"}) == {
|
||||
"bridge_diag_log": True
|
||||
}
|
||||
assert ssh_ft._child_env_session_flags({}) == {"bridge_diag_log": False}
|
||||
assert ssh_ft._child_env_session_flags({"SESSIONS_BRIDGE_DIAG_LOG": " "}) == {
|
||||
"bridge_diag_log": False
|
||||
}
|
||||
|
||||
|
||||
# --- pure helpers (envelope id, revision normalization, auth hint) ---
|
||||
|
||||
|
||||
def test_next_envelope_id_is_monotonic_per_prefix() -> None:
|
||||
a = ssh_ft._next_envelope_id("tree-list")
|
||||
b = ssh_ft._next_envelope_id("tree-list")
|
||||
assert a.startswith("tree-list-") and b.startswith("tree-list-")
|
||||
assert int(a.rsplit("-", 1)[1]) < int(b.rsplit("-", 1)[1])
|
||||
|
||||
|
||||
def test_next_bridge_trace_request_id_is_strictly_increasing() -> None:
|
||||
first = ssh_ft._next_bridge_trace_request_id()
|
||||
second = ssh_ft._next_bridge_trace_request_id()
|
||||
assert second == first + 1
|
||||
|
||||
|
||||
def test_revision_cache_segment_short_alnum_passthrough() -> None:
|
||||
assert ssh_ft._revision_cache_segment("v0.7.36") == "v0.7.36"
|
||||
assert ssh_ft._revision_cache_segment("rev_abc-123") == "rev_abc-123"
|
||||
|
||||
|
||||
def test_revision_cache_segment_blank_returns_unknown() -> None:
|
||||
assert ssh_ft._revision_cache_segment("") == "unknown"
|
||||
assert ssh_ft._revision_cache_segment(" ") == "unknown"
|
||||
|
||||
|
||||
def test_revision_cache_segment_unsafe_chars_hash_fallback() -> None:
|
||||
out = ssh_ft._revision_cache_segment("ev!l/path with spaces")
|
||||
assert out.startswith("sha256_")
|
||||
assert len(out) == len("sha256_") + 24 # truncated digest
|
||||
|
||||
|
||||
def test_revision_cache_segment_overlong_hash_fallback() -> None:
|
||||
out = ssh_ft._revision_cache_segment("a" * 200)
|
||||
assert out.startswith("sha256_")
|
||||
|
||||
|
||||
def test_validate_revision_path_segment_accepts_safe_chars() -> None:
|
||||
ssh_ft._validate_revision_path_segment("v0.7.36-rc1")
|
||||
ssh_ft._validate_revision_path_segment("rev_abc-123") # must not raise
|
||||
|
||||
|
||||
def test_validate_revision_path_segment_rejects_path_separators() -> None:
|
||||
with pytest.raises(SessionHelperStartError):
|
||||
ssh_ft._validate_revision_path_segment("../escape")
|
||||
|
||||
|
||||
def test_ssh_auth_failure_hint_returns_empty_for_non_auth_stderr() -> None:
|
||||
assert ssh_ft._ssh_auth_failure_hint("connection refused") == ""
|
||||
assert ssh_ft._ssh_auth_failure_hint("") == ""
|
||||
|
||||
|
||||
def test_ssh_auth_failure_hint_returns_text_on_permission_denied() -> None:
|
||||
hint = ssh_ft._ssh_auth_failure_hint("Permission denied (publickey)")
|
||||
assert hint, "expected a one-line hint when stderr is auth-shaped"
|
||||
|
||||
|
||||
def test_transport_trace_listener_register_and_unregister_round_trip() -> None:
|
||||
def listener(event, fields):
|
||||
return None
|
||||
|
||||
ssh_ft.register_transport_trace_listener(listener)
|
||||
assert listener in ssh_ft._TRANSPORT_TRACE_LISTENERS
|
||||
# Re-registering is idempotent (no duplicates).
|
||||
ssh_ft.register_transport_trace_listener(listener)
|
||||
assert ssh_ft._TRANSPORT_TRACE_LISTENERS.count(listener) == 1
|
||||
ssh_ft.unregister_transport_trace_listener(listener)
|
||||
assert listener not in ssh_ft._TRANSPORT_TRACE_LISTENERS
|
||||
# Unregistering a not-registered listener is a no-op.
|
||||
ssh_ft.unregister_transport_trace_listener(listener)
|
||||
|
||||
Reference in New Issue
Block a user