adding translation from BedrockBinaryFrameDecoder to AnthropicMessagesEvent

This commit is contained in:
Salman Paracha 2025-10-19 20:31:30 -07:00
parent bf67ea126f
commit d826de382a
11 changed files with 1511 additions and 231 deletions

6
crates/Cargo.lock generated
View file

@ -777,6 +777,7 @@ name = "hermesllm"
version = "0.1.0"
dependencies = [
"aws-smithy-eventstream",
"bytes",
"serde",
"serde_json",
"serde_with",
@ -1230,6 +1231,7 @@ name = "llm_gateway"
version = "0.1.0"
dependencies = [
"acap",
"bytes",
"common",
"derivative",
"governor",
@ -2111,9 +2113,9 @@ dependencies = [
[[package]]
name = "security-framework-sys"
version = "2.14.0"
version = "2.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49db231d56a190491cb4aeda9527f1ad45345af50b0851622a7adb8c03b01c32"
checksum = "cc1f0cbffaac4852523ce30d8bd3c5cdc873501d96ff467ca09b6767bb8cd5c0"
dependencies = [
"core-foundation-sys",
"libc",

View file

@ -9,3 +9,4 @@ serde_json = "1.0.140"
serde_with = {version = "3.12.0", features = ["base64"]}
thiserror = "2.0.12"
aws-smithy-eventstream = "0.60"
bytes = "1.10"

View file

@ -7,6 +7,7 @@ use std::collections::HashMap;
use super::ApiDefinition;
use crate::providers::request::{ProviderRequest, ProviderRequestError};
use crate::providers::response::ProviderStreamResponse;
// ============================================================================
// AMAZON BEDROCK CONVERSE API ENUMERATION
@ -685,7 +686,7 @@ pub struct MessageStartEvent {
pub struct ContentBlockStartEvent {
/// Content block index
#[serde(rename = "contentBlockIndex")]
pub content_block_index: u32,
pub content_block_index: i32,
/// Start information
pub start: ContentBlockStart,
}
@ -707,18 +708,16 @@ pub enum ContentBlockStart {
pub struct ContentBlockDeltaEvent {
/// Content block index
#[serde(rename = "contentBlockIndex")]
pub content_block_index: u32,
pub content_block_index: i32,
/// Delta information
pub delta: ContentBlockDelta,
}
/// Content block delta information
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(tag = "type")]
#[serde(untagged)]
pub enum ContentBlockDelta {
#[serde(rename = "text")]
Text { text: String },
#[serde(rename = "toolUse")]
ToolUse { input: String },
}
@ -727,7 +726,7 @@ pub enum ContentBlockDelta {
pub struct ContentBlockStopEvent {
/// Content block index
#[serde(rename = "contentBlockIndex")]
pub content_block_index: u32,
pub content_block_index: i32,
}
/// Message stop event
@ -867,6 +866,198 @@ impl crate::providers::response::TokenUsage for BedrockTokenUsage {
}
}
// ============================================================================
// EVENT STREAM PARSING
// ============================================================================
/// Convert from aws-smithy-eventstream DecodedFrame to ConverseStreamEvent
impl TryFrom<&aws_smithy_eventstream::frame::DecodedFrame> for ConverseStreamEvent {
type Error = BedrockError;
fn try_from(frame: &aws_smithy_eventstream::frame::DecodedFrame) -> Result<Self, Self::Error> {
// Only process Complete frames, skip Incomplete
let message = match frame {
aws_smithy_eventstream::frame::DecodedFrame::Complete(msg) => msg,
aws_smithy_eventstream::frame::DecodedFrame::Incomplete => {
return Err(BedrockError::Validation {
message: "Expected Complete frame, got Incomplete".to_string(),
})
}
};
// Extract the :event-type and :message-type headers
let event_type = message
.headers()
.iter()
.find(|h| h.name().as_str() == ":event-type")
.and_then(|h| h.value().as_string().ok())
.ok_or_else(|| BedrockError::Validation {
message: "Missing :event-type header".to_string(),
})?
.as_str();
let message_type = message
.headers()
.iter()
.find(|h| h.name().as_str() == ":message-type")
.and_then(|h| h.value().as_string().ok())
.ok_or_else(|| BedrockError::Validation {
message: "Missing :message-type header".to_string(),
})?
.as_str();
let payload = message.payload();
// Parse the event based on message type and event type
match message_type {
"event" => match event_type {
"messageStart" => {
let event: MessageStartEvent =
serde_json::from_slice(payload).map_err(BedrockError::Serialization)?;
Ok(ConverseStreamEvent::MessageStart(event))
}
"contentBlockStart" => {
let event: ContentBlockStartEvent =
serde_json::from_slice(payload).map_err(BedrockError::Serialization)?;
Ok(ConverseStreamEvent::ContentBlockStart(event))
}
"contentBlockDelta" => {
let event: ContentBlockDeltaEvent =
serde_json::from_slice(payload).map_err(BedrockError::Serialization)?;
Ok(ConverseStreamEvent::ContentBlockDelta(event))
}
"contentBlockStop" => {
let event: ContentBlockStopEvent =
serde_json::from_slice(payload).map_err(BedrockError::Serialization)?;
Ok(ConverseStreamEvent::ContentBlockStop(event))
}
"messageStop" => {
let event: MessageStopEvent =
serde_json::from_slice(payload).map_err(BedrockError::Serialization)?;
Ok(ConverseStreamEvent::MessageStop(event))
}
"metadata" => {
let event: ConverseStreamMetadataEvent =
serde_json::from_slice(payload).map_err(BedrockError::Serialization)?;
Ok(ConverseStreamEvent::Metadata(event))
}
unknown => Err(BedrockError::Validation {
message: format!("Unknown event type: {}", unknown),
}),
},
"exception" => match event_type {
"internalServerException" => {
let exception: BedrockException =
serde_json::from_slice(payload).map_err(BedrockError::Serialization)?;
Ok(ConverseStreamEvent::InternalServerException(exception))
}
"modelStreamErrorException" => {
let exception: BedrockException =
serde_json::from_slice(payload).map_err(BedrockError::Serialization)?;
Ok(ConverseStreamEvent::ModelStreamErrorException(exception))
}
"serviceUnavailableException" => {
let exception: BedrockException =
serde_json::from_slice(payload).map_err(BedrockError::Serialization)?;
Ok(ConverseStreamEvent::ServiceUnavailableException(exception))
}
"throttlingException" => {
let exception: BedrockException =
serde_json::from_slice(payload).map_err(BedrockError::Serialization)?;
Ok(ConverseStreamEvent::ThrottlingException(exception))
}
"validationException" => {
let exception: BedrockException =
serde_json::from_slice(payload).map_err(BedrockError::Serialization)?;
Ok(ConverseStreamEvent::ValidationException(exception))
}
unknown => Err(BedrockError::Validation {
message: format!("Unknown exception type: {}", unknown),
}),
},
unknown => Err(BedrockError::Validation {
message: format!("Unknown message type: {}", unknown),
}),
}
}
}
impl Into<String> for ConverseStreamEvent {
fn into(self) -> String {
let transformed_json = serde_json::to_string(&self).unwrap_or_default();
let event_type = match &self {
ConverseStreamEvent::MessageStart { .. } => "message_start",
ConverseStreamEvent::ContentBlockStart { .. } => "content_block_start",
ConverseStreamEvent::ContentBlockDelta { .. } => "content_block_delta",
ConverseStreamEvent::ContentBlockStop { .. } => "content_block_stop",
ConverseStreamEvent::MessageStop { .. } => "message_stop",
ConverseStreamEvent::Metadata { .. } => "metadata",
ConverseStreamEvent::InternalServerException { .. } => "internal_server_exception",
ConverseStreamEvent::ModelStreamErrorException { .. } => "model_stream_error_exception",
ConverseStreamEvent::ServiceUnavailableException { .. } => "service_unavailable_exception",
ConverseStreamEvent::ThrottlingException { .. } => "throttling_exception",
ConverseStreamEvent::ValidationException { .. } => "validation_exception",
};
let event = format!("event: {}\n", event_type);
let data = format!("data: {}\n\n", transformed_json);
event + &data
}
}
// Implement ProviderStreamResponse for ConverseStreamEvent
impl ProviderStreamResponse for ConverseStreamEvent {
fn content_delta(&self) -> Option<&str> {
match self {
ConverseStreamEvent::ContentBlockDelta(event) => {
match &event.delta {
ContentBlockDelta::Text { text } => Some(text),
ContentBlockDelta::ToolUse { .. } => None,
}
}
_ => None,
}
}
fn is_final(&self) -> bool {
matches!(self, ConverseStreamEvent::MessageStop(_))
}
fn role(&self) -> Option<&str> {
match self {
ConverseStreamEvent::MessageStart(event) => Some(event.role.as_str()),
_ => None,
}
}
fn event_type(&self) -> Option<&str> {
Some(match self {
ConverseStreamEvent::MessageStart(_) => "messageStart",
ConverseStreamEvent::ContentBlockStart(_) => "contentBlockStart",
ConverseStreamEvent::ContentBlockDelta(_) => "contentBlockDelta",
ConverseStreamEvent::ContentBlockStop(_) => "contentBlockStop",
ConverseStreamEvent::MessageStop(_) => "messageStop",
ConverseStreamEvent::Metadata(_) => "metadata",
ConverseStreamEvent::InternalServerException(_) => "internalServerException",
ConverseStreamEvent::ModelStreamErrorException(_) => "modelStreamErrorException",
ConverseStreamEvent::ServiceUnavailableException(_) => "serviceUnavailableException",
ConverseStreamEvent::ThrottlingException(_) => "throttlingException",
ConverseStreamEvent::ValidationException(_) => "validationException",
})
}
}
// Add as_str helper for ConversationRole
impl ConversationRole {
pub fn as_str(&self) -> &'static str {
match self {
ConversationRole::User => "user",
ConversationRole::Assistant => "assistant",
}
}
}
#[cfg(test)]
mod tests {
use super::*;

View file

@ -7,8 +7,9 @@ pub mod clients;
pub mod transforms;
// Re-export important types and traits
pub use providers::request::{ProviderRequestType, ProviderRequest, ProviderRequestError};
pub use providers::response::{ProviderResponseType, ProviderStreamResponseType, ProviderResponse, ProviderStreamResponse, ProviderResponseError, TokenUsage, SseEvent, SseStreamIter};
pub use providers::response::{ProviderResponseType, ProviderStreamResponseType, ProviderResponse, ProviderStreamResponse, ProviderResponseError, TokenUsage, SseEvent, SseStreamIter, BedrockBinaryFrameDecoder};
pub use providers::id::ProviderId;
pub use aws_smithy_eventstream::frame::DecodedFrame;
//TODO: Refactor such that commons doesn't depend on Hermes. For now this will clean up strings
@ -77,4 +78,133 @@ mod tests {
let final_event = streaming_iter.next();
assert!(final_event.is_none()); // Should be None because iterator stops at [DONE]
}
/// Test AWS Event Stream decoding for Bedrock ConverseStream responses.
///
/// This test demonstrates how to:
/// 1. Use MessageFrameDecoder to decode AWS Event Stream frames
/// 2. Handle chunked network arrivals with buffering
/// 3. Extract event types from message headers
/// 4. Parse JSON payloads from decoded messages
/// 5. Reconstruct streaming content from contentBlockDelta events
///
/// The decoder handles frame boundaries automatically - you just keep calling
/// decode_frame() until it returns Incomplete, which means you've processed
/// all complete frames in the buffer.
#[test]
fn test_amazon_bedrock_streaming_response() {
use aws_smithy_eventstream::frame::{MessageFrameDecoder, DecodedFrame};
use bytes::{Buf, BytesMut};
use std::fs;
use std::path::PathBuf;
// Read the response.hex file from tests/e2e directory
// Use absolute path to avoid cargo test working directory issues
let test_file = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("../../tests/e2e/response.hex");
let response_data = fs::read(&test_file)
.unwrap_or_else(|e| panic!("Failed to read {:?}: {}", test_file, e));
println!("📊 Response data size: {} bytes\n", response_data.len());
// Create decoder and buffer that implements Buf trait
// BytesMut automatically tracks position as decoder advances it!
let mut decoder = MessageFrameDecoder::new();
let mut simulated_network_buffer = BytesMut::new();
let mut frame_count = 0;
let mut content_chunks = Vec::new();
// Simulate chunked network arrivals - process as data comes in
let chunk_sizes = vec![50, 100, 75, 200, 150, 300, 500, 1000];
let mut offset = 0;
let mut chunk_num = 0;
println!("🔄 Simulating chunked network arrivals...\n");
// Process chunks as they "arrive" from the network
while offset < response_data.len() {
// Receive next chunk from network
let chunk_size = chunk_sizes[chunk_num % chunk_sizes.len()];
let end = (offset + chunk_size).min(response_data.len());
let chunk = &response_data[offset..end];
chunk_num += 1;
simulated_network_buffer.extend_from_slice(chunk);
offset = end;
println!("📦 Chunk {}: Received {} bytes (buffer: {} bytes total, {} bytes remaining)",
chunk_num, chunk.len(), simulated_network_buffer.len(), simulated_network_buffer.remaining());
// Try to decode all complete frames from buffer
// The Buf trait tracks position automatically!
loop {
let bytes_before = simulated_network_buffer.remaining();
match decoder.decode_frame(&mut simulated_network_buffer) {
Ok(DecodedFrame::Complete(message)) => {
frame_count += 1;
let consumed = bytes_before - simulated_network_buffer.remaining();
println!(" ✅ Frame {}: decoded ({} bytes, {} bytes remaining)",
frame_count, consumed, simulated_network_buffer.remaining());
// Get event type from headers
let event_type = message.headers()
.iter()
.find(|h| h.name().as_str() == ":event-type")
.and_then(|h| {
h.value().as_string().ok().map(|s| s.as_str().to_string())
});
if let Some(ref evt) = event_type {
println!(" Event: {}", evt);
}
// Parse payload and extract content
let payload = message.payload();
if !payload.is_empty() {
if let Ok(json) = serde_json::from_slice::<serde_json::Value>(payload) {
if event_type.as_deref() == Some("contentBlockDelta") {
if let Some(delta) = json.get("delta") {
if let Some(text) = delta.get("text").and_then(|t| t.as_str()) {
println!(" 📝 Content: \"{}\"", text);
content_chunks.push(text.to_string());
}
}
}
}
} // Continue loop to check for more complete frames in buffer
}
Ok(DecodedFrame::Incomplete) => {
// Not enough data for a complete frame - need more chunks
println!(" ⏳ Incomplete frame ({} bytes remaining) - waiting for more data\n", simulated_network_buffer.remaining());
break; // Wait for next chunk
}
Err(e) => {
panic!("❌ Frame decode error: {}", e);
}
}
}
}
println!("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
println!("📋 Summary:");
println!(" Total chunks received: {}", chunk_num);
println!(" Total frames decoded: {}", frame_count);
println!(" Total content chunks: {}", content_chunks.len());
println!(" Final buffer remaining: {} bytes", simulated_network_buffer.remaining());
if !content_chunks.is_empty() {
let full_text = content_chunks.join("");
println!("\n📄 Full reconstructed content:");
println!("{}", full_text);
println!("\n Characters: {}", full_text.len());
println!(" Estimated tokens: ~{}", full_text.len() / 4);
}
// Ensure we decoded at least one frame
assert!(frame_count > 0, "Should decode at least one frame");
// Ensure all data was consumed - if buffer has remaining bytes, it's a partial frame
assert_eq!(simulated_network_buffer.remaining(), 0, "All bytes should be consumed, {} bytes remain", simulated_network_buffer.remaining());
}
}

View file

@ -180,8 +180,13 @@ impl TryFrom<(ProviderRequestType, &SupportedUpstreamAPIs)> for ProviderRequestT
Ok(ProviderRequestType::BedrockConverse(bedrock_req))
}
(ProviderRequestType::ChatCompletionsRequest(_), SupportedUpstreamAPIs::AmazonBedrockConverseStream(_)) => {
todo!("ChatCompletionsRequest to Amazon Bedrock Stream conversion not implemented yet")
(ProviderRequestType::ChatCompletionsRequest(chat_req), SupportedUpstreamAPIs::AmazonBedrockConverseStream(_)) => {
let bedrock_req = ConverseStreamRequest::try_from(chat_req)
.map_err(|e| ProviderRequestError {
message: format!("Failed to convert ChatCompletionsRequest to Amazon Bedrock request: {}", e),
source: Some(Box::new(e))
})?;
Ok(ProviderRequestType::BedrockConverse(bedrock_req))
}
(ProviderRequestType::MessagesRequest(messages_req), SupportedUpstreamAPIs::AmazonBedrockConverse(_)) => {
let bedrock_req = ConverseRequest::try_from(messages_req)
@ -191,8 +196,13 @@ impl TryFrom<(ProviderRequestType, &SupportedUpstreamAPIs)> for ProviderRequestT
})?;
Ok(ProviderRequestType::BedrockConverse(bedrock_req))
}
(ProviderRequestType::MessagesRequest(_), SupportedUpstreamAPIs::AmazonBedrockConverseStream(_)) => {
todo!("MessagesRequest to Amazon Bedrock Stream conversion not implemented yet")
(ProviderRequestType::MessagesRequest(messages_req), SupportedUpstreamAPIs::AmazonBedrockConverseStream(_)) => {
let bedrock_req = ConverseStreamRequest::try_from(messages_req)
.map_err(|e| ProviderRequestError {
message: format!("Failed to convert MessagesRequest to Amazon Bedrock request: {}", e),
source: Some(Box::new(e))
})?;
Ok(ProviderRequestType::BedrockConverse(bedrock_req))
}
// Amazon Bedrock to other APIs conversions

File diff suppressed because it is too large Load diff

View file

@ -8,7 +8,9 @@ use crate::apis::anthropic::{
MessagesStreamEvent, MessagesStopReason, MessagesMessageDelta, MessagesResponse,
MessagesStreamMessage, MessagesUsage, MessagesContentDelta, MessagesRole, MessagesContentBlock
};
use crate::apis::amazon_bedrock::{ConverseResponse, ConverseOutput, StopReason};
use crate::apis::amazon_bedrock::{
ConverseResponse, ConverseOutput, StopReason, ConverseStreamEvent, ContentBlockDelta
};
// ============================================================================
// STANDARD RUST TRAIT IMPLEMENTATIONS - Using Into/TryFrom for convenience
@ -47,7 +49,6 @@ impl TryFrom<ChatCompletionsResponse> for MessagesResponse {
}
}
impl TryFrom<ConverseResponse> for MessagesResponse {
type Error = TransformError;
@ -202,6 +203,157 @@ impl TryFrom<ChatCompletionsStreamResponse> for MessagesStreamEvent {
}
}
impl Into<String> for MessagesStreamEvent {
fn into(self) -> String {
let transformed_json = serde_json::to_string(&self).unwrap_or_default();
let event_type = match &self {
MessagesStreamEvent::MessageStart { .. } => "message_start",
MessagesStreamEvent::ContentBlockStart { .. } => "content_block_start",
MessagesStreamEvent::ContentBlockDelta { .. } => "content_block_delta",
MessagesStreamEvent::ContentBlockStop { .. } => "content_block_stop",
MessagesStreamEvent::MessageDelta { .. } => "message_delta",
MessagesStreamEvent::MessageStop => "message_stop",
MessagesStreamEvent::Ping => "ping",
};
let event = format!("event: {}\n", event_type);
let data = format!("data: {}\n\n", transformed_json);
event + &data
}
}
impl TryFrom<ConverseStreamEvent> for MessagesStreamEvent {
type Error = TransformError;
fn try_from(event: ConverseStreamEvent) -> Result<Self, Self::Error> {
match event {
// MessageStart - convert to Anthropic MessageStart
ConverseStreamEvent::MessageStart(start_event) => {
let role = match start_event.role {
crate::apis::amazon_bedrock::ConversationRole::User => MessagesRole::User,
crate::apis::amazon_bedrock::ConversationRole::Assistant => MessagesRole::Assistant,
};
Ok(MessagesStreamEvent::MessageStart {
message: MessagesStreamMessage {
id: format!("bedrock-stream-{}", std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos()),
obj_type: "message".to_string(),
role,
content: vec![],
model: "bedrock-model".to_string(),
stop_reason: None,
stop_sequence: None,
usage: MessagesUsage {
input_tokens: 0,
output_tokens: 0,
cache_creation_input_tokens: None,
cache_read_input_tokens: None,
},
},
})
}
// ContentBlockStart - convert to Anthropic ContentBlockStart
ConverseStreamEvent::ContentBlockStart(start_event) => {
// Note: Bedrock sends tool_use_id and name at start, with input coming in subsequent deltas
// Anthropic expects the same pattern, so we initialize with an empty input object
match start_event.start {
crate::apis::amazon_bedrock::ContentBlockStart::ToolUse { tool_use_id, name } => {
Ok(MessagesStreamEvent::ContentBlockStart {
index: start_event.content_block_index as u32,
content_block: MessagesContentBlock::ToolUse {
id: tool_use_id,
name,
input: Value::Object(serde_json::Map::new()), // Empty - will be filled by deltas
cache_control: None,
},
})
}
}
}
// ContentBlockDelta - convert to Anthropic ContentBlockDelta
ConverseStreamEvent::ContentBlockDelta(delta_event) => {
let delta = match delta_event.delta {
ContentBlockDelta::Text { text } => {
MessagesContentDelta::TextDelta { text }
}
ContentBlockDelta::ToolUse { input } => {
MessagesContentDelta::InputJsonDelta { partial_json: input }
}
};
Ok(MessagesStreamEvent::ContentBlockDelta {
index: delta_event.content_block_index as u32,
delta,
})
}
// ContentBlockStop - convert to Anthropic ContentBlockStop
ConverseStreamEvent::ContentBlockStop(stop_event) => {
Ok(MessagesStreamEvent::ContentBlockStop {
index: stop_event.content_block_index as u32,
})
}
// MessageStop - convert to Anthropic MessageDelta with stop reason + MessageStop
ConverseStreamEvent::MessageStop(stop_event) => {
let anthropic_stop_reason = match stop_event.stop_reason {
StopReason::EndTurn => MessagesStopReason::EndTurn,
StopReason::ToolUse => MessagesStopReason::ToolUse,
StopReason::MaxTokens => MessagesStopReason::MaxTokens,
StopReason::StopSequence => MessagesStopReason::EndTurn,
StopReason::GuardrailIntervened => MessagesStopReason::Refusal,
StopReason::ContentFiltered => MessagesStopReason::Refusal,
};
// Return MessageDelta (MessageStop will be sent separately by the streaming handler)
Ok(MessagesStreamEvent::MessageDelta {
delta: MessagesMessageDelta {
stop_reason: anthropic_stop_reason,
stop_sequence: None,
},
usage: MessagesUsage {
input_tokens: 0,
output_tokens: 0,
cache_creation_input_tokens: None,
cache_read_input_tokens: None,
},
})
}
// Metadata - convert usage information to MessageDelta
ConverseStreamEvent::Metadata(metadata_event) => {
Ok(MessagesStreamEvent::MessageDelta {
delta: MessagesMessageDelta {
stop_reason: MessagesStopReason::EndTurn,
stop_sequence: None,
},
usage: MessagesUsage {
input_tokens: metadata_event.usage.input_tokens,
output_tokens: metadata_event.usage.output_tokens,
cache_creation_input_tokens: metadata_event.usage.cache_write_input_tokens,
cache_read_input_tokens: metadata_event.usage.cache_read_input_tokens,
},
})
}
// Exception events - convert to Ping (could be enhanced to return error events)
ConverseStreamEvent::InternalServerException(_) |
ConverseStreamEvent::ModelStreamErrorException(_) |
ConverseStreamEvent::ServiceUnavailableException(_) |
ConverseStreamEvent::ThrottlingException(_) |
ConverseStreamEvent::ValidationException(_) => {
// TODO: Consider adding proper error handling/events
Ok(MessagesStreamEvent::Ping)
}
}
}
}
/// Convert tool call deltas to Anthropic stream events
fn convert_tool_call_deltas(tool_calls: Vec<ToolCallDelta>) -> Result<MessagesStreamEvent, TransformError> {
for tool_call in tool_calls {

View file

@ -1,6 +1,6 @@
use crate::apis::openai::{ChatCompletionsResponse, ChatCompletionsStreamResponse, Choice, FinishReason, ResponseMessage, Role, ToolCallDelta, FunctionCallDelta, Usage, StreamChoice, MessageDelta, MessageContent};
use crate::apis::anthropic::{MessagesResponse, MessagesStreamEvent, MessagesContentBlock, MessagesContentDelta, MessagesStopReason, MessagesUsage};
use crate::apis::amazon_bedrock::{ConverseResponse, ConverseOutput, StopReason};
use crate::apis::amazon_bedrock::{ConverseOutput, ConverseResponse, ConverseStreamEvent, StopReason};
use crate::clients::TransformError;
use crate::transforms::lib::*;
@ -250,6 +250,172 @@ impl TryFrom<MessagesStreamEvent> for ChatCompletionsStreamResponse {
}
impl TryFrom<ConverseStreamEvent> for ChatCompletionsStreamResponse {
type Error = TransformError;
fn try_from(event: ConverseStreamEvent) -> Result<Self, Self::Error> {
match event {
ConverseStreamEvent::MessageStart(start_event) => {
let role = match start_event.role {
crate::apis::amazon_bedrock::ConversationRole::User => Role::User,
crate::apis::amazon_bedrock::ConversationRole::Assistant => Role::Assistant,
};
Ok(create_openai_chunk(
"stream",
"unknown",
MessageDelta {
role: Some(role),
content: None,
refusal: None,
function_call: None,
tool_calls: None,
},
None,
None,
))
}
ConverseStreamEvent::ContentBlockStart(start_event) => {
use crate::apis::amazon_bedrock::ContentBlockStart;
match start_event.start {
ContentBlockStart::ToolUse { tool_use_id, name } => {
Ok(create_openai_chunk(
"stream",
"unknown",
MessageDelta {
role: None,
content: None,
refusal: None,
function_call: None,
tool_calls: Some(vec![ToolCallDelta {
index: start_event.content_block_index as u32,
id: Some(tool_use_id),
call_type: Some("function".to_string()),
function: Some(FunctionCallDelta {
name: Some(name),
arguments: Some("".to_string()),
}),
}]),
},
None,
None,
))
}
}
}
ConverseStreamEvent::ContentBlockDelta(delta_event) => {
use crate::apis::amazon_bedrock::ContentBlockDelta;
match delta_event.delta {
ContentBlockDelta::Text { text } => {
Ok(create_openai_chunk(
"stream",
"unknown",
MessageDelta {
role: None,
content: Some(text),
refusal: None,
function_call: None,
tool_calls: None,
},
None,
None,
))
}
ContentBlockDelta::ToolUse { input } => {
Ok(create_openai_chunk(
"stream",
"unknown",
MessageDelta {
role: None,
content: None,
refusal: None,
function_call: None,
tool_calls: Some(vec![ToolCallDelta {
index: delta_event.content_block_index as u32,
id: None,
call_type: None,
function: Some(FunctionCallDelta {
name: None,
arguments: Some(input),
}),
}]),
},
None,
None,
))
}
}
}
ConverseStreamEvent::ContentBlockStop(_) => {
Ok(create_empty_openai_chunk())
}
ConverseStreamEvent::MessageStop(stop_event) => {
let finish_reason = match stop_event.stop_reason {
StopReason::EndTurn => FinishReason::Stop,
StopReason::ToolUse => FinishReason::ToolCalls,
StopReason::MaxTokens => FinishReason::Length,
StopReason::StopSequence => FinishReason::Stop,
StopReason::GuardrailIntervened => FinishReason::ContentFilter,
StopReason::ContentFiltered => FinishReason::ContentFilter,
};
Ok(create_openai_chunk(
"stream",
"unknown",
MessageDelta {
role: None,
content: None,
refusal: None,
function_call: None,
tool_calls: None,
},
Some(finish_reason),
None,
))
}
ConverseStreamEvent::Metadata(metadata_event) => {
let usage = Usage {
prompt_tokens: metadata_event.usage.input_tokens,
completion_tokens: metadata_event.usage.output_tokens,
total_tokens: metadata_event.usage.total_tokens,
prompt_tokens_details: None,
completion_tokens_details: None,
};
Ok(create_openai_chunk(
"stream",
"unknown",
MessageDelta {
role: None,
content: None,
refusal: None,
function_call: None,
tool_calls: None,
},
None,
Some(usage),
))
}
// Error events - convert to empty chunks (errors should be handled elsewhere)
ConverseStreamEvent::InternalServerException(_) |
ConverseStreamEvent::ModelStreamErrorException(_) |
ConverseStreamEvent::ServiceUnavailableException(_) |
ConverseStreamEvent::ThrottlingException(_) |
ConverseStreamEvent::ValidationException(_) => {
Ok(create_empty_openai_chunk())
}
}
}
}
/// Convert content block start to OpenAI chunk
fn convert_content_block_start(content_block: MessagesContentBlock) -> Result<ChatCompletionsStreamResponse, TransformError> {
match content_block {

View file

@ -23,6 +23,7 @@ thiserror = "1.0.64"
derivative = "2.2.0"
sha2 = "0.10.8"
hermesllm = { version = "0.1.0", path = "../hermesllm" }
bytes = "1.10"
[dev-dependencies]
serial_test = "3.1.1"

View file

@ -1,3 +1,4 @@
use bytes::Buf;
use hermesllm::clients::endpoints::SupportedUpstreamAPIs;
use http::StatusCode;
use log::{debug, info, warn};
@ -23,7 +24,9 @@ use common::stats::{IncrementingMetric, RecordingMetric};
use common::tracing::{Event, Span, TraceData, Traceparent};
use common::{ratelimit, routing, tokenizer};
use hermesllm::clients::endpoints::SupportedAPIs;
use hermesllm::providers::response::{ProviderResponse, SseEvent, SseStreamIter};
use hermesllm::providers::response::{
BedrockBinaryFrameDecoder, ProviderResponse, SseEvent, SseStreamIter,
};
use hermesllm::{ProviderId, ProviderRequest, ProviderRequestType, ProviderResponseType};
pub struct StreamContext {
@ -46,8 +49,8 @@ pub struct StreamContext {
traces_queue: Arc<Mutex<VecDeque<TraceData>>>,
overrides: Rc<Option<Overrides>>,
user_message: Option<String>,
/// Store upstream response status code to handle error responses gracefully
upstream_status_code: Option<StatusCode>,
binary_frame_decoder: Option<BedrockBinaryFrameDecoder<bytes::BytesMut>>,
}
impl StreamContext {
@ -76,6 +79,7 @@ impl StreamContext {
request_body_sent_time: None,
user_message: None,
upstream_status_code: None,
binary_frame_decoder: None,
}
}

BIN
tests/e2e/response.hex Normal file

Binary file not shown.