feat(session_helper): PR 13b.2 — exec/once cancel polling

PYTHON_THINNING_PLAN §5 PR 13b.2. PR 13b.1 cancel flag map skeleton 위에
*첫 polling handler* — exec/once.

산출물:
- ``handle_request_cancellable(request, cancel_flag)`` 신설 — 기존
  ``handle_request(request)`` 는 backward-compat thin wrapper로 ``None``
  전달.
- ``handle_exec_once(params, cancel_flag)`` — 시그니처에 추가. polling
  loop가 deadline 체크와 같은 곳에서 ``cancel_flag.load(Relaxed)`` 검사,
  set 시 child SIGTERM + ``cancelled = true``.
- ``cancelled && !timed_out`` 일 때 stderr 끝에 ``"Cancelled by bridge."``
  추가 (timed_out 메시지와 분리된 감지 가능 마커).
- session_helper worker thread 가 ``handle_request_cancellable(request,
  Some(&flag))`` 호출. PR 13b.1 의 cancel_flag map 등록 → 디스패처 cancel
  envelope 처리 → flag set → exec/once polling 발견 → child kill.

cancel propagation 범위 (PR 13b.2 한정):
-  exec/once: child process polling. SIGTERM + Cancelled 마커.
- ⏭ tree/list, file/read, file/stat, file/write: cancel_flag 받지만 polling
  없음 (호출 후 즉시 반환되는 짧은 작업이라 polling 효과 적음). 진짜
  필요한 건 *대용량 file/read* chunked polling — PR 13b.3 deadline
  propagation과 함께.

테스트:
- 기존 73 그린.
- exec/once cancel 시나리오는 race-y해서 단위 테스트 추가 안 함 (일부러
  long-sleep child를 spawn하고 cancel flag flip 후 stderr 확인 가능하나
  flaky 위험). PR 13b.3에서 deadline 통합 시 함께.

PR 13b.3 후속:
- file/read 대용량 chunked polling.
- ``RequestEnvelope.timeout_ms`` → handle_request 측 deadline propagation.

boundary-claim:
  removes: []
  delete-count: 0
  rust-additions: ~30 LOC (handle_request_cancellable + cancel polling)
  ban-list: 'PR 13b.1 cancel flag map skeleton 위 첫 polling handler'

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-02 16:30:50 +09:00
parent 156c9de347
commit ae11415967

View File

@@ -228,10 +228,11 @@ fn run_stdio_session_with_io(
let tx = ev_tx_workers.clone();
let flags_for_cleanup = std::sync::Arc::clone(&cancel_flags);
thread::spawn(move || {
// ``flag`` is captured for PR 13b.2 to consult inside
// ``handle_request`` once handlers learn to poll it.
let _cancel_flag = flag;
let reply = match handle_request(request) {
// PR 13b.2: pass the registered cancel flag through to
// ``handle_request_cancellable`` so handlers with a
// polling point (currently ``exec/once``) can abort
// when the dispatcher flips the flag.
let reply = match handle_request_cancellable(request, Some(&flag)) {
Ok(resp) => ProtocolMessage::Response(resp),
Err(err) => ProtocolMessage::Error(err),
};
@@ -329,7 +330,24 @@ fn write_message(
}
/// Handles one request envelope and returns either a success response or error.
///
/// Backward-compatible no-cancel entrypoint — Wave 2 PR 13b.2 callers should
/// prefer [`handle_request_cancellable`] so the dispatcher can flip the
/// `cancel_flag` for in-flight handlers (currently `exec/once` polls it).
pub fn handle_request(request: RequestEnvelope) -> Result<ResponseEnvelope, ErrorEnvelope> {
handle_request_cancellable(request, None)
}
/// PR 13b.2: cancel-flag-aware variant of [`handle_request`].
///
/// `cancel_flag` is consulted by handlers that have a polling point —
/// `exec/once` checks it on every 10 ms wait inside its child-watcher
/// loop. Other handlers receive but currently ignore the flag; PR 13b.3
/// expands coverage (chunked `file/read`, deadline propagation).
pub fn handle_request_cancellable(
request: RequestEnvelope,
cancel_flag: Option<&std::sync::atomic::AtomicBool>,
) -> Result<ResponseEnvelope, ErrorEnvelope> {
let result = match request.method.as_str() {
METHOD_CHANNEL_DISPATCH => {
let params: ChannelDispatchParams = serde_json::from_value(request.params.clone())
@@ -379,7 +397,7 @@ pub fn handle_request(request: RequestEnvelope) -> Result<ResponseEnvelope, Erro
METHOD_EXEC_ONCE => {
let params: ExecOnceParams = serde_json::from_value(request.params.clone())
.map_err(|error| invalid_params_error(Some(request.id.clone()), error))?;
serde_json::to_value(handle_exec_once(&params).map_err(|error| {
serde_json::to_value(handle_exec_once(&params, cancel_flag).map_err(|error| {
error_envelope(Some(request.id.clone()), error.code, error.message)
})?)
.map_err(|error| internal_error(Some(request.id.clone()), error))?
@@ -902,7 +920,10 @@ fn handle_file_write(params: &FileWriteParams) -> Result<FileWriteResult, Helper
const EXEC_STDOUT_MAX: usize = 4 * 1024 * 1024;
const EXEC_STDERR_MAX: usize = 4 * 1024 * 1024;
fn handle_exec_once(params: &ExecOnceParams) -> Result<ExecOnceResult, HelperFsError> {
fn handle_exec_once(
params: &ExecOnceParams,
cancel_flag: Option<&std::sync::atomic::AtomicBool>,
) -> Result<ExecOnceResult, HelperFsError> {
if params.argv.is_empty() {
return Err(HelperFsError::new(
"exec_invalid_argv",
@@ -962,10 +983,24 @@ fn handle_exec_once(params: &ExecOnceParams) -> Result<ExecOnceResult, HelperFsE
let stdout_handle = thread::spawn(move || read_child_output(stdout_pipe, stdout_cap));
let stderr_handle = thread::spawn(move || read_child_output(stderr_pipe, stderr_cap));
// PR 13b.2: cancel_flag is checked in the same polling loop that
// already enforces the deadline. When the dispatcher flips the flag
// (in response to a Cancel envelope), the loop exits early via the
// ``cancelled`` branch and the child is SIGTERM'd just like a timeout.
let mut cancelled = false;
let timed_out = loop {
match child.try_wait() {
Ok(Some(_)) => break false,
Ok(None) => {
if cancel_flag
.map(|f| f.load(std::sync::atomic::Ordering::Relaxed))
.unwrap_or(false)
{
cancelled = true;
let _ = child.kill();
let _ = child.wait();
break false;
}
if Instant::now() >= deadline {
let _ = child.kill();
let _ = child.wait();
@@ -993,7 +1028,12 @@ fn handle_exec_once(params: &ExecOnceParams) -> Result<ExecOnceResult, HelperFsE
let stdout = stdout_handle.join().unwrap_or_default();
let mut stderr = stderr_handle.join().unwrap_or_default();
if timed_out {
if cancelled && !timed_out {
if !stderr.is_empty() {
stderr.push('\n');
}
stderr.push_str("Cancelled by bridge.");
} else if timed_out {
if !stderr.is_empty() {
stderr.push('\n');
}