mirror of
https://github.com/katanemo/plano.git
synced 2026-06-17 15:25:17 +02:00
PR comments fixed
This commit is contained in:
parent
c9c33594f9
commit
4d954b0f9e
9 changed files with 315 additions and 324 deletions
|
|
@ -9,7 +9,7 @@ stdout_logfile_maxbytes=0
|
|||
stderr_logfile_maxbytes=0
|
||||
|
||||
[program:envoy]
|
||||
command=/bin/sh -c "python /app/config_generator.py && envsubst < /etc/envoy/envoy.yaml > /etc/envoy.env_sub.yaml && envoy -c /etc/envoy.env_sub.yaml --component-log-level wasm:debug --log-format '[%%Y-%%m-%%d %%T.%%e][%%l] %%v' 2>&1 | tee /var/log/envoy.log | while IFS= read -r line; do echo '[envoy_logs] ' \"$line\"; done"
|
||||
command=/bin/sh -c "python /app/config_generator.py && envsubst < /etc/envoy/envoy.yaml > /etc/envoy.env_sub.yaml && envoy -c /etc/envoy.env_sub.yaml --component-log-level wasm:info --log-format '[%%Y-%%m-%%d %%T.%%e][%%l] %%v' 2>&1 | tee /var/log/envoy.log | while IFS= read -r line; do echo '[envoy_logs] ' \"$line\"; done"
|
||||
stdout_logfile=/dev/stdout
|
||||
redirect_stderr=true
|
||||
stdout_logfile_maxbytes=0
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ pub const MODEL_SERVER_NAME: &str = "model_server";
|
|||
pub const ARCH_ROUTING_HEADER: &str = "x-arch-llm-provider";
|
||||
pub const MESSAGES_KEY: &str = "messages";
|
||||
pub const ARCH_PROVIDER_HINT_HEADER: &str = "x-arch-llm-provider-hint";
|
||||
pub const ARCH_IS_STREAMING_HEADER: &str = "x-archgw-streaming-request";
|
||||
pub const ARCH_IS_STREAMING_HEADER: &str = "x-arch-streaming-request";
|
||||
pub const CHAT_COMPLETIONS_PATH: &str = "/v1/chat/completions";
|
||||
pub const MESSAGES_PATH: &str = "/v1/messages";
|
||||
pub const HEALTHZ_PATH: &str = "/healthz";
|
||||
|
|
|
|||
65
crates/hermesllm/src/apis/amazon_bedrock_binary_frame.rs
Normal file
65
crates/hermesllm/src/apis/amazon_bedrock_binary_frame.rs
Normal file
|
|
@ -0,0 +1,65 @@
|
|||
use std::collections::HashSet;
|
||||
use bytes::Buf;
|
||||
use aws_smithy_eventstream::frame::DecodedFrame;
|
||||
use aws_smithy_eventstream::frame::MessageFrameDecoder;
|
||||
|
||||
/// AWS Event Stream frame decoder wrapper
|
||||
pub struct BedrockBinaryFrameDecoder<B>
|
||||
where
|
||||
B: Buf,
|
||||
{
|
||||
decoder: MessageFrameDecoder,
|
||||
buffer: B,
|
||||
content_block_start_indices: HashSet<i32>,
|
||||
}
|
||||
|
||||
impl BedrockBinaryFrameDecoder<bytes::BytesMut> {
|
||||
/// This is a convenience constructor that creates a BytesMut buffer internally
|
||||
pub fn from_bytes(bytes: &[u8]) -> Self {
|
||||
let buffer = bytes::BytesMut::from(bytes);
|
||||
Self {
|
||||
decoder: MessageFrameDecoder::new(),
|
||||
buffer,
|
||||
content_block_start_indices: std::collections::HashSet::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<B> BedrockBinaryFrameDecoder<B>
|
||||
where
|
||||
B: Buf,
|
||||
{
|
||||
pub fn new(buffer: B) -> Self {
|
||||
Self {
|
||||
decoder: MessageFrameDecoder::new(),
|
||||
buffer,
|
||||
content_block_start_indices: HashSet::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn decode_frame(&mut self) -> Option<DecodedFrame> {
|
||||
match self.decoder.decode_frame(&mut self.buffer) {
|
||||
Ok(frame) => Some(frame),
|
||||
Err(_e) => None, // Fatal decode error
|
||||
}
|
||||
}
|
||||
|
||||
pub fn buffer_mut(&mut self) -> &mut B {
|
||||
&mut self.buffer
|
||||
}
|
||||
|
||||
/// Check if there are any bytes remaining in the buffer
|
||||
pub fn has_remaining(&self) -> bool {
|
||||
self.buffer.has_remaining()
|
||||
}
|
||||
|
||||
/// Check if a content_block_start event has been sent for the given index
|
||||
pub fn has_content_block_start_been_sent(&self, index: i32) -> bool {
|
||||
self.content_block_start_indices.contains(&index)
|
||||
}
|
||||
|
||||
/// Mark that a content_block_start event has been sent for the given index
|
||||
pub fn set_content_block_start_sent(&mut self, index: i32) {
|
||||
self.content_block_start_indices.insert(index);
|
||||
}
|
||||
}
|
||||
|
|
@ -1,6 +1,8 @@
|
|||
pub mod anthropic;
|
||||
pub mod openai;
|
||||
pub mod amazon_bedrock;
|
||||
pub mod amazon_bedrock_binary_frame;
|
||||
pub mod sse;
|
||||
|
||||
// Explicit exports to avoid naming conflicts
|
||||
pub use anthropic::{AnthropicApi, MessagesRequest, MessagesResponse, MessagesStreamEvent};
|
||||
|
|
|
|||
194
crates/hermesllm/src/apis/sse.rs
Normal file
194
crates/hermesllm/src/apis/sse.rs
Normal file
|
|
@ -0,0 +1,194 @@
|
|||
use std::str::FromStr;
|
||||
use std::fmt;
|
||||
use std::error::Error;
|
||||
use serde::{Serialize, Deserialize};
|
||||
use crate::providers::response::ProviderStreamResponse;
|
||||
use crate::providers::response::ProviderStreamResponseType;
|
||||
|
||||
// ============================================================================
|
||||
// SSE EVENT CONTAINER
|
||||
// ============================================================================
|
||||
|
||||
/// Represents a single Server-Sent Event with the complete wire format
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct SseEvent {
|
||||
#[serde(rename = "data")]
|
||||
pub data: Option<String>, // The JSON payload after "data: "
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub event: Option<String>, // Optional event type (e.g., "message_start", "content_block_delta")
|
||||
|
||||
#[serde(skip_serializing, skip_deserializing)]
|
||||
pub raw_line: String, // The complete line as received including "data: " prefix and "\n\n"
|
||||
|
||||
#[serde(skip_serializing, skip_deserializing)]
|
||||
pub sse_transform_buffer: String, // The complete line as received including "data: " prefix and "\n\n"
|
||||
|
||||
#[serde(skip_serializing, skip_deserializing)]
|
||||
pub provider_stream_response: Option<ProviderStreamResponseType>, // Parsed provider stream response object
|
||||
}
|
||||
|
||||
impl SseEvent {
|
||||
/// Check if this event represents the end of the stream
|
||||
pub fn is_done(&self) -> bool {
|
||||
self.data == Some("[DONE]".into())
|
||||
}
|
||||
|
||||
/// Check if this event should be skipped during processing
|
||||
/// This includes ping messages and other provider-specific events that don't contain content
|
||||
pub fn should_skip(&self) -> bool {
|
||||
// Skip ping messages (commonly used by providers for connection keep-alive)
|
||||
self.data == Some(r#"{"type": "ping"}"#.into())
|
||||
}
|
||||
|
||||
/// Check if this is an event-only SSE event (no data payload)
|
||||
pub fn is_event_only(&self) -> bool {
|
||||
self.event.is_some() && self.data.is_none()
|
||||
}
|
||||
|
||||
/// Get the parsed provider response if available
|
||||
pub fn provider_response(&self) -> Result<&dyn ProviderStreamResponse, std::io::Error> {
|
||||
self.provider_stream_response.as_ref()
|
||||
.map(|resp| resp as &dyn ProviderStreamResponse)
|
||||
.ok_or_else(|| {
|
||||
std::io::Error::new(std::io::ErrorKind::NotFound, "Provider response not found")
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
impl FromStr for SseEvent {
|
||||
type Err = SseParseError;
|
||||
|
||||
fn from_str(line: &str) -> Result<Self, Self::Err> {
|
||||
if line.starts_with("data: ") {
|
||||
let data: String = line[6..].to_string(); // Remove "data: " prefix
|
||||
if data.is_empty() {
|
||||
return Err(SseParseError {
|
||||
message: "Empty data field is not a valid SSE event".to_string(),
|
||||
});
|
||||
}
|
||||
Ok(SseEvent {
|
||||
data: Some(data),
|
||||
event: None,
|
||||
raw_line: line.to_string(),
|
||||
sse_transform_buffer: line.to_string(),
|
||||
provider_stream_response: None,
|
||||
})
|
||||
} else if line.starts_with("event: ") { //used by Anthropic
|
||||
let event_type = line[7..].to_string();
|
||||
if event_type.is_empty() {
|
||||
return Err(SseParseError {
|
||||
message: "Empty event field is not a valid SSE event".to_string(),
|
||||
});
|
||||
}
|
||||
Ok(SseEvent {
|
||||
data: None,
|
||||
event: Some(event_type),
|
||||
raw_line: line.to_string(),
|
||||
sse_transform_buffer: line.to_string(),
|
||||
provider_stream_response: None,
|
||||
})
|
||||
} else {
|
||||
Err(SseParseError {
|
||||
message: format!("Line does not start with 'data: ' or 'event: ': {}", line),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for SseEvent {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{}", self.sse_transform_buffer)
|
||||
}
|
||||
}
|
||||
|
||||
// Into implementation to convert SseEvent to bytes for response buffer
|
||||
impl Into<Vec<u8>> for SseEvent {
|
||||
fn into(self) -> Vec<u8> {
|
||||
format!("{}\n\n", self.sse_transform_buffer).into_bytes()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct SseParseError {
|
||||
pub message: String,
|
||||
}
|
||||
|
||||
impl fmt::Display for SseParseError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "SSE parse error: {}", self.message)
|
||||
}
|
||||
}
|
||||
|
||||
impl Error for SseParseError {}
|
||||
|
||||
|
||||
/// Generic SSE (Server-Sent Events) streaming iterator container
|
||||
/// Parses raw SSE lines into SseEvent objects
|
||||
pub struct SseStreamIter<I>
|
||||
where
|
||||
I: Iterator,
|
||||
I::Item: AsRef<str>,
|
||||
{
|
||||
pub lines: I,
|
||||
pub done_seen: bool,
|
||||
}
|
||||
|
||||
impl<I> SseStreamIter<I>
|
||||
where
|
||||
I: Iterator,
|
||||
I::Item: AsRef<str>,
|
||||
{
|
||||
pub fn new(lines: I) -> Self {
|
||||
Self { lines, done_seen: false }
|
||||
}
|
||||
}
|
||||
|
||||
// TryFrom implementation to parse bytes into SseStreamIter
|
||||
// Handles both text-based SSE and binary AWS Event Stream formats
|
||||
impl TryFrom<&[u8]> for SseStreamIter<std::vec::IntoIter<String>> {
|
||||
type Error = Box<dyn std::error::Error + Send + Sync>;
|
||||
|
||||
fn try_from(bytes: &[u8]) -> Result<Self, Self::Error> {
|
||||
// Parse as text-based SSE format
|
||||
let s = std::str::from_utf8(bytes)?;
|
||||
let lines: Vec<String> = s.lines().map(|line| line.to_string()).collect();
|
||||
Ok(SseStreamIter::new(lines.into_iter()))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
impl<I> Iterator for SseStreamIter<I>
|
||||
where
|
||||
I: Iterator,
|
||||
I::Item: AsRef<str>,
|
||||
{
|
||||
type Item = SseEvent;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
// If we already returned [DONE], terminate the stream
|
||||
if self.done_seen {
|
||||
return None;
|
||||
}
|
||||
|
||||
for line in &mut self.lines {
|
||||
let line_str = line.as_ref();
|
||||
|
||||
// Try to parse as either data: or event: line
|
||||
if let Ok(event) = line_str.parse::<SseEvent>() {
|
||||
// For data: lines, check if this is the [DONE] marker
|
||||
if event.data.is_some() && event.is_done() {
|
||||
self.done_seen = true;
|
||||
return Some(event); // Return [DONE] event for transformation
|
||||
}
|
||||
// For data: lines, skip events that should be filtered at the transport layer
|
||||
if event.data.is_some() && event.should_skip() {
|
||||
continue;
|
||||
}
|
||||
return Some(event);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
|
|
@ -104,12 +104,13 @@ impl SupportedAPIs {
|
|||
}
|
||||
}
|
||||
ProviderId::AmazonBedrock => {
|
||||
if request_path.starts_with("/v1/") && !is_streaming {
|
||||
format!("/model/{}/converse", model_id)
|
||||
} else if request_path.starts_with("/v1/") && is_streaming {
|
||||
format!("/model/{}/converse-stream", model_id)
|
||||
}
|
||||
else {
|
||||
if request_path.starts_with("/v1/") {
|
||||
if !is_streaming {
|
||||
format!("/model/{}/converse", model_id)
|
||||
} else {
|
||||
format!("/model/{}/converse-stream", model_id)
|
||||
}
|
||||
} else {
|
||||
default_endpoint
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,7 +7,9 @@ pub mod clients;
|
|||
pub mod transforms;
|
||||
// Re-export important types and traits
|
||||
pub use providers::request::{ProviderRequestType, ProviderRequest, ProviderRequestError};
|
||||
pub use providers::response::{ProviderResponseType, ProviderStreamResponseType, ProviderResponse, ProviderStreamResponse, ProviderResponseError, TokenUsage, SseEvent, SseStreamIter, BedrockBinaryFrameDecoder};
|
||||
pub use apis::sse::{SseEvent, SseStreamIter};
|
||||
pub use apis::amazon_bedrock_binary_frame::BedrockBinaryFrameDecoder;
|
||||
pub use providers::response::{ProviderResponseType, ProviderStreamResponseType, ProviderResponse, ProviderStreamResponse, ProviderResponseError, TokenUsage};
|
||||
pub use providers::id::ProviderId;
|
||||
pub use aws_smithy_eventstream::frame::DecodedFrame;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,16 +1,15 @@
|
|||
use crate::clients::endpoints::SupportedUpstreamAPIs;
|
||||
use crate::providers::id::ProviderId;
|
||||
use bytes::Buf;
|
||||
use serde::{Serialize, Deserialize};
|
||||
use serde::Serialize;
|
||||
use std::error::Error;
|
||||
use std::fmt;
|
||||
use std::convert::TryFrom;
|
||||
use std::str::FromStr;
|
||||
|
||||
use crate::clients::endpoints::SupportedUpstreamAPIs;
|
||||
use crate::clients::endpoints::SupportedAPIs;
|
||||
use crate::providers::id::ProviderId;
|
||||
use crate::apis::sse::SseEvent;
|
||||
use crate::apis::openai::ChatCompletionsResponse;
|
||||
use crate::apis::openai::ChatCompletionsStreamResponse;
|
||||
use crate::apis::anthropic::MessagesStreamEvent;
|
||||
use crate::clients::endpoints::SupportedAPIs;
|
||||
use crate::apis::anthropic::MessagesResponse;
|
||||
use crate::apis::amazon_bedrock::ConverseResponse;
|
||||
use crate::apis::amazon_bedrock::ConverseStreamEvent;
|
||||
|
|
@ -241,124 +240,6 @@ impl TryFrom<(&[u8], &SupportedAPIs, &SupportedUpstreamAPIs)> for ProviderStream
|
|||
}
|
||||
|
||||
|
||||
// ============================================================================
|
||||
// SSE EVENT CONTAINER
|
||||
// ============================================================================
|
||||
|
||||
/// Represents a single Server-Sent Event with the complete wire format
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct SseEvent {
|
||||
#[serde(rename = "data")]
|
||||
pub data: Option<String>, // The JSON payload after "data: "
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub event: Option<String>, // Optional event type (e.g., "message_start", "content_block_delta")
|
||||
|
||||
#[serde(skip_serializing, skip_deserializing)]
|
||||
pub raw_line: String, // The complete line as received including "data: " prefix and "\n\n"
|
||||
|
||||
#[serde(skip_serializing, skip_deserializing)]
|
||||
pub sse_transform_buffer: String, // The complete line as received including "data: " prefix and "\n\n"
|
||||
|
||||
#[serde(skip_serializing, skip_deserializing)]
|
||||
pub provider_stream_response: Option<ProviderStreamResponseType>, // Parsed provider stream response object
|
||||
}
|
||||
|
||||
impl SseEvent {
|
||||
/// Check if this event represents the end of the stream
|
||||
pub fn is_done(&self) -> bool {
|
||||
self.data == Some("[DONE]".into())
|
||||
}
|
||||
|
||||
/// Check if this event should be skipped during processing
|
||||
/// This includes ping messages and other provider-specific events that don't contain content
|
||||
pub fn should_skip(&self) -> bool {
|
||||
// Skip ping messages (commonly used by providers for connection keep-alive)
|
||||
self.data == Some(r#"{"type": "ping"}"#.into())
|
||||
}
|
||||
|
||||
/// Check if this is an event-only SSE event (no data payload)
|
||||
pub fn is_event_only(&self) -> bool {
|
||||
self.event.is_some() && self.data.is_none()
|
||||
}
|
||||
|
||||
/// Get the parsed provider response if available
|
||||
pub fn provider_response(&self) -> Result<&dyn ProviderStreamResponse, std::io::Error> {
|
||||
self.provider_stream_response.as_ref()
|
||||
.map(|resp| resp as &dyn ProviderStreamResponse)
|
||||
.ok_or_else(|| {
|
||||
std::io::Error::new(std::io::ErrorKind::NotFound, "Provider response not found")
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
impl FromStr for SseEvent {
|
||||
type Err = SseParseError;
|
||||
|
||||
fn from_str(line: &str) -> Result<Self, Self::Err> {
|
||||
if line.starts_with("data: ") {
|
||||
let data: String = line[6..].to_string(); // Remove "data: " prefix
|
||||
if data.is_empty() {
|
||||
return Err(SseParseError {
|
||||
message: "Empty data field is not a valid SSE event".to_string(),
|
||||
});
|
||||
}
|
||||
Ok(SseEvent {
|
||||
data: Some(data),
|
||||
event: None,
|
||||
raw_line: line.to_string(),
|
||||
sse_transform_buffer: line.to_string(),
|
||||
provider_stream_response: None,
|
||||
})
|
||||
} else if line.starts_with("event: ") { //used by Anthropic
|
||||
let event_type = line[7..].to_string();
|
||||
if event_type.is_empty() {
|
||||
return Err(SseParseError {
|
||||
message: "Empty event field is not a valid SSE event".to_string(),
|
||||
});
|
||||
}
|
||||
Ok(SseEvent {
|
||||
data: None,
|
||||
event: Some(event_type),
|
||||
raw_line: line.to_string(),
|
||||
sse_transform_buffer: line.to_string(),
|
||||
provider_stream_response: None,
|
||||
})
|
||||
} else {
|
||||
Err(SseParseError {
|
||||
message: format!("Line does not start with 'data: ' or 'event: ': {}", line),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for SseEvent {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{}", self.sse_transform_buffer)
|
||||
}
|
||||
}
|
||||
|
||||
// Into implementation to convert SseEvent to bytes for response buffer
|
||||
impl Into<Vec<u8>> for SseEvent {
|
||||
fn into(self) -> Vec<u8> {
|
||||
format!("{}\n\n", self.sse_transform_buffer).into_bytes()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct SseParseError {
|
||||
pub message: String,
|
||||
}
|
||||
|
||||
impl fmt::Display for SseParseError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "SSE parse error: {}", self.message)
|
||||
}
|
||||
}
|
||||
|
||||
impl Error for SseParseError {}
|
||||
|
||||
// TryFrom implementation to convert raw bytes to SseEvent with parsed provider response
|
||||
impl TryFrom<(SseEvent, &SupportedAPIs, &SupportedUpstreamAPIs)> for SseEvent {
|
||||
type Error = Box<dyn std::error::Error + Send + Sync>;
|
||||
|
|
@ -471,135 +352,6 @@ impl TryFrom<(&aws_smithy_eventstream::frame::DecodedFrame, &SupportedAPIs, &Sup
|
|||
}
|
||||
|
||||
|
||||
/// AWS Event Stream frame decoder wrapper
|
||||
pub struct BedrockBinaryFrameDecoder<B>
|
||||
where
|
||||
B: Buf,
|
||||
{
|
||||
decoder: aws_smithy_eventstream::frame::MessageFrameDecoder,
|
||||
buffer: B,
|
||||
content_block_start_indices: std::collections::HashSet<i32>,
|
||||
}
|
||||
|
||||
impl BedrockBinaryFrameDecoder<bytes::BytesMut> {
|
||||
/// This is a convenience constructor that creates a BytesMut buffer internally
|
||||
pub fn from_bytes(bytes: &[u8]) -> Self {
|
||||
let buffer = bytes::BytesMut::from(bytes);
|
||||
Self {
|
||||
decoder: aws_smithy_eventstream::frame::MessageFrameDecoder::new(),
|
||||
buffer,
|
||||
content_block_start_indices: std::collections::HashSet::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<B> BedrockBinaryFrameDecoder<B>
|
||||
where
|
||||
B: Buf,
|
||||
{
|
||||
pub fn new(buffer: B) -> Self {
|
||||
Self {
|
||||
decoder: aws_smithy_eventstream::frame::MessageFrameDecoder::new(),
|
||||
buffer,
|
||||
content_block_start_indices: std::collections::HashSet::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn decode_frame(&mut self) -> Option<aws_smithy_eventstream::frame::DecodedFrame> {
|
||||
match self.decoder.decode_frame(&mut self.buffer) {
|
||||
Ok(frame) => Some(frame),
|
||||
Err(_e) => None, // Fatal decode error
|
||||
}
|
||||
}
|
||||
|
||||
pub fn buffer_mut(&mut self) -> &mut B {
|
||||
&mut self.buffer
|
||||
}
|
||||
|
||||
/// Check if there are any bytes remaining in the buffer
|
||||
pub fn has_remaining(&self) -> bool {
|
||||
self.buffer.has_remaining()
|
||||
}
|
||||
|
||||
/// Check if a content_block_start event has been sent for the given index
|
||||
pub fn has_content_block_start_been_sent(&self, index: i32) -> bool {
|
||||
self.content_block_start_indices.contains(&index)
|
||||
}
|
||||
|
||||
/// Mark that a content_block_start event has been sent for the given index
|
||||
pub fn set_content_block_start_sent(&mut self, index: i32) {
|
||||
self.content_block_start_indices.insert(index);
|
||||
}
|
||||
}
|
||||
|
||||
/// Generic SSE (Server-Sent Events) streaming iterator container
|
||||
/// Parses raw SSE lines into SseEvent objects
|
||||
pub struct SseStreamIter<I>
|
||||
where
|
||||
I: Iterator,
|
||||
I::Item: AsRef<str>,
|
||||
{
|
||||
pub lines: I,
|
||||
pub done_seen: bool,
|
||||
}
|
||||
|
||||
impl<I> SseStreamIter<I>
|
||||
where
|
||||
I: Iterator,
|
||||
I::Item: AsRef<str>,
|
||||
{
|
||||
pub fn new(lines: I) -> Self {
|
||||
Self { lines, done_seen: false }
|
||||
}
|
||||
}
|
||||
|
||||
// TryFrom implementation to parse bytes into SseStreamIter
|
||||
// Handles both text-based SSE and binary AWS Event Stream formats
|
||||
impl TryFrom<&[u8]> for SseStreamIter<std::vec::IntoIter<String>> {
|
||||
type Error = Box<dyn std::error::Error + Send + Sync>;
|
||||
|
||||
fn try_from(bytes: &[u8]) -> Result<Self, Self::Error> {
|
||||
// Parse as text-based SSE format
|
||||
let s = std::str::from_utf8(bytes)?;
|
||||
let lines: Vec<String> = s.lines().map(|line| line.to_string()).collect();
|
||||
Ok(SseStreamIter::new(lines.into_iter()))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
impl<I> Iterator for SseStreamIter<I>
|
||||
where
|
||||
I: Iterator,
|
||||
I::Item: AsRef<str>,
|
||||
{
|
||||
type Item = SseEvent;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
// If we already returned [DONE], terminate the stream
|
||||
if self.done_seen {
|
||||
return None;
|
||||
}
|
||||
|
||||
for line in &mut self.lines {
|
||||
let line_str = line.as_ref();
|
||||
|
||||
// Try to parse as either data: or event: line
|
||||
if let Ok(event) = line_str.parse::<SseEvent>() {
|
||||
// For data: lines, check if this is the [DONE] marker
|
||||
if event.data.is_some() && event.is_done() {
|
||||
self.done_seen = true;
|
||||
return Some(event); // Return [DONE] event for transformation
|
||||
}
|
||||
// For data: lines, skip events that should be filtered at the transport layer
|
||||
if event.data.is_some() && event.should_skip() {
|
||||
continue;
|
||||
}
|
||||
return Some(event);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ProviderResponseError {
|
||||
|
|
@ -623,12 +375,15 @@ impl Error for ProviderResponseError {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::apis::sse::SseStreamIter;
|
||||
use crate::apis::amazon_bedrock_binary_frame::BedrockBinaryFrameDecoder;
|
||||
use crate::clients::endpoints::SupportedAPIs;
|
||||
use crate::providers::id::ProviderId;
|
||||
use crate::apis::openai::OpenAIApi;
|
||||
use crate::apis::anthropic::AnthropicApi;
|
||||
use serde_json::json;
|
||||
|
||||
|
||||
#[test]
|
||||
fn test_openai_response_from_bytes() {
|
||||
let resp = json!({
|
||||
|
|
|
|||
|
|
@ -22,12 +22,14 @@ use common::ratelimit::Header;
|
|||
use common::stats::{IncrementingMetric, RecordingMetric};
|
||||
use common::tracing::{Event, Span, TraceData, Traceparent};
|
||||
use common::{ratelimit, routing, tokenizer};
|
||||
use hermesllm::apis::amazon_bedrock_binary_frame::BedrockBinaryFrameDecoder;
|
||||
use hermesllm::apis::anthropic::{MessagesContentBlock, MessagesStreamEvent};
|
||||
use hermesllm::apis::sse::{SseEvent, SseStreamIter};
|
||||
use hermesllm::clients::endpoints::SupportedAPIs;
|
||||
use hermesllm::providers::response::{
|
||||
BedrockBinaryFrameDecoder, ProviderResponse, ProviderStreamResponse, SseEvent, SseStreamIter,
|
||||
};
|
||||
use hermesllm::providers::response::ProviderResponse;
|
||||
use hermesllm::{
|
||||
DecodedFrame, ProviderId, ProviderRequest, ProviderRequestType, ProviderResponseType,
|
||||
ProviderStreamResponseType,
|
||||
};
|
||||
|
||||
pub struct StreamContext {
|
||||
|
|
@ -514,88 +516,58 @@ impl StreamContext {
|
|||
client_api: &SupportedAPIs,
|
||||
upstream_api: &SupportedUpstreamAPIs,
|
||||
) -> Result<Vec<u8>, Action> {
|
||||
use hermesllm::providers::response::ProviderStreamResponseType;
|
||||
|
||||
// Initialize decoder if not present
|
||||
if self.binary_frame_decoder.is_none() {
|
||||
self.binary_frame_decoder = Some(BedrockBinaryFrameDecoder::from_bytes(&[]));
|
||||
}
|
||||
|
||||
// Add incoming bytes to buffer
|
||||
if let Some(decoder) = self.binary_frame_decoder.as_mut() {
|
||||
decoder.buffer_mut().extend_from_slice(body);
|
||||
}
|
||||
let decoder = self.binary_frame_decoder.as_mut().unwrap();
|
||||
decoder.buffer_mut().extend_from_slice(body);
|
||||
|
||||
let mut response_buffer = Vec::new();
|
||||
|
||||
// Decode all available complete frames
|
||||
loop {
|
||||
let decoded_frame = self.binary_frame_decoder.as_mut().unwrap().decode_frame();
|
||||
match decoded_frame {
|
||||
Some(DecodedFrame::Complete(ref frame_ref)) => {
|
||||
// Convert frame to ProviderStreamResponseType
|
||||
let frame = DecodedFrame::Complete(frame_ref.clone());
|
||||
match ProviderStreamResponseType::try_from((&frame, client_api, upstream_api)) {
|
||||
Ok(provider_response) => {
|
||||
self.record_ttft_if_needed();
|
||||
|
||||
// Extract index from the event if available
|
||||
let event_index =
|
||||
if let ProviderStreamResponseType::MessagesStreamEvent(ref evt) =
|
||||
provider_response
|
||||
{
|
||||
use hermesllm::apis::anthropic::MessagesStreamEvent;
|
||||
// Handle ContentBlockStart and ContentBlockDelta events
|
||||
match &provider_response {
|
||||
ProviderStreamResponseType::MessagesStreamEvent(evt) => {
|
||||
match evt {
|
||||
MessagesStreamEvent::ContentBlockStart {
|
||||
index, ..
|
||||
} => Some(*index as i32),
|
||||
MessagesStreamEvent::ContentBlockDelta {
|
||||
index, ..
|
||||
} => Some(*index as i32),
|
||||
MessagesStreamEvent::ContentBlockStop { index, .. } => {
|
||||
Some(*index as i32)
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Check event type to track ContentBlockStart
|
||||
if let Some(event_type) = provider_response.event_type() {
|
||||
match event_type {
|
||||
"content_block_start" => {
|
||||
// Mark that we've seen ContentBlockStart for this index
|
||||
if let (Some(decoder), Some(index)) =
|
||||
(self.binary_frame_decoder.as_mut(), event_index)
|
||||
{
|
||||
decoder.set_content_block_start_sent(index);
|
||||
} => {
|
||||
// Mark that we've seen ContentBlockStart for this index
|
||||
self.binary_frame_decoder
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.set_content_block_start_sent(*index as i32);
|
||||
debug!(
|
||||
"[ARCHGW_REQ_ID:{}] BEDROCK_CONTENT_BLOCK_START_TRACKED: index={}",
|
||||
self.request_identifier(),
|
||||
index
|
||||
*index
|
||||
);
|
||||
}
|
||||
}
|
||||
"content_block_delta" => {
|
||||
// Check if ContentBlockStart was sent for this index
|
||||
if let Some(index) = event_index {
|
||||
let needs_start = if let Some(decoder) =
|
||||
self.binary_frame_decoder.as_ref()
|
||||
{
|
||||
!decoder.has_content_block_start_been_sent(index)
|
||||
} else {
|
||||
false
|
||||
};
|
||||
MessagesStreamEvent::ContentBlockDelta {
|
||||
index, ..
|
||||
} => {
|
||||
// Check if ContentBlockStart was sent for this index
|
||||
let needs_start = !self
|
||||
.binary_frame_decoder
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.has_content_block_start_been_sent(*index as i32);
|
||||
|
||||
if needs_start {
|
||||
// Emit empty ContentBlockStart before delta
|
||||
use hermesllm::apis::anthropic::{
|
||||
MessagesContentBlock, MessagesStreamEvent,
|
||||
};
|
||||
let content_block_start =
|
||||
MessagesStreamEvent::ContentBlockStart {
|
||||
index: index as u32,
|
||||
index: *index,
|
||||
content_block: MessagesContentBlock::Text {
|
||||
text: String::new(),
|
||||
cache_control: None,
|
||||
|
|
@ -606,22 +578,22 @@ impl StreamContext {
|
|||
.extend_from_slice(start_sse.as_bytes());
|
||||
|
||||
// Mark that we've now sent it
|
||||
if let Some(decoder) =
|
||||
self.binary_frame_decoder.as_mut()
|
||||
{
|
||||
decoder.set_content_block_start_sent(index);
|
||||
}
|
||||
self.binary_frame_decoder
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.set_content_block_start_sent(*index as i32);
|
||||
|
||||
debug!(
|
||||
"[ARCHGW_REQ_ID:{}] BEDROCK_INJECTED_CONTENT_BLOCK_START: index={}",
|
||||
self.request_identifier(),
|
||||
index
|
||||
*index
|
||||
);
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
let sse_string: String = provider_response.into();
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue