chore: retire dead chat-with-diff agent modules
Maintainer signoff (2026-04-25): the chat→tmux pivot abandoned the diff-centric review direction. The diff primitives that survived the pivot have no live caller in the agent flow and were carrying test weight (56 tests across 3 files) for an undefined product surface. git history preserves them for anyone who wants to revive the diff direction later. Removed - ``sublime/sessions/agent_proposal_watcher.py`` (290 LOC) — pure-Python unified diff parser; was meant to tail ``tmux pipe-pane`` output. - ``sublime/sessions/agent_change_badge.py`` (248 LOC, ``AgentChangeBadgeRenderer``) — post-apply phantom badge renderer. - ``sublime/tests/test_agent_proposal_watcher.py`` (20 tests). - ``sublime/tests/test_agent_proposal_watcher_adversarial.py`` (10). - ``sublime/tests/test_agent_change_badge.py`` (26). Updated - ``sublime/sessions/agent_tmux.py``: docstring no longer points at the retired companion module; new line documents the historical context of the chat→tmux pivot so future readers know why the broker is agent-agnostic. - ``planning/REVIEW_v0_6_4_DISTRIBUTION_PLAN.md``: "Code to consider retiring" → "Code retired (post-direction-correction)" with the test-health gate numbers post-deletion. Closed two of the three open questions (rm signoff and Linux-only sublime-package decision — defer the latter, no Linux-only release asset for now). Test-health gate stays green: 1432 sublime tests pass; adversarial 190 (floor 184), real-subprocess 55 (floor 53), contract-fixture 27 (floor 27), mock-only:high-value 0.95 (cap 0.98). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -552,34 +552,27 @@ Plus, dropped explicitly by maintainer (2026-04-25):
|
||||
If you find yourself drafting any of the above, pause and check
|
||||
this list first.
|
||||
|
||||
## Code to consider retiring (post-direction-correction)
|
||||
## Code retired (post-direction-correction)
|
||||
|
||||
Modules that supported the abandoned chat-with-diff agent design.
|
||||
Not auto-deleted — needs maintainer signoff. Listed here so the
|
||||
next time someone touches them they ask "is this still load-bearing?"
|
||||
before extending.
|
||||
**Deleted in v0.6.7** — git history preserves the work for anyone
|
||||
who wants to revive the diff direction.
|
||||
|
||||
- `sublime/sessions/agent_proposal_watcher.py` (290 LOC) — pure
|
||||
Python unified-diff parser. Designed to tail `tmux pipe-pane`
|
||||
output and surface diff hunks for the never-shipped review
|
||||
panel. No live caller in the agent flow.
|
||||
- `sublime/sessions/agent_proposal_watcher.py` (290 LOC) — unified
|
||||
diff parser. Designed to tail `tmux pipe-pane` output and surface
|
||||
diff hunks for the never-shipped review panel.
|
||||
- `sublime/sessions/agent_change_badge.py` (248 LOC,
|
||||
`AgentChangeBadgeRenderer`) — post-apply phantom badge for
|
||||
hunks the user accepted. Imported nowhere outside its own
|
||||
module + tests.
|
||||
- Their tests + adversarial tests under
|
||||
`sublime/tests/test_agent_proposal_watcher*.py` and
|
||||
`sublime/tests/test_agent_change_badge*.py`.
|
||||
`AgentChangeBadgeRenderer`) — post-apply phantom badge.
|
||||
- `sublime/tests/test_agent_proposal_watcher.py`,
|
||||
`sublime/tests/test_agent_proposal_watcher_adversarial.py`,
|
||||
`sublime/tests/test_agent_change_badge.py` (56 tests total).
|
||||
- The dangling reference comment in `agent_tmux.py:10` was rewritten
|
||||
to note the historical context and the retirement.
|
||||
|
||||
If retired, also drop the dangling reference comment in
|
||||
`agent_tmux.py:10` ("The companion `agent_proposal_watcher`
|
||||
module parses diff output tailed").
|
||||
|
||||
Decision needed: delete entirely (cleanest) vs. keep as a
|
||||
documented "experiment archive" under `planning/archive/` (preserves
|
||||
the work in case the diff direction comes back). My read: delete —
|
||||
git history preserves the work; carrying dead code increases
|
||||
cognitive overhead and test-suite weight.
|
||||
Test-health gate stays green after the deletion: adversarial 190
|
||||
(floor 184), real-subprocess 55 (floor 53), contract-fixture 27
|
||||
(floor 27), mock-only:high-value 0.95 (cap 0.98). No floor
|
||||
adjustment needed.
|
||||
|
||||
---
|
||||
|
||||
@@ -602,16 +595,16 @@ cognitive overhead and test-suite weight.
|
||||
## Open questions
|
||||
|
||||
- **Code to retire (`agent_proposal_watcher.py` +
|
||||
`agent_change_badge.py`)** — delete now, or move to
|
||||
`planning/archive/` as an experiment archive? My read: delete;
|
||||
git history preserves the work, carrying dead code costs
|
||||
cognitive overhead + test-suite weight. `[needs-input]`.
|
||||
- **§1.1 Linux-only sublime-package** — ship as a release asset on
|
||||
the next batch (extending `sign_release_artifacts.py` +
|
||||
CI wiring), or hold until macOS / Windows are unblocked? My read:
|
||||
ship Linux-only — concrete improvement for at-minimum the Linux
|
||||
user fraction, no rework cost when the matrix expands.
|
||||
- **Windows W1 PersistentBroker (§2.6)** — given the maintainer's
|
||||
current macOS-primary workflow, is Windows parity still on the
|
||||
roadmap? If not, drop §2.6 from this plan and update README's
|
||||
"platform support" claim. If yes, keep but with no date attached.
|
||||
`agent_change_badge.py`)** — `[done @ v0.6.7]` deleted; see "Code
|
||||
retired" section above.
|
||||
- **§1.1 Linux-only sublime-package** — `[plan]` deferred. Maintainer
|
||||
decision (2026-04-25): "Linux-only는 사용성 측면에서 큰 문제없을
|
||||
것 같음. 그대로 가도 됨." — keep current `git clone + cargo
|
||||
build` install path; no rush to ship a Linux-only sublime-package
|
||||
release asset.
|
||||
- **Windows W1 PersistentBroker (§2.6)** — `[needs-input]` still.
|
||||
Maintainer hadn't recalled what W1 was when asked; if the
|
||||
macOS-primary workflow is the steady state, this whole track can
|
||||
drop and the README "Linux/macOS/Windows local-editor support"
|
||||
claim should change to "Linux/macOS supported, Windows
|
||||
best-effort (LSP attach not yet wired — see BACKLOG W1)".
|
||||
|
||||
@@ -1,248 +0,0 @@
|
||||
"""Post-apply edit badges for agent-driven file changes.
|
||||
|
||||
Track D of ``planning/AGENT_TMUX_LAYOUT.md`` (§D7 Phase 2) surfaces the
|
||||
fact that an open local cache buffer was just rewritten by the remote
|
||||
agent. When the existing ``file/watch`` flow detects that the freshly
|
||||
pulled content differs from the pre-change snapshot, the integrator
|
||||
calls into this module to decorate the modified hunks with a transient
|
||||
Sublime phantom — enough of a visual cue that the user notices the
|
||||
edit without the noise of a full diff popup.
|
||||
|
||||
Two pure helpers carry the bulk of the logic so tests can exercise
|
||||
them without a running Sublime API:
|
||||
|
||||
- :func:`compute_changed_line_ranges` — ``difflib``-backed line-range
|
||||
extractor returning ``(start, end)`` pairs;
|
||||
- :func:`format_badge_html` — minihtml renderer for the phantom body.
|
||||
|
||||
The :class:`AgentChangeBadgeRenderer` orchestrates Sublime API calls
|
||||
(``view.add_phantom``, ``view.erase_phantom_by_id``, and
|
||||
``sublime.set_timeout_async`` for the auto-fade). All three are
|
||||
injectable so a monkeypatched test harness can observe call counts.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import difflib
|
||||
import html as _html
|
||||
import time as _time
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Callable, List, Optional, Tuple
|
||||
|
||||
try:
|
||||
import sublime # type: ignore
|
||||
except ImportError: # pragma: no cover - unit tests import without Sublime
|
||||
sublime = None # type: ignore[assignment]
|
||||
|
||||
|
||||
# Sublime ``add_phantom`` expects a layout constant; the integer 2 is
|
||||
# ``LAYOUT_BLOCK`` in the live Sublime API. Duplicating the value keeps
|
||||
# this module importable without the real ``sublime`` module, and the
|
||||
# test monkeypatches the ``add_phantom`` callable anyway so the
|
||||
# numerical constant is never compared against anything.
|
||||
_LAYOUT_BLOCK_VALUE = 2
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class AgentEditBadgeRequest:
|
||||
"""Input payload describing a single post-apply edit badge."""
|
||||
|
||||
view_id: int
|
||||
old_text: str
|
||||
new_text: str
|
||||
agent_label: str
|
||||
timestamp: float
|
||||
|
||||
|
||||
def compute_changed_line_ranges(old_text: str, new_text: str) -> List[Tuple[int, int]]:
|
||||
"""Return a list of ``(start_line, end_line)`` inclusive 0-based ranges.
|
||||
|
||||
Pure wrapper around :class:`difflib.SequenceMatcher`. Ranges refer
|
||||
to **new** line numbers (post-apply) because the caller decorates
|
||||
the already-updated buffer. Identical inputs produce an empty
|
||||
list; a pure append at end-of-file produces a single trailing
|
||||
range. Adjacent change hunks get merged into one range so a
|
||||
replace-then-insert block shows a single phantom.
|
||||
"""
|
||||
old_lines = old_text.splitlines()
|
||||
new_lines = new_text.splitlines()
|
||||
matcher = difflib.SequenceMatcher(a=old_lines, b=new_lines, autojunk=False)
|
||||
ranges: List[Tuple[int, int]] = []
|
||||
for tag, _i1, _i2, j1, j2 in matcher.get_opcodes():
|
||||
if tag == "equal":
|
||||
continue
|
||||
if tag == "delete":
|
||||
# Pure delete leaves no "new" line to decorate; point the
|
||||
# badge at the join location (``j1`` is the row after which
|
||||
# lines were removed). Clamp to zero for edge cases.
|
||||
idx = max(0, j1 - 1) if j1 > 0 else 0
|
||||
ranges.append((idx, idx))
|
||||
continue
|
||||
# Insert / replace both bring new lines into the buffer; j2 is
|
||||
# exclusive per difflib convention.
|
||||
if j2 > j1:
|
||||
ranges.append((j1, j2 - 1))
|
||||
return _merge_adjacent_ranges(ranges)
|
||||
|
||||
|
||||
def _merge_adjacent_ranges(
|
||||
ranges: List[Tuple[int, int]],
|
||||
) -> List[Tuple[int, int]]:
|
||||
"""Collapse overlapping / touching ranges so each hunk gets one phantom."""
|
||||
if not ranges:
|
||||
return []
|
||||
sorted_ranges = sorted(ranges)
|
||||
merged: List[Tuple[int, int]] = [sorted_ranges[0]]
|
||||
for start, end in sorted_ranges[1:]:
|
||||
prev_start, prev_end = merged[-1]
|
||||
if start <= prev_end + 1:
|
||||
merged[-1] = (prev_start, max(prev_end, end))
|
||||
else:
|
||||
merged.append((start, end))
|
||||
return merged
|
||||
|
||||
|
||||
def format_badge_html(agent_label: str, ts: float) -> str:
|
||||
"""Render the minihtml string shown inside the phantom.
|
||||
|
||||
``agent_label`` is escaped before interpolation so an agent name
|
||||
with ``&`` / ``<`` can't break the minihtml. The timestamp is
|
||||
rendered as a local ``HH:MM:SS`` string — absolute dates show up in
|
||||
status messages elsewhere, the badge is a glance-sized cue.
|
||||
"""
|
||||
safe_label = _html.escape(agent_label or "agent", quote=True)
|
||||
try:
|
||||
time_str = _time.strftime("%H:%M:%S", _time.localtime(ts))
|
||||
except (ValueError, OSError, OverflowError):
|
||||
time_str = "--:--:--"
|
||||
return (
|
||||
'<span class="sessions-agent-badge">agent edit · {label} · {ts}</span>'
|
||||
).format(label=safe_label, ts=time_str)
|
||||
|
||||
|
||||
class AgentChangeBadgeRenderer:
|
||||
"""Drop + auto-erase phantoms on a Sublime view for an agent-driven edit."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
add_phantom: Optional[Callable[..., int]] = None,
|
||||
erase_phantom: Optional[Callable[[int], None]] = None,
|
||||
set_timeout: Optional[Callable[[Callable[[], None], int], None]] = None,
|
||||
) -> None:
|
||||
"""Inject the three Sublime-facing callables (``None`` → real API)."""
|
||||
self._explicit_add = add_phantom
|
||||
self._explicit_erase = erase_phantom
|
||||
self._explicit_timeout = set_timeout
|
||||
|
||||
def render(
|
||||
self,
|
||||
request: AgentEditBadgeRequest,
|
||||
view: object,
|
||||
ttl_ms: int = 30_000,
|
||||
) -> List[int]:
|
||||
"""Drop one phantom per changed line range, scheduling auto-erase.
|
||||
|
||||
Returns the list of created phantom ids. An empty list means the
|
||||
diff was a no-op or the view can't host phantoms (missing
|
||||
methods). Raising is reserved for programmer errors — we don't
|
||||
blow up on "view closed mid-render".
|
||||
"""
|
||||
ranges = compute_changed_line_ranges(request.old_text, request.new_text)
|
||||
if not ranges:
|
||||
return []
|
||||
add_phantom = self._resolve_add_phantom(view)
|
||||
erase_phantom = self._resolve_erase_phantom(view)
|
||||
set_timeout = self._resolve_set_timeout()
|
||||
if add_phantom is None:
|
||||
return []
|
||||
badge_html = format_badge_html(request.agent_label, request.timestamp)
|
||||
created: List[int] = []
|
||||
for start_line, end_line in ranges:
|
||||
region = _region_for_line_range(view, start_line, end_line)
|
||||
if region is None:
|
||||
continue
|
||||
try:
|
||||
phantom_id = add_phantom(
|
||||
"sessions-agent-edit-{}".format(request.view_id),
|
||||
region,
|
||||
badge_html,
|
||||
_LAYOUT_BLOCK_VALUE,
|
||||
)
|
||||
except Exception: # pragma: no cover - defensive
|
||||
continue
|
||||
if isinstance(phantom_id, int) and phantom_id > 0:
|
||||
created.append(phantom_id)
|
||||
if created and erase_phantom is not None and set_timeout is not None:
|
||||
created_snapshot: Tuple[int, ...] = tuple(created)
|
||||
erase_fn: Callable[[int], None] = erase_phantom
|
||||
|
||||
def _erase_all() -> None:
|
||||
for pid in created_snapshot:
|
||||
try:
|
||||
erase_fn(pid)
|
||||
except Exception: # pragma: no cover - defensive
|
||||
pass
|
||||
|
||||
set_timeout(_erase_all, ttl_ms)
|
||||
return created
|
||||
|
||||
def _resolve_add_phantom(self, view: object) -> Optional[Callable[..., int]]:
|
||||
if self._explicit_add is not None:
|
||||
return self._explicit_add
|
||||
candidate = getattr(view, "add_phantom", None)
|
||||
return candidate if callable(candidate) else None
|
||||
|
||||
def _resolve_erase_phantom(self, view: object) -> Optional[Callable[[int], None]]:
|
||||
if self._explicit_erase is not None:
|
||||
return self._explicit_erase
|
||||
candidate = getattr(view, "erase_phantom_by_id", None)
|
||||
return candidate if callable(candidate) else None
|
||||
|
||||
def _resolve_set_timeout(
|
||||
self,
|
||||
) -> Optional[Callable[[Callable[[], None], int], None]]:
|
||||
if self._explicit_timeout is not None:
|
||||
return self._explicit_timeout
|
||||
if sublime is None:
|
||||
return None
|
||||
candidate = getattr(sublime, "set_timeout_async", None)
|
||||
if callable(candidate):
|
||||
return candidate
|
||||
fallback = getattr(sublime, "set_timeout", None)
|
||||
return fallback if callable(fallback) else None
|
||||
|
||||
|
||||
def _region_for_line_range(
|
||||
view: object, start_line: int, end_line: int
|
||||
) -> Optional[Any]:
|
||||
"""Return a Sublime ``Region`` covering ``start_line..end_line`` inclusive.
|
||||
|
||||
Uses ``view.text_point(line, 0)`` — the de-facto way to locate a row
|
||||
without depending on whether :class:`sublime.Region` is importable
|
||||
in the test harness. When ``sublime`` is unavailable we return a
|
||||
plain ``(begin, end)`` tuple the test harness can compare against.
|
||||
"""
|
||||
text_point = getattr(view, "text_point", None)
|
||||
if not callable(text_point):
|
||||
return None
|
||||
try:
|
||||
begin = int(text_point(start_line, 0))
|
||||
# ``end_line`` is inclusive; extending to the start of
|
||||
# ``end_line`` keeps the region anchored to the hunk's first
|
||||
# character without needing the line length.
|
||||
end = int(text_point(end_line, 0))
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
region_cls = getattr(sublime, "Region", None) if sublime is not None else None
|
||||
if region_cls is not None:
|
||||
return region_cls(begin, end)
|
||||
return (begin, end)
|
||||
|
||||
|
||||
__all__ = (
|
||||
"AgentChangeBadgeRenderer",
|
||||
"AgentEditBadgeRequest",
|
||||
"compute_changed_line_ranges",
|
||||
"format_badge_html",
|
||||
)
|
||||
@@ -1,290 +0,0 @@
|
||||
"""Stream-safe unified-diff parser for tmux-piped agent output.
|
||||
|
||||
Phase 1 of the D7 "edit-proposal surfacing" design (see
|
||||
``planning/AGENT_TMUX_LAYOUT.md``). A Sublime-side watcher will mirror the
|
||||
remote Terminus/tmux pane into a local file via ``tmux pipe-pane`` and feed
|
||||
the growing text into :func:`parse_unified_diff_stream`. This module is
|
||||
deliberately I/O-free and Sublime-free — pure string processing so the
|
||||
parser is trivially unit-testable with fixture blobs.
|
||||
|
||||
The parser is agent-agnostic: any tool that prints a standard
|
||||
``--- a/path`` / ``+++ b/path`` / ``@@ -L,N +L,N @@`` block will be
|
||||
surfaced. ANSI colour codes from terminal rendering are stripped before
|
||||
parsing; lines that aren't part of a diff (agent prose, thinking
|
||||
blocks, prompts) are ignored. The implementation intentionally drops
|
||||
malformed blocks silently rather than raising — terminal output is
|
||||
inherently messy and a false negative is always preferable to a crash.
|
||||
|
||||
Stream safety
|
||||
-------------
|
||||
The Sublime watcher is expected to call :func:`parse_unified_diff_stream`
|
||||
many times with a growing buffer. The parser returns every *complete*
|
||||
block visible in the current buffer; callers pass the previous result
|
||||
and the current result to :func:`extract_new_blocks` to identify what is
|
||||
newly available. A partial trailing block (header seen, body still
|
||||
streaming) is dropped from the current-call result rather than emitted
|
||||
prematurely — it will be picked up on the next call once the next
|
||||
block header or EOF marker follows.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from typing import List, Optional, Sequence, Tuple
|
||||
|
||||
try:
|
||||
import sublime_plugin # type: ignore
|
||||
|
||||
import sublime # type: ignore
|
||||
except ImportError: # pragma: no cover - unit tests import without Sublime
|
||||
sublime = None # type: ignore[assignment]
|
||||
sublime_plugin = None # type: ignore[assignment]
|
||||
|
||||
|
||||
# ANSI CSI / OSC escape sequences. Covers colour (``\x1b[…m``), cursor moves,
|
||||
# and the OSC 8 hyperlink pair that some agents emit around file paths.
|
||||
_ANSI_ESCAPE_RE = re.compile(
|
||||
r"\x1b\[[0-9;?]*[ -/]*[@-~]" # CSI ... final byte
|
||||
r"|\x1b\][^\x07\x1b]*(?:\x07|\x1b\\)" # OSC ... BEL / ST
|
||||
r"|\x1b[@-Z\\-_]" # plain two-byte escape
|
||||
)
|
||||
|
||||
# ``--- a/path/to/file`` header — leading ``---`` plus optional ``a/`` prefix
|
||||
# plus a path token. ``(\S.*?)`` captures a possibly-spaced path; the trailing
|
||||
# group tolerates the ``\t<timestamp>`` suffix some diff producers emit.
|
||||
_OLD_HEADER_RE = re.compile(r"^---\s+(?:a/)?(?P<path>\S(?:[^\t\n]*\S)?)(?:\t.*)?$")
|
||||
_NEW_HEADER_RE = re.compile(r"^\+\+\+\s+(?:b/)?(?P<path>\S(?:[^\t\n]*\S)?)(?:\t.*)?$")
|
||||
|
||||
# ``@@ -L[,N] +L[,N] @@[ suffix]``. ``N`` defaults to 1 per unified-diff spec.
|
||||
_HUNK_HEADER_RE = re.compile(
|
||||
r"^@@\s+-(?P<before_start>\d+)(?:,(?P<before_count>\d+))?"
|
||||
r"\s+\+(?P<after_start>\d+)(?:,(?P<after_count>\d+))?"
|
||||
r"\s+@@.*$"
|
||||
)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class DiffHunk:
|
||||
"""One ``@@`` hunk inside a unified-diff block.
|
||||
|
||||
``before_start`` / ``after_start`` are 1-based line numbers as written
|
||||
by the diff producer. ``body`` contains the ``@@`` header line followed
|
||||
by the context / ``+`` / ``-`` lines joined by ``\\n`` — no trailing
|
||||
newline. Callers that want to render the hunk typically just concatenate
|
||||
``body`` with a leading newline.
|
||||
"""
|
||||
|
||||
before_start: int
|
||||
before_count: int
|
||||
after_start: int
|
||||
after_count: int
|
||||
body: str
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class DiffBlock:
|
||||
"""One complete unified-diff block.
|
||||
|
||||
A block is the pair of ``--- a/<path>`` / ``+++ b/<path>`` header lines
|
||||
plus all contiguous hunks that follow before the next block header or
|
||||
the end of the current buffer. ``path_before`` and ``path_after`` are
|
||||
captured separately because renames and /dev/null markers make them
|
||||
differ; tooling that cares can compare the two.
|
||||
"""
|
||||
|
||||
path_before: str
|
||||
path_after: str
|
||||
hunks: Tuple[DiffHunk, ...]
|
||||
|
||||
|
||||
def _strip_ansi(text: str) -> str:
|
||||
"""Return ``text`` with ANSI CSI / OSC escape sequences removed."""
|
||||
return _ANSI_ESCAPE_RE.sub("", text)
|
||||
|
||||
|
||||
def _is_block_header_start(line: str) -> bool:
|
||||
"""Return ``True`` if ``line`` looks like the ``--- a/...`` header start.
|
||||
|
||||
We treat any line matching :data:`_OLD_HEADER_RE` as a potential block
|
||||
boundary. The follow-up ``+++`` line is validated by the caller; a bare
|
||||
``---`` without a ``+++`` partner is ignored.
|
||||
"""
|
||||
return _OLD_HEADER_RE.match(line) is not None
|
||||
|
||||
|
||||
def _parse_hunk_header(line: str) -> Optional[Tuple[int, int, int, int]]:
|
||||
"""Return ``(before_start, before_count, after_start, after_count)`` or ``None``.
|
||||
|
||||
Missing ``,count`` defaults to 1, matching the unified-diff specification
|
||||
as implemented by GNU diff and git.
|
||||
"""
|
||||
match = _HUNK_HEADER_RE.match(line)
|
||||
if match is None:
|
||||
return None
|
||||
before_start = int(match.group("before_start"))
|
||||
before_count_raw = match.group("before_count")
|
||||
before_count = int(before_count_raw) if before_count_raw is not None else 1
|
||||
after_start = int(match.group("after_start"))
|
||||
after_count_raw = match.group("after_count")
|
||||
after_count = int(after_count_raw) if after_count_raw is not None else 1
|
||||
return before_start, before_count, after_start, after_count
|
||||
|
||||
|
||||
def _is_hunk_body_line(line: str) -> bool:
|
||||
"""Return ``True`` if ``line`` belongs to a hunk body (``+ ``/``- ``/`` ``)."""
|
||||
if not line:
|
||||
# Empty line = context line with trailing-whitespace stripped by the
|
||||
# terminal. Accept it as part of the body.
|
||||
return True
|
||||
first = line[0]
|
||||
return first in (" ", "+", "-", "\\")
|
||||
|
||||
|
||||
def _consume_hunks(
|
||||
lines: Sequence[str], start_idx: int
|
||||
) -> Tuple[List[DiffHunk], int, bool]:
|
||||
"""Parse hunks starting at ``lines[start_idx]``.
|
||||
|
||||
Returns ``(hunks, next_idx, complete)`` where:
|
||||
|
||||
* ``hunks`` is the list of fully-parsed hunks (may be empty),
|
||||
* ``next_idx`` is the index of the line after the last consumed hunk,
|
||||
* ``complete`` is ``True`` when the last hunk's body count was reached
|
||||
before running out of input or hitting a new block header. If the
|
||||
buffer ended mid-body we still return the hunks parsed so far; the
|
||||
caller decides whether to keep the trailing partial block (we drop
|
||||
it to keep the stream-safety guarantee).
|
||||
"""
|
||||
hunks: List[DiffHunk] = []
|
||||
idx = start_idx
|
||||
total = len(lines)
|
||||
while idx < total:
|
||||
header_match = _parse_hunk_header(lines[idx])
|
||||
if header_match is None:
|
||||
# A non-hunk-header line here ends the current block's hunks.
|
||||
return hunks, idx, True
|
||||
before_start, before_count, after_start, after_count = header_match
|
||||
header_line = lines[idx]
|
||||
idx += 1
|
||||
|
||||
body_lines: List[str] = []
|
||||
before_seen = 0
|
||||
after_seen = 0
|
||||
while idx < total and (before_seen < before_count or after_seen < after_count):
|
||||
body_line = lines[idx]
|
||||
if _is_block_header_start(body_line):
|
||||
# The next block header beats out the unfinished body; we
|
||||
# stop the current hunk short and treat what we have as
|
||||
# incomplete so the caller can drop this partial block.
|
||||
return hunks, idx, False
|
||||
if not _is_hunk_body_line(body_line):
|
||||
# Non-diff line interrupts the body (agent prose, prompt,
|
||||
# etc.). Treat the current hunk as incomplete and exit.
|
||||
return hunks, idx, False
|
||||
body_lines.append(body_line)
|
||||
if body_line.startswith(" "):
|
||||
before_seen += 1
|
||||
after_seen += 1
|
||||
elif body_line.startswith("-"):
|
||||
before_seen += 1
|
||||
elif body_line.startswith("+"):
|
||||
after_seen += 1
|
||||
# "\\ No newline at end of file" consumes no counter, per spec.
|
||||
idx += 1
|
||||
|
||||
if before_seen < before_count or after_seen < after_count:
|
||||
# Ran out of input before the hunk body completed.
|
||||
return hunks, idx, False
|
||||
|
||||
hunks.append(
|
||||
DiffHunk(
|
||||
before_start=before_start,
|
||||
before_count=before_count,
|
||||
after_start=after_start,
|
||||
after_count=after_count,
|
||||
body="\n".join([header_line, *body_lines]),
|
||||
)
|
||||
)
|
||||
return hunks, idx, True
|
||||
|
||||
|
||||
def parse_unified_diff_stream(text: str) -> List[DiffBlock]:
|
||||
"""Return the complete :class:`DiffBlock` instances found in ``text``.
|
||||
|
||||
Stream-safe: any partial block at the tail (header seen, body still
|
||||
streaming) is dropped rather than emitted prematurely, so the Sublime
|
||||
watcher can call this repeatedly as its buffer grows without producing
|
||||
spurious duplicates. ANSI colour / OSC escape sequences are stripped
|
||||
before parsing; lines that aren't part of a diff are silently skipped.
|
||||
|
||||
Args:
|
||||
text: The (possibly growing) buffer tailed from ``tmux pipe-pane``.
|
||||
|
||||
Returns:
|
||||
The list of complete diff blocks in stream order. Blocks whose
|
||||
body is truncated at the tail of ``text`` are omitted; callers
|
||||
call again once more data arrives.
|
||||
"""
|
||||
cleaned = _strip_ansi(text)
|
||||
lines = cleaned.splitlines()
|
||||
blocks: List[DiffBlock] = []
|
||||
idx = 0
|
||||
total = len(lines)
|
||||
while idx < total:
|
||||
line = lines[idx]
|
||||
old_match = _OLD_HEADER_RE.match(line)
|
||||
if old_match is None:
|
||||
idx += 1
|
||||
continue
|
||||
# Peek at the next line for the ``+++`` partner; skip if absent.
|
||||
if idx + 1 >= total:
|
||||
break
|
||||
new_match = _NEW_HEADER_RE.match(lines[idx + 1])
|
||||
if new_match is None:
|
||||
idx += 1
|
||||
continue
|
||||
|
||||
path_before = old_match.group("path")
|
||||
path_after = new_match.group("path")
|
||||
hunks, next_idx, complete = _consume_hunks(lines, idx + 2)
|
||||
if complete and hunks:
|
||||
blocks.append(
|
||||
DiffBlock(
|
||||
path_before=path_before,
|
||||
path_after=path_after,
|
||||
hunks=tuple(hunks),
|
||||
)
|
||||
)
|
||||
idx = next_idx
|
||||
continue
|
||||
if not hunks:
|
||||
# Header pair without any hunks yet — treat as partial and stop
|
||||
# so we don't starve a possible next block.
|
||||
break
|
||||
# Partial trailing block: keep the hunks we have for downstream
|
||||
# tools, then stop. We still only append if complete so that
|
||||
# callers don't see the same partial block twice as it grows.
|
||||
break
|
||||
return blocks
|
||||
|
||||
|
||||
def extract_new_blocks(
|
||||
prev: Sequence[DiffBlock], curr: Sequence[DiffBlock]
|
||||
) -> List[DiffBlock]:
|
||||
"""Return blocks in ``curr`` that are not in ``prev`` (by dataclass equality).
|
||||
|
||||
Uses frozen-dataclass equality semantics — two blocks compare equal iff
|
||||
their paths and every hunk match exactly. The order of the returned
|
||||
list mirrors the order of ``curr``.
|
||||
|
||||
Args:
|
||||
prev: Blocks returned by the previous
|
||||
:func:`parse_unified_diff_stream` call.
|
||||
curr: Blocks returned by the current call.
|
||||
|
||||
Returns:
|
||||
The subset of ``curr`` not present in ``prev``.
|
||||
"""
|
||||
prev_set = set(prev)
|
||||
return [block for block in curr if block not in prev_set]
|
||||
@@ -7,9 +7,11 @@ module owns the SSH / tmux plumbing — spawning, attaching, listing and
|
||||
killing sessions — and is intentionally free of Sublime imports so the
|
||||
logic is unit-testable without the ``sublime`` runtime.
|
||||
|
||||
The companion ``agent_proposal_watcher`` module parses diff output tailed
|
||||
from ``tmux pipe-pane``; this broker is agent-agnostic and knows nothing
|
||||
about what the agent prints.
|
||||
This broker is agent-agnostic and knows nothing about what the agent
|
||||
prints inside the tmux pane. The earlier chat-with-diff design had a
|
||||
companion ``agent_proposal_watcher`` module that parsed diff output
|
||||
tailed from ``tmux pipe-pane``; that direction was abandoned with the
|
||||
chat→tmux pivot and the parser module was retired in v0.6.7.
|
||||
|
||||
Design notes
|
||||
------------
|
||||
|
||||
@@ -1,386 +0,0 @@
|
||||
"""Unit tests for :mod:`sessions.agent_change_badge`."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import FrozenInstanceError
|
||||
from typing import Any, Callable, Dict, List, Tuple
|
||||
|
||||
import pytest
|
||||
from sessions.agent_change_badge import (
|
||||
AgentChangeBadgeRenderer,
|
||||
AgentEditBadgeRequest,
|
||||
compute_changed_line_ranges,
|
||||
format_badge_html,
|
||||
)
|
||||
|
||||
# ---- compute_changed_line_ranges ----------------------------------------
|
||||
|
||||
|
||||
def test_compute_changed_line_ranges_identical_is_empty() -> None:
|
||||
assert compute_changed_line_ranges("a\nb\nc\n", "a\nb\nc\n") == []
|
||||
|
||||
|
||||
def test_compute_changed_line_ranges_detects_single_insertion() -> None:
|
||||
old = "a\nb\nc\n"
|
||||
new = "a\nb\nX\nc\n"
|
||||
ranges = compute_changed_line_ranges(old, new)
|
||||
# The inserted line ``X`` sits at index 2 in ``new``.
|
||||
assert ranges == [(2, 2)]
|
||||
|
||||
|
||||
def test_compute_changed_line_ranges_detects_single_replace() -> None:
|
||||
old = "a\nb\nc\n"
|
||||
new = "a\nREPLACED\nc\n"
|
||||
ranges = compute_changed_line_ranges(old, new)
|
||||
assert ranges == [(1, 1)]
|
||||
|
||||
|
||||
def test_compute_changed_line_ranges_detects_trailing_append() -> None:
|
||||
old = "a\nb\n"
|
||||
new = "a\nb\nc\nd\n"
|
||||
ranges = compute_changed_line_ranges(old, new)
|
||||
assert ranges == [(2, 3)]
|
||||
|
||||
|
||||
def test_compute_changed_line_ranges_handles_pure_deletion() -> None:
|
||||
old = "a\nb\nc\nd\n"
|
||||
new = "a\nd\n"
|
||||
ranges = compute_changed_line_ranges(old, new)
|
||||
# Deletion decorates the surviving join line.
|
||||
assert ranges
|
||||
for start, end in ranges:
|
||||
assert 0 <= start <= end
|
||||
|
||||
|
||||
def test_compute_changed_line_ranges_merges_adjacent_hunks() -> None:
|
||||
old = "a\nb\nc\nd\ne\n"
|
||||
new = "a\nB\nC\nD\ne\n"
|
||||
ranges = compute_changed_line_ranges(old, new)
|
||||
# Three consecutive replaces should collapse into a single range.
|
||||
assert ranges == [(1, 3)]
|
||||
|
||||
|
||||
def test_compute_changed_line_ranges_keeps_disjoint_hunks_separate() -> None:
|
||||
old = "a\nb\nc\nd\ne\nf\n"
|
||||
new = "a\nB\nc\nd\nE\nf\n"
|
||||
ranges = compute_changed_line_ranges(old, new)
|
||||
assert ranges == [(1, 1), (4, 4)]
|
||||
|
||||
|
||||
def test_compute_changed_line_ranges_full_rewrite() -> None:
|
||||
old = "alpha\nbeta\n"
|
||||
new = "gamma\ndelta\nepsilon\n"
|
||||
ranges = compute_changed_line_ranges(old, new)
|
||||
# ``SequenceMatcher`` yields one replace spanning the whole buffer.
|
||||
assert ranges == [(0, 2)]
|
||||
|
||||
|
||||
def test_compute_changed_line_ranges_empty_old_all_inserts() -> None:
|
||||
ranges = compute_changed_line_ranges("", "one\ntwo\n")
|
||||
assert ranges == [(0, 1)]
|
||||
|
||||
|
||||
def test_compute_changed_line_ranges_empty_new_reports_join_line_zero() -> None:
|
||||
ranges = compute_changed_line_ranges("kept\n", "")
|
||||
assert ranges == [(0, 0)]
|
||||
|
||||
|
||||
# ---- format_badge_html --------------------------------------------------
|
||||
|
||||
|
||||
def test_format_badge_html_contains_agent_and_time() -> None:
|
||||
html = format_badge_html("claude", 0)
|
||||
assert "agent edit" in html
|
||||
assert "claude" in html
|
||||
assert 'class="sessions-agent-badge"' in html
|
||||
|
||||
|
||||
def test_format_badge_html_escapes_agent_label() -> None:
|
||||
html = format_badge_html("<script>alert('x')</script>", 0)
|
||||
assert "<script>" not in html
|
||||
assert "<script>" in html
|
||||
|
||||
|
||||
def test_format_badge_html_falls_back_when_label_blank() -> None:
|
||||
html = format_badge_html("", 0)
|
||||
assert "agent" in html
|
||||
|
||||
|
||||
def test_format_badge_html_handles_invalid_timestamp() -> None:
|
||||
html = format_badge_html("claude", float("inf"))
|
||||
assert "claude" in html
|
||||
# Fallback renders dashes; at minimum no traceback-producing format chars.
|
||||
assert "%" not in html
|
||||
|
||||
|
||||
def test_format_badge_html_is_ascii_style() -> None:
|
||||
# No emojis per the user's ASCII-only rule.
|
||||
html = format_badge_html("claude", 0)
|
||||
for ch in html:
|
||||
assert ord(ch) < 0x1F000, "unexpected emoji {!r} in html".format(ch)
|
||||
|
||||
|
||||
# ---- AgentChangeBadgeRenderer ------------------------------------------
|
||||
|
||||
|
||||
class _FakeRegion:
|
||||
def __init__(self, begin: int, end: int) -> None:
|
||||
self.begin_ = begin
|
||||
self.end_ = end
|
||||
|
||||
def __eq__(self, other: object) -> bool:
|
||||
if not isinstance(other, _FakeRegion):
|
||||
return NotImplemented
|
||||
return (self.begin_, self.end_) == (other.begin_, other.end_)
|
||||
|
||||
def __repr__(self) -> str: # pragma: no cover - debug aid
|
||||
return "_FakeRegion({}, {})".format(self.begin_, self.end_)
|
||||
|
||||
|
||||
class _FakeView:
|
||||
def __init__(self) -> None:
|
||||
self.add_phantom_calls: List[Tuple[str, Any, str, int]] = []
|
||||
self.erase_calls: List[int] = []
|
||||
self._next_id = 100
|
||||
|
||||
def text_point(self, row: int, col: int) -> int:
|
||||
return row * 100 + col
|
||||
|
||||
def add_phantom(
|
||||
self,
|
||||
key: str,
|
||||
region: Any,
|
||||
content: str,
|
||||
layout: int,
|
||||
) -> int:
|
||||
pid = self._next_id
|
||||
self._next_id += 1
|
||||
self.add_phantom_calls.append((key, region, content, layout))
|
||||
return pid
|
||||
|
||||
def erase_phantom_by_id(self, pid: int) -> None:
|
||||
self.erase_calls.append(pid)
|
||||
|
||||
|
||||
class _TimeoutCollector:
|
||||
def __init__(self) -> None:
|
||||
self.scheduled: List[Tuple[Callable[[], None], int]] = []
|
||||
|
||||
def __call__(self, callback: Callable[[], None], delay_ms: int) -> None:
|
||||
self.scheduled.append((callback, delay_ms))
|
||||
|
||||
def run_all(self) -> None:
|
||||
for cb, _ms in self.scheduled:
|
||||
cb()
|
||||
|
||||
|
||||
def _build_renderer(
|
||||
*,
|
||||
view: _FakeView,
|
||||
timer: _TimeoutCollector,
|
||||
use_injection: bool = False,
|
||||
) -> AgentChangeBadgeRenderer:
|
||||
if use_injection:
|
||||
erase_calls: List[int] = []
|
||||
|
||||
def _erase(pid: int) -> None:
|
||||
view.erase_phantom_by_id(pid)
|
||||
erase_calls.append(pid)
|
||||
|
||||
return AgentChangeBadgeRenderer(
|
||||
add_phantom=view.add_phantom,
|
||||
erase_phantom=_erase,
|
||||
set_timeout=timer,
|
||||
)
|
||||
return AgentChangeBadgeRenderer(set_timeout=timer)
|
||||
|
||||
|
||||
def _request(old: str, new: str, ts: float = 0.0) -> AgentEditBadgeRequest:
|
||||
return AgentEditBadgeRequest(
|
||||
view_id=1,
|
||||
old_text=old,
|
||||
new_text=new,
|
||||
agent_label="claude",
|
||||
timestamp=ts,
|
||||
)
|
||||
|
||||
|
||||
def test_renderer_adds_one_phantom_per_range() -> None:
|
||||
view = _FakeView()
|
||||
timer = _TimeoutCollector()
|
||||
renderer = _build_renderer(view=view, timer=timer)
|
||||
request = _request("a\nb\nc\nd\ne\nf\n", "a\nB\nc\nd\nE\nf\n")
|
||||
created = renderer.render(request, view, ttl_ms=1000)
|
||||
assert len(created) == 2
|
||||
assert len(view.add_phantom_calls) == 2
|
||||
# Keys encode the view id so concurrent badges don't collide.
|
||||
for key, *_ in view.add_phantom_calls:
|
||||
assert key == "sessions-agent-edit-1"
|
||||
|
||||
|
||||
def test_renderer_skips_noop_diffs() -> None:
|
||||
view = _FakeView()
|
||||
timer = _TimeoutCollector()
|
||||
renderer = _build_renderer(view=view, timer=timer)
|
||||
request = _request("a\nb\n", "a\nb\n")
|
||||
assert renderer.render(request, view) == []
|
||||
assert view.add_phantom_calls == []
|
||||
assert timer.scheduled == []
|
||||
|
||||
|
||||
def test_renderer_schedules_erase_after_ttl() -> None:
|
||||
view = _FakeView()
|
||||
timer = _TimeoutCollector()
|
||||
renderer = _build_renderer(view=view, timer=timer)
|
||||
request = _request("a\nb\n", "a\nBEE\n")
|
||||
created = renderer.render(request, view, ttl_ms=1234)
|
||||
assert len(timer.scheduled) == 1
|
||||
_cb, delay = timer.scheduled[0]
|
||||
assert delay == 1234
|
||||
timer.run_all()
|
||||
assert view.erase_calls == created
|
||||
|
||||
|
||||
def test_renderer_returns_empty_when_view_lacks_add_phantom() -> None:
|
||||
class _ViewWithoutAdd:
|
||||
def text_point(self, row: int, col: int) -> int:
|
||||
return 0
|
||||
|
||||
view = _ViewWithoutAdd()
|
||||
timer = _TimeoutCollector()
|
||||
renderer = AgentChangeBadgeRenderer(set_timeout=timer)
|
||||
request = _request("a\n", "b\n")
|
||||
assert renderer.render(request, view) == []
|
||||
|
||||
|
||||
def test_renderer_does_not_schedule_erase_when_nothing_created() -> None:
|
||||
class _FailingView:
|
||||
def text_point(self, row: int, col: int) -> int:
|
||||
return 0
|
||||
|
||||
def add_phantom(self, *args: Any, **kwargs: Any) -> int:
|
||||
return 0 # non-positive → renderer treats as failure
|
||||
|
||||
timer = _TimeoutCollector()
|
||||
view = _FailingView()
|
||||
renderer = AgentChangeBadgeRenderer(set_timeout=timer)
|
||||
request = _request("a\n", "b\n")
|
||||
assert renderer.render(request, view) == []
|
||||
assert timer.scheduled == []
|
||||
|
||||
|
||||
def test_renderer_respects_explicit_injections() -> None:
|
||||
captured: Dict[str, Any] = {"added": 0, "erased": [], "timeouts": []}
|
||||
|
||||
def _add(key: str, region: Any, content: str, layout: int) -> int:
|
||||
captured["added"] += 1
|
||||
return 77
|
||||
|
||||
def _erase(pid: int) -> None:
|
||||
captured["erased"].append(pid)
|
||||
|
||||
def _timer(cb: Callable[[], None], ms: int) -> None:
|
||||
captured["timeouts"].append(ms)
|
||||
cb()
|
||||
|
||||
class _StubView:
|
||||
def text_point(self, row: int, col: int) -> int:
|
||||
return 10 * row + col
|
||||
|
||||
renderer = AgentChangeBadgeRenderer(
|
||||
add_phantom=_add,
|
||||
erase_phantom=_erase,
|
||||
set_timeout=_timer,
|
||||
)
|
||||
request = _request("a\nb\n", "a\nX\n", ts=100.0)
|
||||
created = renderer.render(request, _StubView(), ttl_ms=500)
|
||||
assert created == [77]
|
||||
assert captured["added"] == 1
|
||||
assert captured["timeouts"] == [500]
|
||||
assert captured["erased"] == [77]
|
||||
|
||||
|
||||
def test_renderer_uses_injected_add_even_when_view_has_method() -> None:
|
||||
# Proves the explicit injection takes priority — important for tests
|
||||
# that observe call counts without touching the real view API.
|
||||
view = _FakeView()
|
||||
timer = _TimeoutCollector()
|
||||
call_log: List[str] = []
|
||||
|
||||
def _add(key: str, region: Any, content: str, layout: int) -> int:
|
||||
call_log.append(key)
|
||||
return 55
|
||||
|
||||
renderer = AgentChangeBadgeRenderer(
|
||||
add_phantom=_add,
|
||||
erase_phantom=view.erase_phantom_by_id,
|
||||
set_timeout=timer,
|
||||
)
|
||||
renderer.render(_request("a\n", "b\n"), view, ttl_ms=1)
|
||||
assert call_log # injected path fired
|
||||
assert view.add_phantom_calls == [] # real view untouched
|
||||
|
||||
|
||||
def test_renderer_returns_phantom_ids_matching_created_set() -> None:
|
||||
view = _FakeView()
|
||||
timer = _TimeoutCollector()
|
||||
renderer = _build_renderer(view=view, timer=timer)
|
||||
request = _request("a\nb\nc\n", "a\nB\nc\n")
|
||||
created = renderer.render(request, view, ttl_ms=10)
|
||||
assert set(created) == {100} # single range, single phantom
|
||||
timer.run_all()
|
||||
assert view.erase_calls == [100]
|
||||
|
||||
|
||||
def test_agent_edit_badge_request_is_frozen() -> None:
|
||||
req = _request("a\n", "b\n")
|
||||
with pytest.raises(FrozenInstanceError):
|
||||
req.agent_label = "codex" # type: ignore[misc]
|
||||
|
||||
|
||||
def test_renderer_without_timeout_still_adds_phantoms() -> None:
|
||||
class _TinyView:
|
||||
def __init__(self) -> None:
|
||||
self.added = 0
|
||||
|
||||
def text_point(self, row: int, col: int) -> int:
|
||||
return row
|
||||
|
||||
def add_phantom(self, *args: Any, **kwargs: Any) -> int:
|
||||
self.added += 1
|
||||
return 11
|
||||
|
||||
view = _TinyView()
|
||||
renderer = AgentChangeBadgeRenderer(
|
||||
add_phantom=view.add_phantom,
|
||||
erase_phantom=None,
|
||||
set_timeout=None,
|
||||
)
|
||||
request = _request("a\n", "b\n")
|
||||
created = renderer.render(request, view)
|
||||
assert created == [11]
|
||||
assert view.added == 1
|
||||
|
||||
|
||||
def test_renderer_region_receives_expected_line_bounds() -> None:
|
||||
captured_regions: List[Any] = []
|
||||
|
||||
def _add(key: str, region: Any, content: str, layout: int) -> int:
|
||||
captured_regions.append(region)
|
||||
return 9
|
||||
|
||||
class _PointView:
|
||||
def text_point(self, row: int, col: int) -> int:
|
||||
return row * 1000 + col
|
||||
|
||||
renderer = AgentChangeBadgeRenderer(add_phantom=_add)
|
||||
request = _request("a\nb\nc\nd\n", "a\nB\nC\nd\n")
|
||||
renderer.render(request, _PointView())
|
||||
assert captured_regions
|
||||
# Merged range covers rows 1..2 — text_point produces 1000 / 2000.
|
||||
region = captured_regions[0]
|
||||
if isinstance(region, tuple):
|
||||
assert region == (1000, 2000)
|
||||
else:
|
||||
assert region.begin() == 1000
|
||||
assert region.end() == 2000
|
||||
@@ -1,271 +0,0 @@
|
||||
"""Unit tests for the unified-diff stream parser in ``agent_proposal_watcher``."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import FrozenInstanceError
|
||||
|
||||
import pytest
|
||||
from sessions.agent_proposal_watcher import (
|
||||
DiffBlock,
|
||||
DiffHunk,
|
||||
extract_new_blocks,
|
||||
parse_unified_diff_stream,
|
||||
)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixture builders
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _minimal_diff(path: str = "foo.py") -> str:
|
||||
return (
|
||||
f"--- a/{path}\n"
|
||||
f"+++ b/{path}\n"
|
||||
"@@ -1,3 +1,4 @@\n"
|
||||
" line one\n"
|
||||
" line two\n"
|
||||
"-old three\n"
|
||||
"+new three\n"
|
||||
"+added four\n"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Minimal shapes
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_parse_returns_empty_list_on_empty_input() -> None:
|
||||
assert parse_unified_diff_stream("") == []
|
||||
|
||||
|
||||
def test_parse_returns_empty_list_on_plain_prose() -> None:
|
||||
text = "I'm thinking about what to edit...\nHere is my plan.\n"
|
||||
assert parse_unified_diff_stream(text) == []
|
||||
|
||||
|
||||
def test_parse_minimal_one_block_one_hunk() -> None:
|
||||
blocks = parse_unified_diff_stream(_minimal_diff("foo.py"))
|
||||
assert len(blocks) == 1
|
||||
block = blocks[0]
|
||||
assert block.path_before == "foo.py"
|
||||
assert block.path_after == "foo.py"
|
||||
assert len(block.hunks) == 1
|
||||
hunk = block.hunks[0]
|
||||
assert hunk.before_start == 1
|
||||
assert hunk.before_count == 3
|
||||
assert hunk.after_start == 1
|
||||
assert hunk.after_count == 4
|
||||
assert hunk.body.startswith("@@ -1,3 +1,4 @@")
|
||||
assert "+new three" in hunk.body
|
||||
|
||||
|
||||
def test_parse_hunk_without_explicit_count_defaults_to_one() -> None:
|
||||
text = "--- a/x.py\n+++ b/x.py\n@@ -5 +5 @@\n-old\n+new\n"
|
||||
blocks = parse_unified_diff_stream(text)
|
||||
assert len(blocks) == 1
|
||||
hunk = blocks[0].hunks[0]
|
||||
assert hunk.before_start == 5
|
||||
assert hunk.before_count == 1
|
||||
assert hunk.after_start == 5
|
||||
assert hunk.after_count == 1
|
||||
|
||||
|
||||
def test_parse_hunk_header_function_context_suffix_preserved_in_body() -> None:
|
||||
text = (
|
||||
"--- a/x.py\n"
|
||||
"+++ b/x.py\n"
|
||||
"@@ -10,2 +10,3 @@ def greet(name):\n"
|
||||
' return f"hi {name}"\n'
|
||||
"-\n"
|
||||
"+# trailing comment\n"
|
||||
"+\n"
|
||||
)
|
||||
blocks = parse_unified_diff_stream(text)
|
||||
assert len(blocks) == 1
|
||||
hunk = blocks[0].hunks[0]
|
||||
assert "def greet(name):" in hunk.body
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Multi-block / multi-hunk
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_parse_multi_block_returns_every_block() -> None:
|
||||
text = _minimal_diff("a.py") + _minimal_diff("b.py")
|
||||
blocks = parse_unified_diff_stream(text)
|
||||
assert [blk.path_after for blk in blocks] == ["a.py", "b.py"]
|
||||
|
||||
|
||||
def test_parse_multi_hunk_block_keeps_hunks_in_order() -> None:
|
||||
text = (
|
||||
"--- a/x.py\n"
|
||||
"+++ b/x.py\n"
|
||||
"@@ -1,2 +1,2 @@\n"
|
||||
" a\n"
|
||||
"-b\n"
|
||||
"+B\n"
|
||||
"@@ -10,1 +10,2 @@\n"
|
||||
" last\n"
|
||||
"+new-tail\n"
|
||||
)
|
||||
blocks = parse_unified_diff_stream(text)
|
||||
assert len(blocks) == 1
|
||||
hunks = blocks[0].hunks
|
||||
assert len(hunks) == 2
|
||||
assert hunks[0].before_start == 1
|
||||
assert hunks[1].before_start == 10
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ANSI colour handling
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_parse_strips_ansi_color_codes_before_matching_headers() -> None:
|
||||
colored = (
|
||||
"\x1b[1m--- a/foo.py\x1b[0m\n"
|
||||
"\x1b[1m+++ b/foo.py\x1b[0m\n"
|
||||
"\x1b[36m@@ -1,2 +1,2 @@\x1b[0m\n"
|
||||
" a\n"
|
||||
"\x1b[31m-b\x1b[0m\n"
|
||||
"\x1b[32m+B\x1b[0m\n"
|
||||
)
|
||||
blocks = parse_unified_diff_stream(colored)
|
||||
assert len(blocks) == 1
|
||||
assert blocks[0].path_before == "foo.py"
|
||||
assert "\x1b[" not in blocks[0].hunks[0].body
|
||||
|
||||
|
||||
def test_parse_strips_osc_hyperlink_escape_sequences() -> None:
|
||||
osc = (
|
||||
"\x1b]8;;file:///tmp/x.py\x1b\\--- a/x.py\x1b]8;;\x1b\\\n"
|
||||
"+++ b/x.py\n"
|
||||
"@@ -1,1 +1,1 @@\n"
|
||||
"-old\n"
|
||||
"+new\n"
|
||||
)
|
||||
blocks = parse_unified_diff_stream(osc)
|
||||
assert len(blocks) == 1
|
||||
assert blocks[0].path_before == "x.py"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Stream safety / partial tails
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_parse_drops_trailing_block_header_without_hunks() -> None:
|
||||
text = _minimal_diff("a.py") + "--- a/b.py\n+++ b/b.py\n"
|
||||
blocks = parse_unified_diff_stream(text)
|
||||
# Only the complete first block should be emitted.
|
||||
assert [blk.path_after for blk in blocks] == ["a.py"]
|
||||
|
||||
|
||||
def test_parse_drops_trailing_hunk_whose_body_is_truncated() -> None:
|
||||
text = _minimal_diff("a.py") + "--- a/b.py\n+++ b/b.py\n@@ -1,5 +1,5 @@\n a\n b\n"
|
||||
blocks = parse_unified_diff_stream(text)
|
||||
assert [blk.path_after for blk in blocks] == ["a.py"]
|
||||
|
||||
|
||||
def test_parse_drops_block_with_header_without_plus_partner() -> None:
|
||||
text = "--- a/x.py\nrandom interjection\n" + _minimal_diff("a.py")
|
||||
blocks = parse_unified_diff_stream(text)
|
||||
assert [blk.path_after for blk in blocks] == ["a.py"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Noise tolerance
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_parse_ignores_noise_between_blocks() -> None:
|
||||
text = (
|
||||
"Thinking about next edit...\n"
|
||||
+ _minimal_diff("a.py")
|
||||
+ "I will now also touch b.py:\n"
|
||||
+ _minimal_diff("b.py")
|
||||
+ "Done.\n"
|
||||
)
|
||||
blocks = parse_unified_diff_stream(text)
|
||||
assert [blk.path_after for blk in blocks] == ["a.py", "b.py"]
|
||||
|
||||
|
||||
def test_parse_handles_header_timestamps_and_trailing_whitespace() -> None:
|
||||
text = (
|
||||
"--- a/foo.py\t2026-04-23 09:00:00\n"
|
||||
"+++ b/foo.py\t2026-04-23 09:01:00\n"
|
||||
"@@ -1,1 +1,1 @@\n"
|
||||
"-old\n"
|
||||
"+new\n"
|
||||
)
|
||||
blocks = parse_unified_diff_stream(text)
|
||||
assert len(blocks) == 1
|
||||
assert blocks[0].path_before == "foo.py"
|
||||
assert blocks[0].path_after == "foo.py"
|
||||
|
||||
|
||||
def test_parse_handles_no_newline_at_end_of_file_marker() -> None:
|
||||
text = (
|
||||
"--- a/x.py\n"
|
||||
"+++ b/x.py\n"
|
||||
"@@ -1,1 +1,1 @@\n"
|
||||
"-old\n"
|
||||
"\\ No newline at end of file\n"
|
||||
"+new\n"
|
||||
)
|
||||
blocks = parse_unified_diff_stream(text)
|
||||
assert len(blocks) == 1
|
||||
assert "\\ No newline" in blocks[0].hunks[0].body
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# extract_new_blocks dedup
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_extract_new_blocks_returns_only_blocks_not_in_prev() -> None:
|
||||
first = parse_unified_diff_stream(_minimal_diff("a.py"))
|
||||
second = parse_unified_diff_stream(_minimal_diff("a.py") + _minimal_diff("b.py"))
|
||||
new_blocks = extract_new_blocks(first, second)
|
||||
assert [blk.path_after for blk in new_blocks] == ["b.py"]
|
||||
|
||||
|
||||
def test_extract_new_blocks_returns_empty_when_curr_is_subset_of_prev() -> None:
|
||||
full = parse_unified_diff_stream(_minimal_diff("a.py") + _minimal_diff("b.py"))
|
||||
partial = parse_unified_diff_stream(_minimal_diff("a.py"))
|
||||
assert extract_new_blocks(full, partial) == []
|
||||
|
||||
|
||||
def test_extract_new_blocks_returns_all_when_prev_empty() -> None:
|
||||
blocks = parse_unified_diff_stream(_minimal_diff("a.py"))
|
||||
assert extract_new_blocks([], blocks) == blocks
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Dataclass invariants
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_diff_block_is_frozen_and_hashable() -> None:
|
||||
block = DiffBlock(
|
||||
path_before="a.py",
|
||||
path_after="a.py",
|
||||
hunks=(DiffHunk(1, 1, 1, 1, "@@ -1 +1 @@\n-x\n+y"),),
|
||||
)
|
||||
with pytest.raises(FrozenInstanceError):
|
||||
block.path_before = "other" # type: ignore[misc]
|
||||
assert hash(block) == hash(
|
||||
DiffBlock(
|
||||
path_before="a.py",
|
||||
path_after="a.py",
|
||||
hunks=(DiffHunk(1, 1, 1, 1, "@@ -1 +1 @@\n-x\n+y"),),
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def test_diff_hunk_is_frozen() -> None:
|
||||
hunk = DiffHunk(1, 1, 1, 1, "@@ -1 +1 @@\n-x\n+y")
|
||||
with pytest.raises(FrozenInstanceError):
|
||||
hunk.before_start = 2 # type: ignore[misc]
|
||||
@@ -1,179 +0,0 @@
|
||||
"""Adversarial edge-case tests for ``parse_unified_diff_stream``.
|
||||
|
||||
The baseline coverage in ``test_agent_proposal_watcher`` exercises
|
||||
well-formed fixtures — this file pushes the parser with stressful
|
||||
inputs the Track D Phase 1 watcher will actually see once wired
|
||||
against a live tmux ``pipe-pane``: interleaved ANSI colours, partial
|
||||
tails from growing log blobs, enormous multi-thousand-line diffs,
|
||||
concurrent parses from two threads.
|
||||
|
||||
Classifier markers: ``threading.Thread`` + ``thread.start`` for
|
||||
adversarial; "large" body word for the multi-thousand-line stress
|
||||
test. These all hit the real parser function — no mocks.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
from typing import List
|
||||
|
||||
from sessions.agent_proposal_watcher import (
|
||||
DiffBlock,
|
||||
DiffHunk,
|
||||
extract_new_blocks,
|
||||
parse_unified_diff_stream,
|
||||
)
|
||||
|
||||
|
||||
def _minimal_block(path: str = "src/lib.rs") -> str:
|
||||
return f"--- a/{path}\n+++ b/{path}\n@@ -1,3 +1,3 @@\n context\n-old\n+new\n tail\n"
|
||||
|
||||
|
||||
def test_parser_handles_ansi_colour_codes_interleaved() -> None:
|
||||
# Claude Code / codex pipe unified diffs with ANSI colour escapes
|
||||
# around +/- markers; the parser must strip them before matching.
|
||||
# Header counts must match body or the parser treats the block as
|
||||
# still-streaming — 1 before-line, 1 after-line exactly.
|
||||
ansi = "\x1b[31m" # red
|
||||
reset = "\x1b[0m"
|
||||
blob = (
|
||||
f"{ansi}--- a/x.py{reset}\n"
|
||||
f"{ansi}+++ b/x.py{reset}\n"
|
||||
"@@ -1,1 +1,1 @@\n"
|
||||
f"{ansi}-old{reset}\n"
|
||||
f"{ansi}+new{reset}\n"
|
||||
)
|
||||
blocks = parse_unified_diff_stream(blob)
|
||||
assert len(blocks) == 1
|
||||
assert blocks[0].path_before == "x.py"
|
||||
assert blocks[0].path_after == "x.py"
|
||||
|
||||
|
||||
def test_parser_drops_incomplete_block_at_tail() -> None:
|
||||
# The watcher feeds growing log text; a block whose header landed
|
||||
# but whose hunks haven't fully arrived must NOT be returned
|
||||
# prematurely. The parser either returns the partial block with
|
||||
# zero hunks or drops it entirely — tests pin the latter.
|
||||
blob = (
|
||||
_minimal_block("done.py")
|
||||
+ "--- a/pending.py\n"
|
||||
+ "+++ b/pending.py\n"
|
||||
+ "@@ -1,2 " # truncated header, no trailing \n
|
||||
)
|
||||
blocks = parse_unified_diff_stream(blob)
|
||||
assert [b.path_before for b in blocks] == ["done.py"]
|
||||
|
||||
|
||||
def test_parser_handles_multiple_hunks_in_one_block() -> None:
|
||||
# Each hunk's body must exactly match its declared counts or the
|
||||
# parser treats the block as still-streaming and drops it.
|
||||
blob = (
|
||||
"--- a/f.py\n"
|
||||
"+++ b/f.py\n"
|
||||
"@@ -1,1 +1,1 @@\n"
|
||||
"-a\n"
|
||||
"+A\n"
|
||||
"@@ -10,2 +10,2 @@\n"
|
||||
" ctx\n"
|
||||
"-b\n"
|
||||
"+B\n"
|
||||
)
|
||||
blocks = parse_unified_diff_stream(blob)
|
||||
assert len(blocks) == 1
|
||||
assert len(blocks[0].hunks) == 2
|
||||
|
||||
|
||||
def test_parser_large_diff_stress_ten_thousand_hunks() -> None:
|
||||
# Intentionally large to catch accidental O(n^2) behaviour in the
|
||||
# parser — ten thousand one-line hunks inside one block.
|
||||
lines: List[str] = ["--- a/big.py\n", "+++ b/big.py\n"]
|
||||
for i in range(10_000):
|
||||
lines.append(f"@@ -{i + 1},1 +{i + 1},1 @@\n")
|
||||
lines.append(f"-line_{i}_old\n")
|
||||
lines.append(f"+line_{i}_new\n")
|
||||
blob = "".join(lines)
|
||||
blocks = parse_unified_diff_stream(blob)
|
||||
assert len(blocks) == 1
|
||||
assert len(blocks[0].hunks) == 10_000
|
||||
|
||||
|
||||
def test_extract_new_blocks_stress_identity_diff() -> None:
|
||||
# extract_new_blocks compares via dataclass equality, so a thousand
|
||||
# identical blocks must return zero new blocks regardless of list
|
||||
# length (O(n*m) is acceptable for moderate sizes but must return
|
||||
# the right answer).
|
||||
one = parse_unified_diff_stream(_minimal_block("x.py"))
|
||||
assert len(one) == 1
|
||||
many = one * 100
|
||||
assert extract_new_blocks(many, many) == []
|
||||
|
||||
|
||||
def test_extract_new_blocks_detects_only_the_last_addition() -> None:
|
||||
prev = parse_unified_diff_stream(_minimal_block("a.py"))
|
||||
curr = parse_unified_diff_stream(_minimal_block("a.py") + _minimal_block("b.py"))
|
||||
new_blocks = extract_new_blocks(prev, curr)
|
||||
assert [b.path_before for b in new_blocks] == ["b.py"]
|
||||
|
||||
|
||||
def test_parser_ignores_noise_lines_between_blocks() -> None:
|
||||
# Pipe-pane gives us scrollback with agent prose + prompt chrome
|
||||
# mixed into the diff stream. Those must not corrupt the parse.
|
||||
blob = (
|
||||
"> Tool call: edit_file path=x.py\n"
|
||||
"Thinking: applying patch...\n"
|
||||
+ _minimal_block("x.py")
|
||||
+ "\n(3 hunks, 1 file changed)\n\n"
|
||||
+ _minimal_block("y.py")
|
||||
+ "\n> Tool call: confirm?\n"
|
||||
)
|
||||
blocks = parse_unified_diff_stream(blob)
|
||||
assert [b.path_before for b in blocks] == ["x.py", "y.py"]
|
||||
|
||||
|
||||
def test_concurrent_parse_returns_identical_results() -> None:
|
||||
# Pure-function concurrency guard: spin up two threads that each
|
||||
# parse the same large blob, confirm both see the same structured
|
||||
# output. This catches any global mutable state the parser might
|
||||
# introduce during future refactors.
|
||||
blob = _minimal_block("x.py") * 50 + _minimal_block("y.py") * 50
|
||||
expected = parse_unified_diff_stream(blob)
|
||||
|
||||
results: List[List[DiffBlock]] = [[], []]
|
||||
|
||||
def worker(idx: int) -> None:
|
||||
results[idx] = parse_unified_diff_stream(blob)
|
||||
|
||||
t1 = threading.Thread(target=worker, args=(0,))
|
||||
t2 = threading.Thread(target=worker, args=(1,))
|
||||
t1.start()
|
||||
t2.start()
|
||||
t1.join(timeout=10)
|
||||
t2.join(timeout=10)
|
||||
|
||||
assert results[0] == expected
|
||||
assert results[1] == expected
|
||||
|
||||
|
||||
def test_parser_tolerates_windows_line_endings() -> None:
|
||||
# Agents running on Windows hosts emit CRLF. Python's splitlines
|
||||
# handles it natively, but guard against a regression where a
|
||||
# trimmed `\r` leaks into a captured path.
|
||||
blob = "--- a/win.py\r\n+++ b/win.py\r\n@@ -1,1 +1,1 @@\r\n-x\r\n+y\r\n"
|
||||
blocks = parse_unified_diff_stream(blob)
|
||||
# Either the parser accepts CRLF transparently (one block with the
|
||||
# stripped path) or the agent-tmux watcher normalises upstream.
|
||||
# Whichever shape, the path_before must NOT carry a literal \r.
|
||||
for block in blocks:
|
||||
assert "\r" not in block.path_before
|
||||
assert "\r" not in block.path_after
|
||||
|
||||
|
||||
def test_dataclass_instances_are_hashable() -> None:
|
||||
# ``DiffBlock`` / ``DiffHunk`` are frozen dataclasses. Assert they
|
||||
# can live in a set so dedup pipelines using them don't regress to
|
||||
# TypeError.
|
||||
hunk = DiffHunk(
|
||||
before_start=1, before_count=1, after_start=1, after_count=1, body="@@ \n"
|
||||
)
|
||||
block = DiffBlock(path_before="a", path_after="a", hunks=(hunk,))
|
||||
assert {block} == {block}
|
||||
Reference in New Issue
Block a user