mirror of
https://github.com/katanemo/plano.git
synced 2026-04-25 00:36:34 +02:00
fixed reasoning failures (#634)
* fixed reasoning failures * adding debugging * made several fixes for transmission isses for SSeEvents, incomplete handling of json types by anthropic, and wrote a bunch of tests * removed debugging from supervisord.conf --------- Co-authored-by: Salman Paracha <salmanparacha@MacBook-Pro-342.local>
This commit is contained in:
parent
2f9121407b
commit
48bbc7cce7
8 changed files with 360 additions and 68 deletions
|
|
@ -398,6 +398,8 @@ pub enum MessagesContentDelta {
|
|||
InputJsonDelta { partial_json: String },
|
||||
#[serde(rename = "thinking_delta")]
|
||||
ThinkingDelta { thinking: String },
|
||||
#[serde(rename = "signature_delta")]
|
||||
SignatureDelta { signature: String },
|
||||
}
|
||||
|
||||
#[skip_serializing_none]
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
pub mod sse;
|
||||
pub mod sse_chunk_processor;
|
||||
pub mod amazon_bedrock_binary_frame;
|
||||
pub mod anthropic_streaming_buffer;
|
||||
pub mod chat_completions_streaming_buffer;
|
||||
|
|
|
|||
|
|
@ -198,7 +198,15 @@ impl fmt::Display for SseEvent {
|
|||
// Into implementation to convert SseEvent to bytes for response buffer
|
||||
impl Into<Vec<u8>> for SseEvent {
|
||||
fn into(self) -> Vec<u8> {
|
||||
format!("{}\n\n", self.sse_transformed_lines).into_bytes()
|
||||
// For generated events (like ResponsesAPI), sse_transformed_lines already includes trailing \n\n
|
||||
// For parsed events (like passthrough), we need to add the \n\n separator
|
||||
if self.sse_transformed_lines.ends_with("\n\n") {
|
||||
// Already properly formatted with trailing newlines
|
||||
self.sse_transformed_lines.into_bytes()
|
||||
} else {
|
||||
// Add SSE event separator
|
||||
format!("{}\n\n", self.sse_transformed_lines).into_bytes()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,241 @@
|
|||
use crate::apis::streaming_shapes::sse::{SseEvent, SseStreamIter};
|
||||
use crate::clients::endpoints::{SupportedAPIsFromClient, SupportedUpstreamAPIs};
|
||||
|
||||
/// Stateful processor for handling SSE chunks that may contain incomplete events.
|
||||
///
|
||||
/// This processor buffers incomplete SSE event bytes when transformation fails
|
||||
/// (e.g., due to incomplete JSON) and prepends them to the next chunk for retry.
|
||||
pub struct SseChunkProcessor {
|
||||
/// Buffered bytes from incomplete SSE events across chunks
|
||||
incomplete_event_buffer: Vec<u8>,
|
||||
}
|
||||
|
||||
impl SseChunkProcessor {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
incomplete_event_buffer: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Process a chunk of SSE data, handling incomplete events across chunk boundaries.
|
||||
///
|
||||
/// Returns successfully transformed events. Incomplete events are buffered internally
|
||||
/// and will be retried when more data arrives in the next chunk.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `chunk` - Raw bytes from upstream SSE stream
|
||||
/// * `client_api` - The API format the client expects
|
||||
/// * `upstream_api` - The API format from the upstream provider
|
||||
///
|
||||
/// # Returns
|
||||
/// * `Ok(Vec<SseEvent>)` - Successfully transformed events ready for client
|
||||
/// * `Err(String)` - Fatal error that cannot be recovered by buffering
|
||||
pub fn process_chunk(
|
||||
&mut self,
|
||||
chunk: &[u8],
|
||||
client_api: &SupportedAPIsFromClient,
|
||||
upstream_api: &SupportedUpstreamAPIs,
|
||||
) -> Result<Vec<SseEvent>, String> {
|
||||
// Combine buffered incomplete event with new chunk
|
||||
let mut combined_data = std::mem::take(&mut self.incomplete_event_buffer);
|
||||
combined_data.extend_from_slice(chunk);
|
||||
|
||||
// Parse using SseStreamIter
|
||||
let sse_iter = match SseStreamIter::try_from(combined_data.as_slice()) {
|
||||
Ok(iter) => iter,
|
||||
Err(e) => return Err(format!("Failed to create SSE iterator: {}", e)),
|
||||
};
|
||||
|
||||
let mut transformed_events = Vec::new();
|
||||
|
||||
// Process each parsed SSE event
|
||||
for sse_event in sse_iter {
|
||||
// Try to transform the event (this is where incomplete JSON fails)
|
||||
match SseEvent::try_from((sse_event.clone(), client_api, upstream_api)) {
|
||||
Ok(transformed) => {
|
||||
// Successfully transformed - add to results
|
||||
transformed_events.push(transformed);
|
||||
}
|
||||
Err(e) => {
|
||||
// Check if this is incomplete JSON (EOF while parsing) vs other errors
|
||||
let error_str = e.to_string().to_lowercase();
|
||||
let is_incomplete_json = error_str.contains("eof while parsing")
|
||||
|| error_str.contains("unexpected end of json")
|
||||
|| error_str.contains("unexpected eof");
|
||||
|
||||
if is_incomplete_json {
|
||||
// Incomplete JSON - buffer for retry with next chunk
|
||||
self.incomplete_event_buffer = sse_event.raw_line.as_bytes().to_vec();
|
||||
break;
|
||||
} else {
|
||||
// Other error (unsupported event type, validation error, etc.)
|
||||
// Skip this event and continue processing others
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(transformed_events)
|
||||
}
|
||||
|
||||
/// Check if there are buffered incomplete bytes
|
||||
pub fn has_buffered_data(&self) -> bool {
|
||||
!self.incomplete_event_buffer.is_empty()
|
||||
}
|
||||
|
||||
/// Get the size of buffered incomplete data (for debugging/logging)
|
||||
pub fn buffered_size(&self) -> usize {
|
||||
self.incomplete_event_buffer.len()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::clients::endpoints::{SupportedAPIsFromClient, SupportedUpstreamAPIs};
|
||||
use crate::apis::openai::OpenAIApi;
|
||||
|
||||
#[test]
|
||||
fn test_complete_events_process_immediately() {
|
||||
let mut processor = SseChunkProcessor::new();
|
||||
let client_api = SupportedAPIsFromClient::OpenAIChatCompletions(OpenAIApi::ChatCompletions);
|
||||
let upstream_api = SupportedUpstreamAPIs::OpenAIChatCompletions(OpenAIApi::ChatCompletions);
|
||||
|
||||
let chunk1 = b"data: {\"id\":\"chatcmpl-123\",\"object\":\"chat.completion.chunk\",\"created\":1234567890,\"model\":\"gpt-4o\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"Hello\"},\"finish_reason\":null}]}\n\n";
|
||||
|
||||
let events = processor.process_chunk(chunk1, &client_api, &upstream_api).unwrap();
|
||||
|
||||
assert_eq!(events.len(), 1);
|
||||
assert!(!processor.has_buffered_data());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_incomplete_json_buffered_and_completed() {
|
||||
let mut processor = SseChunkProcessor::new();
|
||||
let client_api = SupportedAPIsFromClient::OpenAIChatCompletions(OpenAIApi::ChatCompletions);
|
||||
let upstream_api = SupportedUpstreamAPIs::OpenAIChatCompletions(OpenAIApi::ChatCompletions);
|
||||
|
||||
// First chunk with incomplete JSON
|
||||
let chunk1 = b"data: {\"id\":\"chatcmpl-123\",\"object\":\"chat.completion.chu";
|
||||
|
||||
let events1 = processor.process_chunk(chunk1, &client_api, &upstream_api).unwrap();
|
||||
|
||||
assert_eq!(events1.len(), 0, "Incomplete event should not be processed");
|
||||
assert!(processor.has_buffered_data(), "Incomplete data should be buffered");
|
||||
|
||||
// Second chunk completes the JSON
|
||||
let chunk2 = b"nk\",\"created\":1234567890,\"model\":\"gpt-4o\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"Hello\"},\"finish_reason\":null}]}\n\n";
|
||||
|
||||
let events2 = processor.process_chunk(chunk2, &client_api, &upstream_api).unwrap();
|
||||
|
||||
assert_eq!(events2.len(), 1, "Complete event should be processed");
|
||||
assert!(!processor.has_buffered_data(), "Buffer should be cleared after completion");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_multiple_events_with_one_incomplete() {
|
||||
let mut processor = SseChunkProcessor::new();
|
||||
let client_api = SupportedAPIsFromClient::OpenAIChatCompletions(OpenAIApi::ChatCompletions);
|
||||
let upstream_api = SupportedUpstreamAPIs::OpenAIChatCompletions(OpenAIApi::ChatCompletions);
|
||||
|
||||
// Chunk with 2 complete events and 1 incomplete
|
||||
let chunk = b"data: {\"id\":\"chatcmpl-123\",\"object\":\"chat.completion.chunk\",\"created\":1234567890,\"model\":\"gpt-4o\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"A\"},\"finish_reason\":null}]}\n\ndata: {\"id\":\"chatcmpl-124\",\"object\":\"chat.completion.chunk\",\"created\":1234567890,\"model\":\"gpt-4o\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"B\"},\"finish_reason\":null}]}\n\ndata: {\"id\":\"chatcmpl-125\",\"object\":\"chat.completion.chu";
|
||||
|
||||
let events = processor.process_chunk(chunk, &client_api, &upstream_api).unwrap();
|
||||
|
||||
assert_eq!(events.len(), 2, "Two complete events should be processed");
|
||||
assert!(processor.has_buffered_data(), "Incomplete third event should be buffered");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_anthropic_signature_delta_from_production_logs() {
|
||||
use crate::apis::anthropic::AnthropicApi;
|
||||
|
||||
let mut processor = SseChunkProcessor::new();
|
||||
let client_api = SupportedAPIsFromClient::AnthropicMessagesAPI(AnthropicApi::Messages);
|
||||
let upstream_api = SupportedUpstreamAPIs::AnthropicMessagesAPI(AnthropicApi::Messages);
|
||||
|
||||
// Exact chunk from production logs - signature_delta event followed by content_block_stop
|
||||
let chunk = br#"event: content_block_delta
|
||||
data: {"type":"content_block_delta","index":0,"delta":{"type":"signature_delta","signature":"ErECCkYIChgCKkC7lAf/BOatd0I4NnANYNEDKl5/WSsjNK44AETnLoy3i5FfdYMAb0m4qMLJD6A04QnM4Hf3VpGqq/snA/9vvNxCEgw3CYcHcj0aTdqOisQaDOhlVBtAUKkoh3WopSIwAbJp4jG/41vVWBj63eaR7KFJ37OdY1byjlPkaGDUJRcWc/YfUWIDSAToomq2fB4VKpgBk+swVYxLZ709gQvyTCT+3vO/I+yexZpkx6eBl/+YCgQXTeviZ+hTxSoPVayf5vEQoc19ZA4MEkZ7yBInRgk8vUxAJITSf+vOvDIBsElpgkLfSjARCasjh78wONg39AkAoIbKzU+Q2l1htUwXcqQ2b+b5DrY9+Oxae4pBVGQlWU36XAHsa/KG+ejfdwhWJM7FNL3uphwAf0oYAQ=="}}
|
||||
|
||||
event: content_block_stop
|
||||
data: {"type":"content_block_stop","index":0}
|
||||
|
||||
"#;
|
||||
|
||||
let result = processor.process_chunk(chunk, &client_api, &upstream_api);
|
||||
|
||||
match result {
|
||||
Ok(events) => {
|
||||
println!("Successfully processed {} events", events.len());
|
||||
for (i, event) in events.iter().enumerate() {
|
||||
println!("Event {}: event={:?}, has_data={}", i, event.event, event.data.is_some());
|
||||
}
|
||||
// Should successfully process both events (signature_delta + content_block_stop)
|
||||
assert!(events.len() >= 2, "Should process at least 2 complete events (signature_delta + stop), got {}", events.len());
|
||||
assert!(!processor.has_buffered_data(), "Complete events should not be buffered");
|
||||
}
|
||||
Err(e) => {
|
||||
panic!("Failed to process signature_delta chunk - this means SignatureDelta is not properly handled: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_unsupported_event_does_not_block_subsequent_events() {
|
||||
let mut processor = SseChunkProcessor::new();
|
||||
let client_api = SupportedAPIsFromClient::OpenAIChatCompletions(OpenAIApi::ChatCompletions);
|
||||
let upstream_api = SupportedUpstreamAPIs::OpenAIChatCompletions(OpenAIApi::ChatCompletions);
|
||||
|
||||
// Chunk with an unsupported/invalid event followed by a valid event
|
||||
// First event has invalid JSON structure that will fail validation (not incomplete)
|
||||
// Second event is valid and should be processed
|
||||
let chunk = b"data: {\"id\":\"chatcmpl-123\",\"object\":\"chat.completion.chunk\",\"created\":1234567890,\"model\":\"gpt-4o\",\"choices\":[{\"index\":0,\"delta\":{\"unsupported_field_causing_validation_error\":true},\"finish_reason\":null}]}\n\ndata: {\"id\":\"chatcmpl-124\",\"object\":\"chat.completion.chunk\",\"created\":1234567890,\"model\":\"gpt-4o\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"Hello\"},\"finish_reason\":null}]}\n\n";
|
||||
|
||||
let events = processor.process_chunk(chunk, &client_api, &upstream_api).unwrap();
|
||||
|
||||
// Should skip the invalid event and process the valid one
|
||||
// (If we were buffering all errors, we'd get 0 events and have buffered data)
|
||||
assert!(events.len() >= 1, "Should process at least the valid event, got {} events", events.len());
|
||||
assert!(!processor.has_buffered_data(), "Invalid (non-incomplete) events should not be buffered");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_unknown_delta_type_skipped_others_processed() {
|
||||
use crate::apis::anthropic::AnthropicApi;
|
||||
|
||||
let mut processor = SseChunkProcessor::new();
|
||||
let client_api = SupportedAPIsFromClient::AnthropicMessagesAPI(AnthropicApi::Messages);
|
||||
let upstream_api = SupportedUpstreamAPIs::AnthropicMessagesAPI(AnthropicApi::Messages);
|
||||
|
||||
// Chunk with valid event, unsupported delta type, then another valid event
|
||||
// This simulates a future API change where Anthropic adds a new delta type we don't support yet
|
||||
let chunk = br#"event: content_block_delta
|
||||
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}
|
||||
|
||||
event: content_block_delta
|
||||
data: {"type":"content_block_delta","index":0,"delta":{"type":"future_unsupported_delta","future_field":"some_value"}}
|
||||
|
||||
event: content_block_delta
|
||||
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" World"}}
|
||||
|
||||
"#;
|
||||
|
||||
let result = processor.process_chunk(chunk, &client_api, &upstream_api);
|
||||
|
||||
match result {
|
||||
Ok(events) => {
|
||||
println!("Processed {} events (unsupported event should be skipped)", events.len());
|
||||
// Should process the 2 valid text_delta events and skip the unsupported one
|
||||
// We expect at least 2 events (the valid ones), unsupported should be skipped
|
||||
assert!(events.len() >= 2, "Should process at least 2 valid events, got {}", events.len());
|
||||
assert!(!processor.has_buffered_data(), "Unsupported events should be skipped, not buffered");
|
||||
}
|
||||
Err(e) => {
|
||||
panic!("Should not fail on unsupported delta type, should skip it: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -189,7 +189,10 @@ impl TryFrom<ChatCompletionsResponse> for ResponsesAPIResponse {
|
|||
top_p: 1.0,
|
||||
metadata: resp.metadata.unwrap_or_default(),
|
||||
truncation: None,
|
||||
reasoning: None,
|
||||
reasoning: Some(crate::apis::openai_responses::Reasoning {
|
||||
effort: None,
|
||||
summary: None,
|
||||
}),
|
||||
store: None,
|
||||
text: None,
|
||||
audio: None,
|
||||
|
|
|
|||
|
|
@ -364,6 +364,23 @@ fn convert_content_delta(
|
|||
None,
|
||||
None,
|
||||
)),
|
||||
MessagesContentDelta::SignatureDelta { signature: _ } => {
|
||||
// Signature delta is cryptographic verification metadata, not content
|
||||
// Create an empty delta chunk to maintain stream continuity
|
||||
Ok(create_openai_chunk(
|
||||
"stream",
|
||||
"unknown",
|
||||
MessageDelta {
|
||||
role: None,
|
||||
content: None,
|
||||
refusal: None,
|
||||
function_call: None,
|
||||
tool_calls: None,
|
||||
},
|
||||
None,
|
||||
None,
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -20,9 +20,8 @@ use common::ratelimit::Header;
|
|||
use common::stats::{IncrementingMetric, RecordingMetric};
|
||||
use common::{ratelimit, routing, tokenizer};
|
||||
use hermesllm::apis::streaming_shapes::amazon_bedrock_binary_frame::BedrockBinaryFrameDecoder;
|
||||
use hermesllm::apis::streaming_shapes::sse::{
|
||||
SseEvent, SseStreamBuffer, SseStreamBufferTrait, SseStreamIter,
|
||||
};
|
||||
use hermesllm::apis::streaming_shapes::sse::{SseEvent, SseStreamBuffer, SseStreamBufferTrait};
|
||||
use hermesllm::apis::streaming_shapes::sse_chunk_processor::SseChunkProcessor;
|
||||
use hermesllm::clients::endpoints::SupportedAPIsFromClient;
|
||||
use hermesllm::providers::response::ProviderResponse;
|
||||
use hermesllm::providers::streaming_response::ProviderStreamResponse;
|
||||
|
|
@ -55,6 +54,7 @@ pub struct StreamContext {
|
|||
http_method: Option<String>,
|
||||
http_protocol: Option<String>,
|
||||
sse_buffer: Option<SseStreamBuffer>,
|
||||
sse_chunk_processor: Option<SseChunkProcessor>,
|
||||
}
|
||||
|
||||
impl StreamContext {
|
||||
|
|
@ -85,6 +85,7 @@ impl StreamContext {
|
|||
http_method: None,
|
||||
http_protocol: None,
|
||||
sse_buffer: None,
|
||||
sse_chunk_processor: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -138,7 +139,7 @@ impl StreamContext {
|
|||
));
|
||||
|
||||
info!(
|
||||
"[ARCHGW_REQ_ID:{}] PROVIDER_SELECTION: Hint='{}' -> Selected='{}'",
|
||||
"[PLANO_REQ_ID:{}] PROVIDER_SELECTION: Hint='{}' -> Selected='{}'",
|
||||
self.request_identifier(),
|
||||
self.get_http_request_header(ARCH_PROVIDER_HINT_HEADER)
|
||||
.unwrap_or("none".to_string()),
|
||||
|
|
@ -223,7 +224,7 @@ impl StreamContext {
|
|||
let token_count = tokenizer::token_count(model, json_string).unwrap_or(0);
|
||||
|
||||
debug!(
|
||||
"[ARCHGW_REQ_ID:{}] TOKEN_COUNT: model='{}' input_tokens={}",
|
||||
"[PLANO_REQ_ID:{}] TOKEN_COUNT: model='{}' input_tokens={}",
|
||||
self.request_identifier(),
|
||||
model,
|
||||
token_count
|
||||
|
|
@ -237,7 +238,7 @@ impl StreamContext {
|
|||
// Check if rate limiting needs to be applied.
|
||||
if let Some(selector) = self.ratelimit_selector.take() {
|
||||
info!(
|
||||
"[ARCHGW_REQ_ID:{}] RATELIMIT_CHECK: model='{}' selector='{}:{}'",
|
||||
"[PLANO_REQ_ID:{}] RATELIMIT_CHECK: model='{}' selector='{}:{}'",
|
||||
self.request_identifier(),
|
||||
model,
|
||||
selector.key,
|
||||
|
|
@ -250,7 +251,7 @@ impl StreamContext {
|
|||
)?;
|
||||
} else {
|
||||
debug!(
|
||||
"[ARCHGW_REQ_ID:{}] RATELIMIT_SKIP: model='{}' (no selector)",
|
||||
"[PLANO_REQ_ID:{}] RATELIMIT_SKIP: model='{}' (no selector)",
|
||||
self.request_identifier(),
|
||||
model
|
||||
);
|
||||
|
|
@ -269,7 +270,7 @@ impl StreamContext {
|
|||
Ok(duration) => {
|
||||
let duration_ms = duration.as_millis();
|
||||
info!(
|
||||
"[ARCHGW_REQ_ID:{}] TIME_TO_FIRST_TOKEN: {}ms",
|
||||
"[PLANO_REQ_ID:{}] TIME_TO_FIRST_TOKEN: {}ms",
|
||||
self.request_identifier(),
|
||||
duration_ms
|
||||
);
|
||||
|
|
@ -278,7 +279,7 @@ impl StreamContext {
|
|||
}
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"[ARCHGW_REQ_ID:{}] TIME_MEASUREMENT_ERROR: {:?}",
|
||||
"[PLANO_REQ_ID:{}] TIME_MEASUREMENT_ERROR: {:?}",
|
||||
self.request_identifier(),
|
||||
e
|
||||
);
|
||||
|
|
@ -294,7 +295,7 @@ impl StreamContext {
|
|||
// Convert the duration to milliseconds
|
||||
let duration_ms = duration.as_millis();
|
||||
info!(
|
||||
"[ARCHGW_REQ_ID:{}] REQUEST_COMPLETE: latency={}ms tokens={}",
|
||||
"[PLANO_REQ_ID:{}] REQUEST_COMPLETE: latency={}ms tokens={}",
|
||||
self.request_identifier(),
|
||||
duration_ms,
|
||||
self.response_tokens
|
||||
|
|
@ -310,7 +311,7 @@ impl StreamContext {
|
|||
self.metrics.time_per_output_token.record(tpot);
|
||||
|
||||
info!(
|
||||
"[ARCHGW_REQ_ID:{}] TOKEN_THROUGHPUT: time_per_token={}ms tokens_per_second={}",
|
||||
"[PLANO_REQ_ID:{}] TOKEN_THROUGHPUT: time_per_token={}ms tokens_per_second={}",
|
||||
self.request_identifier(),
|
||||
tpot,
|
||||
1000 / tpot
|
||||
|
|
@ -333,7 +334,7 @@ impl StreamContext {
|
|||
if self.streaming_response {
|
||||
let chunk_size = body_size;
|
||||
debug!(
|
||||
"[ARCHGW_REQ_ID:{}] UPSTREAM_RESPONSE_CHUNK: streaming=true chunk_size={}",
|
||||
"[PLANO_REQ_ID:{}] UPSTREAM_RESPONSE_CHUNK: streaming=true chunk_size={}",
|
||||
self.request_identifier(),
|
||||
chunk_size
|
||||
);
|
||||
|
|
@ -341,7 +342,7 @@ impl StreamContext {
|
|||
Some(chunk) => chunk,
|
||||
None => {
|
||||
warn!(
|
||||
"[ARCHGW_REQ_ID:{}] UPSTREAM_RESPONSE_ERROR: empty chunk, size={}",
|
||||
"[PLANO_REQ_ID:{}] UPSTREAM_RESPONSE_ERROR: empty chunk, size={}",
|
||||
self.request_identifier(),
|
||||
chunk_size
|
||||
);
|
||||
|
|
@ -351,7 +352,7 @@ impl StreamContext {
|
|||
|
||||
if streaming_chunk.len() != chunk_size {
|
||||
warn!(
|
||||
"[ARCHGW_REQ_ID:{}] UPSTREAM_RESPONSE_MISMATCH: expected={} actual={}",
|
||||
"[PLANO_REQ_ID:{}] UPSTREAM_RESPONSE_MISMATCH: expected={} actual={}",
|
||||
self.request_identifier(),
|
||||
chunk_size,
|
||||
streaming_chunk.len()
|
||||
|
|
@ -363,7 +364,7 @@ impl StreamContext {
|
|||
return Err(Action::Continue);
|
||||
}
|
||||
debug!(
|
||||
"[ARCHGW_REQ_ID:{}] UPSTREAM_RESPONSE_COMPLETE: streaming=false body_size={}",
|
||||
"[PLANO_REQ_ID:{}] UPSTREAM_RESPONSE_COMPLETE: streaming=false body_size={}",
|
||||
self.request_identifier(),
|
||||
body_size
|
||||
);
|
||||
|
|
@ -383,7 +384,7 @@ impl StreamContext {
|
|||
provider_id: ProviderId,
|
||||
) -> Result<Vec<u8>, Action> {
|
||||
debug!(
|
||||
"[ARCHGW_REQ_ID:{}] STREAMING_PROCESS: client={:?} provider_id={:?} chunk_size={}",
|
||||
"[PLANO_REQ_ID:{}] STREAMING_PROCESS: client={:?} provider_id={:?} chunk_size={}",
|
||||
self.request_identifier(),
|
||||
self.client_api,
|
||||
provider_id,
|
||||
|
|
@ -403,15 +404,10 @@ impl StreamContext {
|
|||
return self.handle_bedrock_binary_stream(body, &client_api, &upstream_api);
|
||||
}
|
||||
|
||||
// Parse body into SSE iterator using TryFrom
|
||||
let sse_iter: SseStreamIter<std::vec::IntoIter<String>> =
|
||||
match SseStreamIter::try_from(body) {
|
||||
Ok(iter) => iter,
|
||||
Err(e) => {
|
||||
warn!("Failed to parse body into SSE iterator: {}", e);
|
||||
return Err(Action::Continue);
|
||||
}
|
||||
};
|
||||
// Initialize SSE chunk processor if not present
|
||||
if self.sse_chunk_processor.is_none() {
|
||||
self.sse_chunk_processor = Some(SseChunkProcessor::new());
|
||||
}
|
||||
|
||||
// Initialize SSE buffer if not present
|
||||
if self.sse_buffer.is_none() {
|
||||
|
|
@ -425,18 +421,42 @@ impl StreamContext {
|
|||
};
|
||||
}
|
||||
|
||||
// Process each SSE event
|
||||
for sse_event in sse_iter {
|
||||
// Transform event if upstream API != client API
|
||||
let transformed_event: SseEvent =
|
||||
match SseEvent::try_from((sse_event, &client_api, &upstream_api)) {
|
||||
Ok(event) => event,
|
||||
// Process chunk through SSE processor (handles incomplete events)
|
||||
let transformed_events = match self.sse_chunk_processor.as_mut() {
|
||||
Some(processor) => {
|
||||
let result = processor.process_chunk(body, &client_api, &upstream_api);
|
||||
let has_buffered = processor.has_buffered_data();
|
||||
let buffered_size = processor.buffered_size();
|
||||
|
||||
match result {
|
||||
Ok(events) => {
|
||||
if has_buffered {
|
||||
debug!(
|
||||
"[PLANO_REQ_ID:{}] SSE_INCOMPLETE_BUFFERED: {} bytes buffered for next chunk",
|
||||
self.request_identifier(),
|
||||
buffered_size
|
||||
);
|
||||
}
|
||||
events
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Failed to transform SSE event: {}", e);
|
||||
warn!(
|
||||
"[PLANO_REQ_ID:{}] SSE_CHUNK_PROCESS_ERROR: {}",
|
||||
self.request_identifier(),
|
||||
e
|
||||
);
|
||||
return Err(Action::Continue);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
None => {
|
||||
warn!("SSE chunk processor unexpectedly missing");
|
||||
return Err(Action::Continue);
|
||||
}
|
||||
};
|
||||
|
||||
// Process each successfully transformed SSE event
|
||||
for transformed_event in transformed_events {
|
||||
// Extract ProviderStreamResponse for processing (token counting, etc.)
|
||||
if !transformed_event.is_done() && !transformed_event.is_event_only() {
|
||||
match transformed_event.provider_response() {
|
||||
|
|
@ -445,7 +465,7 @@ impl StreamContext {
|
|||
|
||||
if provider_response.is_final() {
|
||||
debug!(
|
||||
"[ARCHGW_REQ_ID:{}] STREAMING_FINAL_CHUNK: total_tokens={}",
|
||||
"[PLANO_REQ_ID:{}] STREAMING_FINAL_CHUNK: total_tokens={}",
|
||||
self.request_identifier(),
|
||||
self.response_tokens
|
||||
);
|
||||
|
|
@ -455,7 +475,7 @@ impl StreamContext {
|
|||
let estimated_tokens = content.len() / 4;
|
||||
self.response_tokens += estimated_tokens.max(1);
|
||||
debug!(
|
||||
"[ARCHGW_REQ_ID:{}] STREAMING_TOKEN_UPDATE: delta_chars={} estimated_tokens={} total_tokens={}",
|
||||
"[PLANO_REQ_ID:{}] STREAMING_TOKEN_UPDATE: delta_chars={} estimated_tokens={} total_tokens={}",
|
||||
self.request_identifier(),
|
||||
content.len(),
|
||||
estimated_tokens.max(1),
|
||||
|
|
@ -465,7 +485,7 @@ impl StreamContext {
|
|||
}
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"[ARCHGW_REQ_ID:{}] STREAMING_CHUNK_ERROR: {}",
|
||||
"[PLANO_REQ_ID:{}] STREAMING_CHUNK_ERROR: {}",
|
||||
self.request_identifier(),
|
||||
e
|
||||
);
|
||||
|
|
@ -487,7 +507,7 @@ impl StreamContext {
|
|||
if !bytes.is_empty() {
|
||||
let content = String::from_utf8_lossy(&bytes);
|
||||
debug!(
|
||||
"[ARCHGW_REQ_ID:{}] UPSTREAM_TRANSFORMED_CLIENT_RESPONSE: size={} content={}",
|
||||
"[PLANO_REQ_ID:{}] UPSTREAM_TRANSFORMED_CLIENT_RESPONSE: size={} content={}",
|
||||
self.request_identifier(),
|
||||
bytes.len(),
|
||||
content
|
||||
|
|
@ -525,7 +545,7 @@ impl StreamContext {
|
|||
Ok(buffer) => Some(buffer),
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"[ARCHGW_REQ_ID:{}] BEDROCK_BUFFER_INIT_ERROR: {}",
|
||||
"[PLANO_REQ_ID:{}] BEDROCK_BUFFER_INIT_ERROR: {}",
|
||||
self.request_identifier(),
|
||||
e
|
||||
);
|
||||
|
|
@ -555,7 +575,7 @@ impl StreamContext {
|
|||
let estimated_tokens = content.len() / 4;
|
||||
self.response_tokens += estimated_tokens.max(1);
|
||||
debug!(
|
||||
"[ARCHGW_REQ_ID:{}] BEDROCK_TOKEN_UPDATE: delta_chars={} estimated_tokens={} total_tokens={}",
|
||||
"[PLANO_REQ_ID:{}] BEDROCK_TOKEN_UPDATE: delta_chars={} estimated_tokens={} total_tokens={}",
|
||||
self.request_identifier(),
|
||||
content.len(),
|
||||
estimated_tokens.max(1),
|
||||
|
|
@ -573,7 +593,7 @@ impl StreamContext {
|
|||
}
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"[ARCHGW_REQ_ID:{}] BEDROCK_FRAME_CONVERSION_ERROR: {}",
|
||||
"[PLANO_REQ_ID:{}] BEDROCK_FRAME_CONVERSION_ERROR: {}",
|
||||
self.request_identifier(),
|
||||
e
|
||||
);
|
||||
|
|
@ -583,7 +603,7 @@ impl StreamContext {
|
|||
Some(DecodedFrame::Incomplete) => {
|
||||
// Incomplete frame - buffer retains partial data, wait for more bytes
|
||||
debug!(
|
||||
"[ARCHGW_REQ_ID:{}] BEDROCK_INCOMPLETE_FRAME: waiting for more data",
|
||||
"[PLANO_REQ_ID:{}] BEDROCK_INCOMPLETE_FRAME: waiting for more data",
|
||||
self.request_identifier()
|
||||
);
|
||||
break;
|
||||
|
|
@ -591,7 +611,7 @@ impl StreamContext {
|
|||
None => {
|
||||
// Decode error
|
||||
warn!(
|
||||
"[ARCHGW_REQ_ID:{}] BEDROCK_DECODE_ERROR",
|
||||
"[PLANO_REQ_ID:{}] BEDROCK_DECODE_ERROR",
|
||||
self.request_identifier()
|
||||
);
|
||||
return Err(Action::Continue);
|
||||
|
|
@ -606,7 +626,7 @@ impl StreamContext {
|
|||
if !bytes.is_empty() {
|
||||
let content = String::from_utf8_lossy(&bytes);
|
||||
debug!(
|
||||
"[ARCHGW_REQ_ID:{}] UPSTREAM_TRANSFORMED_CLIENT_RESPONSE: size={} content={}",
|
||||
"[PLANO_REQ_ID:{}] UPSTREAM_TRANSFORMED_CLIENT_RESPONSE: size={} content={}",
|
||||
self.request_identifier(),
|
||||
bytes.len(),
|
||||
content
|
||||
|
|
@ -616,7 +636,7 @@ impl StreamContext {
|
|||
}
|
||||
None => {
|
||||
warn!(
|
||||
"[ARCHGW_REQ_ID:{}] BEDROCK_BUFFER_MISSING",
|
||||
"[PLANO_REQ_ID:{}] BEDROCK_BUFFER_MISSING",
|
||||
self.request_identifier()
|
||||
);
|
||||
Err(Action::Continue)
|
||||
|
|
@ -630,7 +650,7 @@ impl StreamContext {
|
|||
provider_id: ProviderId,
|
||||
) -> Result<Vec<u8>, Action> {
|
||||
debug!(
|
||||
"[ARCHGW_REQ_ID:{}] NON_STREAMING_PROCESS: provider_id={:?} body_size={}",
|
||||
"[PLANO_REQ_ID:{}] NON_STREAMING_PROCESS: provider_id={:?} body_size={}",
|
||||
self.request_identifier(),
|
||||
provider_id,
|
||||
body.len()
|
||||
|
|
@ -642,7 +662,7 @@ impl StreamContext {
|
|||
Ok(response) => response,
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"[ARCHGW_REQ_ID:{}] UPSTREAM_RESPONSE_PARSE_ERROR: {} | body: {}",
|
||||
"[PLANO_REQ_ID:{}] UPSTREAM_RESPONSE_PARSE_ERROR: {} | body: {}",
|
||||
self.request_identifier(),
|
||||
e,
|
||||
String::from_utf8_lossy(body)
|
||||
|
|
@ -657,7 +677,7 @@ impl StreamContext {
|
|||
}
|
||||
None => {
|
||||
warn!(
|
||||
"[ARCHGW_REQ_ID:{}] UPSTREAM_RESPONSE_ERROR: missing client_api",
|
||||
"[PLANO_REQ_ID:{}] UPSTREAM_RESPONSE_ERROR: missing client_api",
|
||||
self.request_identifier()
|
||||
);
|
||||
return Err(Action::Continue);
|
||||
|
|
@ -669,7 +689,7 @@ impl StreamContext {
|
|||
response.extract_usage_counts()
|
||||
{
|
||||
debug!(
|
||||
"[ARCHGW_REQ_ID:{}] RESPONSE_USAGE: prompt_tokens={} completion_tokens={} total_tokens={}",
|
||||
"[PLANO_REQ_ID:{}] RESPONSE_USAGE: prompt_tokens={} completion_tokens={} total_tokens={}",
|
||||
self.request_identifier(),
|
||||
prompt_tokens,
|
||||
completion_tokens,
|
||||
|
|
@ -678,7 +698,7 @@ impl StreamContext {
|
|||
self.response_tokens = completion_tokens;
|
||||
} else {
|
||||
warn!(
|
||||
"[ARCHGW_REQ_ID:{}] RESPONSE_USAGE: no usage information found",
|
||||
"[PLANO_REQ_ID:{}] RESPONSE_USAGE: no usage information found",
|
||||
self.request_identifier()
|
||||
);
|
||||
}
|
||||
|
|
@ -686,7 +706,7 @@ impl StreamContext {
|
|||
match serde_json::to_vec(&response) {
|
||||
Ok(bytes) => {
|
||||
debug!(
|
||||
"[ARCHGW_REQ_ID:{}] CLIENT_RESPONSE_PAYLOAD: {}",
|
||||
"[PLANO_REQ_ID:{}] CLIENT_RESPONSE_PAYLOAD: {}",
|
||||
self.request_identifier(),
|
||||
String::from_utf8_lossy(&bytes)
|
||||
);
|
||||
|
|
@ -763,7 +783,7 @@ impl HttpContext for StreamContext {
|
|||
Some(provider_id.compatible_api_for_client(api, self.streaming_response));
|
||||
|
||||
debug!(
|
||||
"[ARCHGW_REQ_ID:{}] ROUTING_INFO: provider='{}' client_api={:?} resolved_api={:?} request_path='{}'",
|
||||
"[PLANO_REQ_ID:{}] ROUTING_INFO: provider='{}' client_api={:?} resolved_api={:?} request_path='{}'",
|
||||
self.request_identifier(),
|
||||
provider.to_provider_id(),
|
||||
api,
|
||||
|
|
@ -816,7 +836,7 @@ impl HttpContext for StreamContext {
|
|||
|
||||
fn on_http_request_body(&mut self, body_size: usize, end_of_stream: bool) -> Action {
|
||||
debug!(
|
||||
"[ARCHGW_REQ_ID:{}] REQUEST_BODY_CHUNK: bytes={} end_stream={}",
|
||||
"[PLANO_REQ_ID:{}] REQUEST_BODY_CHUNK: bytes={} end_stream={}",
|
||||
self.request_identifier(),
|
||||
body_size,
|
||||
end_of_stream
|
||||
|
|
@ -855,14 +875,14 @@ impl HttpContext for StreamContext {
|
|||
let mut deserialized_client_request: ProviderRequestType = match self.client_api.as_ref() {
|
||||
Some(the_client_api) => {
|
||||
info!(
|
||||
"[ARCHGW_REQ_ID:{}] CLIENT_REQUEST_RECEIVED: api={:?} body_size={}",
|
||||
"[PLANO_REQ_ID:{}] CLIENT_REQUEST_RECEIVED: api={:?} body_size={}",
|
||||
self.request_identifier(),
|
||||
the_client_api,
|
||||
body_bytes.len()
|
||||
);
|
||||
|
||||
debug!(
|
||||
"[ARCHGW_REQ_ID:{}] CLIENT_REQUEST_PAYLOAD: {}",
|
||||
"[PLANO_REQ_ID:{}] CLIENT_REQUEST_PAYLOAD: {}",
|
||||
self.request_identifier(),
|
||||
String::from_utf8_lossy(&body_bytes)
|
||||
);
|
||||
|
|
@ -871,7 +891,7 @@ impl HttpContext for StreamContext {
|
|||
Ok(deserialized) => deserialized,
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"[ARCHGW_REQ_ID:{}] CLIENT_REQUEST_PARSE_ERROR: {} | body: {}",
|
||||
"[PLANO_REQ_ID:{}] CLIENT_REQUEST_PARSE_ERROR: {} | body: {}",
|
||||
self.request_identifier(),
|
||||
e,
|
||||
String::from_utf8_lossy(&body_bytes)
|
||||
|
|
@ -914,7 +934,7 @@ impl HttpContext for StreamContext {
|
|||
"agent_orchestrator".to_string()
|
||||
} else {
|
||||
warn!(
|
||||
"[ARCHGW_REQ_ID:{}] MODEL_RESOLUTION_ERROR: no model specified | req_model='{}' provider='{}' config_model={:?}",
|
||||
"[PLANO_REQ_ID:{}] MODEL_RESOLUTION_ERROR: no model specified | req_model='{}' provider='{}' config_model={:?}",
|
||||
self.request_identifier(),
|
||||
model_requested,
|
||||
self.llm_provider().name,
|
||||
|
|
@ -943,7 +963,7 @@ impl HttpContext for StreamContext {
|
|||
self.user_message = deserialized_client_request.get_recent_user_message();
|
||||
|
||||
info!(
|
||||
"[ARCHGW_REQ_ID:{}] MODEL_RESOLUTION: req_model='{}' -> resolved_model='{}' provider='{}' streaming={}",
|
||||
"[PLANO_REQ_ID:{}] MODEL_RESOLUTION: req_model='{}' -> resolved_model='{}' provider='{}' streaming={}",
|
||||
self.request_identifier(),
|
||||
model_requested,
|
||||
resolved_model,
|
||||
|
|
@ -974,14 +994,14 @@ impl HttpContext for StreamContext {
|
|||
match self.resolved_api.as_ref() {
|
||||
Some(upstream) => {
|
||||
info!(
|
||||
"[ARCHGW_REQ_ID:{}] UPSTREAM_TRANSFORM: client_api={:?} -> upstream_api={:?}",
|
||||
"[PLANO_REQ_ID:{}] UPSTREAM_TRANSFORM: client_api={:?} -> upstream_api={:?}",
|
||||
self.request_identifier(), self.client_api, upstream
|
||||
);
|
||||
|
||||
match ProviderRequestType::try_from((deserialized_client_request, upstream)) {
|
||||
Ok(request) => {
|
||||
debug!(
|
||||
"[ARCHGW_REQ_ID:{}] UPSTREAM_REQUEST_PAYLOAD: {}",
|
||||
"[PLANO_REQ_ID:{}] UPSTREAM_REQUEST_PAYLOAD: {}",
|
||||
self.request_identifier(),
|
||||
String::from_utf8_lossy(&request.to_bytes().unwrap_or_default())
|
||||
);
|
||||
|
|
@ -1032,7 +1052,7 @@ impl HttpContext for StreamContext {
|
|||
self.upstream_status_code = StatusCode::from_u16(status_code).ok();
|
||||
|
||||
debug!(
|
||||
"[ARCHGW_REQ_ID:{}] UPSTREAM_RESPONSE_STATUS: {}",
|
||||
"[PLANO_REQ_ID:{}] UPSTREAM_RESPONSE_STATUS: {}",
|
||||
self.request_identifier(),
|
||||
status_code
|
||||
);
|
||||
|
|
@ -1059,7 +1079,7 @@ impl HttpContext for StreamContext {
|
|||
let current_time = get_current_time().unwrap();
|
||||
if end_of_stream && body_size == 0 {
|
||||
debug!(
|
||||
"[ARCHGW_REQ_ID:{}] RESPONSE_BODY_COMPLETE: total_bytes={}",
|
||||
"[PLANO_REQ_ID:{}] RESPONSE_BODY_COMPLETE: total_bytes={}",
|
||||
self.request_identifier(),
|
||||
body_size
|
||||
);
|
||||
|
|
@ -1071,7 +1091,7 @@ impl HttpContext for StreamContext {
|
|||
if let Some(status_code) = &self.upstream_status_code {
|
||||
if status_code.is_client_error() || status_code.is_server_error() {
|
||||
info!(
|
||||
"[ARCHGW_REQ_ID:{}] UPSTREAM_ERROR_RESPONSE: status={} body_size={}",
|
||||
"[PLANO_REQ_ID:{}] UPSTREAM_ERROR_RESPONSE: status={} body_size={}",
|
||||
self.request_identifier(),
|
||||
status_code.as_u16(),
|
||||
body_size
|
||||
|
|
@ -1081,7 +1101,7 @@ impl HttpContext for StreamContext {
|
|||
if body_size > 0 {
|
||||
if let Ok(body) = self.read_raw_response_body(body_size) {
|
||||
debug!(
|
||||
"[ARCHGW_REQ_ID:{}] UPSTREAM_ERROR_BODY: {}",
|
||||
"[PLANO_REQ_ID:{}] UPSTREAM_ERROR_BODY: {}",
|
||||
self.request_identifier(),
|
||||
String::from_utf8_lossy(&body)
|
||||
);
|
||||
|
|
@ -1103,7 +1123,7 @@ impl HttpContext for StreamContext {
|
|||
None => "None".to_string(),
|
||||
};
|
||||
info!(
|
||||
"[ARCHGW_REQ_ID:{}], UNSUPPORTED API: {}",
|
||||
"[PLANO_REQ_ID:{}], UNSUPPORTED API: {}",
|
||||
self.request_identifier(),
|
||||
api_info
|
||||
);
|
||||
|
|
@ -1117,7 +1137,7 @@ impl HttpContext for StreamContext {
|
|||
};
|
||||
|
||||
debug!(
|
||||
"[ARCHGW_REQ_ID:{}] UPSTREAM_RAW_RESPONSE: body_size={} content={}",
|
||||
"[PLANO_REQ_ID:{}] UPSTREAM_RAW_RESPONSE: body_size={} content={}",
|
||||
self.request_identifier(),
|
||||
body.len(),
|
||||
String::from_utf8_lossy(&body)
|
||||
|
|
|
|||
|
|
@ -595,7 +595,7 @@ def test_openai_responses_api_streaming_with_tools_upstream_anthropic():
|
|||
|
||||
stream = client.responses.create(
|
||||
model="claude-sonnet-4-20250514",
|
||||
input="Call the echo tool",
|
||||
input="Call the echo tool with hello_world",
|
||||
tools=tools,
|
||||
stream=True,
|
||||
)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue