mirror of
https://github.com/katanemo/plano.git
synced 2026-06-08 14:55:14 +02:00
fix(llm_gateway): read full non-streaming body with usize::MAX at end_of_stream
Co-authored-by: Musa <musa@spherrrical.dev>
This commit is contained in:
parent
51de194141
commit
0348ca1d11
1 changed files with 32 additions and 33 deletions
|
|
@ -56,8 +56,6 @@ pub struct StreamContext {
|
|||
http_protocol: Option<String>,
|
||||
sse_buffer: Option<SseStreamBuffer>,
|
||||
sse_chunk_processor: Option<SseChunkProcessor>,
|
||||
/// Largest `body_size` seen for a non-streaming response (cumulative per Envoy).
|
||||
non_streaming_response_body_size: usize,
|
||||
}
|
||||
|
||||
impl StreamContext {
|
||||
|
|
@ -89,7 +87,6 @@ impl StreamContext {
|
|||
http_protocol: None,
|
||||
sse_buffer: None,
|
||||
sse_chunk_processor: None,
|
||||
non_streaming_response_body_size: 0,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -437,17 +434,14 @@ impl StreamContext {
|
|||
}
|
||||
Ok(streaming_chunk)
|
||||
} else {
|
||||
if body_size == 0 {
|
||||
return Err(Action::Continue);
|
||||
}
|
||||
debug!(
|
||||
"request_id={}: upstream response complete, streaming=false body_size={}",
|
||||
self.request_identifier(),
|
||||
body_size
|
||||
);
|
||||
match self.get_http_response_body(0, body_size) {
|
||||
Some(body) => Ok(body),
|
||||
None => {
|
||||
match self.get_http_response_body(0, usize::MAX) {
|
||||
Some(body) if !body.is_empty() => Ok(body),
|
||||
_ => {
|
||||
warn!(
|
||||
"request_id={}: non streaming response body empty",
|
||||
self.request_identifier()
|
||||
|
|
@ -1178,29 +1172,16 @@ impl HttpContext for StreamContext {
|
|||
let current_time = get_current_time().unwrap();
|
||||
|
||||
// Non-streaming upstream responses may arrive in multiple chunks; wait for the
|
||||
// full buffered body before parsing (body_size is cumulative on the final chunk).
|
||||
if !self.streaming_response {
|
||||
if body_size > self.non_streaming_response_body_size {
|
||||
self.non_streaming_response_body_size = body_size;
|
||||
}
|
||||
if !end_of_stream {
|
||||
return Action::Continue;
|
||||
}
|
||||
// full buffered body before parsing.
|
||||
if !self.streaming_response && !end_of_stream {
|
||||
return Action::Continue;
|
||||
}
|
||||
|
||||
let effective_body_size = if self.streaming_response {
|
||||
body_size
|
||||
} else if body_size > 0 {
|
||||
body_size
|
||||
} else {
|
||||
self.non_streaming_response_body_size
|
||||
};
|
||||
|
||||
if end_of_stream && effective_body_size == 0 {
|
||||
if end_of_stream && body_size == 0 && self.streaming_response {
|
||||
debug!(
|
||||
"request_id={}: response body complete, total_bytes={}",
|
||||
self.request_identifier(),
|
||||
effective_body_size
|
||||
body_size
|
||||
);
|
||||
self.handle_end_of_request_metrics_and_traces(current_time);
|
||||
return Action::Continue;
|
||||
|
|
@ -1217,15 +1198,20 @@ impl HttpContext for StreamContext {
|
|||
);
|
||||
|
||||
// For error responses, forward the upstream error directly without parsing
|
||||
if effective_body_size > 0 {
|
||||
if let Ok(body) = self.read_raw_response_body(effective_body_size) {
|
||||
if let Ok(body) = self.read_raw_response_body(body_size) {
|
||||
if !body.is_empty() {
|
||||
debug!(
|
||||
"request_id={}: upstream error body: {}",
|
||||
self.request_identifier(),
|
||||
String::from_utf8_lossy(&body)
|
||||
);
|
||||
// Forward the error response as-is
|
||||
self.set_http_response_body(0, effective_body_size, &body);
|
||||
let replace_size = if self.streaming_response {
|
||||
body_size
|
||||
} else {
|
||||
body.len()
|
||||
};
|
||||
self.set_http_response_body(0, replace_size, &body);
|
||||
}
|
||||
}
|
||||
return Action::Continue;
|
||||
|
|
@ -1250,11 +1236,24 @@ impl HttpContext for StreamContext {
|
|||
}
|
||||
}
|
||||
|
||||
let body = match self.read_raw_response_body(effective_body_size) {
|
||||
let body = match self.read_raw_response_body(body_size) {
|
||||
Ok(bytes) => bytes,
|
||||
Err(action) => return action,
|
||||
};
|
||||
|
||||
if !self.streaming_response && body.is_empty() {
|
||||
if end_of_stream {
|
||||
self.handle_end_of_request_metrics_and_traces(current_time);
|
||||
}
|
||||
return Action::Continue;
|
||||
}
|
||||
|
||||
let replace_size = if self.streaming_response {
|
||||
body_size
|
||||
} else {
|
||||
body.len()
|
||||
};
|
||||
|
||||
debug!(
|
||||
"request_id={}: upstream raw response, body_size={} content={}",
|
||||
self.request_identifier(),
|
||||
|
|
@ -1266,14 +1265,14 @@ impl HttpContext for StreamContext {
|
|||
if self.streaming_response {
|
||||
match self.handle_streaming_response(&body, provider_id) {
|
||||
Ok(serialized_body) => {
|
||||
self.set_http_response_body(0, effective_body_size, &serialized_body);
|
||||
self.set_http_response_body(0, replace_size, &serialized_body);
|
||||
}
|
||||
Err(action) => return action,
|
||||
}
|
||||
} else {
|
||||
match self.handle_non_streaming_response(&body, provider_id) {
|
||||
Ok(serialized_body) => {
|
||||
self.set_http_response_body(0, effective_body_size, &serialized_body);
|
||||
self.set_http_response_body(0, replace_size, &serialized_body);
|
||||
}
|
||||
Err(action) => return action,
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue