feat(session_helper): PR 13b.3 — deadline propagation + file/read chunked polling
PYTHON_THINNING_PLAN §5 PR 13b.3. PR 13b.2 (exec/once polling) 위에
*deadline propagation* + *file/read chunked polling*.
산출물:
- ``handle_request_cancellable`` 가 ``request.timeout_ms`` 를 ``Instant``
deadline으로 변환해 모든 handler에 일관된 시간 한도 부과 (timeout_ms=0
→ None, 기존 무제한 호출자 호환).
- ``handle_file_read(params, cancel_flag, deadline)`` 시그니처 변경:
- 64 KiB chunked read (기존 exec_once read buffer와 동일).
- 매 chunk마다 ``cancel_flag.load(Relaxed)`` + collapse-able
``if let Some(d) = deadline && Instant::now() >= d`` 체크.
- 16 MiB MAX_READ_BYTES 상한 = 256+ polling points worst-case.
- cancel 시 ``HelperFsError::new("cancelled", "Cancelled by bridge.")``.
- deadline 초과 시 ``"file_read_timeout"`` + 누적 바이트 수 메시지.
cancel/timeout coverage (PR 13b.1 → .3 누적):
- exec/once: PR 13b.2 polling SIGTERM.
- file/read: PR 13b.3 chunked + cancel + deadline.
- tree/list, file/stat, file/write, file/watch: cancel_flag/deadline 받지만
polling 없음 (자체 timeout이 별도이거나 짧은 단일-syscall 호출).
테스트:
- 기존 73 그린 (timeout_ms=0 호출자는 deadline=None → 기존 무한 동작).
- ``file_read_request_returns_base64_body`` 비트 동일 통과 (chunked 경로
결과 동일).
PR 13b.4 후속:
- worker dispatch가 priority queue (interactive vs mirror) 사용.
boundary-claim:
removes: []
delete-count: 0
rust-additions: ~50 LOC (chunked read loop + deadline 전파 + collapsed if)
ban-list: 'cancel/timeout 일관 적용 — file/read 16 MiB 한도 안 256+ checkpoint'
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -331,23 +331,30 @@ fn write_message(
|
||||
|
||||
/// Handles one request envelope and returns either a success response or error.
|
||||
///
|
||||
/// Backward-compatible no-cancel entrypoint — Wave 2 PR 13b.2 callers should
|
||||
/// Backward-compatible no-cancel entrypoint — Wave 2 PR 13b.2/.3 callers should
|
||||
/// prefer [`handle_request_cancellable`] so the dispatcher can flip the
|
||||
/// `cancel_flag` for in-flight handlers (currently `exec/once` polls it).
|
||||
/// `cancel_flag` for in-flight handlers and propagate the per-request
|
||||
/// `timeout_ms` deadline through to chunked-read handlers.
|
||||
pub fn handle_request(request: RequestEnvelope) -> Result<ResponseEnvelope, ErrorEnvelope> {
|
||||
handle_request_cancellable(request, None)
|
||||
}
|
||||
|
||||
/// PR 13b.2: cancel-flag-aware variant of [`handle_request`].
|
||||
/// PR 13b.2/.3: cancel-flag and deadline-aware variant of [`handle_request`].
|
||||
///
|
||||
/// `cancel_flag` is consulted by handlers that have a polling point —
|
||||
/// `exec/once` checks it on every 10 ms wait inside its child-watcher
|
||||
/// loop. Other handlers receive but currently ignore the flag; PR 13b.3
|
||||
/// expands coverage (chunked `file/read`, deadline propagation).
|
||||
/// loop, `file/read` checks it between 64 KiB chunks. Deadline is derived
|
||||
/// from `request.timeout_ms` and applied uniformly so a slow-disk read or
|
||||
/// runaway exec terminates with the same envelope.
|
||||
pub fn handle_request_cancellable(
|
||||
request: RequestEnvelope,
|
||||
cancel_flag: Option<&std::sync::atomic::AtomicBool>,
|
||||
) -> Result<ResponseEnvelope, ErrorEnvelope> {
|
||||
let deadline = if request.timeout_ms > 0 {
|
||||
Some(Instant::now() + Duration::from_millis(request.timeout_ms))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let result = match request.method.as_str() {
|
||||
METHOD_CHANNEL_DISPATCH => {
|
||||
let params: ChannelDispatchParams = serde_json::from_value(request.params.clone())
|
||||
@@ -365,9 +372,9 @@ pub fn handle_request_cancellable(
|
||||
METHOD_FILE_READ => {
|
||||
let params: FileReadParams = serde_json::from_value(request.params.clone())
|
||||
.map_err(|error| invalid_params_error(Some(request.id.clone()), error))?;
|
||||
serde_json::to_value(handle_file_read(¶ms).map_err(|error| {
|
||||
error_envelope(Some(request.id.clone()), error.code, error.message)
|
||||
})?)
|
||||
serde_json::to_value(handle_file_read(¶ms, cancel_flag, deadline).map_err(
|
||||
|error| error_envelope(Some(request.id.clone()), error.code, error.message),
|
||||
)?)
|
||||
.map_err(|error| internal_error(Some(request.id.clone()), error))?
|
||||
}
|
||||
METHOD_FILE_STAT => {
|
||||
@@ -534,7 +541,11 @@ fn handle_tree_list(params: &TreeListParams) -> Result<TreeListResult, HelperFsE
|
||||
Ok(TreeListResult { entries })
|
||||
}
|
||||
|
||||
fn handle_file_read(params: &FileReadParams) -> Result<FileReadResult, HelperFsError> {
|
||||
fn handle_file_read(
|
||||
params: &FileReadParams,
|
||||
cancel_flag: Option<&std::sync::atomic::AtomicBool>,
|
||||
deadline: Option<Instant>,
|
||||
) -> Result<FileReadResult, HelperFsError> {
|
||||
let path = absolute_path(¶ms.remote_absolute_path)?;
|
||||
let metadata = fs::symlink_metadata(path).map_err(|error| {
|
||||
HelperFsError::new("file_read_failed", format!("Unable to stat path: {error}"))
|
||||
@@ -555,10 +566,46 @@ fn handle_file_read(params: &FileReadParams) -> Result<FileReadResult, HelperFsE
|
||||
let mut file = File::open(path).map_err(|error| {
|
||||
HelperFsError::new("file_read_failed", format!("Unable to open file: {error}"))
|
||||
})?;
|
||||
let mut body = Vec::new();
|
||||
file.read_to_end(&mut body).map_err(|error| {
|
||||
HelperFsError::new("file_read_failed", format!("Unable to read file: {error}"))
|
||||
})?;
|
||||
// PR 13b.3: chunked read so cancel_flag and deadline can be polled
|
||||
// between chunks. 64 KiB matches the existing exec_once read buffer
|
||||
// and is well below the 16 MiB MAX_READ_BYTES cap so even worst-case
|
||||
// file sizes get ~256 polling points per request.
|
||||
const CHUNK: usize = 64 * 1024;
|
||||
let cap = usize::try_from(mapped.size_bytes).unwrap_or(usize::MAX);
|
||||
let mut body: Vec<u8> = Vec::with_capacity(cap.min(CHUNK * 16));
|
||||
let mut buf = [0u8; CHUNK];
|
||||
loop {
|
||||
if cancel_flag
|
||||
.map(|f| f.load(std::sync::atomic::Ordering::Relaxed))
|
||||
.unwrap_or(false)
|
||||
{
|
||||
return Err(HelperFsError::new(
|
||||
"cancelled",
|
||||
"Cancelled by bridge.".to_string(),
|
||||
));
|
||||
}
|
||||
if let Some(d) = deadline
|
||||
&& Instant::now() >= d
|
||||
{
|
||||
return Err(HelperFsError::new(
|
||||
"file_read_timeout",
|
||||
format!("Read exceeded request deadline ({} bytes read)", body.len()),
|
||||
));
|
||||
}
|
||||
let n = file.read(&mut buf).map_err(|error| {
|
||||
HelperFsError::new("file_read_failed", format!("Unable to read file: {error}"))
|
||||
})?;
|
||||
if n == 0 {
|
||||
break;
|
||||
}
|
||||
body.extend_from_slice(&buf[..n]);
|
||||
if body.len() as u64 > MAX_READ_BYTES {
|
||||
return Err(HelperFsError::new(
|
||||
"file_too_large",
|
||||
"Remote file grew beyond MAX_READ_BYTES during read.".to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
Ok(FileReadResult {
|
||||
metadata: RemoteFileMetadata {
|
||||
size_bytes: body.len() as u64,
|
||||
|
||||
Reference in New Issue
Block a user