fixed issues with translation to claude code

This commit is contained in:
Salman Paracha 2025-12-03 11:57:13 -08:00
parent dd16801813
commit 1607305118
3 changed files with 101 additions and 27 deletions

View file

@ -24,6 +24,9 @@ pub struct AnthropicMessagesStreamBuffer {
/// Track if we need to inject ContentBlockStop before message_delta
needs_content_block_stop: bool,
/// Track if we've seen a MessageDelta (so we need to send MessageStop at the end)
seen_message_delta: bool,
/// Model name to use when generating message_start events
model: Option<String>,
}
@ -35,6 +38,7 @@ impl AnthropicMessagesStreamBuffer {
message_started: false,
content_block_start_indices: HashSet::new(),
needs_content_block_stop: false,
seen_message_delta: false,
model: None,
}
}
@ -182,7 +186,7 @@ impl SseStreamBufferTrait for AnthropicMessagesStreamBuffer {
// Content deltas are between ContentBlockStart and ContentBlockStop
self.buffered_events.push(event);
}
MessagesStreamEvent::MessageDelta { .. } => {
MessagesStreamEvent::MessageDelta { usage, .. } => {
// Inject ContentBlockStop before message_delta
if self.needs_content_block_stop {
let content_block_stop = AnthropicMessagesStreamBuffer::create_content_block_stop_event();
@ -190,11 +194,44 @@ impl SseStreamBufferTrait for AnthropicMessagesStreamBuffer {
self.needs_content_block_stop = false;
}
// Add the message_delta event
// Check if the last event was also a MessageDelta - if so, merge them
// This handles Bedrock's split of stop_reason (MessageStop) and usage (Metadata)
if let Some(last_event) = self.buffered_events.last_mut() {
if let Some(ProviderStreamResponseType::MessagesStreamEvent(
MessagesStreamEvent::MessageDelta {
usage: last_usage,
..
}
)) = &mut last_event.provider_stream_response {
// Merge: take stop_reason from first, usage from second (if non-zero)
if usage.input_tokens > 0 || usage.output_tokens > 0 {
*last_usage = usage.clone();
}
// Mark that we've seen MessageDelta (need to send MessageStop later)
self.seen_message_delta = true;
// Don't push the new event, we've merged it
return;
}
}
// No previous MessageDelta to merge with, add this one
self.buffered_events.push(event);
self.seen_message_delta = true;
}
MessagesStreamEvent::ContentBlockStop { .. } => {
// ContentBlockStop received from upstream (e.g., Bedrock)
// Clear the flag so we don't inject another one
self.needs_content_block_stop = false;
self.buffered_events.push(event);
}
MessagesStreamEvent::MessageStop => {
// MessageStop received from upstream (e.g., OpenAI via [DONE])
// Clear the flag so we don't inject another one
self.seen_message_delta = false;
self.buffered_events.push(event);
}
_ => {
// Other Anthropic event types (ContentBlockStop, MessageStop, etc.), just accumulate
// Other Anthropic event types (Ping, etc.), just accumulate
self.buffered_events.push(event);
}
}
@ -207,14 +244,26 @@ impl SseStreamBufferTrait for AnthropicMessagesStreamBuffer {
}
fn into_bytes(&mut self) -> Vec<u8> {
// Inject ContentBlockStop if needed before flushing
if self.needs_content_block_stop {
let content_block_stop = AnthropicMessagesStreamBuffer::create_content_block_stop_event();
self.buffered_events.push(content_block_stop);
self.needs_content_block_stop = false;
// Convert all accumulated events to bytes and clear buffer
// NOTE: We do NOT inject ContentBlockStop here because it's injected when we see MessageDelta
// or MessageStop. Injecting it here causes premature ContentBlockStop in the middle of streaming.
// Inject MessageStop after MessageDelta if we've seen one
// This completes the Anthropic Messages API event sequence
if self.seen_message_delta {
let message_stop = MessagesStreamEvent::MessageStop;
let sse_string: String = message_stop.into();
let message_stop_event = SseEvent {
data: None,
event: Some("message_stop".to_string()),
raw_line: sse_string.clone(),
sse_transformed_lines: sse_string,
provider_stream_response: None,
};
self.buffered_events.push(message_stop_event);
self.seen_message_delta = false;
}
// Convert all accumulated events to bytes and clear buffer
let mut buffer = Vec::new();
for event in self.buffered_events.drain(..) {
let event_bytes: Vec<u8> = event.into();
@ -344,10 +393,10 @@ data: {"id":"chatcmpl-456","object":"chat.completion.chunk","created":1234567890
assert!(output.contains("\"text\":\" in San Francisco\""), "Should have second content delta");
assert!(output.contains("\"text\":\" is\""), "Should have third content delta");
// For partial streams, the buffer will inject content_block_stop in into_bytes()
// because needs_content_block_stop is true. This is expected behavior to maintain
// proper Anthropic protocol even for incomplete streams.
assert!(output.contains("event: content_block_stop"), "Should have content_block_stop (injected at flush)");
// For partial streams (no finish_reason, no [DONE]), we do NOT inject content_block_stop
// because the stream may continue. This is correct behavior - only inject lifecycle events
// when we have explicit signals from upstream (finish_reason, [DONE], etc.)
assert!(!output.contains("event: content_block_stop"), "Should NOT have content_block_stop for partial stream");
// Should NOT have completion events
assert!(!output.contains("event: message_delta"), "Should NOT have message_delta");
@ -356,10 +405,10 @@ data: {"id":"chatcmpl-456","object":"chat.completion.chunk","created":1234567890
println!("\nVALIDATION SUMMARY:");
println!("{}", "-".repeat(80));
println!("✓ Partial transformation: OpenAI → Anthropic (stream interrupted)");
println!("✓ Injected: message_start, content_block_start at beginning, content_block_stop at flush");
println!("✓ Injected: message_start, content_block_start at beginning");
println!("✓ Incremental deltas: {} events (ALL content preserved!)", delta_count);
println!("✓ NO message completion events (partial stream, no [DONE])");
println!("✓ Buffer maintains Anthropic protocol even for incomplete streams\n");
println!("✓ NO completion events (partial stream, no [DONE])");
println!("✓ Buffer maintains Anthropic protocol for active streams\n");
}
#[test]

View file

@ -1,5 +1,5 @@
use crate::apis::amazon_bedrock::{
ContentBlockDelta, ConverseStreamEvent, StopReason,
ContentBlockDelta, ConverseStreamEvent,
};
use crate::apis::anthropic::{
MessagesContentBlock, MessagesContentDelta, MessagesMessageDelta,
@ -187,18 +187,19 @@ impl TryFrom<ConverseStreamEvent> for MessagesStreamEvent {
})
}
// MessageStop - convert to Anthropic MessageDelta with stop reason + MessageStop
// MessageStop - convert to Anthropic MessageDelta with stop reason
// Note: Bedrock sends Metadata separately with usage info, creating a second MessageDelta
// The client should merge these or use the final one with complete usage
ConverseStreamEvent::MessageStop(stop_event) => {
let anthropic_stop_reason = match stop_event.stop_reason {
StopReason::EndTurn => MessagesStopReason::EndTurn,
StopReason::ToolUse => MessagesStopReason::ToolUse,
StopReason::MaxTokens => MessagesStopReason::MaxTokens,
StopReason::StopSequence => MessagesStopReason::EndTurn,
StopReason::GuardrailIntervened => MessagesStopReason::Refusal,
StopReason::ContentFiltered => MessagesStopReason::Refusal,
crate::apis::amazon_bedrock::StopReason::EndTurn => MessagesStopReason::EndTurn,
crate::apis::amazon_bedrock::StopReason::ToolUse => MessagesStopReason::ToolUse,
crate::apis::amazon_bedrock::StopReason::MaxTokens => MessagesStopReason::MaxTokens,
crate::apis::amazon_bedrock::StopReason::StopSequence => MessagesStopReason::EndTurn,
crate::apis::amazon_bedrock::StopReason::GuardrailIntervened => MessagesStopReason::Refusal,
crate::apis::amazon_bedrock::StopReason::ContentFiltered => MessagesStopReason::Refusal,
};
// Return MessageDelta (MessageStop will be sent separately by the streaming handler)
Ok(MessagesStreamEvent::MessageDelta {
delta: MessagesMessageDelta {
stop_reason: anthropic_stop_reason,

View file

@ -550,7 +550,19 @@ impl StreamContext {
// Get accumulated bytes from buffer and return
match self.sse_buffer.as_mut() {
Some(buffer) => Ok(buffer.into_bytes()),
Some(buffer) => {
let bytes = buffer.into_bytes();
if !bytes.is_empty() {
let content = String::from_utf8_lossy(&bytes);
debug!(
"[ARCHGW_REQ_ID:{}] UPSTREAM_TRANSFORMED_CLIENT_RESPONSE: size={} content={}",
self.request_identifier(),
bytes.len(),
content
);
}
Ok(bytes)
}
None => {
warn!("SSE buffer unexpectedly missing after initialization");
Err(Action::Continue)
@ -657,7 +669,19 @@ impl StreamContext {
// Get accumulated bytes from buffer and return
match self.sse_buffer.as_mut() {
Some(buffer) => Ok(buffer.into_bytes()),
Some(buffer) => {
let bytes = buffer.into_bytes();
if !bytes.is_empty() {
let content = String::from_utf8_lossy(&bytes);
debug!(
"[ARCHGW_REQ_ID:{}] UPSTREAM_TRANSFORMED_CLIENT_RESPONSE: size={} content={}",
self.request_identifier(),
bytes.len(),
content
);
}
Ok(bytes)
}
None => {
warn!(
"[ARCHGW_REQ_ID:{}] BEDROCK_BUFFER_MISSING",