From 1607305118aa9a25cf36aa4767ecbb04c016ebb7 Mon Sep 17 00:00:00 2001 From: Salman Paracha Date: Wed, 3 Dec 2025 11:57:13 -0800 Subject: [PATCH] fixed issues with translation to claude code --- .../anthropic_streaming_buffer.rs | 81 +++++++++++++++---- .../to_anthropic_streaming.rs | 19 ++--- crates/llm_gateway/src/stream_context.rs | 28 ++++++- 3 files changed, 101 insertions(+), 27 deletions(-) diff --git a/crates/hermesllm/src/apis/streaming_shapes/anthropic_streaming_buffer.rs b/crates/hermesllm/src/apis/streaming_shapes/anthropic_streaming_buffer.rs index c326a770..818ee37d 100644 --- a/crates/hermesllm/src/apis/streaming_shapes/anthropic_streaming_buffer.rs +++ b/crates/hermesllm/src/apis/streaming_shapes/anthropic_streaming_buffer.rs @@ -24,6 +24,9 @@ pub struct AnthropicMessagesStreamBuffer { /// Track if we need to inject ContentBlockStop before message_delta needs_content_block_stop: bool, + /// Track if we've seen a MessageDelta (so we need to send MessageStop at the end) + seen_message_delta: bool, + /// Model name to use when generating message_start events model: Option, } @@ -35,6 +38,7 @@ impl AnthropicMessagesStreamBuffer { message_started: false, content_block_start_indices: HashSet::new(), needs_content_block_stop: false, + seen_message_delta: false, model: None, } } @@ -182,7 +186,7 @@ impl SseStreamBufferTrait for AnthropicMessagesStreamBuffer { // Content deltas are between ContentBlockStart and ContentBlockStop self.buffered_events.push(event); } - MessagesStreamEvent::MessageDelta { .. } => { + MessagesStreamEvent::MessageDelta { usage, .. } => { // Inject ContentBlockStop before message_delta if self.needs_content_block_stop { let content_block_stop = AnthropicMessagesStreamBuffer::create_content_block_stop_event(); @@ -190,11 +194,44 @@ impl SseStreamBufferTrait for AnthropicMessagesStreamBuffer { self.needs_content_block_stop = false; } - // Add the message_delta event + // Check if the last event was also a MessageDelta - if so, merge them + // This handles Bedrock's split of stop_reason (MessageStop) and usage (Metadata) + if let Some(last_event) = self.buffered_events.last_mut() { + if let Some(ProviderStreamResponseType::MessagesStreamEvent( + MessagesStreamEvent::MessageDelta { + usage: last_usage, + .. + } + )) = &mut last_event.provider_stream_response { + // Merge: take stop_reason from first, usage from second (if non-zero) + if usage.input_tokens > 0 || usage.output_tokens > 0 { + *last_usage = usage.clone(); + } + // Mark that we've seen MessageDelta (need to send MessageStop later) + self.seen_message_delta = true; + // Don't push the new event, we've merged it + return; + } + } + + // No previous MessageDelta to merge with, add this one + self.buffered_events.push(event); + self.seen_message_delta = true; + } + MessagesStreamEvent::ContentBlockStop { .. } => { + // ContentBlockStop received from upstream (e.g., Bedrock) + // Clear the flag so we don't inject another one + self.needs_content_block_stop = false; + self.buffered_events.push(event); + } + MessagesStreamEvent::MessageStop => { + // MessageStop received from upstream (e.g., OpenAI via [DONE]) + // Clear the flag so we don't inject another one + self.seen_message_delta = false; self.buffered_events.push(event); } _ => { - // Other Anthropic event types (ContentBlockStop, MessageStop, etc.), just accumulate + // Other Anthropic event types (Ping, etc.), just accumulate self.buffered_events.push(event); } } @@ -207,14 +244,26 @@ impl SseStreamBufferTrait for AnthropicMessagesStreamBuffer { } fn into_bytes(&mut self) -> Vec { - // Inject ContentBlockStop if needed before flushing - if self.needs_content_block_stop { - let content_block_stop = AnthropicMessagesStreamBuffer::create_content_block_stop_event(); - self.buffered_events.push(content_block_stop); - self.needs_content_block_stop = false; + // Convert all accumulated events to bytes and clear buffer + // NOTE: We do NOT inject ContentBlockStop here because it's injected when we see MessageDelta + // or MessageStop. Injecting it here causes premature ContentBlockStop in the middle of streaming. + + // Inject MessageStop after MessageDelta if we've seen one + // This completes the Anthropic Messages API event sequence + if self.seen_message_delta { + let message_stop = MessagesStreamEvent::MessageStop; + let sse_string: String = message_stop.into(); + let message_stop_event = SseEvent { + data: None, + event: Some("message_stop".to_string()), + raw_line: sse_string.clone(), + sse_transformed_lines: sse_string, + provider_stream_response: None, + }; + self.buffered_events.push(message_stop_event); + self.seen_message_delta = false; } - // Convert all accumulated events to bytes and clear buffer let mut buffer = Vec::new(); for event in self.buffered_events.drain(..) { let event_bytes: Vec = event.into(); @@ -344,10 +393,10 @@ data: {"id":"chatcmpl-456","object":"chat.completion.chunk","created":1234567890 assert!(output.contains("\"text\":\" in San Francisco\""), "Should have second content delta"); assert!(output.contains("\"text\":\" is\""), "Should have third content delta"); - // For partial streams, the buffer will inject content_block_stop in into_bytes() - // because needs_content_block_stop is true. This is expected behavior to maintain - // proper Anthropic protocol even for incomplete streams. - assert!(output.contains("event: content_block_stop"), "Should have content_block_stop (injected at flush)"); + // For partial streams (no finish_reason, no [DONE]), we do NOT inject content_block_stop + // because the stream may continue. This is correct behavior - only inject lifecycle events + // when we have explicit signals from upstream (finish_reason, [DONE], etc.) + assert!(!output.contains("event: content_block_stop"), "Should NOT have content_block_stop for partial stream"); // Should NOT have completion events assert!(!output.contains("event: message_delta"), "Should NOT have message_delta"); @@ -356,10 +405,10 @@ data: {"id":"chatcmpl-456","object":"chat.completion.chunk","created":1234567890 println!("\nVALIDATION SUMMARY:"); println!("{}", "-".repeat(80)); println!("✓ Partial transformation: OpenAI → Anthropic (stream interrupted)"); - println!("✓ Injected: message_start, content_block_start at beginning, content_block_stop at flush"); + println!("✓ Injected: message_start, content_block_start at beginning"); println!("✓ Incremental deltas: {} events (ALL content preserved!)", delta_count); - println!("✓ NO message completion events (partial stream, no [DONE])"); - println!("✓ Buffer maintains Anthropic protocol even for incomplete streams\n"); + println!("✓ NO completion events (partial stream, no [DONE])"); + println!("✓ Buffer maintains Anthropic protocol for active streams\n"); } #[test] diff --git a/crates/hermesllm/src/transforms/response_streaming/to_anthropic_streaming.rs b/crates/hermesllm/src/transforms/response_streaming/to_anthropic_streaming.rs index 61939dd7..b8cac631 100644 --- a/crates/hermesllm/src/transforms/response_streaming/to_anthropic_streaming.rs +++ b/crates/hermesllm/src/transforms/response_streaming/to_anthropic_streaming.rs @@ -1,5 +1,5 @@ use crate::apis::amazon_bedrock::{ - ContentBlockDelta, ConverseStreamEvent, StopReason, + ContentBlockDelta, ConverseStreamEvent, }; use crate::apis::anthropic::{ MessagesContentBlock, MessagesContentDelta, MessagesMessageDelta, @@ -187,18 +187,19 @@ impl TryFrom for MessagesStreamEvent { }) } - // MessageStop - convert to Anthropic MessageDelta with stop reason + MessageStop + // MessageStop - convert to Anthropic MessageDelta with stop reason + // Note: Bedrock sends Metadata separately with usage info, creating a second MessageDelta + // The client should merge these or use the final one with complete usage ConverseStreamEvent::MessageStop(stop_event) => { let anthropic_stop_reason = match stop_event.stop_reason { - StopReason::EndTurn => MessagesStopReason::EndTurn, - StopReason::ToolUse => MessagesStopReason::ToolUse, - StopReason::MaxTokens => MessagesStopReason::MaxTokens, - StopReason::StopSequence => MessagesStopReason::EndTurn, - StopReason::GuardrailIntervened => MessagesStopReason::Refusal, - StopReason::ContentFiltered => MessagesStopReason::Refusal, + crate::apis::amazon_bedrock::StopReason::EndTurn => MessagesStopReason::EndTurn, + crate::apis::amazon_bedrock::StopReason::ToolUse => MessagesStopReason::ToolUse, + crate::apis::amazon_bedrock::StopReason::MaxTokens => MessagesStopReason::MaxTokens, + crate::apis::amazon_bedrock::StopReason::StopSequence => MessagesStopReason::EndTurn, + crate::apis::amazon_bedrock::StopReason::GuardrailIntervened => MessagesStopReason::Refusal, + crate::apis::amazon_bedrock::StopReason::ContentFiltered => MessagesStopReason::Refusal, }; - // Return MessageDelta (MessageStop will be sent separately by the streaming handler) Ok(MessagesStreamEvent::MessageDelta { delta: MessagesMessageDelta { stop_reason: anthropic_stop_reason, diff --git a/crates/llm_gateway/src/stream_context.rs b/crates/llm_gateway/src/stream_context.rs index fbab980c..42d7cb31 100644 --- a/crates/llm_gateway/src/stream_context.rs +++ b/crates/llm_gateway/src/stream_context.rs @@ -550,7 +550,19 @@ impl StreamContext { // Get accumulated bytes from buffer and return match self.sse_buffer.as_mut() { - Some(buffer) => Ok(buffer.into_bytes()), + Some(buffer) => { + let bytes = buffer.into_bytes(); + if !bytes.is_empty() { + let content = String::from_utf8_lossy(&bytes); + debug!( + "[ARCHGW_REQ_ID:{}] UPSTREAM_TRANSFORMED_CLIENT_RESPONSE: size={} content={}", + self.request_identifier(), + bytes.len(), + content + ); + } + Ok(bytes) + } None => { warn!("SSE buffer unexpectedly missing after initialization"); Err(Action::Continue) @@ -657,7 +669,19 @@ impl StreamContext { // Get accumulated bytes from buffer and return match self.sse_buffer.as_mut() { - Some(buffer) => Ok(buffer.into_bytes()), + Some(buffer) => { + let bytes = buffer.into_bytes(); + if !bytes.is_empty() { + let content = String::from_utf8_lossy(&bytes); + debug!( + "[ARCHGW_REQ_ID:{}] UPSTREAM_TRANSFORMED_CLIENT_RESPONSE: size={} content={}", + self.request_identifier(), + bytes.len(), + content + ); + } + Ok(bytes) + } None => { warn!( "[ARCHGW_REQ_ID:{}] BEDROCK_BUFFER_MISSING",