feat(session_helper): PR 13b.2 — exec/once cancel polling
PYTHON_THINNING_PLAN §5 PR 13b.2. PR 13b.1 cancel flag map skeleton 위에 *첫 polling handler* — exec/once. 산출물: - ``handle_request_cancellable(request, cancel_flag)`` 신설 — 기존 ``handle_request(request)`` 는 backward-compat thin wrapper로 ``None`` 전달. - ``handle_exec_once(params, cancel_flag)`` — 시그니처에 추가. polling loop가 deadline 체크와 같은 곳에서 ``cancel_flag.load(Relaxed)`` 검사, set 시 child SIGTERM + ``cancelled = true``. - ``cancelled && !timed_out`` 일 때 stderr 끝에 ``"Cancelled by bridge."`` 추가 (timed_out 메시지와 분리된 감지 가능 마커). - session_helper worker thread 가 ``handle_request_cancellable(request, Some(&flag))`` 호출. PR 13b.1 의 cancel_flag map 등록 → 디스패처 cancel envelope 처리 → flag set → exec/once polling 발견 → child kill. cancel propagation 범위 (PR 13b.2 한정): - ✅ exec/once: child process polling. SIGTERM + Cancelled 마커. - ⏭ tree/list, file/read, file/stat, file/write: cancel_flag 받지만 polling 없음 (호출 후 즉시 반환되는 짧은 작업이라 polling 효과 적음). 진짜 필요한 건 *대용량 file/read* chunked polling — PR 13b.3 deadline propagation과 함께. 테스트: - 기존 73 그린. - exec/once cancel 시나리오는 race-y해서 단위 테스트 추가 안 함 (일부러 long-sleep child를 spawn하고 cancel flag flip 후 stderr 확인 가능하나 flaky 위험). PR 13b.3에서 deadline 통합 시 함께. PR 13b.3 후속: - file/read 대용량 chunked polling. - ``RequestEnvelope.timeout_ms`` → handle_request 측 deadline propagation. boundary-claim: removes: [] delete-count: 0 rust-additions: ~30 LOC (handle_request_cancellable + cancel polling) ban-list: 'PR 13b.1 cancel flag map skeleton 위 첫 polling handler' Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -228,10 +228,11 @@ fn run_stdio_session_with_io(
|
||||
let tx = ev_tx_workers.clone();
|
||||
let flags_for_cleanup = std::sync::Arc::clone(&cancel_flags);
|
||||
thread::spawn(move || {
|
||||
// ``flag`` is captured for PR 13b.2 to consult inside
|
||||
// ``handle_request`` once handlers learn to poll it.
|
||||
let _cancel_flag = flag;
|
||||
let reply = match handle_request(request) {
|
||||
// PR 13b.2: pass the registered cancel flag through to
|
||||
// ``handle_request_cancellable`` so handlers with a
|
||||
// polling point (currently ``exec/once``) can abort
|
||||
// when the dispatcher flips the flag.
|
||||
let reply = match handle_request_cancellable(request, Some(&flag)) {
|
||||
Ok(resp) => ProtocolMessage::Response(resp),
|
||||
Err(err) => ProtocolMessage::Error(err),
|
||||
};
|
||||
@@ -329,7 +330,24 @@ fn write_message(
|
||||
}
|
||||
|
||||
/// Handles one request envelope and returns either a success response or error.
|
||||
///
|
||||
/// Backward-compatible no-cancel entrypoint — Wave 2 PR 13b.2 callers should
|
||||
/// prefer [`handle_request_cancellable`] so the dispatcher can flip the
|
||||
/// `cancel_flag` for in-flight handlers (currently `exec/once` polls it).
|
||||
pub fn handle_request(request: RequestEnvelope) -> Result<ResponseEnvelope, ErrorEnvelope> {
|
||||
handle_request_cancellable(request, None)
|
||||
}
|
||||
|
||||
/// PR 13b.2: cancel-flag-aware variant of [`handle_request`].
|
||||
///
|
||||
/// `cancel_flag` is consulted by handlers that have a polling point —
|
||||
/// `exec/once` checks it on every 10 ms wait inside its child-watcher
|
||||
/// loop. Other handlers receive but currently ignore the flag; PR 13b.3
|
||||
/// expands coverage (chunked `file/read`, deadline propagation).
|
||||
pub fn handle_request_cancellable(
|
||||
request: RequestEnvelope,
|
||||
cancel_flag: Option<&std::sync::atomic::AtomicBool>,
|
||||
) -> Result<ResponseEnvelope, ErrorEnvelope> {
|
||||
let result = match request.method.as_str() {
|
||||
METHOD_CHANNEL_DISPATCH => {
|
||||
let params: ChannelDispatchParams = serde_json::from_value(request.params.clone())
|
||||
@@ -379,7 +397,7 @@ pub fn handle_request(request: RequestEnvelope) -> Result<ResponseEnvelope, Erro
|
||||
METHOD_EXEC_ONCE => {
|
||||
let params: ExecOnceParams = serde_json::from_value(request.params.clone())
|
||||
.map_err(|error| invalid_params_error(Some(request.id.clone()), error))?;
|
||||
serde_json::to_value(handle_exec_once(¶ms).map_err(|error| {
|
||||
serde_json::to_value(handle_exec_once(¶ms, cancel_flag).map_err(|error| {
|
||||
error_envelope(Some(request.id.clone()), error.code, error.message)
|
||||
})?)
|
||||
.map_err(|error| internal_error(Some(request.id.clone()), error))?
|
||||
@@ -902,7 +920,10 @@ fn handle_file_write(params: &FileWriteParams) -> Result<FileWriteResult, Helper
|
||||
const EXEC_STDOUT_MAX: usize = 4 * 1024 * 1024;
|
||||
const EXEC_STDERR_MAX: usize = 4 * 1024 * 1024;
|
||||
|
||||
fn handle_exec_once(params: &ExecOnceParams) -> Result<ExecOnceResult, HelperFsError> {
|
||||
fn handle_exec_once(
|
||||
params: &ExecOnceParams,
|
||||
cancel_flag: Option<&std::sync::atomic::AtomicBool>,
|
||||
) -> Result<ExecOnceResult, HelperFsError> {
|
||||
if params.argv.is_empty() {
|
||||
return Err(HelperFsError::new(
|
||||
"exec_invalid_argv",
|
||||
@@ -962,10 +983,24 @@ fn handle_exec_once(params: &ExecOnceParams) -> Result<ExecOnceResult, HelperFsE
|
||||
let stdout_handle = thread::spawn(move || read_child_output(stdout_pipe, stdout_cap));
|
||||
let stderr_handle = thread::spawn(move || read_child_output(stderr_pipe, stderr_cap));
|
||||
|
||||
// PR 13b.2: cancel_flag is checked in the same polling loop that
|
||||
// already enforces the deadline. When the dispatcher flips the flag
|
||||
// (in response to a Cancel envelope), the loop exits early via the
|
||||
// ``cancelled`` branch and the child is SIGTERM'd just like a timeout.
|
||||
let mut cancelled = false;
|
||||
let timed_out = loop {
|
||||
match child.try_wait() {
|
||||
Ok(Some(_)) => break false,
|
||||
Ok(None) => {
|
||||
if cancel_flag
|
||||
.map(|f| f.load(std::sync::atomic::Ordering::Relaxed))
|
||||
.unwrap_or(false)
|
||||
{
|
||||
cancelled = true;
|
||||
let _ = child.kill();
|
||||
let _ = child.wait();
|
||||
break false;
|
||||
}
|
||||
if Instant::now() >= deadline {
|
||||
let _ = child.kill();
|
||||
let _ = child.wait();
|
||||
@@ -993,7 +1028,12 @@ fn handle_exec_once(params: &ExecOnceParams) -> Result<ExecOnceResult, HelperFsE
|
||||
|
||||
let stdout = stdout_handle.join().unwrap_or_default();
|
||||
let mut stderr = stderr_handle.join().unwrap_or_default();
|
||||
if timed_out {
|
||||
if cancelled && !timed_out {
|
||||
if !stderr.is_empty() {
|
||||
stderr.push('\n');
|
||||
}
|
||||
stderr.push_str("Cancelled by bridge.");
|
||||
} else if timed_out {
|
||||
if !stderr.is_empty() {
|
||||
stderr.push('\n');
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user