mirror of
https://github.com/katanemo/plano.git
synced 2026-06-11 15:05:14 +02:00
fix(llm_gateway): read full non-streaming body when final chunk is empty
Co-authored-by: Musa <musa@spherrrical.dev>
This commit is contained in:
parent
420431b30c
commit
51de194141
1 changed files with 33 additions and 16 deletions
|
|
@ -56,6 +56,8 @@ pub struct StreamContext {
|
|||
http_protocol: Option<String>,
|
||||
sse_buffer: Option<SseStreamBuffer>,
|
||||
sse_chunk_processor: Option<SseChunkProcessor>,
|
||||
/// Largest `body_size` seen for a non-streaming response (cumulative per Envoy).
|
||||
non_streaming_response_body_size: usize,
|
||||
}
|
||||
|
||||
impl StreamContext {
|
||||
|
|
@ -87,6 +89,7 @@ impl StreamContext {
|
|||
http_protocol: None,
|
||||
sse_buffer: None,
|
||||
sse_chunk_processor: None,
|
||||
non_streaming_response_body_size: 0,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1173,19 +1176,33 @@ impl HttpContext for StreamContext {
|
|||
}
|
||||
|
||||
let current_time = get_current_time().unwrap();
|
||||
if end_of_stream && body_size == 0 {
|
||||
debug!(
|
||||
"request_id={}: response body complete, total_bytes={}",
|
||||
self.request_identifier(),
|
||||
body_size
|
||||
);
|
||||
self.handle_end_of_request_metrics_and_traces(current_time);
|
||||
return Action::Continue;
|
||||
}
|
||||
|
||||
// Non-streaming upstream responses may arrive in multiple chunks; wait for the
|
||||
// full buffered body before parsing (body_size is cumulative on the final chunk).
|
||||
if !self.streaming_response && !end_of_stream {
|
||||
if !self.streaming_response {
|
||||
if body_size > self.non_streaming_response_body_size {
|
||||
self.non_streaming_response_body_size = body_size;
|
||||
}
|
||||
if !end_of_stream {
|
||||
return Action::Continue;
|
||||
}
|
||||
}
|
||||
|
||||
let effective_body_size = if self.streaming_response {
|
||||
body_size
|
||||
} else if body_size > 0 {
|
||||
body_size
|
||||
} else {
|
||||
self.non_streaming_response_body_size
|
||||
};
|
||||
|
||||
if end_of_stream && effective_body_size == 0 {
|
||||
debug!(
|
||||
"request_id={}: response body complete, total_bytes={}",
|
||||
self.request_identifier(),
|
||||
effective_body_size
|
||||
);
|
||||
self.handle_end_of_request_metrics_and_traces(current_time);
|
||||
return Action::Continue;
|
||||
}
|
||||
|
||||
|
|
@ -1200,15 +1217,15 @@ impl HttpContext for StreamContext {
|
|||
);
|
||||
|
||||
// For error responses, forward the upstream error directly without parsing
|
||||
if body_size > 0 {
|
||||
if let Ok(body) = self.read_raw_response_body(body_size) {
|
||||
if effective_body_size > 0 {
|
||||
if let Ok(body) = self.read_raw_response_body(effective_body_size) {
|
||||
debug!(
|
||||
"request_id={}: upstream error body: {}",
|
||||
self.request_identifier(),
|
||||
String::from_utf8_lossy(&body)
|
||||
);
|
||||
// Forward the error response as-is
|
||||
self.set_http_response_body(0, body_size, &body);
|
||||
self.set_http_response_body(0, effective_body_size, &body);
|
||||
}
|
||||
}
|
||||
return Action::Continue;
|
||||
|
|
@ -1233,7 +1250,7 @@ impl HttpContext for StreamContext {
|
|||
}
|
||||
}
|
||||
|
||||
let body = match self.read_raw_response_body(body_size) {
|
||||
let body = match self.read_raw_response_body(effective_body_size) {
|
||||
Ok(bytes) => bytes,
|
||||
Err(action) => return action,
|
||||
};
|
||||
|
|
@ -1249,14 +1266,14 @@ impl HttpContext for StreamContext {
|
|||
if self.streaming_response {
|
||||
match self.handle_streaming_response(&body, provider_id) {
|
||||
Ok(serialized_body) => {
|
||||
self.set_http_response_body(0, body_size, &serialized_body);
|
||||
self.set_http_response_body(0, effective_body_size, &serialized_body);
|
||||
}
|
||||
Err(action) => return action,
|
||||
}
|
||||
} else {
|
||||
match self.handle_non_streaming_response(&body, provider_id) {
|
||||
Ok(serialized_body) => {
|
||||
self.set_http_response_body(0, body_size, &serialized_body);
|
||||
self.set_http_response_body(0, effective_body_size, &serialized_body);
|
||||
}
|
||||
Err(action) => return action,
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue