diff --git a/crates/hermesllm/src/apis/anthropic.rs b/crates/hermesllm/src/apis/anthropic.rs index 20f422e2..2e73e1c2 100644 --- a/crates/hermesllm/src/apis/anthropic.rs +++ b/crates/hermesllm/src/apis/anthropic.rs @@ -398,6 +398,8 @@ pub enum MessagesContentDelta { InputJsonDelta { partial_json: String }, #[serde(rename = "thinking_delta")] ThinkingDelta { thinking: String }, + #[serde(rename = "signature_delta")] + SignatureDelta { signature: String }, } #[skip_serializing_none] diff --git a/crates/hermesllm/src/apis/streaming_shapes/mod.rs b/crates/hermesllm/src/apis/streaming_shapes/mod.rs index 1ef7acc7..4db3b094 100644 --- a/crates/hermesllm/src/apis/streaming_shapes/mod.rs +++ b/crates/hermesllm/src/apis/streaming_shapes/mod.rs @@ -1,4 +1,5 @@ pub mod sse; +pub mod sse_chunk_processor; pub mod amazon_bedrock_binary_frame; pub mod anthropic_streaming_buffer; pub mod chat_completions_streaming_buffer; diff --git a/crates/hermesllm/src/apis/streaming_shapes/sse.rs b/crates/hermesllm/src/apis/streaming_shapes/sse.rs index 17c6873a..05f0b296 100644 --- a/crates/hermesllm/src/apis/streaming_shapes/sse.rs +++ b/crates/hermesllm/src/apis/streaming_shapes/sse.rs @@ -198,7 +198,15 @@ impl fmt::Display for SseEvent { // Into implementation to convert SseEvent to bytes for response buffer impl Into> for SseEvent { fn into(self) -> Vec { - format!("{}\n\n", self.sse_transformed_lines).into_bytes() + // For generated events (like ResponsesAPI), sse_transformed_lines already includes trailing \n\n + // For parsed events (like passthrough), we need to add the \n\n separator + if self.sse_transformed_lines.ends_with("\n\n") { + // Already properly formatted with trailing newlines + self.sse_transformed_lines.into_bytes() + } else { + // Add SSE event separator + format!("{}\n\n", self.sse_transformed_lines).into_bytes() + } } } diff --git a/crates/hermesllm/src/apis/streaming_shapes/sse_chunk_processor.rs b/crates/hermesllm/src/apis/streaming_shapes/sse_chunk_processor.rs new file mode 100644 index 00000000..c7d25527 --- /dev/null +++ b/crates/hermesllm/src/apis/streaming_shapes/sse_chunk_processor.rs @@ -0,0 +1,241 @@ +use crate::apis::streaming_shapes::sse::{SseEvent, SseStreamIter}; +use crate::clients::endpoints::{SupportedAPIsFromClient, SupportedUpstreamAPIs}; + +/// Stateful processor for handling SSE chunks that may contain incomplete events. +/// +/// This processor buffers incomplete SSE event bytes when transformation fails +/// (e.g., due to incomplete JSON) and prepends them to the next chunk for retry. +pub struct SseChunkProcessor { + /// Buffered bytes from incomplete SSE events across chunks + incomplete_event_buffer: Vec, +} + +impl SseChunkProcessor { + pub fn new() -> Self { + Self { + incomplete_event_buffer: Vec::new(), + } + } + + /// Process a chunk of SSE data, handling incomplete events across chunk boundaries. + /// + /// Returns successfully transformed events. Incomplete events are buffered internally + /// and will be retried when more data arrives in the next chunk. + /// + /// # Arguments + /// * `chunk` - Raw bytes from upstream SSE stream + /// * `client_api` - The API format the client expects + /// * `upstream_api` - The API format from the upstream provider + /// + /// # Returns + /// * `Ok(Vec)` - Successfully transformed events ready for client + /// * `Err(String)` - Fatal error that cannot be recovered by buffering + pub fn process_chunk( + &mut self, + chunk: &[u8], + client_api: &SupportedAPIsFromClient, + upstream_api: &SupportedUpstreamAPIs, + ) -> Result, String> { + // Combine buffered incomplete event with new chunk + let mut combined_data = std::mem::take(&mut self.incomplete_event_buffer); + combined_data.extend_from_slice(chunk); + + // Parse using SseStreamIter + let sse_iter = match SseStreamIter::try_from(combined_data.as_slice()) { + Ok(iter) => iter, + Err(e) => return Err(format!("Failed to create SSE iterator: {}", e)), + }; + + let mut transformed_events = Vec::new(); + + // Process each parsed SSE event + for sse_event in sse_iter { + // Try to transform the event (this is where incomplete JSON fails) + match SseEvent::try_from((sse_event.clone(), client_api, upstream_api)) { + Ok(transformed) => { + // Successfully transformed - add to results + transformed_events.push(transformed); + } + Err(e) => { + // Check if this is incomplete JSON (EOF while parsing) vs other errors + let error_str = e.to_string().to_lowercase(); + let is_incomplete_json = error_str.contains("eof while parsing") + || error_str.contains("unexpected end of json") + || error_str.contains("unexpected eof"); + + if is_incomplete_json { + // Incomplete JSON - buffer for retry with next chunk + self.incomplete_event_buffer = sse_event.raw_line.as_bytes().to_vec(); + break; + } else { + // Other error (unsupported event type, validation error, etc.) + // Skip this event and continue processing others + continue; + } + } + } + } + + Ok(transformed_events) + } + + /// Check if there are buffered incomplete bytes + pub fn has_buffered_data(&self) -> bool { + !self.incomplete_event_buffer.is_empty() + } + + /// Get the size of buffered incomplete data (for debugging/logging) + pub fn buffered_size(&self) -> usize { + self.incomplete_event_buffer.len() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::clients::endpoints::{SupportedAPIsFromClient, SupportedUpstreamAPIs}; + use crate::apis::openai::OpenAIApi; + + #[test] + fn test_complete_events_process_immediately() { + let mut processor = SseChunkProcessor::new(); + let client_api = SupportedAPIsFromClient::OpenAIChatCompletions(OpenAIApi::ChatCompletions); + let upstream_api = SupportedUpstreamAPIs::OpenAIChatCompletions(OpenAIApi::ChatCompletions); + + let chunk1 = b"data: {\"id\":\"chatcmpl-123\",\"object\":\"chat.completion.chunk\",\"created\":1234567890,\"model\":\"gpt-4o\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"Hello\"},\"finish_reason\":null}]}\n\n"; + + let events = processor.process_chunk(chunk1, &client_api, &upstream_api).unwrap(); + + assert_eq!(events.len(), 1); + assert!(!processor.has_buffered_data()); + } + + #[test] + fn test_incomplete_json_buffered_and_completed() { + let mut processor = SseChunkProcessor::new(); + let client_api = SupportedAPIsFromClient::OpenAIChatCompletions(OpenAIApi::ChatCompletions); + let upstream_api = SupportedUpstreamAPIs::OpenAIChatCompletions(OpenAIApi::ChatCompletions); + + // First chunk with incomplete JSON + let chunk1 = b"data: {\"id\":\"chatcmpl-123\",\"object\":\"chat.completion.chu"; + + let events1 = processor.process_chunk(chunk1, &client_api, &upstream_api).unwrap(); + + assert_eq!(events1.len(), 0, "Incomplete event should not be processed"); + assert!(processor.has_buffered_data(), "Incomplete data should be buffered"); + + // Second chunk completes the JSON + let chunk2 = b"nk\",\"created\":1234567890,\"model\":\"gpt-4o\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"Hello\"},\"finish_reason\":null}]}\n\n"; + + let events2 = processor.process_chunk(chunk2, &client_api, &upstream_api).unwrap(); + + assert_eq!(events2.len(), 1, "Complete event should be processed"); + assert!(!processor.has_buffered_data(), "Buffer should be cleared after completion"); + } + + #[test] + fn test_multiple_events_with_one_incomplete() { + let mut processor = SseChunkProcessor::new(); + let client_api = SupportedAPIsFromClient::OpenAIChatCompletions(OpenAIApi::ChatCompletions); + let upstream_api = SupportedUpstreamAPIs::OpenAIChatCompletions(OpenAIApi::ChatCompletions); + + // Chunk with 2 complete events and 1 incomplete + let chunk = b"data: {\"id\":\"chatcmpl-123\",\"object\":\"chat.completion.chunk\",\"created\":1234567890,\"model\":\"gpt-4o\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"A\"},\"finish_reason\":null}]}\n\ndata: {\"id\":\"chatcmpl-124\",\"object\":\"chat.completion.chunk\",\"created\":1234567890,\"model\":\"gpt-4o\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"B\"},\"finish_reason\":null}]}\n\ndata: {\"id\":\"chatcmpl-125\",\"object\":\"chat.completion.chu"; + + let events = processor.process_chunk(chunk, &client_api, &upstream_api).unwrap(); + + assert_eq!(events.len(), 2, "Two complete events should be processed"); + assert!(processor.has_buffered_data(), "Incomplete third event should be buffered"); + } + + #[test] + fn test_anthropic_signature_delta_from_production_logs() { + use crate::apis::anthropic::AnthropicApi; + + let mut processor = SseChunkProcessor::new(); + let client_api = SupportedAPIsFromClient::AnthropicMessagesAPI(AnthropicApi::Messages); + let upstream_api = SupportedUpstreamAPIs::AnthropicMessagesAPI(AnthropicApi::Messages); + + // Exact chunk from production logs - signature_delta event followed by content_block_stop + let chunk = br#"event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"signature_delta","signature":"ErECCkYIChgCKkC7lAf/BOatd0I4NnANYNEDKl5/WSsjNK44AETnLoy3i5FfdYMAb0m4qMLJD6A04QnM4Hf3VpGqq/snA/9vvNxCEgw3CYcHcj0aTdqOisQaDOhlVBtAUKkoh3WopSIwAbJp4jG/41vVWBj63eaR7KFJ37OdY1byjlPkaGDUJRcWc/YfUWIDSAToomq2fB4VKpgBk+swVYxLZ709gQvyTCT+3vO/I+yexZpkx6eBl/+YCgQXTeviZ+hTxSoPVayf5vEQoc19ZA4MEkZ7yBInRgk8vUxAJITSf+vOvDIBsElpgkLfSjARCasjh78wONg39AkAoIbKzU+Q2l1htUwXcqQ2b+b5DrY9+Oxae4pBVGQlWU36XAHsa/KG+ejfdwhWJM7FNL3uphwAf0oYAQ=="}} + +event: content_block_stop +data: {"type":"content_block_stop","index":0} + +"#; + + let result = processor.process_chunk(chunk, &client_api, &upstream_api); + + match result { + Ok(events) => { + println!("Successfully processed {} events", events.len()); + for (i, event) in events.iter().enumerate() { + println!("Event {}: event={:?}, has_data={}", i, event.event, event.data.is_some()); + } + // Should successfully process both events (signature_delta + content_block_stop) + assert!(events.len() >= 2, "Should process at least 2 complete events (signature_delta + stop), got {}", events.len()); + assert!(!processor.has_buffered_data(), "Complete events should not be buffered"); + } + Err(e) => { + panic!("Failed to process signature_delta chunk - this means SignatureDelta is not properly handled: {}", e); + } + } + } + + #[test] + fn test_unsupported_event_does_not_block_subsequent_events() { + let mut processor = SseChunkProcessor::new(); + let client_api = SupportedAPIsFromClient::OpenAIChatCompletions(OpenAIApi::ChatCompletions); + let upstream_api = SupportedUpstreamAPIs::OpenAIChatCompletions(OpenAIApi::ChatCompletions); + + // Chunk with an unsupported/invalid event followed by a valid event + // First event has invalid JSON structure that will fail validation (not incomplete) + // Second event is valid and should be processed + let chunk = b"data: {\"id\":\"chatcmpl-123\",\"object\":\"chat.completion.chunk\",\"created\":1234567890,\"model\":\"gpt-4o\",\"choices\":[{\"index\":0,\"delta\":{\"unsupported_field_causing_validation_error\":true},\"finish_reason\":null}]}\n\ndata: {\"id\":\"chatcmpl-124\",\"object\":\"chat.completion.chunk\",\"created\":1234567890,\"model\":\"gpt-4o\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"Hello\"},\"finish_reason\":null}]}\n\n"; + + let events = processor.process_chunk(chunk, &client_api, &upstream_api).unwrap(); + + // Should skip the invalid event and process the valid one + // (If we were buffering all errors, we'd get 0 events and have buffered data) + assert!(events.len() >= 1, "Should process at least the valid event, got {} events", events.len()); + assert!(!processor.has_buffered_data(), "Invalid (non-incomplete) events should not be buffered"); + } + + #[test] + fn test_unknown_delta_type_skipped_others_processed() { + use crate::apis::anthropic::AnthropicApi; + + let mut processor = SseChunkProcessor::new(); + let client_api = SupportedAPIsFromClient::AnthropicMessagesAPI(AnthropicApi::Messages); + let upstream_api = SupportedUpstreamAPIs::AnthropicMessagesAPI(AnthropicApi::Messages); + + // Chunk with valid event, unsupported delta type, then another valid event + // This simulates a future API change where Anthropic adds a new delta type we don't support yet + let chunk = br#"event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}} + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"future_unsupported_delta","future_field":"some_value"}} + +event: content_block_delta +data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" World"}} + +"#; + + let result = processor.process_chunk(chunk, &client_api, &upstream_api); + + match result { + Ok(events) => { + println!("Processed {} events (unsupported event should be skipped)", events.len()); + // Should process the 2 valid text_delta events and skip the unsupported one + // We expect at least 2 events (the valid ones), unsupported should be skipped + assert!(events.len() >= 2, "Should process at least 2 valid events, got {}", events.len()); + assert!(!processor.has_buffered_data(), "Unsupported events should be skipped, not buffered"); + } + Err(e) => { + panic!("Should not fail on unsupported delta type, should skip it: {}", e); + } + } + } +} diff --git a/crates/hermesllm/src/transforms/response/to_openai.rs b/crates/hermesllm/src/transforms/response/to_openai.rs index d90d9035..ee526c71 100644 --- a/crates/hermesllm/src/transforms/response/to_openai.rs +++ b/crates/hermesllm/src/transforms/response/to_openai.rs @@ -189,7 +189,10 @@ impl TryFrom for ResponsesAPIResponse { top_p: 1.0, metadata: resp.metadata.unwrap_or_default(), truncation: None, - reasoning: None, + reasoning: Some(crate::apis::openai_responses::Reasoning { + effort: None, + summary: None, + }), store: None, text: None, audio: None, diff --git a/crates/hermesllm/src/transforms/response_streaming/to_openai_streaming.rs b/crates/hermesllm/src/transforms/response_streaming/to_openai_streaming.rs index 30b40956..ca3d049b 100644 --- a/crates/hermesllm/src/transforms/response_streaming/to_openai_streaming.rs +++ b/crates/hermesllm/src/transforms/response_streaming/to_openai_streaming.rs @@ -364,6 +364,23 @@ fn convert_content_delta( None, None, )), + MessagesContentDelta::SignatureDelta { signature: _ } => { + // Signature delta is cryptographic verification metadata, not content + // Create an empty delta chunk to maintain stream continuity + Ok(create_openai_chunk( + "stream", + "unknown", + MessageDelta { + role: None, + content: None, + refusal: None, + function_call: None, + tool_calls: None, + }, + None, + None, + )) + } } } diff --git a/crates/llm_gateway/src/stream_context.rs b/crates/llm_gateway/src/stream_context.rs index fbbf6c28..1fa1a418 100644 --- a/crates/llm_gateway/src/stream_context.rs +++ b/crates/llm_gateway/src/stream_context.rs @@ -20,9 +20,8 @@ use common::ratelimit::Header; use common::stats::{IncrementingMetric, RecordingMetric}; use common::{ratelimit, routing, tokenizer}; use hermesllm::apis::streaming_shapes::amazon_bedrock_binary_frame::BedrockBinaryFrameDecoder; -use hermesllm::apis::streaming_shapes::sse::{ - SseEvent, SseStreamBuffer, SseStreamBufferTrait, SseStreamIter, -}; +use hermesllm::apis::streaming_shapes::sse::{SseEvent, SseStreamBuffer, SseStreamBufferTrait}; +use hermesllm::apis::streaming_shapes::sse_chunk_processor::SseChunkProcessor; use hermesllm::clients::endpoints::SupportedAPIsFromClient; use hermesllm::providers::response::ProviderResponse; use hermesllm::providers::streaming_response::ProviderStreamResponse; @@ -55,6 +54,7 @@ pub struct StreamContext { http_method: Option, http_protocol: Option, sse_buffer: Option, + sse_chunk_processor: Option, } impl StreamContext { @@ -85,6 +85,7 @@ impl StreamContext { http_method: None, http_protocol: None, sse_buffer: None, + sse_chunk_processor: None, } } @@ -138,7 +139,7 @@ impl StreamContext { )); info!( - "[ARCHGW_REQ_ID:{}] PROVIDER_SELECTION: Hint='{}' -> Selected='{}'", + "[PLANO_REQ_ID:{}] PROVIDER_SELECTION: Hint='{}' -> Selected='{}'", self.request_identifier(), self.get_http_request_header(ARCH_PROVIDER_HINT_HEADER) .unwrap_or("none".to_string()), @@ -223,7 +224,7 @@ impl StreamContext { let token_count = tokenizer::token_count(model, json_string).unwrap_or(0); debug!( - "[ARCHGW_REQ_ID:{}] TOKEN_COUNT: model='{}' input_tokens={}", + "[PLANO_REQ_ID:{}] TOKEN_COUNT: model='{}' input_tokens={}", self.request_identifier(), model, token_count @@ -237,7 +238,7 @@ impl StreamContext { // Check if rate limiting needs to be applied. if let Some(selector) = self.ratelimit_selector.take() { info!( - "[ARCHGW_REQ_ID:{}] RATELIMIT_CHECK: model='{}' selector='{}:{}'", + "[PLANO_REQ_ID:{}] RATELIMIT_CHECK: model='{}' selector='{}:{}'", self.request_identifier(), model, selector.key, @@ -250,7 +251,7 @@ impl StreamContext { )?; } else { debug!( - "[ARCHGW_REQ_ID:{}] RATELIMIT_SKIP: model='{}' (no selector)", + "[PLANO_REQ_ID:{}] RATELIMIT_SKIP: model='{}' (no selector)", self.request_identifier(), model ); @@ -269,7 +270,7 @@ impl StreamContext { Ok(duration) => { let duration_ms = duration.as_millis(); info!( - "[ARCHGW_REQ_ID:{}] TIME_TO_FIRST_TOKEN: {}ms", + "[PLANO_REQ_ID:{}] TIME_TO_FIRST_TOKEN: {}ms", self.request_identifier(), duration_ms ); @@ -278,7 +279,7 @@ impl StreamContext { } Err(e) => { warn!( - "[ARCHGW_REQ_ID:{}] TIME_MEASUREMENT_ERROR: {:?}", + "[PLANO_REQ_ID:{}] TIME_MEASUREMENT_ERROR: {:?}", self.request_identifier(), e ); @@ -294,7 +295,7 @@ impl StreamContext { // Convert the duration to milliseconds let duration_ms = duration.as_millis(); info!( - "[ARCHGW_REQ_ID:{}] REQUEST_COMPLETE: latency={}ms tokens={}", + "[PLANO_REQ_ID:{}] REQUEST_COMPLETE: latency={}ms tokens={}", self.request_identifier(), duration_ms, self.response_tokens @@ -310,7 +311,7 @@ impl StreamContext { self.metrics.time_per_output_token.record(tpot); info!( - "[ARCHGW_REQ_ID:{}] TOKEN_THROUGHPUT: time_per_token={}ms tokens_per_second={}", + "[PLANO_REQ_ID:{}] TOKEN_THROUGHPUT: time_per_token={}ms tokens_per_second={}", self.request_identifier(), tpot, 1000 / tpot @@ -333,7 +334,7 @@ impl StreamContext { if self.streaming_response { let chunk_size = body_size; debug!( - "[ARCHGW_REQ_ID:{}] UPSTREAM_RESPONSE_CHUNK: streaming=true chunk_size={}", + "[PLANO_REQ_ID:{}] UPSTREAM_RESPONSE_CHUNK: streaming=true chunk_size={}", self.request_identifier(), chunk_size ); @@ -341,7 +342,7 @@ impl StreamContext { Some(chunk) => chunk, None => { warn!( - "[ARCHGW_REQ_ID:{}] UPSTREAM_RESPONSE_ERROR: empty chunk, size={}", + "[PLANO_REQ_ID:{}] UPSTREAM_RESPONSE_ERROR: empty chunk, size={}", self.request_identifier(), chunk_size ); @@ -351,7 +352,7 @@ impl StreamContext { if streaming_chunk.len() != chunk_size { warn!( - "[ARCHGW_REQ_ID:{}] UPSTREAM_RESPONSE_MISMATCH: expected={} actual={}", + "[PLANO_REQ_ID:{}] UPSTREAM_RESPONSE_MISMATCH: expected={} actual={}", self.request_identifier(), chunk_size, streaming_chunk.len() @@ -363,7 +364,7 @@ impl StreamContext { return Err(Action::Continue); } debug!( - "[ARCHGW_REQ_ID:{}] UPSTREAM_RESPONSE_COMPLETE: streaming=false body_size={}", + "[PLANO_REQ_ID:{}] UPSTREAM_RESPONSE_COMPLETE: streaming=false body_size={}", self.request_identifier(), body_size ); @@ -383,7 +384,7 @@ impl StreamContext { provider_id: ProviderId, ) -> Result, Action> { debug!( - "[ARCHGW_REQ_ID:{}] STREAMING_PROCESS: client={:?} provider_id={:?} chunk_size={}", + "[PLANO_REQ_ID:{}] STREAMING_PROCESS: client={:?} provider_id={:?} chunk_size={}", self.request_identifier(), self.client_api, provider_id, @@ -403,15 +404,10 @@ impl StreamContext { return self.handle_bedrock_binary_stream(body, &client_api, &upstream_api); } - // Parse body into SSE iterator using TryFrom - let sse_iter: SseStreamIter> = - match SseStreamIter::try_from(body) { - Ok(iter) => iter, - Err(e) => { - warn!("Failed to parse body into SSE iterator: {}", e); - return Err(Action::Continue); - } - }; + // Initialize SSE chunk processor if not present + if self.sse_chunk_processor.is_none() { + self.sse_chunk_processor = Some(SseChunkProcessor::new()); + } // Initialize SSE buffer if not present if self.sse_buffer.is_none() { @@ -425,18 +421,42 @@ impl StreamContext { }; } - // Process each SSE event - for sse_event in sse_iter { - // Transform event if upstream API != client API - let transformed_event: SseEvent = - match SseEvent::try_from((sse_event, &client_api, &upstream_api)) { - Ok(event) => event, + // Process chunk through SSE processor (handles incomplete events) + let transformed_events = match self.sse_chunk_processor.as_mut() { + Some(processor) => { + let result = processor.process_chunk(body, &client_api, &upstream_api); + let has_buffered = processor.has_buffered_data(); + let buffered_size = processor.buffered_size(); + + match result { + Ok(events) => { + if has_buffered { + debug!( + "[PLANO_REQ_ID:{}] SSE_INCOMPLETE_BUFFERED: {} bytes buffered for next chunk", + self.request_identifier(), + buffered_size + ); + } + events + } Err(e) => { - warn!("Failed to transform SSE event: {}", e); + warn!( + "[PLANO_REQ_ID:{}] SSE_CHUNK_PROCESS_ERROR: {}", + self.request_identifier(), + e + ); return Err(Action::Continue); } - }; + } + } + None => { + warn!("SSE chunk processor unexpectedly missing"); + return Err(Action::Continue); + } + }; + // Process each successfully transformed SSE event + for transformed_event in transformed_events { // Extract ProviderStreamResponse for processing (token counting, etc.) if !transformed_event.is_done() && !transformed_event.is_event_only() { match transformed_event.provider_response() { @@ -445,7 +465,7 @@ impl StreamContext { if provider_response.is_final() { debug!( - "[ARCHGW_REQ_ID:{}] STREAMING_FINAL_CHUNK: total_tokens={}", + "[PLANO_REQ_ID:{}] STREAMING_FINAL_CHUNK: total_tokens={}", self.request_identifier(), self.response_tokens ); @@ -455,7 +475,7 @@ impl StreamContext { let estimated_tokens = content.len() / 4; self.response_tokens += estimated_tokens.max(1); debug!( - "[ARCHGW_REQ_ID:{}] STREAMING_TOKEN_UPDATE: delta_chars={} estimated_tokens={} total_tokens={}", + "[PLANO_REQ_ID:{}] STREAMING_TOKEN_UPDATE: delta_chars={} estimated_tokens={} total_tokens={}", self.request_identifier(), content.len(), estimated_tokens.max(1), @@ -465,7 +485,7 @@ impl StreamContext { } Err(e) => { warn!( - "[ARCHGW_REQ_ID:{}] STREAMING_CHUNK_ERROR: {}", + "[PLANO_REQ_ID:{}] STREAMING_CHUNK_ERROR: {}", self.request_identifier(), e ); @@ -487,7 +507,7 @@ impl StreamContext { if !bytes.is_empty() { let content = String::from_utf8_lossy(&bytes); debug!( - "[ARCHGW_REQ_ID:{}] UPSTREAM_TRANSFORMED_CLIENT_RESPONSE: size={} content={}", + "[PLANO_REQ_ID:{}] UPSTREAM_TRANSFORMED_CLIENT_RESPONSE: size={} content={}", self.request_identifier(), bytes.len(), content @@ -525,7 +545,7 @@ impl StreamContext { Ok(buffer) => Some(buffer), Err(e) => { warn!( - "[ARCHGW_REQ_ID:{}] BEDROCK_BUFFER_INIT_ERROR: {}", + "[PLANO_REQ_ID:{}] BEDROCK_BUFFER_INIT_ERROR: {}", self.request_identifier(), e ); @@ -555,7 +575,7 @@ impl StreamContext { let estimated_tokens = content.len() / 4; self.response_tokens += estimated_tokens.max(1); debug!( - "[ARCHGW_REQ_ID:{}] BEDROCK_TOKEN_UPDATE: delta_chars={} estimated_tokens={} total_tokens={}", + "[PLANO_REQ_ID:{}] BEDROCK_TOKEN_UPDATE: delta_chars={} estimated_tokens={} total_tokens={}", self.request_identifier(), content.len(), estimated_tokens.max(1), @@ -573,7 +593,7 @@ impl StreamContext { } Err(e) => { warn!( - "[ARCHGW_REQ_ID:{}] BEDROCK_FRAME_CONVERSION_ERROR: {}", + "[PLANO_REQ_ID:{}] BEDROCK_FRAME_CONVERSION_ERROR: {}", self.request_identifier(), e ); @@ -583,7 +603,7 @@ impl StreamContext { Some(DecodedFrame::Incomplete) => { // Incomplete frame - buffer retains partial data, wait for more bytes debug!( - "[ARCHGW_REQ_ID:{}] BEDROCK_INCOMPLETE_FRAME: waiting for more data", + "[PLANO_REQ_ID:{}] BEDROCK_INCOMPLETE_FRAME: waiting for more data", self.request_identifier() ); break; @@ -591,7 +611,7 @@ impl StreamContext { None => { // Decode error warn!( - "[ARCHGW_REQ_ID:{}] BEDROCK_DECODE_ERROR", + "[PLANO_REQ_ID:{}] BEDROCK_DECODE_ERROR", self.request_identifier() ); return Err(Action::Continue); @@ -606,7 +626,7 @@ impl StreamContext { if !bytes.is_empty() { let content = String::from_utf8_lossy(&bytes); debug!( - "[ARCHGW_REQ_ID:{}] UPSTREAM_TRANSFORMED_CLIENT_RESPONSE: size={} content={}", + "[PLANO_REQ_ID:{}] UPSTREAM_TRANSFORMED_CLIENT_RESPONSE: size={} content={}", self.request_identifier(), bytes.len(), content @@ -616,7 +636,7 @@ impl StreamContext { } None => { warn!( - "[ARCHGW_REQ_ID:{}] BEDROCK_BUFFER_MISSING", + "[PLANO_REQ_ID:{}] BEDROCK_BUFFER_MISSING", self.request_identifier() ); Err(Action::Continue) @@ -630,7 +650,7 @@ impl StreamContext { provider_id: ProviderId, ) -> Result, Action> { debug!( - "[ARCHGW_REQ_ID:{}] NON_STREAMING_PROCESS: provider_id={:?} body_size={}", + "[PLANO_REQ_ID:{}] NON_STREAMING_PROCESS: provider_id={:?} body_size={}", self.request_identifier(), provider_id, body.len() @@ -642,7 +662,7 @@ impl StreamContext { Ok(response) => response, Err(e) => { warn!( - "[ARCHGW_REQ_ID:{}] UPSTREAM_RESPONSE_PARSE_ERROR: {} | body: {}", + "[PLANO_REQ_ID:{}] UPSTREAM_RESPONSE_PARSE_ERROR: {} | body: {}", self.request_identifier(), e, String::from_utf8_lossy(body) @@ -657,7 +677,7 @@ impl StreamContext { } None => { warn!( - "[ARCHGW_REQ_ID:{}] UPSTREAM_RESPONSE_ERROR: missing client_api", + "[PLANO_REQ_ID:{}] UPSTREAM_RESPONSE_ERROR: missing client_api", self.request_identifier() ); return Err(Action::Continue); @@ -669,7 +689,7 @@ impl StreamContext { response.extract_usage_counts() { debug!( - "[ARCHGW_REQ_ID:{}] RESPONSE_USAGE: prompt_tokens={} completion_tokens={} total_tokens={}", + "[PLANO_REQ_ID:{}] RESPONSE_USAGE: prompt_tokens={} completion_tokens={} total_tokens={}", self.request_identifier(), prompt_tokens, completion_tokens, @@ -678,7 +698,7 @@ impl StreamContext { self.response_tokens = completion_tokens; } else { warn!( - "[ARCHGW_REQ_ID:{}] RESPONSE_USAGE: no usage information found", + "[PLANO_REQ_ID:{}] RESPONSE_USAGE: no usage information found", self.request_identifier() ); } @@ -686,7 +706,7 @@ impl StreamContext { match serde_json::to_vec(&response) { Ok(bytes) => { debug!( - "[ARCHGW_REQ_ID:{}] CLIENT_RESPONSE_PAYLOAD: {}", + "[PLANO_REQ_ID:{}] CLIENT_RESPONSE_PAYLOAD: {}", self.request_identifier(), String::from_utf8_lossy(&bytes) ); @@ -763,7 +783,7 @@ impl HttpContext for StreamContext { Some(provider_id.compatible_api_for_client(api, self.streaming_response)); debug!( - "[ARCHGW_REQ_ID:{}] ROUTING_INFO: provider='{}' client_api={:?} resolved_api={:?} request_path='{}'", + "[PLANO_REQ_ID:{}] ROUTING_INFO: provider='{}' client_api={:?} resolved_api={:?} request_path='{}'", self.request_identifier(), provider.to_provider_id(), api, @@ -816,7 +836,7 @@ impl HttpContext for StreamContext { fn on_http_request_body(&mut self, body_size: usize, end_of_stream: bool) -> Action { debug!( - "[ARCHGW_REQ_ID:{}] REQUEST_BODY_CHUNK: bytes={} end_stream={}", + "[PLANO_REQ_ID:{}] REQUEST_BODY_CHUNK: bytes={} end_stream={}", self.request_identifier(), body_size, end_of_stream @@ -855,14 +875,14 @@ impl HttpContext for StreamContext { let mut deserialized_client_request: ProviderRequestType = match self.client_api.as_ref() { Some(the_client_api) => { info!( - "[ARCHGW_REQ_ID:{}] CLIENT_REQUEST_RECEIVED: api={:?} body_size={}", + "[PLANO_REQ_ID:{}] CLIENT_REQUEST_RECEIVED: api={:?} body_size={}", self.request_identifier(), the_client_api, body_bytes.len() ); debug!( - "[ARCHGW_REQ_ID:{}] CLIENT_REQUEST_PAYLOAD: {}", + "[PLANO_REQ_ID:{}] CLIENT_REQUEST_PAYLOAD: {}", self.request_identifier(), String::from_utf8_lossy(&body_bytes) ); @@ -871,7 +891,7 @@ impl HttpContext for StreamContext { Ok(deserialized) => deserialized, Err(e) => { warn!( - "[ARCHGW_REQ_ID:{}] CLIENT_REQUEST_PARSE_ERROR: {} | body: {}", + "[PLANO_REQ_ID:{}] CLIENT_REQUEST_PARSE_ERROR: {} | body: {}", self.request_identifier(), e, String::from_utf8_lossy(&body_bytes) @@ -914,7 +934,7 @@ impl HttpContext for StreamContext { "agent_orchestrator".to_string() } else { warn!( - "[ARCHGW_REQ_ID:{}] MODEL_RESOLUTION_ERROR: no model specified | req_model='{}' provider='{}' config_model={:?}", + "[PLANO_REQ_ID:{}] MODEL_RESOLUTION_ERROR: no model specified | req_model='{}' provider='{}' config_model={:?}", self.request_identifier(), model_requested, self.llm_provider().name, @@ -943,7 +963,7 @@ impl HttpContext for StreamContext { self.user_message = deserialized_client_request.get_recent_user_message(); info!( - "[ARCHGW_REQ_ID:{}] MODEL_RESOLUTION: req_model='{}' -> resolved_model='{}' provider='{}' streaming={}", + "[PLANO_REQ_ID:{}] MODEL_RESOLUTION: req_model='{}' -> resolved_model='{}' provider='{}' streaming={}", self.request_identifier(), model_requested, resolved_model, @@ -974,14 +994,14 @@ impl HttpContext for StreamContext { match self.resolved_api.as_ref() { Some(upstream) => { info!( - "[ARCHGW_REQ_ID:{}] UPSTREAM_TRANSFORM: client_api={:?} -> upstream_api={:?}", + "[PLANO_REQ_ID:{}] UPSTREAM_TRANSFORM: client_api={:?} -> upstream_api={:?}", self.request_identifier(), self.client_api, upstream ); match ProviderRequestType::try_from((deserialized_client_request, upstream)) { Ok(request) => { debug!( - "[ARCHGW_REQ_ID:{}] UPSTREAM_REQUEST_PAYLOAD: {}", + "[PLANO_REQ_ID:{}] UPSTREAM_REQUEST_PAYLOAD: {}", self.request_identifier(), String::from_utf8_lossy(&request.to_bytes().unwrap_or_default()) ); @@ -1032,7 +1052,7 @@ impl HttpContext for StreamContext { self.upstream_status_code = StatusCode::from_u16(status_code).ok(); debug!( - "[ARCHGW_REQ_ID:{}] UPSTREAM_RESPONSE_STATUS: {}", + "[PLANO_REQ_ID:{}] UPSTREAM_RESPONSE_STATUS: {}", self.request_identifier(), status_code ); @@ -1059,7 +1079,7 @@ impl HttpContext for StreamContext { let current_time = get_current_time().unwrap(); if end_of_stream && body_size == 0 { debug!( - "[ARCHGW_REQ_ID:{}] RESPONSE_BODY_COMPLETE: total_bytes={}", + "[PLANO_REQ_ID:{}] RESPONSE_BODY_COMPLETE: total_bytes={}", self.request_identifier(), body_size ); @@ -1071,7 +1091,7 @@ impl HttpContext for StreamContext { if let Some(status_code) = &self.upstream_status_code { if status_code.is_client_error() || status_code.is_server_error() { info!( - "[ARCHGW_REQ_ID:{}] UPSTREAM_ERROR_RESPONSE: status={} body_size={}", + "[PLANO_REQ_ID:{}] UPSTREAM_ERROR_RESPONSE: status={} body_size={}", self.request_identifier(), status_code.as_u16(), body_size @@ -1081,7 +1101,7 @@ impl HttpContext for StreamContext { if body_size > 0 { if let Ok(body) = self.read_raw_response_body(body_size) { debug!( - "[ARCHGW_REQ_ID:{}] UPSTREAM_ERROR_BODY: {}", + "[PLANO_REQ_ID:{}] UPSTREAM_ERROR_BODY: {}", self.request_identifier(), String::from_utf8_lossy(&body) ); @@ -1103,7 +1123,7 @@ impl HttpContext for StreamContext { None => "None".to_string(), }; info!( - "[ARCHGW_REQ_ID:{}], UNSUPPORTED API: {}", + "[PLANO_REQ_ID:{}], UNSUPPORTED API: {}", self.request_identifier(), api_info ); @@ -1117,7 +1137,7 @@ impl HttpContext for StreamContext { }; debug!( - "[ARCHGW_REQ_ID:{}] UPSTREAM_RAW_RESPONSE: body_size={} content={}", + "[PLANO_REQ_ID:{}] UPSTREAM_RAW_RESPONSE: body_size={} content={}", self.request_identifier(), body.len(), String::from_utf8_lossy(&body) diff --git a/tests/e2e/test_openai_responses_api_client.py b/tests/e2e/test_openai_responses_api_client.py index e1fa8da8..282af7c4 100644 --- a/tests/e2e/test_openai_responses_api_client.py +++ b/tests/e2e/test_openai_responses_api_client.py @@ -595,7 +595,7 @@ def test_openai_responses_api_streaming_with_tools_upstream_anthropic(): stream = client.responses.create( model="claude-sonnet-4-20250514", - input="Call the echo tool", + input="Call the echo tool with hello_world", tools=tools, stream=True, )