From 51de194141d31f1d8a918896cf41593562eb6e3e Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Mon, 18 May 2026 18:19:08 +0000 Subject: [PATCH] fix(llm_gateway): read full non-streaming body when final chunk is empty Co-authored-by: Musa --- crates/llm_gateway/src/stream_context.rs | 49 ++++++++++++++++-------- 1 file changed, 33 insertions(+), 16 deletions(-) diff --git a/crates/llm_gateway/src/stream_context.rs b/crates/llm_gateway/src/stream_context.rs index 6920d43d..86c09554 100644 --- a/crates/llm_gateway/src/stream_context.rs +++ b/crates/llm_gateway/src/stream_context.rs @@ -56,6 +56,8 @@ pub struct StreamContext { http_protocol: Option, sse_buffer: Option, sse_chunk_processor: Option, + /// Largest `body_size` seen for a non-streaming response (cumulative per Envoy). + non_streaming_response_body_size: usize, } impl StreamContext { @@ -87,6 +89,7 @@ impl StreamContext { http_protocol: None, sse_buffer: None, sse_chunk_processor: None, + non_streaming_response_body_size: 0, } } @@ -1173,19 +1176,33 @@ impl HttpContext for StreamContext { } let current_time = get_current_time().unwrap(); - if end_of_stream && body_size == 0 { - debug!( - "request_id={}: response body complete, total_bytes={}", - self.request_identifier(), - body_size - ); - self.handle_end_of_request_metrics_and_traces(current_time); - return Action::Continue; - } // Non-streaming upstream responses may arrive in multiple chunks; wait for the // full buffered body before parsing (body_size is cumulative on the final chunk). - if !self.streaming_response && !end_of_stream { + if !self.streaming_response { + if body_size > self.non_streaming_response_body_size { + self.non_streaming_response_body_size = body_size; + } + if !end_of_stream { + return Action::Continue; + } + } + + let effective_body_size = if self.streaming_response { + body_size + } else if body_size > 0 { + body_size + } else { + self.non_streaming_response_body_size + }; + + if end_of_stream && effective_body_size == 0 { + debug!( + "request_id={}: response body complete, total_bytes={}", + self.request_identifier(), + effective_body_size + ); + self.handle_end_of_request_metrics_and_traces(current_time); return Action::Continue; } @@ -1200,15 +1217,15 @@ impl HttpContext for StreamContext { ); // For error responses, forward the upstream error directly without parsing - if body_size > 0 { - if let Ok(body) = self.read_raw_response_body(body_size) { + if effective_body_size > 0 { + if let Ok(body) = self.read_raw_response_body(effective_body_size) { debug!( "request_id={}: upstream error body: {}", self.request_identifier(), String::from_utf8_lossy(&body) ); // Forward the error response as-is - self.set_http_response_body(0, body_size, &body); + self.set_http_response_body(0, effective_body_size, &body); } } return Action::Continue; @@ -1233,7 +1250,7 @@ impl HttpContext for StreamContext { } } - let body = match self.read_raw_response_body(body_size) { + let body = match self.read_raw_response_body(effective_body_size) { Ok(bytes) => bytes, Err(action) => return action, }; @@ -1249,14 +1266,14 @@ impl HttpContext for StreamContext { if self.streaming_response { match self.handle_streaming_response(&body, provider_id) { Ok(serialized_body) => { - self.set_http_response_body(0, body_size, &serialized_body); + self.set_http_response_body(0, effective_body_size, &serialized_body); } Err(action) => return action, } } else { match self.handle_non_streaming_response(&body, provider_id) { Ok(serialized_body) => { - self.set_http_response_body(0, body_size, &serialized_body); + self.set_http_response_body(0, effective_body_size, &serialized_body); } Err(action) => return action, }