From 3c581853895fe46318413292844d295e4fa6ac5c Mon Sep 17 00:00:00 2001 From: Spherrrical Date: Mon, 4 May 2026 13:35:48 -0700 Subject: [PATCH] fix(claude-cli): correct streaming SSE for non-MessageStart first events - The synthetic message_start path only fired when the very first observed event was a Result. If the CLI ever emitted (say) a bare ContentBlockStart first, we'd ship malformed Anthropic SSE without a preceding message_start. Trigger the synthesis on any first stream-advancing event that isn't a MessageStart. - Make every send-to-client branch consistent: break out of the loop when the receiver has gone away (mpsc send returned Err), so we don't keep generating events for a vanished client. - Replace serde_json::to_string(...).unwrap() in the streaming error path with the same fallback json_response already uses ("{}" on serialize failure). No more panic surface in the streaming worker. - Drop the dead `_touch_stream_module` placeholder and its unused `use futures::stream` import. --- .../src/handlers/claude_cli/server.rs | 56 ++++++++++--------- 1 file changed, 31 insertions(+), 25 deletions(-) diff --git a/crates/brightstaff/src/handlers/claude_cli/server.rs b/crates/brightstaff/src/handlers/claude_cli/server.rs index 68f3dc57..91cb96fc 100644 --- a/crates/brightstaff/src/handlers/claude_cli/server.rs +++ b/crates/brightstaff/src/handlers/claude_cli/server.rs @@ -8,7 +8,6 @@ use std::net::SocketAddr; use std::sync::Arc; use bytes::Bytes; -use futures::stream; use hermesllm::apis::anthropic::MessagesRequest; use hermesllm::apis::claude_cli::{ cli_error_to_anthropic_error_body, cli_event_to_messages_stream_event, @@ -194,6 +193,8 @@ fn stream_response( let (tx, rx) = mpsc::channel::, Infallible>>(64); tokio::spawn(async move { + use hermesllm::apis::anthropic::MessagesStreamEvent; + // Some short turns skip MessageStart; emit a synthetic one so the // client always sees a complete stream. let mut emitted_message_start = false; @@ -205,28 +206,39 @@ fn stream_response( Err(err) => { warn!(session = %session_id, error = %err, "claude-cli streaming turn failed"); let body = cli_error_to_anthropic_error_body(&err.to_string()); - let frame = - Frame::data(format_sse("error", &serde_json::to_string(&body).unwrap())); + let payload = serde_json::to_string(&body).unwrap_or_else(|_| "{}".to_string()); + let frame = Frame::data(format_sse("error", &payload)); let _ = tx.send(Ok(frame)).await; break; } }; - if !emitted_message_start { - if let ClaudeCliEvent::StreamEvent { - event: hermesllm::apis::anthropic::MessagesStreamEvent::MessageStart { .. }, - } = &ev - { - emitted_message_start = true; - } else if matches!(&ev, ClaudeCliEvent::Result { .. }) { - // No actual content was streamed; synthesize a - // MessageStart so the SSE stream is well-formed. + // Synthesize a MessageStart frame the first time we see anything + // that advances the stream (StreamEvent or Result) and isn't + // already a MessageStart. Untranslated events (System/Assistant/ + // User/Unknown) don't trigger synthesis — we silently skip them + // and wait for the real or synthetic start later. + let is_message_start = matches!( + &ev, + ClaudeCliEvent::StreamEvent { + event: MessagesStreamEvent::MessageStart { .. } + } + ); + let advances_stream = matches!( + &ev, + ClaudeCliEvent::StreamEvent { .. } | ClaudeCliEvent::Result { .. } + ); + + if !emitted_message_start && advances_stream { + if !is_message_start { let synthetic = synthetic_message_start(&model, Some(&session_id)); if let Some(frame) = sse_frame_for_event(&synthetic) { - let _ = tx.send(Ok(frame)).await; + if tx.send(Ok(frame)).await.is_err() { + break; + } } - emitted_message_start = true; } + emitted_message_start = true; } if let Some(translated) = cli_event_to_messages_stream_event(&ev) { @@ -246,9 +258,11 @@ fn stream_response( .clone() .unwrap_or_else(|| "claude-cli returned an error".to_string()); let body = cli_error_to_anthropic_error_body(&msg); - let frame = - Frame::data(format_sse("error", &serde_json::to_string(&body).unwrap())); - let _ = tx.send(Ok(frame)).await; + let payload = serde_json::to_string(&body).unwrap_or_else(|_| "{}".to_string()); + let frame = Frame::data(format_sse("error", &payload)); + if tx.send(Ok(frame)).await.is_err() { + break; + } } break; } @@ -325,11 +339,3 @@ fn text_response( .insert(header::CONTENT_TYPE, HeaderValue::from_static("text/plain")); resp } - -// Ensure a no-op import so that `stream` (re-exported from futures) is -// considered used in case future expansion needs it. Avoids accidental -// deletion when running `cargo fix`. -#[allow(dead_code)] -fn _touch_stream_module() { - let _: stream::Empty = stream::empty(); -}