diff --git a/crates/hermesllm/src/apis/streaming_shapes/anthropic_streaming_buffer.rs b/crates/hermesllm/src/apis/streaming_shapes/anthropic_streaming_buffer.rs index eb9ec5b1..d3e3bbff 100644 --- a/crates/hermesllm/src/apis/streaming_shapes/anthropic_streaming_buffer.rs +++ b/crates/hermesllm/src/apis/streaming_shapes/anthropic_streaming_buffer.rs @@ -1,6 +1,9 @@ -use crate::apis::anthropic::MessagesStreamEvent; +use crate::apis::anthropic::{ + MessagesMessageDelta, MessagesStopReason, MessagesStreamEvent, MessagesUsage, +}; use crate::apis::streaming_shapes::sse::{SseEvent, SseStreamBufferTrait}; use crate::providers::streaming_response::ProviderStreamResponseType; +use log::warn; use std::collections::HashSet; /// SSE Stream Buffer for Anthropic Messages API streaming. @@ -11,13 +14,24 @@ use std::collections::HashSet; /// /// When converting from OpenAI to Anthropic format, this buffer injects the required /// ContentBlockStart and ContentBlockStop events to maintain proper Anthropic protocol. +/// +/// Guarantees (Anthropic Messages API contract): +/// 1. `message_stop` is never emitted unless a matching `message_start` was emitted first. +/// 2. `message_stop` is emitted at most once per stream (no double-close). +/// 3. If upstream terminates with no content (empty/filtered/errored response), a +/// minimal but well-formed envelope is synthesized so the client's state machine +/// stays consistent. pub struct AnthropicMessagesStreamBuffer { /// Buffered SSE events ready to be written to wire buffered_events: Vec, - /// Track if we've seen a message_start event + /// Track if we've emitted a message_start event message_started: bool, + /// Track if we've emitted a terminal message_stop event (for idempotency / + /// double-close protection). + message_stopped: bool, + /// Track content block indices that have received ContentBlockStart events content_block_start_indices: HashSet, @@ -42,6 +56,7 @@ impl AnthropicMessagesStreamBuffer { Self { buffered_events: Vec::new(), message_started: false, + message_stopped: false, content_block_start_indices: HashSet::new(), needs_content_block_stop: false, seen_message_delta: false, @@ -49,6 +64,66 @@ impl AnthropicMessagesStreamBuffer { } } + /// Inject a `message_start` event into the buffer if one hasn't been emitted yet. + /// This is the single source of truth for opening a message — every handler + /// that can legitimately be the first event on the wire must call this before + /// pushing its own event. + fn ensure_message_started(&mut self) { + if self.message_started { + return; + } + let model = self.model.as_deref().unwrap_or("unknown"); + let message_start = AnthropicMessagesStreamBuffer::create_message_start_event(model); + self.buffered_events.push(message_start); + self.message_started = true; + } + + /// Inject a synthetic `message_delta` with `end_turn` / zero usage. + /// Used when we must close a message but upstream never produced a terminal + /// event (e.g. `[DONE]` arrives with no prior `finish_reason`). + fn push_synthetic_message_delta(&mut self) { + let event = MessagesStreamEvent::MessageDelta { + delta: MessagesMessageDelta { + stop_reason: MessagesStopReason::EndTurn, + stop_sequence: None, + }, + usage: MessagesUsage { + input_tokens: 0, + output_tokens: 0, + cache_creation_input_tokens: None, + cache_read_input_tokens: None, + }, + }; + let sse_string: String = event.clone().into(); + self.buffered_events.push(SseEvent { + data: None, + event: Some("message_delta".to_string()), + raw_line: sse_string.clone(), + sse_transformed_lines: sse_string, + provider_stream_response: Some(ProviderStreamResponseType::MessagesStreamEvent(event)), + }); + self.seen_message_delta = true; + } + + /// Inject a `message_stop` event into the buffer, marking the stream as closed. + /// Idempotent — subsequent calls are no-ops. + fn push_message_stop(&mut self) { + if self.message_stopped { + return; + } + let message_stop = MessagesStreamEvent::MessageStop; + let sse_string: String = message_stop.into(); + self.buffered_events.push(SseEvent { + data: None, + event: Some("message_stop".to_string()), + raw_line: sse_string.clone(), + sse_transformed_lines: sse_string, + provider_stream_response: None, + }); + self.message_stopped = true; + self.seen_message_delta = false; + } + /// Check if a content_block_start event has been sent for the given index fn has_content_block_start_been_sent(&self, index: i32) -> bool { self.content_block_start_indices.contains(&index) @@ -149,6 +224,27 @@ impl SseStreamBufferTrait for AnthropicMessagesStreamBuffer { // We match on a reference first to determine the type, then move the event match &event.provider_stream_response { Some(ProviderStreamResponseType::MessagesStreamEvent(evt)) => { + // If the message has already been closed, drop any trailing events + // to avoid emitting data after `message_stop` (protocol violation). + // This typically indicates a duplicate `[DONE]` from upstream or a + // replay of previously-buffered bytes — worth surfacing so we can + // spot misbehaving providers. + if self.message_stopped { + warn!( + "anthropic stream buffer: dropping event after message_stop (variant={})", + match evt { + MessagesStreamEvent::MessageStart { .. } => "message_start", + MessagesStreamEvent::ContentBlockStart { .. } => "content_block_start", + MessagesStreamEvent::ContentBlockDelta { .. } => "content_block_delta", + MessagesStreamEvent::ContentBlockStop { .. } => "content_block_stop", + MessagesStreamEvent::MessageDelta { .. } => "message_delta", + MessagesStreamEvent::MessageStop => "message_stop", + MessagesStreamEvent::Ping => "ping", + } + ); + return; + } + match evt { MessagesStreamEvent::MessageStart { .. } => { // Add the message_start event @@ -157,14 +253,7 @@ impl SseStreamBufferTrait for AnthropicMessagesStreamBuffer { } MessagesStreamEvent::ContentBlockStart { index, .. } => { let index = *index as i32; - // Inject message_start if needed - if !self.message_started { - let model = self.model.as_deref().unwrap_or("unknown"); - let message_start = - AnthropicMessagesStreamBuffer::create_message_start_event(model); - self.buffered_events.push(message_start); - self.message_started = true; - } + self.ensure_message_started(); // Add the content_block_start event (from tool calls or other sources) self.buffered_events.push(event); @@ -173,14 +262,7 @@ impl SseStreamBufferTrait for AnthropicMessagesStreamBuffer { } MessagesStreamEvent::ContentBlockDelta { index, .. } => { let index = *index as i32; - // Inject message_start if needed - if !self.message_started { - let model = self.model.as_deref().unwrap_or("unknown"); - let message_start = - AnthropicMessagesStreamBuffer::create_message_start_event(model); - self.buffered_events.push(message_start); - self.message_started = true; - } + self.ensure_message_started(); // Check if ContentBlockStart was sent for this index if !self.has_content_block_start_been_sent(index) { @@ -196,6 +278,11 @@ impl SseStreamBufferTrait for AnthropicMessagesStreamBuffer { self.buffered_events.push(event); } MessagesStreamEvent::MessageDelta { usage, .. } => { + // `message_delta` is only meaningful inside an open message. + // Upstream can send it with no prior content (empty completion, + // content filter, etc.), so we must open a message first. + self.ensure_message_started(); + // Inject ContentBlockStop before message_delta if self.needs_content_block_stop { let content_block_stop = @@ -230,15 +317,52 @@ impl SseStreamBufferTrait for AnthropicMessagesStreamBuffer { } MessagesStreamEvent::ContentBlockStop { .. } => { // ContentBlockStop received from upstream (e.g., Bedrock) + self.ensure_message_started(); // Clear the flag so we don't inject another one self.needs_content_block_stop = false; self.buffered_events.push(event); } MessagesStreamEvent::MessageStop => { - // MessageStop received from upstream (e.g., OpenAI via [DONE]) - // Clear the flag so we don't inject another one - self.seen_message_delta = false; + // MessageStop received from upstream (e.g., OpenAI via [DONE]). + // + // The Anthropic protocol requires the full envelope + // message_start → [content blocks] → message_delta → message_stop + // so we must not emit a bare `message_stop`. Synthesize whatever + // is missing to keep the client's state machine consistent. + self.ensure_message_started(); + + if self.needs_content_block_stop { + let content_block_stop = + AnthropicMessagesStreamBuffer::create_content_block_stop_event(); + self.buffered_events.push(content_block_stop); + self.needs_content_block_stop = false; + } + + // If no message_delta has been emitted yet (empty/filtered upstream + // response), synthesize a minimal one carrying `end_turn`. + if !self.seen_message_delta { + // If we also never opened a content block, open and close one + // so clients that expect at least one block are happy. + if self.content_block_start_indices.is_empty() { + let content_block_start = + AnthropicMessagesStreamBuffer::create_content_block_start_event( + ); + self.buffered_events.push(content_block_start); + self.set_content_block_start_sent(0); + let content_block_stop = + AnthropicMessagesStreamBuffer::create_content_block_stop_event( + ); + self.buffered_events.push(content_block_stop); + } + self.push_synthetic_message_delta(); + } + + // Push the upstream-provided message_stop and mark closed. + // `push_message_stop` is idempotent but we want to reuse the + // original SseEvent so raw passthrough semantics are preserved. self.buffered_events.push(event); + self.message_stopped = true; + self.seen_message_delta = false; } _ => { // Other Anthropic event types (Ping, etc.), just accumulate @@ -254,24 +378,23 @@ impl SseStreamBufferTrait for AnthropicMessagesStreamBuffer { } fn to_bytes(&mut self) -> Vec { - // Convert all accumulated events to bytes and clear buffer + // Convert all accumulated events to bytes and clear buffer. + // // NOTE: We do NOT inject ContentBlockStop here because it's injected when we see MessageDelta // or MessageStop. Injecting it here causes premature ContentBlockStop in the middle of streaming. - - // Inject MessageStop after MessageDelta if we've seen one - // This completes the Anthropic Messages API event sequence - if self.seen_message_delta { - let message_stop = MessagesStreamEvent::MessageStop; - let sse_string: String = message_stop.into(); - let message_stop_event = SseEvent { - data: None, - event: Some("message_stop".to_string()), - raw_line: sse_string.clone(), - sse_transformed_lines: sse_string, - provider_stream_response: None, - }; - self.buffered_events.push(message_stop_event); - self.seen_message_delta = false; + // + // Inject a synthetic `message_stop` only when: + // 1. A `message_delta` has been seen (otherwise we'd violate the Anthropic + // protocol by emitting `message_stop` without a preceding `message_delta`), AND + // 2. We haven't already emitted `message_stop` (either synthetic from a + // previous flush, or real from an upstream `[DONE]`). + // + // Without the `!message_stopped` guard, a stream whose `finish_reason` chunk + // and `[DONE]` marker land in separate HTTP body chunks would receive two + // `message_stop` events, triggering Claude Code's "Received message_stop + // without a current message" error. + if self.seen_message_delta && !self.message_stopped { + self.push_message_stop(); } let mut buffer = Vec::new(); @@ -615,4 +738,133 @@ data: [DONE]"#; println!("✓ Stop reason: tool_use"); println!("✓ Proper Anthropic tool_use protocol\n"); } + + /// Regression test for: + /// Claude Code CLI error: "Received message_stop without a current message" + /// + /// Reproduces the *double-close* scenario: OpenAI's final `finish_reason` + /// chunk and the `[DONE]` marker arrive in **separate** HTTP body chunks, so + /// `to_bytes()` is called between them. Before the fix, this produced two + /// `message_stop` events on the wire (one synthetic, one from `[DONE]`). + #[test] + fn test_openai_to_anthropic_emits_single_message_stop_across_chunk_boundary() { + let client_api = SupportedAPIsFromClient::AnthropicMessagesAPI(AnthropicApi::Messages); + let upstream_api = SupportedUpstreamAPIs::OpenAIChatCompletions(OpenAIApi::ChatCompletions); + let mut buffer = AnthropicMessagesStreamBuffer::new(); + + // --- HTTP chunk 1: content + finish_reason (no [DONE] yet) ----------- + let chunk_1 = r#"data: {"id":"c1","object":"chat.completion.chunk","created":1,"model":"gpt-4o","choices":[{"index":0,"delta":{"role":"assistant","content":"Hi"},"finish_reason":null}]} + +data: {"id":"c1","object":"chat.completion.chunk","created":1,"model":"gpt-4o","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}"#; + + for raw in SseStreamIter::try_from(chunk_1.as_bytes()).unwrap() { + let e = SseEvent::try_from((raw, &client_api, &upstream_api)).unwrap(); + buffer.add_transformed_event(e); + } + let out_1 = String::from_utf8(buffer.to_bytes()).unwrap(); + + // --- HTTP chunk 2: just the [DONE] marker ---------------------------- + let chunk_2 = "data: [DONE]"; + for raw in SseStreamIter::try_from(chunk_2.as_bytes()).unwrap() { + let e = SseEvent::try_from((raw, &client_api, &upstream_api)).unwrap(); + buffer.add_transformed_event(e); + } + let out_2 = String::from_utf8(buffer.to_bytes()).unwrap(); + + let combined = format!("{}{}", out_1, out_2); + let start_count = combined.matches("event: message_start").count(); + let stop_count = combined.matches("event: message_stop").count(); + + assert_eq!( + start_count, 1, + "Must emit exactly one message_start across chunks, got {start_count}. Output:\n{combined}" + ); + assert_eq!( + stop_count, 1, + "Must emit exactly one message_stop across chunks (no double-close), got {stop_count}. Output:\n{combined}" + ); + // Every message_stop must be preceded by a message_start earlier in the stream. + let start_pos = combined.find("event: message_start").unwrap(); + let stop_pos = combined.find("event: message_stop").unwrap(); + assert!( + start_pos < stop_pos, + "message_start must come before message_stop. Output:\n{combined}" + ); + } + + /// Regression test for: + /// "Received message_stop without a current message" on empty upstream responses. + /// + /// OpenAI returns only `[DONE]` with no content deltas and no `finish_reason` + /// (this happens with content filters, truncated upstream streams, and some + /// 5xx recoveries). Before the fix, the buffer emitted a bare `message_stop` + /// with no preceding `message_start`. After the fix, it synthesizes a + /// minimal but well-formed envelope. + #[test] + fn test_openai_done_only_stream_synthesizes_valid_envelope() { + let client_api = SupportedAPIsFromClient::AnthropicMessagesAPI(AnthropicApi::Messages); + let upstream_api = SupportedUpstreamAPIs::OpenAIChatCompletions(OpenAIApi::ChatCompletions); + let mut buffer = AnthropicMessagesStreamBuffer::new(); + + let raw_input = "data: [DONE]"; + for raw in SseStreamIter::try_from(raw_input.as_bytes()).unwrap() { + let e = SseEvent::try_from((raw, &client_api, &upstream_api)).unwrap(); + buffer.add_transformed_event(e); + } + let out = String::from_utf8(buffer.to_bytes()).unwrap(); + + assert!( + out.contains("event: message_start"), + "Empty upstream must still produce message_start. Output:\n{out}" + ); + assert!( + out.contains("event: message_delta"), + "Empty upstream must produce a synthesized message_delta. Output:\n{out}" + ); + assert_eq!( + out.matches("event: message_stop").count(), + 1, + "Empty upstream must produce exactly one message_stop. Output:\n{out}" + ); + + // Protocol ordering: start < delta < stop. + let p_start = out.find("event: message_start").unwrap(); + let p_delta = out.find("event: message_delta").unwrap(); + let p_stop = out.find("event: message_stop").unwrap(); + assert!( + p_start < p_delta && p_delta < p_stop, + "Bad ordering. Output:\n{out}" + ); + } + + /// Regression test: events arriving after `message_stop` (e.g. a stray `[DONE]` + /// echo, or late-arriving deltas from a racing upstream) must be dropped + /// rather than written after the terminal frame. + #[test] + fn test_events_after_message_stop_are_dropped() { + let client_api = SupportedAPIsFromClient::AnthropicMessagesAPI(AnthropicApi::Messages); + let upstream_api = SupportedUpstreamAPIs::OpenAIChatCompletions(OpenAIApi::ChatCompletions); + let mut buffer = AnthropicMessagesStreamBuffer::new(); + + let first = r#"data: {"id":"c1","object":"chat.completion.chunk","created":1,"model":"gpt-4o","choices":[{"index":0,"delta":{"content":"ok"},"finish_reason":"stop"}]} + +data: [DONE]"#; + for raw in SseStreamIter::try_from(first.as_bytes()).unwrap() { + let e = SseEvent::try_from((raw, &client_api, &upstream_api)).unwrap(); + buffer.add_transformed_event(e); + } + let _ = buffer.to_bytes(); + + // Simulate a duplicate / late `[DONE]` after the stream was already closed. + let late = "data: [DONE]"; + for raw in SseStreamIter::try_from(late.as_bytes()).unwrap() { + let e = SseEvent::try_from((raw, &client_api, &upstream_api)).unwrap(); + buffer.add_transformed_event(e); + } + let tail = String::from_utf8(buffer.to_bytes()).unwrap(); + assert!( + tail.is_empty(), + "No bytes should be emitted after message_stop, got: {tail:?}" + ); + } }