mirror of
https://github.com/katanemo/plano.git
synced 2026-04-25 00:36:34 +02:00
fix(anthropic-stream): avoid bare/duplicate message_stop on OpenAI upstream
Made-with: Cursor
This commit is contained in:
parent
254d2b03bc
commit
6e1007249f
1 changed files with 273 additions and 37 deletions
|
|
@ -1,4 +1,6 @@
|
|||
use crate::apis::anthropic::MessagesStreamEvent;
|
||||
use crate::apis::anthropic::{
|
||||
MessagesMessageDelta, MessagesStopReason, MessagesStreamEvent, MessagesUsage,
|
||||
};
|
||||
use crate::apis::streaming_shapes::sse::{SseEvent, SseStreamBufferTrait};
|
||||
use crate::providers::streaming_response::ProviderStreamResponseType;
|
||||
use std::collections::HashSet;
|
||||
|
|
@ -11,13 +13,24 @@ use std::collections::HashSet;
|
|||
///
|
||||
/// When converting from OpenAI to Anthropic format, this buffer injects the required
|
||||
/// ContentBlockStart and ContentBlockStop events to maintain proper Anthropic protocol.
|
||||
///
|
||||
/// Guarantees (Anthropic Messages API contract):
|
||||
/// 1. `message_stop` is never emitted unless a matching `message_start` was emitted first.
|
||||
/// 2. `message_stop` is emitted at most once per stream (no double-close).
|
||||
/// 3. If upstream terminates with no content (empty/filtered/errored response), a
|
||||
/// minimal but well-formed envelope is synthesized so the client's state machine
|
||||
/// stays consistent.
|
||||
pub struct AnthropicMessagesStreamBuffer {
|
||||
/// Buffered SSE events ready to be written to wire
|
||||
buffered_events: Vec<SseEvent>,
|
||||
|
||||
/// Track if we've seen a message_start event
|
||||
/// Track if we've emitted a message_start event
|
||||
message_started: bool,
|
||||
|
||||
/// Track if we've emitted a terminal message_stop event (for idempotency /
|
||||
/// double-close protection).
|
||||
message_stopped: bool,
|
||||
|
||||
/// Track content block indices that have received ContentBlockStart events
|
||||
content_block_start_indices: HashSet<i32>,
|
||||
|
||||
|
|
@ -42,6 +55,7 @@ impl AnthropicMessagesStreamBuffer {
|
|||
Self {
|
||||
buffered_events: Vec::new(),
|
||||
message_started: false,
|
||||
message_stopped: false,
|
||||
content_block_start_indices: HashSet::new(),
|
||||
needs_content_block_stop: false,
|
||||
seen_message_delta: false,
|
||||
|
|
@ -49,6 +63,66 @@ impl AnthropicMessagesStreamBuffer {
|
|||
}
|
||||
}
|
||||
|
||||
/// Inject a `message_start` event into the buffer if one hasn't been emitted yet.
|
||||
/// This is the single source of truth for opening a message — every handler
|
||||
/// that can legitimately be the first event on the wire must call this before
|
||||
/// pushing its own event.
|
||||
fn ensure_message_started(&mut self) {
|
||||
if self.message_started {
|
||||
return;
|
||||
}
|
||||
let model = self.model.as_deref().unwrap_or("unknown");
|
||||
let message_start = AnthropicMessagesStreamBuffer::create_message_start_event(model);
|
||||
self.buffered_events.push(message_start);
|
||||
self.message_started = true;
|
||||
}
|
||||
|
||||
/// Inject a synthetic `message_delta` with `end_turn` / zero usage.
|
||||
/// Used when we must close a message but upstream never produced a terminal
|
||||
/// event (e.g. `[DONE]` arrives with no prior `finish_reason`).
|
||||
fn push_synthetic_message_delta(&mut self) {
|
||||
let event = MessagesStreamEvent::MessageDelta {
|
||||
delta: MessagesMessageDelta {
|
||||
stop_reason: MessagesStopReason::EndTurn,
|
||||
stop_sequence: None,
|
||||
},
|
||||
usage: MessagesUsage {
|
||||
input_tokens: 0,
|
||||
output_tokens: 0,
|
||||
cache_creation_input_tokens: None,
|
||||
cache_read_input_tokens: None,
|
||||
},
|
||||
};
|
||||
let sse_string: String = event.clone().into();
|
||||
self.buffered_events.push(SseEvent {
|
||||
data: None,
|
||||
event: Some("message_delta".to_string()),
|
||||
raw_line: sse_string.clone(),
|
||||
sse_transformed_lines: sse_string,
|
||||
provider_stream_response: Some(ProviderStreamResponseType::MessagesStreamEvent(event)),
|
||||
});
|
||||
self.seen_message_delta = true;
|
||||
}
|
||||
|
||||
/// Inject a `message_stop` event into the buffer, marking the stream as closed.
|
||||
/// Idempotent — subsequent calls are no-ops.
|
||||
fn push_message_stop(&mut self) {
|
||||
if self.message_stopped {
|
||||
return;
|
||||
}
|
||||
let message_stop = MessagesStreamEvent::MessageStop;
|
||||
let sse_string: String = message_stop.into();
|
||||
self.buffered_events.push(SseEvent {
|
||||
data: None,
|
||||
event: Some("message_stop".to_string()),
|
||||
raw_line: sse_string.clone(),
|
||||
sse_transformed_lines: sse_string,
|
||||
provider_stream_response: None,
|
||||
});
|
||||
self.message_stopped = true;
|
||||
self.seen_message_delta = false;
|
||||
}
|
||||
|
||||
/// Check if a content_block_start event has been sent for the given index
|
||||
fn has_content_block_start_been_sent(&self, index: i32) -> bool {
|
||||
self.content_block_start_indices.contains(&index)
|
||||
|
|
@ -149,6 +223,12 @@ impl SseStreamBufferTrait for AnthropicMessagesStreamBuffer {
|
|||
// We match on a reference first to determine the type, then move the event
|
||||
match &event.provider_stream_response {
|
||||
Some(ProviderStreamResponseType::MessagesStreamEvent(evt)) => {
|
||||
// If the message has already been closed, drop any trailing events
|
||||
// to avoid emitting data after `message_stop` (protocol violation).
|
||||
if self.message_stopped {
|
||||
return;
|
||||
}
|
||||
|
||||
match evt {
|
||||
MessagesStreamEvent::MessageStart { .. } => {
|
||||
// Add the message_start event
|
||||
|
|
@ -157,14 +237,7 @@ impl SseStreamBufferTrait for AnthropicMessagesStreamBuffer {
|
|||
}
|
||||
MessagesStreamEvent::ContentBlockStart { index, .. } => {
|
||||
let index = *index as i32;
|
||||
// Inject message_start if needed
|
||||
if !self.message_started {
|
||||
let model = self.model.as_deref().unwrap_or("unknown");
|
||||
let message_start =
|
||||
AnthropicMessagesStreamBuffer::create_message_start_event(model);
|
||||
self.buffered_events.push(message_start);
|
||||
self.message_started = true;
|
||||
}
|
||||
self.ensure_message_started();
|
||||
|
||||
// Add the content_block_start event (from tool calls or other sources)
|
||||
self.buffered_events.push(event);
|
||||
|
|
@ -173,14 +246,7 @@ impl SseStreamBufferTrait for AnthropicMessagesStreamBuffer {
|
|||
}
|
||||
MessagesStreamEvent::ContentBlockDelta { index, .. } => {
|
||||
let index = *index as i32;
|
||||
// Inject message_start if needed
|
||||
if !self.message_started {
|
||||
let model = self.model.as_deref().unwrap_or("unknown");
|
||||
let message_start =
|
||||
AnthropicMessagesStreamBuffer::create_message_start_event(model);
|
||||
self.buffered_events.push(message_start);
|
||||
self.message_started = true;
|
||||
}
|
||||
self.ensure_message_started();
|
||||
|
||||
// Check if ContentBlockStart was sent for this index
|
||||
if !self.has_content_block_start_been_sent(index) {
|
||||
|
|
@ -196,6 +262,11 @@ impl SseStreamBufferTrait for AnthropicMessagesStreamBuffer {
|
|||
self.buffered_events.push(event);
|
||||
}
|
||||
MessagesStreamEvent::MessageDelta { usage, .. } => {
|
||||
// `message_delta` is only meaningful inside an open message.
|
||||
// Upstream can send it with no prior content (empty completion,
|
||||
// content filter, etc.), so we must open a message first.
|
||||
self.ensure_message_started();
|
||||
|
||||
// Inject ContentBlockStop before message_delta
|
||||
if self.needs_content_block_stop {
|
||||
let content_block_stop =
|
||||
|
|
@ -230,15 +301,52 @@ impl SseStreamBufferTrait for AnthropicMessagesStreamBuffer {
|
|||
}
|
||||
MessagesStreamEvent::ContentBlockStop { .. } => {
|
||||
// ContentBlockStop received from upstream (e.g., Bedrock)
|
||||
self.ensure_message_started();
|
||||
// Clear the flag so we don't inject another one
|
||||
self.needs_content_block_stop = false;
|
||||
self.buffered_events.push(event);
|
||||
}
|
||||
MessagesStreamEvent::MessageStop => {
|
||||
// MessageStop received from upstream (e.g., OpenAI via [DONE])
|
||||
// Clear the flag so we don't inject another one
|
||||
self.seen_message_delta = false;
|
||||
// MessageStop received from upstream (e.g., OpenAI via [DONE]).
|
||||
//
|
||||
// The Anthropic protocol requires the full envelope
|
||||
// message_start → [content blocks] → message_delta → message_stop
|
||||
// so we must not emit a bare `message_stop`. Synthesize whatever
|
||||
// is missing to keep the client's state machine consistent.
|
||||
self.ensure_message_started();
|
||||
|
||||
if self.needs_content_block_stop {
|
||||
let content_block_stop =
|
||||
AnthropicMessagesStreamBuffer::create_content_block_stop_event();
|
||||
self.buffered_events.push(content_block_stop);
|
||||
self.needs_content_block_stop = false;
|
||||
}
|
||||
|
||||
// If no message_delta has been emitted yet (empty/filtered upstream
|
||||
// response), synthesize a minimal one carrying `end_turn`.
|
||||
if !self.seen_message_delta {
|
||||
// If we also never opened a content block, open and close one
|
||||
// so clients that expect at least one block are happy.
|
||||
if self.content_block_start_indices.is_empty() {
|
||||
let content_block_start =
|
||||
AnthropicMessagesStreamBuffer::create_content_block_start_event(
|
||||
);
|
||||
self.buffered_events.push(content_block_start);
|
||||
self.set_content_block_start_sent(0);
|
||||
let content_block_stop =
|
||||
AnthropicMessagesStreamBuffer::create_content_block_stop_event(
|
||||
);
|
||||
self.buffered_events.push(content_block_stop);
|
||||
}
|
||||
self.push_synthetic_message_delta();
|
||||
}
|
||||
|
||||
// Push the upstream-provided message_stop and mark closed.
|
||||
// `push_message_stop` is idempotent but we want to reuse the
|
||||
// original SseEvent so raw passthrough semantics are preserved.
|
||||
self.buffered_events.push(event);
|
||||
self.message_stopped = true;
|
||||
self.seen_message_delta = false;
|
||||
}
|
||||
_ => {
|
||||
// Other Anthropic event types (Ping, etc.), just accumulate
|
||||
|
|
@ -254,24 +362,23 @@ impl SseStreamBufferTrait for AnthropicMessagesStreamBuffer {
|
|||
}
|
||||
|
||||
fn to_bytes(&mut self) -> Vec<u8> {
|
||||
// Convert all accumulated events to bytes and clear buffer
|
||||
// Convert all accumulated events to bytes and clear buffer.
|
||||
//
|
||||
// NOTE: We do NOT inject ContentBlockStop here because it's injected when we see MessageDelta
|
||||
// or MessageStop. Injecting it here causes premature ContentBlockStop in the middle of streaming.
|
||||
|
||||
// Inject MessageStop after MessageDelta if we've seen one
|
||||
// This completes the Anthropic Messages API event sequence
|
||||
if self.seen_message_delta {
|
||||
let message_stop = MessagesStreamEvent::MessageStop;
|
||||
let sse_string: String = message_stop.into();
|
||||
let message_stop_event = SseEvent {
|
||||
data: None,
|
||||
event: Some("message_stop".to_string()),
|
||||
raw_line: sse_string.clone(),
|
||||
sse_transformed_lines: sse_string,
|
||||
provider_stream_response: None,
|
||||
};
|
||||
self.buffered_events.push(message_stop_event);
|
||||
self.seen_message_delta = false;
|
||||
//
|
||||
// Inject a synthetic `message_stop` only when:
|
||||
// 1. A `message_delta` has been seen (otherwise we'd violate the Anthropic
|
||||
// protocol by emitting `message_stop` without a preceding `message_delta`), AND
|
||||
// 2. We haven't already emitted `message_stop` (either synthetic from a
|
||||
// previous flush, or real from an upstream `[DONE]`).
|
||||
//
|
||||
// Without the `!message_stopped` guard, a stream whose `finish_reason` chunk
|
||||
// and `[DONE]` marker land in separate HTTP body chunks would receive two
|
||||
// `message_stop` events, triggering Claude Code's "Received message_stop
|
||||
// without a current message" error.
|
||||
if self.seen_message_delta && !self.message_stopped {
|
||||
self.push_message_stop();
|
||||
}
|
||||
|
||||
let mut buffer = Vec::new();
|
||||
|
|
@ -615,4 +722,133 @@ data: [DONE]"#;
|
|||
println!("✓ Stop reason: tool_use");
|
||||
println!("✓ Proper Anthropic tool_use protocol\n");
|
||||
}
|
||||
|
||||
/// Regression test for:
|
||||
/// Claude Code CLI error: "Received message_stop without a current message"
|
||||
///
|
||||
/// Reproduces the *double-close* scenario: OpenAI's final `finish_reason`
|
||||
/// chunk and the `[DONE]` marker arrive in **separate** HTTP body chunks, so
|
||||
/// `to_bytes()` is called between them. Before the fix, this produced two
|
||||
/// `message_stop` events on the wire (one synthetic, one from `[DONE]`).
|
||||
#[test]
|
||||
fn test_openai_to_anthropic_emits_single_message_stop_across_chunk_boundary() {
|
||||
let client_api = SupportedAPIsFromClient::AnthropicMessagesAPI(AnthropicApi::Messages);
|
||||
let upstream_api = SupportedUpstreamAPIs::OpenAIChatCompletions(OpenAIApi::ChatCompletions);
|
||||
let mut buffer = AnthropicMessagesStreamBuffer::new();
|
||||
|
||||
// --- HTTP chunk 1: content + finish_reason (no [DONE] yet) -----------
|
||||
let chunk_1 = r#"data: {"id":"c1","object":"chat.completion.chunk","created":1,"model":"gpt-4o","choices":[{"index":0,"delta":{"role":"assistant","content":"Hi"},"finish_reason":null}]}
|
||||
|
||||
data: {"id":"c1","object":"chat.completion.chunk","created":1,"model":"gpt-4o","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}"#;
|
||||
|
||||
for raw in SseStreamIter::try_from(chunk_1.as_bytes()).unwrap() {
|
||||
let e = SseEvent::try_from((raw, &client_api, &upstream_api)).unwrap();
|
||||
buffer.add_transformed_event(e);
|
||||
}
|
||||
let out_1 = String::from_utf8(buffer.to_bytes()).unwrap();
|
||||
|
||||
// --- HTTP chunk 2: just the [DONE] marker ----------------------------
|
||||
let chunk_2 = "data: [DONE]";
|
||||
for raw in SseStreamIter::try_from(chunk_2.as_bytes()).unwrap() {
|
||||
let e = SseEvent::try_from((raw, &client_api, &upstream_api)).unwrap();
|
||||
buffer.add_transformed_event(e);
|
||||
}
|
||||
let out_2 = String::from_utf8(buffer.to_bytes()).unwrap();
|
||||
|
||||
let combined = format!("{}{}", out_1, out_2);
|
||||
let start_count = combined.matches("event: message_start").count();
|
||||
let stop_count = combined.matches("event: message_stop").count();
|
||||
|
||||
assert_eq!(
|
||||
start_count, 1,
|
||||
"Must emit exactly one message_start across chunks, got {start_count}. Output:\n{combined}"
|
||||
);
|
||||
assert_eq!(
|
||||
stop_count, 1,
|
||||
"Must emit exactly one message_stop across chunks (no double-close), got {stop_count}. Output:\n{combined}"
|
||||
);
|
||||
// Every message_stop must be preceded by a message_start earlier in the stream.
|
||||
let start_pos = combined.find("event: message_start").unwrap();
|
||||
let stop_pos = combined.find("event: message_stop").unwrap();
|
||||
assert!(
|
||||
start_pos < stop_pos,
|
||||
"message_start must come before message_stop. Output:\n{combined}"
|
||||
);
|
||||
}
|
||||
|
||||
/// Regression test for:
|
||||
/// "Received message_stop without a current message" on empty upstream responses.
|
||||
///
|
||||
/// OpenAI returns only `[DONE]` with no content deltas and no `finish_reason`
|
||||
/// (this happens with content filters, truncated upstream streams, and some
|
||||
/// 5xx recoveries). Before the fix, the buffer emitted a bare `message_stop`
|
||||
/// with no preceding `message_start`. After the fix, it synthesizes a
|
||||
/// minimal but well-formed envelope.
|
||||
#[test]
|
||||
fn test_openai_done_only_stream_synthesizes_valid_envelope() {
|
||||
let client_api = SupportedAPIsFromClient::AnthropicMessagesAPI(AnthropicApi::Messages);
|
||||
let upstream_api = SupportedUpstreamAPIs::OpenAIChatCompletions(OpenAIApi::ChatCompletions);
|
||||
let mut buffer = AnthropicMessagesStreamBuffer::new();
|
||||
|
||||
let raw_input = "data: [DONE]";
|
||||
for raw in SseStreamIter::try_from(raw_input.as_bytes()).unwrap() {
|
||||
let e = SseEvent::try_from((raw, &client_api, &upstream_api)).unwrap();
|
||||
buffer.add_transformed_event(e);
|
||||
}
|
||||
let out = String::from_utf8(buffer.to_bytes()).unwrap();
|
||||
|
||||
assert!(
|
||||
out.contains("event: message_start"),
|
||||
"Empty upstream must still produce message_start. Output:\n{out}"
|
||||
);
|
||||
assert!(
|
||||
out.contains("event: message_delta"),
|
||||
"Empty upstream must produce a synthesized message_delta. Output:\n{out}"
|
||||
);
|
||||
assert_eq!(
|
||||
out.matches("event: message_stop").count(),
|
||||
1,
|
||||
"Empty upstream must produce exactly one message_stop. Output:\n{out}"
|
||||
);
|
||||
|
||||
// Protocol ordering: start < delta < stop.
|
||||
let p_start = out.find("event: message_start").unwrap();
|
||||
let p_delta = out.find("event: message_delta").unwrap();
|
||||
let p_stop = out.find("event: message_stop").unwrap();
|
||||
assert!(
|
||||
p_start < p_delta && p_delta < p_stop,
|
||||
"Bad ordering. Output:\n{out}"
|
||||
);
|
||||
}
|
||||
|
||||
/// Regression test: events arriving after `message_stop` (e.g. a stray `[DONE]`
|
||||
/// echo, or late-arriving deltas from a racing upstream) must be dropped
|
||||
/// rather than written after the terminal frame.
|
||||
#[test]
|
||||
fn test_events_after_message_stop_are_dropped() {
|
||||
let client_api = SupportedAPIsFromClient::AnthropicMessagesAPI(AnthropicApi::Messages);
|
||||
let upstream_api = SupportedUpstreamAPIs::OpenAIChatCompletions(OpenAIApi::ChatCompletions);
|
||||
let mut buffer = AnthropicMessagesStreamBuffer::new();
|
||||
|
||||
let first = r#"data: {"id":"c1","object":"chat.completion.chunk","created":1,"model":"gpt-4o","choices":[{"index":0,"delta":{"content":"ok"},"finish_reason":"stop"}]}
|
||||
|
||||
data: [DONE]"#;
|
||||
for raw in SseStreamIter::try_from(first.as_bytes()).unwrap() {
|
||||
let e = SseEvent::try_from((raw, &client_api, &upstream_api)).unwrap();
|
||||
buffer.add_transformed_event(e);
|
||||
}
|
||||
let _ = buffer.to_bytes();
|
||||
|
||||
// Simulate a duplicate / late `[DONE]` after the stream was already closed.
|
||||
let late = "data: [DONE]";
|
||||
for raw in SseStreamIter::try_from(late.as_bytes()).unwrap() {
|
||||
let e = SseEvent::try_from((raw, &client_api, &upstream_api)).unwrap();
|
||||
buffer.add_transformed_event(e);
|
||||
}
|
||||
let tail = String::from_utf8(buffer.to_bytes()).unwrap();
|
||||
assert!(
|
||||
tail.is_empty(),
|
||||
"No bytes should be emitted after message_stop, got: {tail:?}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue