fix(anthropic-stream): avoid bare/duplicate message_stop on OpenAI upstream (#898)

This commit is contained in:
Adil Hafeez 2026-04-18 15:57:34 -07:00 committed by GitHub
parent 254d2b03bc
commit e7464b817a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1,6 +1,9 @@
use crate::apis::anthropic::MessagesStreamEvent;
use crate::apis::anthropic::{
MessagesMessageDelta, MessagesStopReason, MessagesStreamEvent, MessagesUsage,
};
use crate::apis::streaming_shapes::sse::{SseEvent, SseStreamBufferTrait};
use crate::providers::streaming_response::ProviderStreamResponseType;
use log::warn;
use std::collections::HashSet;
/// SSE Stream Buffer for Anthropic Messages API streaming.
@ -11,13 +14,24 @@ use std::collections::HashSet;
///
/// When converting from OpenAI to Anthropic format, this buffer injects the required
/// ContentBlockStart and ContentBlockStop events to maintain proper Anthropic protocol.
///
/// Guarantees (Anthropic Messages API contract):
/// 1. `message_stop` is never emitted unless a matching `message_start` was emitted first.
/// 2. `message_stop` is emitted at most once per stream (no double-close).
/// 3. If upstream terminates with no content (empty/filtered/errored response), a
/// minimal but well-formed envelope is synthesized so the client's state machine
/// stays consistent.
pub struct AnthropicMessagesStreamBuffer {
/// Buffered SSE events ready to be written to wire
buffered_events: Vec<SseEvent>,
/// Track if we've seen a message_start event
/// Track if we've emitted a message_start event
message_started: bool,
/// Track if we've emitted a terminal message_stop event (for idempotency /
/// double-close protection).
message_stopped: bool,
/// Track content block indices that have received ContentBlockStart events
content_block_start_indices: HashSet<i32>,
@ -42,6 +56,7 @@ impl AnthropicMessagesStreamBuffer {
Self {
buffered_events: Vec::new(),
message_started: false,
message_stopped: false,
content_block_start_indices: HashSet::new(),
needs_content_block_stop: false,
seen_message_delta: false,
@ -49,6 +64,66 @@ impl AnthropicMessagesStreamBuffer {
}
}
/// Inject a `message_start` event into the buffer if one hasn't been emitted yet.
/// This is the single source of truth for opening a message — every handler
/// that can legitimately be the first event on the wire must call this before
/// pushing its own event.
fn ensure_message_started(&mut self) {
if self.message_started {
return;
}
let model = self.model.as_deref().unwrap_or("unknown");
let message_start = AnthropicMessagesStreamBuffer::create_message_start_event(model);
self.buffered_events.push(message_start);
self.message_started = true;
}
/// Inject a synthetic `message_delta` with `end_turn` / zero usage.
/// Used when we must close a message but upstream never produced a terminal
/// event (e.g. `[DONE]` arrives with no prior `finish_reason`).
fn push_synthetic_message_delta(&mut self) {
let event = MessagesStreamEvent::MessageDelta {
delta: MessagesMessageDelta {
stop_reason: MessagesStopReason::EndTurn,
stop_sequence: None,
},
usage: MessagesUsage {
input_tokens: 0,
output_tokens: 0,
cache_creation_input_tokens: None,
cache_read_input_tokens: None,
},
};
let sse_string: String = event.clone().into();
self.buffered_events.push(SseEvent {
data: None,
event: Some("message_delta".to_string()),
raw_line: sse_string.clone(),
sse_transformed_lines: sse_string,
provider_stream_response: Some(ProviderStreamResponseType::MessagesStreamEvent(event)),
});
self.seen_message_delta = true;
}
/// Inject a `message_stop` event into the buffer, marking the stream as closed.
/// Idempotent — subsequent calls are no-ops.
fn push_message_stop(&mut self) {
if self.message_stopped {
return;
}
let message_stop = MessagesStreamEvent::MessageStop;
let sse_string: String = message_stop.into();
self.buffered_events.push(SseEvent {
data: None,
event: Some("message_stop".to_string()),
raw_line: sse_string.clone(),
sse_transformed_lines: sse_string,
provider_stream_response: None,
});
self.message_stopped = true;
self.seen_message_delta = false;
}
/// Check if a content_block_start event has been sent for the given index
fn has_content_block_start_been_sent(&self, index: i32) -> bool {
self.content_block_start_indices.contains(&index)
@ -149,6 +224,27 @@ impl SseStreamBufferTrait for AnthropicMessagesStreamBuffer {
// We match on a reference first to determine the type, then move the event
match &event.provider_stream_response {
Some(ProviderStreamResponseType::MessagesStreamEvent(evt)) => {
// If the message has already been closed, drop any trailing events
// to avoid emitting data after `message_stop` (protocol violation).
// This typically indicates a duplicate `[DONE]` from upstream or a
// replay of previously-buffered bytes — worth surfacing so we can
// spot misbehaving providers.
if self.message_stopped {
warn!(
"anthropic stream buffer: dropping event after message_stop (variant={})",
match evt {
MessagesStreamEvent::MessageStart { .. } => "message_start",
MessagesStreamEvent::ContentBlockStart { .. } => "content_block_start",
MessagesStreamEvent::ContentBlockDelta { .. } => "content_block_delta",
MessagesStreamEvent::ContentBlockStop { .. } => "content_block_stop",
MessagesStreamEvent::MessageDelta { .. } => "message_delta",
MessagesStreamEvent::MessageStop => "message_stop",
MessagesStreamEvent::Ping => "ping",
}
);
return;
}
match evt {
MessagesStreamEvent::MessageStart { .. } => {
// Add the message_start event
@ -157,14 +253,7 @@ impl SseStreamBufferTrait for AnthropicMessagesStreamBuffer {
}
MessagesStreamEvent::ContentBlockStart { index, .. } => {
let index = *index as i32;
// Inject message_start if needed
if !self.message_started {
let model = self.model.as_deref().unwrap_or("unknown");
let message_start =
AnthropicMessagesStreamBuffer::create_message_start_event(model);
self.buffered_events.push(message_start);
self.message_started = true;
}
self.ensure_message_started();
// Add the content_block_start event (from tool calls or other sources)
self.buffered_events.push(event);
@ -173,14 +262,7 @@ impl SseStreamBufferTrait for AnthropicMessagesStreamBuffer {
}
MessagesStreamEvent::ContentBlockDelta { index, .. } => {
let index = *index as i32;
// Inject message_start if needed
if !self.message_started {
let model = self.model.as_deref().unwrap_or("unknown");
let message_start =
AnthropicMessagesStreamBuffer::create_message_start_event(model);
self.buffered_events.push(message_start);
self.message_started = true;
}
self.ensure_message_started();
// Check if ContentBlockStart was sent for this index
if !self.has_content_block_start_been_sent(index) {
@ -196,6 +278,11 @@ impl SseStreamBufferTrait for AnthropicMessagesStreamBuffer {
self.buffered_events.push(event);
}
MessagesStreamEvent::MessageDelta { usage, .. } => {
// `message_delta` is only meaningful inside an open message.
// Upstream can send it with no prior content (empty completion,
// content filter, etc.), so we must open a message first.
self.ensure_message_started();
// Inject ContentBlockStop before message_delta
if self.needs_content_block_stop {
let content_block_stop =
@ -230,15 +317,52 @@ impl SseStreamBufferTrait for AnthropicMessagesStreamBuffer {
}
MessagesStreamEvent::ContentBlockStop { .. } => {
// ContentBlockStop received from upstream (e.g., Bedrock)
self.ensure_message_started();
// Clear the flag so we don't inject another one
self.needs_content_block_stop = false;
self.buffered_events.push(event);
}
MessagesStreamEvent::MessageStop => {
// MessageStop received from upstream (e.g., OpenAI via [DONE])
// Clear the flag so we don't inject another one
self.seen_message_delta = false;
// MessageStop received from upstream (e.g., OpenAI via [DONE]).
//
// The Anthropic protocol requires the full envelope
// message_start → [content blocks] → message_delta → message_stop
// so we must not emit a bare `message_stop`. Synthesize whatever
// is missing to keep the client's state machine consistent.
self.ensure_message_started();
if self.needs_content_block_stop {
let content_block_stop =
AnthropicMessagesStreamBuffer::create_content_block_stop_event();
self.buffered_events.push(content_block_stop);
self.needs_content_block_stop = false;
}
// If no message_delta has been emitted yet (empty/filtered upstream
// response), synthesize a minimal one carrying `end_turn`.
if !self.seen_message_delta {
// If we also never opened a content block, open and close one
// so clients that expect at least one block are happy.
if self.content_block_start_indices.is_empty() {
let content_block_start =
AnthropicMessagesStreamBuffer::create_content_block_start_event(
);
self.buffered_events.push(content_block_start);
self.set_content_block_start_sent(0);
let content_block_stop =
AnthropicMessagesStreamBuffer::create_content_block_stop_event(
);
self.buffered_events.push(content_block_stop);
}
self.push_synthetic_message_delta();
}
// Push the upstream-provided message_stop and mark closed.
// `push_message_stop` is idempotent but we want to reuse the
// original SseEvent so raw passthrough semantics are preserved.
self.buffered_events.push(event);
self.message_stopped = true;
self.seen_message_delta = false;
}
_ => {
// Other Anthropic event types (Ping, etc.), just accumulate
@ -254,24 +378,23 @@ impl SseStreamBufferTrait for AnthropicMessagesStreamBuffer {
}
fn to_bytes(&mut self) -> Vec<u8> {
// Convert all accumulated events to bytes and clear buffer
// Convert all accumulated events to bytes and clear buffer.
//
// NOTE: We do NOT inject ContentBlockStop here because it's injected when we see MessageDelta
// or MessageStop. Injecting it here causes premature ContentBlockStop in the middle of streaming.
// Inject MessageStop after MessageDelta if we've seen one
// This completes the Anthropic Messages API event sequence
if self.seen_message_delta {
let message_stop = MessagesStreamEvent::MessageStop;
let sse_string: String = message_stop.into();
let message_stop_event = SseEvent {
data: None,
event: Some("message_stop".to_string()),
raw_line: sse_string.clone(),
sse_transformed_lines: sse_string,
provider_stream_response: None,
};
self.buffered_events.push(message_stop_event);
self.seen_message_delta = false;
//
// Inject a synthetic `message_stop` only when:
// 1. A `message_delta` has been seen (otherwise we'd violate the Anthropic
// protocol by emitting `message_stop` without a preceding `message_delta`), AND
// 2. We haven't already emitted `message_stop` (either synthetic from a
// previous flush, or real from an upstream `[DONE]`).
//
// Without the `!message_stopped` guard, a stream whose `finish_reason` chunk
// and `[DONE]` marker land in separate HTTP body chunks would receive two
// `message_stop` events, triggering Claude Code's "Received message_stop
// without a current message" error.
if self.seen_message_delta && !self.message_stopped {
self.push_message_stop();
}
let mut buffer = Vec::new();
@ -615,4 +738,133 @@ data: [DONE]"#;
println!("✓ Stop reason: tool_use");
println!("✓ Proper Anthropic tool_use protocol\n");
}
/// Regression test for:
/// Claude Code CLI error: "Received message_stop without a current message"
///
/// Reproduces the *double-close* scenario: OpenAI's final `finish_reason`
/// chunk and the `[DONE]` marker arrive in **separate** HTTP body chunks, so
/// `to_bytes()` is called between them. Before the fix, this produced two
/// `message_stop` events on the wire (one synthetic, one from `[DONE]`).
#[test]
fn test_openai_to_anthropic_emits_single_message_stop_across_chunk_boundary() {
let client_api = SupportedAPIsFromClient::AnthropicMessagesAPI(AnthropicApi::Messages);
let upstream_api = SupportedUpstreamAPIs::OpenAIChatCompletions(OpenAIApi::ChatCompletions);
let mut buffer = AnthropicMessagesStreamBuffer::new();
// --- HTTP chunk 1: content + finish_reason (no [DONE] yet) -----------
let chunk_1 = r#"data: {"id":"c1","object":"chat.completion.chunk","created":1,"model":"gpt-4o","choices":[{"index":0,"delta":{"role":"assistant","content":"Hi"},"finish_reason":null}]}
data: {"id":"c1","object":"chat.completion.chunk","created":1,"model":"gpt-4o","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}"#;
for raw in SseStreamIter::try_from(chunk_1.as_bytes()).unwrap() {
let e = SseEvent::try_from((raw, &client_api, &upstream_api)).unwrap();
buffer.add_transformed_event(e);
}
let out_1 = String::from_utf8(buffer.to_bytes()).unwrap();
// --- HTTP chunk 2: just the [DONE] marker ----------------------------
let chunk_2 = "data: [DONE]";
for raw in SseStreamIter::try_from(chunk_2.as_bytes()).unwrap() {
let e = SseEvent::try_from((raw, &client_api, &upstream_api)).unwrap();
buffer.add_transformed_event(e);
}
let out_2 = String::from_utf8(buffer.to_bytes()).unwrap();
let combined = format!("{}{}", out_1, out_2);
let start_count = combined.matches("event: message_start").count();
let stop_count = combined.matches("event: message_stop").count();
assert_eq!(
start_count, 1,
"Must emit exactly one message_start across chunks, got {start_count}. Output:\n{combined}"
);
assert_eq!(
stop_count, 1,
"Must emit exactly one message_stop across chunks (no double-close), got {stop_count}. Output:\n{combined}"
);
// Every message_stop must be preceded by a message_start earlier in the stream.
let start_pos = combined.find("event: message_start").unwrap();
let stop_pos = combined.find("event: message_stop").unwrap();
assert!(
start_pos < stop_pos,
"message_start must come before message_stop. Output:\n{combined}"
);
}
/// Regression test for:
/// "Received message_stop without a current message" on empty upstream responses.
///
/// OpenAI returns only `[DONE]` with no content deltas and no `finish_reason`
/// (this happens with content filters, truncated upstream streams, and some
/// 5xx recoveries). Before the fix, the buffer emitted a bare `message_stop`
/// with no preceding `message_start`. After the fix, it synthesizes a
/// minimal but well-formed envelope.
#[test]
fn test_openai_done_only_stream_synthesizes_valid_envelope() {
let client_api = SupportedAPIsFromClient::AnthropicMessagesAPI(AnthropicApi::Messages);
let upstream_api = SupportedUpstreamAPIs::OpenAIChatCompletions(OpenAIApi::ChatCompletions);
let mut buffer = AnthropicMessagesStreamBuffer::new();
let raw_input = "data: [DONE]";
for raw in SseStreamIter::try_from(raw_input.as_bytes()).unwrap() {
let e = SseEvent::try_from((raw, &client_api, &upstream_api)).unwrap();
buffer.add_transformed_event(e);
}
let out = String::from_utf8(buffer.to_bytes()).unwrap();
assert!(
out.contains("event: message_start"),
"Empty upstream must still produce message_start. Output:\n{out}"
);
assert!(
out.contains("event: message_delta"),
"Empty upstream must produce a synthesized message_delta. Output:\n{out}"
);
assert_eq!(
out.matches("event: message_stop").count(),
1,
"Empty upstream must produce exactly one message_stop. Output:\n{out}"
);
// Protocol ordering: start < delta < stop.
let p_start = out.find("event: message_start").unwrap();
let p_delta = out.find("event: message_delta").unwrap();
let p_stop = out.find("event: message_stop").unwrap();
assert!(
p_start < p_delta && p_delta < p_stop,
"Bad ordering. Output:\n{out}"
);
}
/// Regression test: events arriving after `message_stop` (e.g. a stray `[DONE]`
/// echo, or late-arriving deltas from a racing upstream) must be dropped
/// rather than written after the terminal frame.
#[test]
fn test_events_after_message_stop_are_dropped() {
let client_api = SupportedAPIsFromClient::AnthropicMessagesAPI(AnthropicApi::Messages);
let upstream_api = SupportedUpstreamAPIs::OpenAIChatCompletions(OpenAIApi::ChatCompletions);
let mut buffer = AnthropicMessagesStreamBuffer::new();
let first = r#"data: {"id":"c1","object":"chat.completion.chunk","created":1,"model":"gpt-4o","choices":[{"index":0,"delta":{"content":"ok"},"finish_reason":"stop"}]}
data: [DONE]"#;
for raw in SseStreamIter::try_from(first.as_bytes()).unwrap() {
let e = SseEvent::try_from((raw, &client_api, &upstream_api)).unwrap();
buffer.add_transformed_event(e);
}
let _ = buffer.to_bytes();
// Simulate a duplicate / late `[DONE]` after the stream was already closed.
let late = "data: [DONE]";
for raw in SseStreamIter::try_from(late.as_bytes()).unwrap() {
let e = SseEvent::try_from((raw, &client_api, &upstream_api)).unwrap();
buffer.add_transformed_event(e);
}
let tail = String::from_utf8(buffer.to_bytes()).unwrap();
assert!(
tail.is_empty(),
"No bytes should be emitted after message_stop, got: {tail:?}"
);
}
}