feat(session_helper): PR 13b.3 — deadline propagation + file/read chunked polling

PYTHON_THINNING_PLAN §5 PR 13b.3. PR 13b.2 (exec/once polling) 위에
*deadline propagation* + *file/read chunked polling*.

산출물:
- ``handle_request_cancellable`` 가 ``request.timeout_ms`` 를 ``Instant``
  deadline으로 변환해 모든 handler에 일관된 시간 한도 부과 (timeout_ms=0
  → None, 기존 무제한 호출자 호환).
- ``handle_file_read(params, cancel_flag, deadline)`` 시그니처 변경:
  - 64 KiB chunked read (기존 exec_once read buffer와 동일).
  - 매 chunk마다 ``cancel_flag.load(Relaxed)`` + collapse-able
    ``if let Some(d) = deadline && Instant::now() >= d`` 체크.
  - 16 MiB MAX_READ_BYTES 상한 = 256+ polling points worst-case.
  - cancel 시 ``HelperFsError::new("cancelled", "Cancelled by bridge.")``.
  - deadline 초과 시 ``"file_read_timeout"`` + 누적 바이트 수 메시지.

cancel/timeout coverage (PR 13b.1 → .3 누적):
- exec/once: PR 13b.2 polling SIGTERM.
- file/read: PR 13b.3 chunked + cancel + deadline.
- tree/list, file/stat, file/write, file/watch: cancel_flag/deadline 받지만
  polling 없음 (자체 timeout이 별도이거나 짧은 단일-syscall 호출).

테스트:
- 기존 73 그린 (timeout_ms=0 호출자는 deadline=None → 기존 무한 동작).
- ``file_read_request_returns_base64_body`` 비트 동일 통과 (chunked 경로
  결과 동일).

PR 13b.4 후속:
- worker dispatch가 priority queue (interactive vs mirror) 사용.

boundary-claim:
  removes: []
  delete-count: 0
  rust-additions: ~50 LOC (chunked read loop + deadline 전파 + collapsed if)
  ban-list: 'cancel/timeout 일관 적용 — file/read 16 MiB 한도 안 256+ checkpoint'

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-02 16:45:06 +09:00
parent 7329454b90
commit cf74d89b9a

View File

@@ -331,23 +331,30 @@ fn write_message(
/// Handles one request envelope and returns either a success response or error.
///
/// Backward-compatible no-cancel entrypoint — Wave 2 PR 13b.2 callers should
/// Backward-compatible no-cancel entrypoint — Wave 2 PR 13b.2/.3 callers should
/// prefer [`handle_request_cancellable`] so the dispatcher can flip the
/// `cancel_flag` for in-flight handlers (currently `exec/once` polls it).
/// `cancel_flag` for in-flight handlers and propagate the per-request
/// `timeout_ms` deadline through to chunked-read handlers.
pub fn handle_request(request: RequestEnvelope) -> Result<ResponseEnvelope, ErrorEnvelope> {
handle_request_cancellable(request, None)
}
/// PR 13b.2: cancel-flag-aware variant of [`handle_request`].
/// PR 13b.2/.3: cancel-flag and deadline-aware variant of [`handle_request`].
///
/// `cancel_flag` is consulted by handlers that have a polling point —
/// `exec/once` checks it on every 10 ms wait inside its child-watcher
/// loop. Other handlers receive but currently ignore the flag; PR 13b.3
/// expands coverage (chunked `file/read`, deadline propagation).
/// loop, `file/read` checks it between 64 KiB chunks. Deadline is derived
/// from `request.timeout_ms` and applied uniformly so a slow-disk read or
/// runaway exec terminates with the same envelope.
pub fn handle_request_cancellable(
request: RequestEnvelope,
cancel_flag: Option<&std::sync::atomic::AtomicBool>,
) -> Result<ResponseEnvelope, ErrorEnvelope> {
let deadline = if request.timeout_ms > 0 {
Some(Instant::now() + Duration::from_millis(request.timeout_ms))
} else {
None
};
let result = match request.method.as_str() {
METHOD_CHANNEL_DISPATCH => {
let params: ChannelDispatchParams = serde_json::from_value(request.params.clone())
@@ -365,9 +372,9 @@ pub fn handle_request_cancellable(
METHOD_FILE_READ => {
let params: FileReadParams = serde_json::from_value(request.params.clone())
.map_err(|error| invalid_params_error(Some(request.id.clone()), error))?;
serde_json::to_value(handle_file_read(&params).map_err(|error| {
error_envelope(Some(request.id.clone()), error.code, error.message)
})?)
serde_json::to_value(handle_file_read(&params, cancel_flag, deadline).map_err(
|error| error_envelope(Some(request.id.clone()), error.code, error.message),
)?)
.map_err(|error| internal_error(Some(request.id.clone()), error))?
}
METHOD_FILE_STAT => {
@@ -534,7 +541,11 @@ fn handle_tree_list(params: &TreeListParams) -> Result<TreeListResult, HelperFsE
Ok(TreeListResult { entries })
}
fn handle_file_read(params: &FileReadParams) -> Result<FileReadResult, HelperFsError> {
fn handle_file_read(
params: &FileReadParams,
cancel_flag: Option<&std::sync::atomic::AtomicBool>,
deadline: Option<Instant>,
) -> Result<FileReadResult, HelperFsError> {
let path = absolute_path(&params.remote_absolute_path)?;
let metadata = fs::symlink_metadata(path).map_err(|error| {
HelperFsError::new("file_read_failed", format!("Unable to stat path: {error}"))
@@ -555,10 +566,46 @@ fn handle_file_read(params: &FileReadParams) -> Result<FileReadResult, HelperFsE
let mut file = File::open(path).map_err(|error| {
HelperFsError::new("file_read_failed", format!("Unable to open file: {error}"))
})?;
let mut body = Vec::new();
file.read_to_end(&mut body).map_err(|error| {
HelperFsError::new("file_read_failed", format!("Unable to read file: {error}"))
})?;
// PR 13b.3: chunked read so cancel_flag and deadline can be polled
// between chunks. 64 KiB matches the existing exec_once read buffer
// and is well below the 16 MiB MAX_READ_BYTES cap so even worst-case
// file sizes get ~256 polling points per request.
const CHUNK: usize = 64 * 1024;
let cap = usize::try_from(mapped.size_bytes).unwrap_or(usize::MAX);
let mut body: Vec<u8> = Vec::with_capacity(cap.min(CHUNK * 16));
let mut buf = [0u8; CHUNK];
loop {
if cancel_flag
.map(|f| f.load(std::sync::atomic::Ordering::Relaxed))
.unwrap_or(false)
{
return Err(HelperFsError::new(
"cancelled",
"Cancelled by bridge.".to_string(),
));
}
if let Some(d) = deadline
&& Instant::now() >= d
{
return Err(HelperFsError::new(
"file_read_timeout",
format!("Read exceeded request deadline ({} bytes read)", body.len()),
));
}
let n = file.read(&mut buf).map_err(|error| {
HelperFsError::new("file_read_failed", format!("Unable to read file: {error}"))
})?;
if n == 0 {
break;
}
body.extend_from_slice(&buf[..n]);
if body.len() as u64 > MAX_READ_BYTES {
return Err(HelperFsError::new(
"file_too_large",
"Remote file grew beyond MAX_READ_BYTES during read.".to_string(),
));
}
}
Ok(FileReadResult {
metadata: RemoteFileMetadata {
size_bytes: body.len() as u64,