From 0348ca1d11d4fb08be544aca169d395645d9822a Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Mon, 18 May 2026 18:27:16 +0000 Subject: [PATCH] fix(llm_gateway): read full non-streaming body with usize::MAX at end_of_stream Co-authored-by: Musa --- crates/llm_gateway/src/stream_context.rs | 65 ++++++++++++------------ 1 file changed, 32 insertions(+), 33 deletions(-) diff --git a/crates/llm_gateway/src/stream_context.rs b/crates/llm_gateway/src/stream_context.rs index 86c09554..969dc266 100644 --- a/crates/llm_gateway/src/stream_context.rs +++ b/crates/llm_gateway/src/stream_context.rs @@ -56,8 +56,6 @@ pub struct StreamContext { http_protocol: Option, sse_buffer: Option, sse_chunk_processor: Option, - /// Largest `body_size` seen for a non-streaming response (cumulative per Envoy). - non_streaming_response_body_size: usize, } impl StreamContext { @@ -89,7 +87,6 @@ impl StreamContext { http_protocol: None, sse_buffer: None, sse_chunk_processor: None, - non_streaming_response_body_size: 0, } } @@ -437,17 +434,14 @@ impl StreamContext { } Ok(streaming_chunk) } else { - if body_size == 0 { - return Err(Action::Continue); - } debug!( "request_id={}: upstream response complete, streaming=false body_size={}", self.request_identifier(), body_size ); - match self.get_http_response_body(0, body_size) { - Some(body) => Ok(body), - None => { + match self.get_http_response_body(0, usize::MAX) { + Some(body) if !body.is_empty() => Ok(body), + _ => { warn!( "request_id={}: non streaming response body empty", self.request_identifier() @@ -1178,29 +1172,16 @@ impl HttpContext for StreamContext { let current_time = get_current_time().unwrap(); // Non-streaming upstream responses may arrive in multiple chunks; wait for the - // full buffered body before parsing (body_size is cumulative on the final chunk). - if !self.streaming_response { - if body_size > self.non_streaming_response_body_size { - self.non_streaming_response_body_size = body_size; - } - if !end_of_stream { - return Action::Continue; - } + // full buffered body before parsing. + if !self.streaming_response && !end_of_stream { + return Action::Continue; } - let effective_body_size = if self.streaming_response { - body_size - } else if body_size > 0 { - body_size - } else { - self.non_streaming_response_body_size - }; - - if end_of_stream && effective_body_size == 0 { + if end_of_stream && body_size == 0 && self.streaming_response { debug!( "request_id={}: response body complete, total_bytes={}", self.request_identifier(), - effective_body_size + body_size ); self.handle_end_of_request_metrics_and_traces(current_time); return Action::Continue; @@ -1217,15 +1198,20 @@ impl HttpContext for StreamContext { ); // For error responses, forward the upstream error directly without parsing - if effective_body_size > 0 { - if let Ok(body) = self.read_raw_response_body(effective_body_size) { + if let Ok(body) = self.read_raw_response_body(body_size) { + if !body.is_empty() { debug!( "request_id={}: upstream error body: {}", self.request_identifier(), String::from_utf8_lossy(&body) ); // Forward the error response as-is - self.set_http_response_body(0, effective_body_size, &body); + let replace_size = if self.streaming_response { + body_size + } else { + body.len() + }; + self.set_http_response_body(0, replace_size, &body); } } return Action::Continue; @@ -1250,11 +1236,24 @@ impl HttpContext for StreamContext { } } - let body = match self.read_raw_response_body(effective_body_size) { + let body = match self.read_raw_response_body(body_size) { Ok(bytes) => bytes, Err(action) => return action, }; + if !self.streaming_response && body.is_empty() { + if end_of_stream { + self.handle_end_of_request_metrics_and_traces(current_time); + } + return Action::Continue; + } + + let replace_size = if self.streaming_response { + body_size + } else { + body.len() + }; + debug!( "request_id={}: upstream raw response, body_size={} content={}", self.request_identifier(), @@ -1266,14 +1265,14 @@ impl HttpContext for StreamContext { if self.streaming_response { match self.handle_streaming_response(&body, provider_id) { Ok(serialized_body) => { - self.set_http_response_body(0, effective_body_size, &serialized_body); + self.set_http_response_body(0, replace_size, &serialized_body); } Err(action) => return action, } } else { match self.handle_non_streaming_response(&body, provider_id) { Ok(serialized_body) => { - self.set_http_response_body(0, effective_body_size, &serialized_body); + self.set_http_response_body(0, replace_size, &serialized_body); } Err(action) => return action, }