fix(claude-cli): correct streaming SSE for non-MessageStart first events

- The synthetic message_start path only fired when the very first
  observed event was a Result. If the CLI ever emitted (say) a bare
  ContentBlockStart first, we'd ship malformed Anthropic SSE without a
  preceding message_start. Trigger the synthesis on any first
  stream-advancing event that isn't a MessageStart.
- Make every send-to-client branch consistent: break out of the loop
  when the receiver has gone away (mpsc send returned Err), so we don't
  keep generating events for a vanished client.
- Replace serde_json::to_string(...).unwrap() in the streaming error
  path with the same fallback json_response already uses ("{}" on
  serialize failure). No more panic surface in the streaming worker.
- Drop the dead `_touch_stream_module` placeholder and its unused
  `use futures::stream` import.
This commit is contained in:
Spherrrical 2026-05-04 13:35:48 -07:00
parent 53a23ec8f9
commit 3c58185389

View file

@ -8,7 +8,6 @@ use std::net::SocketAddr;
use std::sync::Arc;
use bytes::Bytes;
use futures::stream;
use hermesllm::apis::anthropic::MessagesRequest;
use hermesllm::apis::claude_cli::{
cli_error_to_anthropic_error_body, cli_event_to_messages_stream_event,
@ -194,6 +193,8 @@ fn stream_response(
let (tx, rx) = mpsc::channel::<Result<Frame<Bytes>, Infallible>>(64);
tokio::spawn(async move {
use hermesllm::apis::anthropic::MessagesStreamEvent;
// Some short turns skip MessageStart; emit a synthetic one so the
// client always sees a complete stream.
let mut emitted_message_start = false;
@ -205,28 +206,39 @@ fn stream_response(
Err(err) => {
warn!(session = %session_id, error = %err, "claude-cli streaming turn failed");
let body = cli_error_to_anthropic_error_body(&err.to_string());
let frame =
Frame::data(format_sse("error", &serde_json::to_string(&body).unwrap()));
let payload = serde_json::to_string(&body).unwrap_or_else(|_| "{}".to_string());
let frame = Frame::data(format_sse("error", &payload));
let _ = tx.send(Ok(frame)).await;
break;
}
};
if !emitted_message_start {
if let ClaudeCliEvent::StreamEvent {
event: hermesllm::apis::anthropic::MessagesStreamEvent::MessageStart { .. },
} = &ev
{
emitted_message_start = true;
} else if matches!(&ev, ClaudeCliEvent::Result { .. }) {
// No actual content was streamed; synthesize a
// MessageStart so the SSE stream is well-formed.
// Synthesize a MessageStart frame the first time we see anything
// that advances the stream (StreamEvent or Result) and isn't
// already a MessageStart. Untranslated events (System/Assistant/
// User/Unknown) don't trigger synthesis — we silently skip them
// and wait for the real or synthetic start later.
let is_message_start = matches!(
&ev,
ClaudeCliEvent::StreamEvent {
event: MessagesStreamEvent::MessageStart { .. }
}
);
let advances_stream = matches!(
&ev,
ClaudeCliEvent::StreamEvent { .. } | ClaudeCliEvent::Result { .. }
);
if !emitted_message_start && advances_stream {
if !is_message_start {
let synthetic = synthetic_message_start(&model, Some(&session_id));
if let Some(frame) = sse_frame_for_event(&synthetic) {
let _ = tx.send(Ok(frame)).await;
if tx.send(Ok(frame)).await.is_err() {
break;
}
}
emitted_message_start = true;
}
emitted_message_start = true;
}
if let Some(translated) = cli_event_to_messages_stream_event(&ev) {
@ -246,9 +258,11 @@ fn stream_response(
.clone()
.unwrap_or_else(|| "claude-cli returned an error".to_string());
let body = cli_error_to_anthropic_error_body(&msg);
let frame =
Frame::data(format_sse("error", &serde_json::to_string(&body).unwrap()));
let _ = tx.send(Ok(frame)).await;
let payload = serde_json::to_string(&body).unwrap_or_else(|_| "{}".to_string());
let frame = Frame::data(format_sse("error", &payload));
if tx.send(Ok(frame)).await.is_err() {
break;
}
}
break;
}
@ -325,11 +339,3 @@ fn text_response(
.insert(header::CONTENT_TYPE, HeaderValue::from_static("text/plain"));
resp
}
// Ensure a no-op import so that `stream` (re-exported from futures) is
// considered used in case future expansion needs it. Avoids accidental
// deletion when running `cargo fix`.
#[allow(dead_code)]
fn _touch_stream_module() {
let _: stream::Empty<u32> = stream::empty();
}