From 4d954b0f9e4a89389cda9ebc588eeed53d2f98e5 Mon Sep 17 00:00:00 2001 From: Salman Paracha Date: Tue, 21 Oct 2025 15:09:14 -0700 Subject: [PATCH] PR comments fixed --- arch/supervisord.conf | 2 +- crates/common/src/consts.rs | 2 +- .../src/apis/amazon_bedrock_binary_frame.rs | 65 +++++ crates/hermesllm/src/apis/mod.rs | 2 + crates/hermesllm/src/apis/sse.rs | 194 +++++++++++++ crates/hermesllm/src/clients/endpoints.rs | 13 +- crates/hermesllm/src/lib.rs | 4 +- crates/hermesllm/src/providers/response.rs | 261 +----------------- crates/llm_gateway/src/stream_context.rs | 96 +++---- 9 files changed, 315 insertions(+), 324 deletions(-) create mode 100644 crates/hermesllm/src/apis/amazon_bedrock_binary_frame.rs create mode 100644 crates/hermesllm/src/apis/sse.rs diff --git a/arch/supervisord.conf b/arch/supervisord.conf index 3d1db8e2..d4d99494 100644 --- a/arch/supervisord.conf +++ b/arch/supervisord.conf @@ -9,7 +9,7 @@ stdout_logfile_maxbytes=0 stderr_logfile_maxbytes=0 [program:envoy] -command=/bin/sh -c "python /app/config_generator.py && envsubst < /etc/envoy/envoy.yaml > /etc/envoy.env_sub.yaml && envoy -c /etc/envoy.env_sub.yaml --component-log-level wasm:debug --log-format '[%%Y-%%m-%%d %%T.%%e][%%l] %%v' 2>&1 | tee /var/log/envoy.log | while IFS= read -r line; do echo '[envoy_logs] ' \"$line\"; done" +command=/bin/sh -c "python /app/config_generator.py && envsubst < /etc/envoy/envoy.yaml > /etc/envoy.env_sub.yaml && envoy -c /etc/envoy.env_sub.yaml --component-log-level wasm:info --log-format '[%%Y-%%m-%%d %%T.%%e][%%l] %%v' 2>&1 | tee /var/log/envoy.log | while IFS= read -r line; do echo '[envoy_logs] ' \"$line\"; done" stdout_logfile=/dev/stdout redirect_stderr=true stdout_logfile_maxbytes=0 diff --git a/crates/common/src/consts.rs b/crates/common/src/consts.rs index 6f5a5441..0a75cfe2 100644 --- a/crates/common/src/consts.rs +++ b/crates/common/src/consts.rs @@ -11,7 +11,7 @@ pub const MODEL_SERVER_NAME: &str = "model_server"; pub const ARCH_ROUTING_HEADER: &str = "x-arch-llm-provider"; pub const MESSAGES_KEY: &str = "messages"; pub const ARCH_PROVIDER_HINT_HEADER: &str = "x-arch-llm-provider-hint"; -pub const ARCH_IS_STREAMING_HEADER: &str = "x-archgw-streaming-request"; +pub const ARCH_IS_STREAMING_HEADER: &str = "x-arch-streaming-request"; pub const CHAT_COMPLETIONS_PATH: &str = "/v1/chat/completions"; pub const MESSAGES_PATH: &str = "/v1/messages"; pub const HEALTHZ_PATH: &str = "/healthz"; diff --git a/crates/hermesllm/src/apis/amazon_bedrock_binary_frame.rs b/crates/hermesllm/src/apis/amazon_bedrock_binary_frame.rs new file mode 100644 index 00000000..03224f8e --- /dev/null +++ b/crates/hermesllm/src/apis/amazon_bedrock_binary_frame.rs @@ -0,0 +1,65 @@ +use std::collections::HashSet; +use bytes::Buf; +use aws_smithy_eventstream::frame::DecodedFrame; +use aws_smithy_eventstream::frame::MessageFrameDecoder; + +/// AWS Event Stream frame decoder wrapper +pub struct BedrockBinaryFrameDecoder +where + B: Buf, +{ + decoder: MessageFrameDecoder, + buffer: B, + content_block_start_indices: HashSet, +} + +impl BedrockBinaryFrameDecoder { + /// This is a convenience constructor that creates a BytesMut buffer internally + pub fn from_bytes(bytes: &[u8]) -> Self { + let buffer = bytes::BytesMut::from(bytes); + Self { + decoder: MessageFrameDecoder::new(), + buffer, + content_block_start_indices: std::collections::HashSet::new(), + } + } +} + +impl BedrockBinaryFrameDecoder +where + B: Buf, +{ + pub fn new(buffer: B) -> Self { + Self { + decoder: MessageFrameDecoder::new(), + buffer, + content_block_start_indices: HashSet::new(), + } + } + + pub fn decode_frame(&mut self) -> Option { + match self.decoder.decode_frame(&mut self.buffer) { + Ok(frame) => Some(frame), + Err(_e) => None, // Fatal decode error + } + } + + pub fn buffer_mut(&mut self) -> &mut B { + &mut self.buffer + } + + /// Check if there are any bytes remaining in the buffer + pub fn has_remaining(&self) -> bool { + self.buffer.has_remaining() + } + + /// Check if a content_block_start event has been sent for the given index + pub fn has_content_block_start_been_sent(&self, index: i32) -> bool { + self.content_block_start_indices.contains(&index) + } + + /// Mark that a content_block_start event has been sent for the given index + pub fn set_content_block_start_sent(&mut self, index: i32) { + self.content_block_start_indices.insert(index); + } +} diff --git a/crates/hermesllm/src/apis/mod.rs b/crates/hermesllm/src/apis/mod.rs index f570ac6e..094347a5 100644 --- a/crates/hermesllm/src/apis/mod.rs +++ b/crates/hermesllm/src/apis/mod.rs @@ -1,6 +1,8 @@ pub mod anthropic; pub mod openai; pub mod amazon_bedrock; +pub mod amazon_bedrock_binary_frame; +pub mod sse; // Explicit exports to avoid naming conflicts pub use anthropic::{AnthropicApi, MessagesRequest, MessagesResponse, MessagesStreamEvent}; diff --git a/crates/hermesllm/src/apis/sse.rs b/crates/hermesllm/src/apis/sse.rs new file mode 100644 index 00000000..cb03e2dd --- /dev/null +++ b/crates/hermesllm/src/apis/sse.rs @@ -0,0 +1,194 @@ +use std::str::FromStr; +use std::fmt; +use std::error::Error; +use serde::{Serialize, Deserialize}; +use crate::providers::response::ProviderStreamResponse; +use crate::providers::response::ProviderStreamResponseType; + +// ============================================================================ +// SSE EVENT CONTAINER +// ============================================================================ + +/// Represents a single Server-Sent Event with the complete wire format +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SseEvent { + #[serde(rename = "data")] + pub data: Option, // The JSON payload after "data: " + + #[serde(skip_serializing_if = "Option::is_none")] + pub event: Option, // Optional event type (e.g., "message_start", "content_block_delta") + + #[serde(skip_serializing, skip_deserializing)] + pub raw_line: String, // The complete line as received including "data: " prefix and "\n\n" + + #[serde(skip_serializing, skip_deserializing)] + pub sse_transform_buffer: String, // The complete line as received including "data: " prefix and "\n\n" + + #[serde(skip_serializing, skip_deserializing)] + pub provider_stream_response: Option, // Parsed provider stream response object +} + +impl SseEvent { + /// Check if this event represents the end of the stream + pub fn is_done(&self) -> bool { + self.data == Some("[DONE]".into()) + } + + /// Check if this event should be skipped during processing + /// This includes ping messages and other provider-specific events that don't contain content + pub fn should_skip(&self) -> bool { + // Skip ping messages (commonly used by providers for connection keep-alive) + self.data == Some(r#"{"type": "ping"}"#.into()) + } + + /// Check if this is an event-only SSE event (no data payload) + pub fn is_event_only(&self) -> bool { + self.event.is_some() && self.data.is_none() + } + + /// Get the parsed provider response if available + pub fn provider_response(&self) -> Result<&dyn ProviderStreamResponse, std::io::Error> { + self.provider_stream_response.as_ref() + .map(|resp| resp as &dyn ProviderStreamResponse) + .ok_or_else(|| { + std::io::Error::new(std::io::ErrorKind::NotFound, "Provider response not found") + }) + } + +} + +impl FromStr for SseEvent { + type Err = SseParseError; + + fn from_str(line: &str) -> Result { + if line.starts_with("data: ") { + let data: String = line[6..].to_string(); // Remove "data: " prefix + if data.is_empty() { + return Err(SseParseError { + message: "Empty data field is not a valid SSE event".to_string(), + }); + } + Ok(SseEvent { + data: Some(data), + event: None, + raw_line: line.to_string(), + sse_transform_buffer: line.to_string(), + provider_stream_response: None, + }) + } else if line.starts_with("event: ") { //used by Anthropic + let event_type = line[7..].to_string(); + if event_type.is_empty() { + return Err(SseParseError { + message: "Empty event field is not a valid SSE event".to_string(), + }); + } + Ok(SseEvent { + data: None, + event: Some(event_type), + raw_line: line.to_string(), + sse_transform_buffer: line.to_string(), + provider_stream_response: None, + }) + } else { + Err(SseParseError { + message: format!("Line does not start with 'data: ' or 'event: ': {}", line), + }) + } + } +} + +impl fmt::Display for SseEvent { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.sse_transform_buffer) + } +} + +// Into implementation to convert SseEvent to bytes for response buffer +impl Into> for SseEvent { + fn into(self) -> Vec { + format!("{}\n\n", self.sse_transform_buffer).into_bytes() + } +} + +#[derive(Debug)] +pub struct SseParseError { + pub message: String, +} + +impl fmt::Display for SseParseError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "SSE parse error: {}", self.message) + } +} + +impl Error for SseParseError {} + + +/// Generic SSE (Server-Sent Events) streaming iterator container +/// Parses raw SSE lines into SseEvent objects +pub struct SseStreamIter +where + I: Iterator, + I::Item: AsRef, +{ + pub lines: I, + pub done_seen: bool, +} + +impl SseStreamIter +where + I: Iterator, + I::Item: AsRef, +{ + pub fn new(lines: I) -> Self { + Self { lines, done_seen: false } + } +} + +// TryFrom implementation to parse bytes into SseStreamIter +// Handles both text-based SSE and binary AWS Event Stream formats +impl TryFrom<&[u8]> for SseStreamIter> { + type Error = Box; + + fn try_from(bytes: &[u8]) -> Result { + // Parse as text-based SSE format + let s = std::str::from_utf8(bytes)?; + let lines: Vec = s.lines().map(|line| line.to_string()).collect(); + Ok(SseStreamIter::new(lines.into_iter())) + } +} + + +impl Iterator for SseStreamIter +where + I: Iterator, + I::Item: AsRef, +{ + type Item = SseEvent; + + fn next(&mut self) -> Option { + // If we already returned [DONE], terminate the stream + if self.done_seen { + return None; + } + + for line in &mut self.lines { + let line_str = line.as_ref(); + + // Try to parse as either data: or event: line + if let Ok(event) = line_str.parse::() { + // For data: lines, check if this is the [DONE] marker + if event.data.is_some() && event.is_done() { + self.done_seen = true; + return Some(event); // Return [DONE] event for transformation + } + // For data: lines, skip events that should be filtered at the transport layer + if event.data.is_some() && event.should_skip() { + continue; + } + return Some(event); + } + } + None + } +} diff --git a/crates/hermesllm/src/clients/endpoints.rs b/crates/hermesllm/src/clients/endpoints.rs index 264f2668..a5338e90 100644 --- a/crates/hermesllm/src/clients/endpoints.rs +++ b/crates/hermesllm/src/clients/endpoints.rs @@ -104,12 +104,13 @@ impl SupportedAPIs { } } ProviderId::AmazonBedrock => { - if request_path.starts_with("/v1/") && !is_streaming { - format!("/model/{}/converse", model_id) - } else if request_path.starts_with("/v1/") && is_streaming { - format!("/model/{}/converse-stream", model_id) - } - else { + if request_path.starts_with("/v1/") { + if !is_streaming { + format!("/model/{}/converse", model_id) + } else { + format!("/model/{}/converse-stream", model_id) + } + } else { default_endpoint } } diff --git a/crates/hermesllm/src/lib.rs b/crates/hermesllm/src/lib.rs index 2ae58b67..a1dd3d5c 100644 --- a/crates/hermesllm/src/lib.rs +++ b/crates/hermesllm/src/lib.rs @@ -7,7 +7,9 @@ pub mod clients; pub mod transforms; // Re-export important types and traits pub use providers::request::{ProviderRequestType, ProviderRequest, ProviderRequestError}; -pub use providers::response::{ProviderResponseType, ProviderStreamResponseType, ProviderResponse, ProviderStreamResponse, ProviderResponseError, TokenUsage, SseEvent, SseStreamIter, BedrockBinaryFrameDecoder}; +pub use apis::sse::{SseEvent, SseStreamIter}; +pub use apis::amazon_bedrock_binary_frame::BedrockBinaryFrameDecoder; +pub use providers::response::{ProviderResponseType, ProviderStreamResponseType, ProviderResponse, ProviderStreamResponse, ProviderResponseError, TokenUsage}; pub use providers::id::ProviderId; pub use aws_smithy_eventstream::frame::DecodedFrame; diff --git a/crates/hermesllm/src/providers/response.rs b/crates/hermesllm/src/providers/response.rs index 61b38f4b..fa3bf13c 100644 --- a/crates/hermesllm/src/providers/response.rs +++ b/crates/hermesllm/src/providers/response.rs @@ -1,16 +1,15 @@ -use crate::clients::endpoints::SupportedUpstreamAPIs; -use crate::providers::id::ProviderId; -use bytes::Buf; -use serde::{Serialize, Deserialize}; +use serde::Serialize; use std::error::Error; use std::fmt; use std::convert::TryFrom; -use std::str::FromStr; +use crate::clients::endpoints::SupportedUpstreamAPIs; +use crate::clients::endpoints::SupportedAPIs; +use crate::providers::id::ProviderId; +use crate::apis::sse::SseEvent; use crate::apis::openai::ChatCompletionsResponse; use crate::apis::openai::ChatCompletionsStreamResponse; use crate::apis::anthropic::MessagesStreamEvent; -use crate::clients::endpoints::SupportedAPIs; use crate::apis::anthropic::MessagesResponse; use crate::apis::amazon_bedrock::ConverseResponse; use crate::apis::amazon_bedrock::ConverseStreamEvent; @@ -241,124 +240,6 @@ impl TryFrom<(&[u8], &SupportedAPIs, &SupportedUpstreamAPIs)> for ProviderStream } -// ============================================================================ -// SSE EVENT CONTAINER -// ============================================================================ - -/// Represents a single Server-Sent Event with the complete wire format -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct SseEvent { - #[serde(rename = "data")] - pub data: Option, // The JSON payload after "data: " - - #[serde(skip_serializing_if = "Option::is_none")] - pub event: Option, // Optional event type (e.g., "message_start", "content_block_delta") - - #[serde(skip_serializing, skip_deserializing)] - pub raw_line: String, // The complete line as received including "data: " prefix and "\n\n" - - #[serde(skip_serializing, skip_deserializing)] - pub sse_transform_buffer: String, // The complete line as received including "data: " prefix and "\n\n" - - #[serde(skip_serializing, skip_deserializing)] - pub provider_stream_response: Option, // Parsed provider stream response object -} - -impl SseEvent { - /// Check if this event represents the end of the stream - pub fn is_done(&self) -> bool { - self.data == Some("[DONE]".into()) - } - - /// Check if this event should be skipped during processing - /// This includes ping messages and other provider-specific events that don't contain content - pub fn should_skip(&self) -> bool { - // Skip ping messages (commonly used by providers for connection keep-alive) - self.data == Some(r#"{"type": "ping"}"#.into()) - } - - /// Check if this is an event-only SSE event (no data payload) - pub fn is_event_only(&self) -> bool { - self.event.is_some() && self.data.is_none() - } - - /// Get the parsed provider response if available - pub fn provider_response(&self) -> Result<&dyn ProviderStreamResponse, std::io::Error> { - self.provider_stream_response.as_ref() - .map(|resp| resp as &dyn ProviderStreamResponse) - .ok_or_else(|| { - std::io::Error::new(std::io::ErrorKind::NotFound, "Provider response not found") - }) - } - -} - -impl FromStr for SseEvent { - type Err = SseParseError; - - fn from_str(line: &str) -> Result { - if line.starts_with("data: ") { - let data: String = line[6..].to_string(); // Remove "data: " prefix - if data.is_empty() { - return Err(SseParseError { - message: "Empty data field is not a valid SSE event".to_string(), - }); - } - Ok(SseEvent { - data: Some(data), - event: None, - raw_line: line.to_string(), - sse_transform_buffer: line.to_string(), - provider_stream_response: None, - }) - } else if line.starts_with("event: ") { //used by Anthropic - let event_type = line[7..].to_string(); - if event_type.is_empty() { - return Err(SseParseError { - message: "Empty event field is not a valid SSE event".to_string(), - }); - } - Ok(SseEvent { - data: None, - event: Some(event_type), - raw_line: line.to_string(), - sse_transform_buffer: line.to_string(), - provider_stream_response: None, - }) - } else { - Err(SseParseError { - message: format!("Line does not start with 'data: ' or 'event: ': {}", line), - }) - } - } -} - -impl fmt::Display for SseEvent { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.sse_transform_buffer) - } -} - -// Into implementation to convert SseEvent to bytes for response buffer -impl Into> for SseEvent { - fn into(self) -> Vec { - format!("{}\n\n", self.sse_transform_buffer).into_bytes() - } -} - -#[derive(Debug)] -pub struct SseParseError { - pub message: String, -} - -impl fmt::Display for SseParseError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "SSE parse error: {}", self.message) - } -} - -impl Error for SseParseError {} - // TryFrom implementation to convert raw bytes to SseEvent with parsed provider response impl TryFrom<(SseEvent, &SupportedAPIs, &SupportedUpstreamAPIs)> for SseEvent { type Error = Box; @@ -471,135 +352,6 @@ impl TryFrom<(&aws_smithy_eventstream::frame::DecodedFrame, &SupportedAPIs, &Sup } -/// AWS Event Stream frame decoder wrapper -pub struct BedrockBinaryFrameDecoder -where - B: Buf, -{ - decoder: aws_smithy_eventstream::frame::MessageFrameDecoder, - buffer: B, - content_block_start_indices: std::collections::HashSet, -} - -impl BedrockBinaryFrameDecoder { - /// This is a convenience constructor that creates a BytesMut buffer internally - pub fn from_bytes(bytes: &[u8]) -> Self { - let buffer = bytes::BytesMut::from(bytes); - Self { - decoder: aws_smithy_eventstream::frame::MessageFrameDecoder::new(), - buffer, - content_block_start_indices: std::collections::HashSet::new(), - } - } -} - -impl BedrockBinaryFrameDecoder -where - B: Buf, -{ - pub fn new(buffer: B) -> Self { - Self { - decoder: aws_smithy_eventstream::frame::MessageFrameDecoder::new(), - buffer, - content_block_start_indices: std::collections::HashSet::new(), - } - } - - pub fn decode_frame(&mut self) -> Option { - match self.decoder.decode_frame(&mut self.buffer) { - Ok(frame) => Some(frame), - Err(_e) => None, // Fatal decode error - } - } - - pub fn buffer_mut(&mut self) -> &mut B { - &mut self.buffer - } - - /// Check if there are any bytes remaining in the buffer - pub fn has_remaining(&self) -> bool { - self.buffer.has_remaining() - } - - /// Check if a content_block_start event has been sent for the given index - pub fn has_content_block_start_been_sent(&self, index: i32) -> bool { - self.content_block_start_indices.contains(&index) - } - - /// Mark that a content_block_start event has been sent for the given index - pub fn set_content_block_start_sent(&mut self, index: i32) { - self.content_block_start_indices.insert(index); - } -} - -/// Generic SSE (Server-Sent Events) streaming iterator container -/// Parses raw SSE lines into SseEvent objects -pub struct SseStreamIter -where - I: Iterator, - I::Item: AsRef, -{ - pub lines: I, - pub done_seen: bool, -} - -impl SseStreamIter -where - I: Iterator, - I::Item: AsRef, -{ - pub fn new(lines: I) -> Self { - Self { lines, done_seen: false } - } -} - -// TryFrom implementation to parse bytes into SseStreamIter -// Handles both text-based SSE and binary AWS Event Stream formats -impl TryFrom<&[u8]> for SseStreamIter> { - type Error = Box; - - fn try_from(bytes: &[u8]) -> Result { - // Parse as text-based SSE format - let s = std::str::from_utf8(bytes)?; - let lines: Vec = s.lines().map(|line| line.to_string()).collect(); - Ok(SseStreamIter::new(lines.into_iter())) - } -} - - -impl Iterator for SseStreamIter -where - I: Iterator, - I::Item: AsRef, -{ - type Item = SseEvent; - - fn next(&mut self) -> Option { - // If we already returned [DONE], terminate the stream - if self.done_seen { - return None; - } - - for line in &mut self.lines { - let line_str = line.as_ref(); - - // Try to parse as either data: or event: line - if let Ok(event) = line_str.parse::() { - // For data: lines, check if this is the [DONE] marker - if event.data.is_some() && event.is_done() { - self.done_seen = true; - return Some(event); // Return [DONE] event for transformation - } - // For data: lines, skip events that should be filtered at the transport layer - if event.data.is_some() && event.should_skip() { - continue; - } - return Some(event); - } - } - None - } -} #[derive(Debug)] pub struct ProviderResponseError { @@ -623,12 +375,15 @@ impl Error for ProviderResponseError { #[cfg(test)] mod tests { use super::*; + use crate::apis::sse::SseStreamIter; + use crate::apis::amazon_bedrock_binary_frame::BedrockBinaryFrameDecoder; use crate::clients::endpoints::SupportedAPIs; use crate::providers::id::ProviderId; use crate::apis::openai::OpenAIApi; use crate::apis::anthropic::AnthropicApi; use serde_json::json; + #[test] fn test_openai_response_from_bytes() { let resp = json!({ diff --git a/crates/llm_gateway/src/stream_context.rs b/crates/llm_gateway/src/stream_context.rs index 43151a7f..785b5b72 100644 --- a/crates/llm_gateway/src/stream_context.rs +++ b/crates/llm_gateway/src/stream_context.rs @@ -22,12 +22,14 @@ use common::ratelimit::Header; use common::stats::{IncrementingMetric, RecordingMetric}; use common::tracing::{Event, Span, TraceData, Traceparent}; use common::{ratelimit, routing, tokenizer}; +use hermesllm::apis::amazon_bedrock_binary_frame::BedrockBinaryFrameDecoder; +use hermesllm::apis::anthropic::{MessagesContentBlock, MessagesStreamEvent}; +use hermesllm::apis::sse::{SseEvent, SseStreamIter}; use hermesllm::clients::endpoints::SupportedAPIs; -use hermesllm::providers::response::{ - BedrockBinaryFrameDecoder, ProviderResponse, ProviderStreamResponse, SseEvent, SseStreamIter, -}; +use hermesllm::providers::response::ProviderResponse; use hermesllm::{ DecodedFrame, ProviderId, ProviderRequest, ProviderRequestType, ProviderResponseType, + ProviderStreamResponseType, }; pub struct StreamContext { @@ -514,88 +516,58 @@ impl StreamContext { client_api: &SupportedAPIs, upstream_api: &SupportedUpstreamAPIs, ) -> Result, Action> { - use hermesllm::providers::response::ProviderStreamResponseType; - // Initialize decoder if not present if self.binary_frame_decoder.is_none() { self.binary_frame_decoder = Some(BedrockBinaryFrameDecoder::from_bytes(&[])); } // Add incoming bytes to buffer - if let Some(decoder) = self.binary_frame_decoder.as_mut() { - decoder.buffer_mut().extend_from_slice(body); - } + let decoder = self.binary_frame_decoder.as_mut().unwrap(); + decoder.buffer_mut().extend_from_slice(body); let mut response_buffer = Vec::new(); - - // Decode all available complete frames loop { let decoded_frame = self.binary_frame_decoder.as_mut().unwrap().decode_frame(); match decoded_frame { Some(DecodedFrame::Complete(ref frame_ref)) => { - // Convert frame to ProviderStreamResponseType let frame = DecodedFrame::Complete(frame_ref.clone()); match ProviderStreamResponseType::try_from((&frame, client_api, upstream_api)) { Ok(provider_response) => { self.record_ttft_if_needed(); - // Extract index from the event if available - let event_index = - if let ProviderStreamResponseType::MessagesStreamEvent(ref evt) = - provider_response - { - use hermesllm::apis::anthropic::MessagesStreamEvent; + // Handle ContentBlockStart and ContentBlockDelta events + match &provider_response { + ProviderStreamResponseType::MessagesStreamEvent(evt) => { match evt { MessagesStreamEvent::ContentBlockStart { index, .. - } => Some(*index as i32), - MessagesStreamEvent::ContentBlockDelta { - index, .. - } => Some(*index as i32), - MessagesStreamEvent::ContentBlockStop { index, .. } => { - Some(*index as i32) - } - _ => None, - } - } else { - None - }; - - // Check event type to track ContentBlockStart - if let Some(event_type) = provider_response.event_type() { - match event_type { - "content_block_start" => { - // Mark that we've seen ContentBlockStart for this index - if let (Some(decoder), Some(index)) = - (self.binary_frame_decoder.as_mut(), event_index) - { - decoder.set_content_block_start_sent(index); + } => { + // Mark that we've seen ContentBlockStart for this index + self.binary_frame_decoder + .as_mut() + .unwrap() + .set_content_block_start_sent(*index as i32); debug!( "[ARCHGW_REQ_ID:{}] BEDROCK_CONTENT_BLOCK_START_TRACKED: index={}", self.request_identifier(), - index + *index ); } - } - "content_block_delta" => { - // Check if ContentBlockStart was sent for this index - if let Some(index) = event_index { - let needs_start = if let Some(decoder) = - self.binary_frame_decoder.as_ref() - { - !decoder.has_content_block_start_been_sent(index) - } else { - false - }; + MessagesStreamEvent::ContentBlockDelta { + index, .. + } => { + // Check if ContentBlockStart was sent for this index + let needs_start = !self + .binary_frame_decoder + .as_ref() + .unwrap() + .has_content_block_start_been_sent(*index as i32); if needs_start { // Emit empty ContentBlockStart before delta - use hermesllm::apis::anthropic::{ - MessagesContentBlock, MessagesStreamEvent, - }; let content_block_start = MessagesStreamEvent::ContentBlockStart { - index: index as u32, + index: *index, content_block: MessagesContentBlock::Text { text: String::new(), cache_control: None, @@ -606,22 +578,22 @@ impl StreamContext { .extend_from_slice(start_sse.as_bytes()); // Mark that we've now sent it - if let Some(decoder) = - self.binary_frame_decoder.as_mut() - { - decoder.set_content_block_start_sent(index); - } + self.binary_frame_decoder + .as_mut() + .unwrap() + .set_content_block_start_sent(*index as i32); debug!( "[ARCHGW_REQ_ID:{}] BEDROCK_INJECTED_CONTENT_BLOCK_START: index={}", self.request_identifier(), - index + *index ); } } + _ => {} } - _ => {} } + _ => {} } let sse_string: String = provider_response.into();