mirror of
https://github.com/katanemo/plano.git
synced 2026-05-10 16:22:42 +02:00
refactor brightstaff (#736)
This commit is contained in:
parent
1f23c573bf
commit
1ad3e0f64e
30 changed files with 1802 additions and 1700 deletions
41
crates/brightstaff/src/handlers/agents/errors.rs
Normal file
41
crates/brightstaff/src/handlers/agents/errors.rs
Normal file
|
|
@ -0,0 +1,41 @@
|
|||
use bytes::Bytes;
|
||||
use http_body_util::combinators::BoxBody;
|
||||
use hyper::Response;
|
||||
use serde_json::json;
|
||||
use tracing::{info, warn};
|
||||
|
||||
use crate::handlers::response::ResponseHandler;
|
||||
|
||||
/// Build a JSON error response from an `AgentFilterChainError`, logging the
|
||||
/// full error chain along the way.
|
||||
///
|
||||
/// Returns `Ok(Response)` so it can be used directly as a handler return value.
|
||||
pub fn build_error_chain_response<E: std::error::Error>(
|
||||
err: &E,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||
let mut error_chain = Vec::new();
|
||||
let mut current: &dyn std::error::Error = err;
|
||||
loop {
|
||||
error_chain.push(current.to_string());
|
||||
match current.source() {
|
||||
Some(source) => current = source,
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
|
||||
warn!(error_chain = ?error_chain, "agent chat error chain");
|
||||
warn!(root_error = ?err, "root error");
|
||||
|
||||
let error_json = json!({
|
||||
"error": {
|
||||
"type": "AgentFilterChainError",
|
||||
"message": err.to_string(),
|
||||
"error_chain": error_chain,
|
||||
"debug_info": format!("{:?}", err)
|
||||
}
|
||||
});
|
||||
|
||||
info!(error = %error_json, "structured error info");
|
||||
|
||||
Ok(ResponseHandler::create_json_error_response(&error_json))
|
||||
}
|
||||
5
crates/brightstaff/src/handlers/agents/mod.rs
Normal file
5
crates/brightstaff/src/handlers/agents/mod.rs
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
pub mod errors;
|
||||
pub mod jsonrpc;
|
||||
pub mod orchestrator;
|
||||
pub mod pipeline;
|
||||
pub mod selector;
|
||||
|
|
@ -2,63 +2,56 @@ use std::sync::Arc;
|
|||
use std::time::Instant;
|
||||
|
||||
use bytes::Bytes;
|
||||
use common::configuration::SpanAttributes;
|
||||
use common::errors::BrightStaffError;
|
||||
use common::llm_providers::LlmProviders;
|
||||
use hermesllm::apis::OpenAIMessage;
|
||||
use hermesllm::clients::SupportedAPIsFromClient;
|
||||
use hermesllm::providers::request::ProviderRequest;
|
||||
use hermesllm::ProviderRequestType;
|
||||
use http_body_util::combinators::BoxBody;
|
||||
use http_body_util::BodyExt;
|
||||
use hyper::{Request, Response, StatusCode};
|
||||
use hyper::{Request, Response};
|
||||
use opentelemetry::trace::get_active_span;
|
||||
use serde::ser::Error as SerError;
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::{debug, info, info_span, warn, Instrument};
|
||||
|
||||
use super::agent_selector::{AgentSelectionError, AgentSelector};
|
||||
use super::pipeline_processor::{PipelineError, PipelineProcessor};
|
||||
use super::response_handler::ResponseHandler;
|
||||
use crate::router::plano_orchestrator::OrchestratorService;
|
||||
use super::errors::build_error_chain_response;
|
||||
use super::pipeline::{PipelineError, PipelineProcessor};
|
||||
use super::selector::{AgentSelectionError, AgentSelector};
|
||||
use crate::app_state::AppState;
|
||||
use crate::handlers::extract_request_id;
|
||||
use crate::handlers::response::ResponseHandler;
|
||||
use crate::tracing::{collect_custom_trace_attributes, operation_component, set_service_name};
|
||||
|
||||
/// Main errors for agent chat completions
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum AgentFilterChainError {
|
||||
#[error("Forwarded error: {0}")]
|
||||
Brightstaff(#[from] BrightStaffError),
|
||||
#[error("Agent selection error: {0}")]
|
||||
Selection(#[from] AgentSelectionError),
|
||||
#[error("Pipeline processing error: {0}")]
|
||||
Pipeline(#[from] PipelineError),
|
||||
#[error("Response handling error: {0}")]
|
||||
Response(#[from] common::errors::BrightStaffError),
|
||||
#[error("Request parsing error: {0}")]
|
||||
RequestParsing(#[from] serde_json::Error),
|
||||
RequestParsing(String),
|
||||
#[error("HTTP error: {0}")]
|
||||
Http(#[from] hyper::Error),
|
||||
#[error("Unsupported endpoint: {0}")]
|
||||
UnsupportedEndpoint(String),
|
||||
#[error("No agents configured")]
|
||||
NoAgentsConfigured,
|
||||
#[error("Agent '{0}' not found in configuration")]
|
||||
AgentNotFound(String),
|
||||
#[error("No messages in conversation history")]
|
||||
EmptyHistory,
|
||||
#[error("Agent chain completed without producing a response")]
|
||||
IncompleteChain,
|
||||
}
|
||||
|
||||
pub async fn agent_chat(
|
||||
request: Request<hyper::body::Incoming>,
|
||||
orchestrator_service: Arc<OrchestratorService>,
|
||||
_: String,
|
||||
agents_list: Arc<tokio::sync::RwLock<Option<Vec<common::configuration::Agent>>>>,
|
||||
listeners: Arc<tokio::sync::RwLock<Vec<common::configuration::Listener>>>,
|
||||
span_attributes: Arc<Option<SpanAttributes>>,
|
||||
llm_providers: Arc<RwLock<LlmProviders>>,
|
||||
state: Arc<AppState>,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||
let request_id = extract_request_id(&request);
|
||||
let custom_attrs =
|
||||
collect_custom_trace_attributes(request.headers(), span_attributes.as_ref().as_ref());
|
||||
// Extract request_id from headers or generate a new one
|
||||
let request_id: String = match request
|
||||
.headers()
|
||||
.get(common::consts::REQUEST_ID_HEADER)
|
||||
.and_then(|h| h.to_str().ok())
|
||||
.map(|s| s.to_string())
|
||||
{
|
||||
Some(id) => id,
|
||||
None => uuid::Uuid::new_v4().to_string(),
|
||||
};
|
||||
collect_custom_trace_attributes(request.headers(), state.span_attributes.as_ref());
|
||||
|
||||
// Create a span with request_id that will be included in all log lines
|
||||
let request_span = info_span!(
|
||||
|
|
@ -74,17 +67,7 @@ pub async fn agent_chat(
|
|||
// Set service name for orchestrator operations
|
||||
set_service_name(operation_component::ORCHESTRATOR);
|
||||
|
||||
match handle_agent_chat_inner(
|
||||
request,
|
||||
orchestrator_service,
|
||||
agents_list,
|
||||
listeners,
|
||||
llm_providers,
|
||||
request_id,
|
||||
custom_attrs,
|
||||
)
|
||||
.await
|
||||
{
|
||||
match handle_agent_chat_inner(request, state, request_id, custom_attrs).await {
|
||||
Ok(response) => Ok(response),
|
||||
Err(err) => {
|
||||
// Check if this is a client error from the pipeline that should be cascaded
|
||||
|
|
@ -101,7 +84,6 @@ pub async fn agent_chat(
|
|||
"client error from agent"
|
||||
);
|
||||
|
||||
// Create error response with the original status code and body
|
||||
let error_json = serde_json::json!({
|
||||
"error": "ClientError",
|
||||
"agent": agent,
|
||||
|
|
@ -109,52 +91,19 @@ pub async fn agent_chat(
|
|||
"agent_response": body
|
||||
});
|
||||
|
||||
let status_code = hyper::StatusCode::from_u16(*status)
|
||||
.unwrap_or(hyper::StatusCode::INTERNAL_SERVER_ERROR);
|
||||
|
||||
let json_string = error_json.to_string();
|
||||
return Ok(BrightStaffError::ForwardedError {
|
||||
status_code,
|
||||
message: json_string,
|
||||
}
|
||||
.into_response());
|
||||
let mut response =
|
||||
Response::new(ResponseHandler::create_full_body(json_string));
|
||||
*response.status_mut() = hyper::StatusCode::from_u16(*status)
|
||||
.unwrap_or(hyper::StatusCode::INTERNAL_SERVER_ERROR);
|
||||
response.headers_mut().insert(
|
||||
hyper::header::CONTENT_TYPE,
|
||||
hyper::header::HeaderValue::from_static("application/json"),
|
||||
);
|
||||
return Ok(response);
|
||||
}
|
||||
|
||||
// Print detailed error information with full error chain for other errors
|
||||
let mut error_chain = Vec::new();
|
||||
let mut current_error: &dyn std::error::Error = &err;
|
||||
|
||||
// Collect the full error chain
|
||||
loop {
|
||||
error_chain.push(current_error.to_string());
|
||||
match current_error.source() {
|
||||
Some(source) => current_error = source,
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
|
||||
// Log the complete error chain
|
||||
warn!(error_chain = ?error_chain, "agent chat error chain");
|
||||
warn!(root_error = ?err, "root error");
|
||||
|
||||
// Create structured error response as JSON
|
||||
let error_json = serde_json::json!({
|
||||
"error": {
|
||||
"type": "AgentFilterChainError",
|
||||
"message": err.to_string(),
|
||||
"error_chain": error_chain,
|
||||
"debug_info": format!("{:?}", err)
|
||||
}
|
||||
});
|
||||
|
||||
// Log the error for debugging
|
||||
info!(error = %error_json, "structured error info");
|
||||
|
||||
Ok(BrightStaffError::ForwardedError {
|
||||
status_code: StatusCode::BAD_REQUEST,
|
||||
message: error_json.to_string(),
|
||||
}
|
||||
.into_response())
|
||||
build_error_chain_response(&err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -162,19 +111,22 @@ pub async fn agent_chat(
|
|||
.await
|
||||
}
|
||||
|
||||
async fn handle_agent_chat_inner(
|
||||
/// Parsed and validated agent request data.
|
||||
struct AgentRequest {
|
||||
client_request: ProviderRequestType,
|
||||
messages: Vec<OpenAIMessage>,
|
||||
request_headers: hyper::HeaderMap,
|
||||
request_id: Option<String>,
|
||||
}
|
||||
|
||||
/// Parse the incoming HTTP request, resolve the listener, and extract messages.
|
||||
async fn parse_agent_request(
|
||||
request: Request<hyper::body::Incoming>,
|
||||
orchestrator_service: Arc<OrchestratorService>,
|
||||
agents_list: Arc<tokio::sync::RwLock<Option<Vec<common::configuration::Agent>>>>,
|
||||
listeners: Arc<tokio::sync::RwLock<Vec<common::configuration::Listener>>>,
|
||||
llm_providers: Arc<RwLock<LlmProviders>>,
|
||||
request_id: String,
|
||||
custom_attrs: std::collections::HashMap<String, String>,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, AgentFilterChainError> {
|
||||
// Initialize services
|
||||
let agent_selector = AgentSelector::new(orchestrator_service);
|
||||
let mut pipeline_processor = PipelineProcessor::default();
|
||||
let response_handler = ResponseHandler::new();
|
||||
state: &AppState,
|
||||
request_id: &str,
|
||||
custom_attrs: &std::collections::HashMap<String, String>,
|
||||
) -> Result<(AgentRequest, common::configuration::Listener, AgentSelector), AgentFilterChainError> {
|
||||
let agent_selector = AgentSelector::new(Arc::clone(&state.orchestrator_service));
|
||||
|
||||
// Extract listener name from headers
|
||||
let listener_name = request
|
||||
|
|
@ -183,16 +135,11 @@ async fn handle_agent_chat_inner(
|
|||
.and_then(|name| name.to_str().ok());
|
||||
|
||||
// Find the appropriate listener
|
||||
let listener: common::configuration::Listener = {
|
||||
let listeners = listeners.read().await;
|
||||
agent_selector
|
||||
.find_listener(listener_name, &listeners)
|
||||
.await?
|
||||
};
|
||||
let listener = agent_selector.find_listener(listener_name, &state.listeners)?;
|
||||
|
||||
get_active_span(|span| {
|
||||
span.update_name(listener.name.to_string());
|
||||
for (key, value) in &custom_attrs {
|
||||
for (key, value) in custom_attrs {
|
||||
span.set_attribute(opentelemetry::KeyValue::new(key.clone(), value.clone()));
|
||||
}
|
||||
});
|
||||
|
|
@ -200,24 +147,20 @@ async fn handle_agent_chat_inner(
|
|||
info!(listener = %listener.name, "handling request");
|
||||
|
||||
// Parse request body
|
||||
let request_path = request
|
||||
.uri()
|
||||
.path()
|
||||
.to_string()
|
||||
let full_path = request.uri().path().to_string();
|
||||
let request_path = full_path
|
||||
.strip_prefix("/agents")
|
||||
.unwrap()
|
||||
.unwrap_or(&full_path)
|
||||
.to_string();
|
||||
|
||||
let request_headers = {
|
||||
let mut headers = request.headers().clone();
|
||||
headers.remove(common::consts::ENVOY_ORIGINAL_PATH_HEADER);
|
||||
|
||||
// Set the request_id in headers if not already present
|
||||
if !headers.contains_key(common::consts::REQUEST_ID_HEADER) {
|
||||
headers.insert(
|
||||
common::consts::REQUEST_ID_HEADER,
|
||||
hyper::header::HeaderValue::from_str(&request_id).unwrap(),
|
||||
);
|
||||
if let Ok(val) = hyper::header::HeaderValue::from_str(request_id) {
|
||||
headers.insert(common::consts::REQUEST_ID_HEADER, val);
|
||||
}
|
||||
}
|
||||
|
||||
headers
|
||||
|
|
@ -230,63 +173,62 @@ async fn handle_agent_chat_inner(
|
|||
"received request body"
|
||||
);
|
||||
|
||||
// Determine the API type from the endpoint
|
||||
let api_type =
|
||||
SupportedAPIsFromClient::from_endpoint(request_path.as_str()).ok_or_else(|| {
|
||||
let err_msg = format!("Unsupported endpoint: {}", request_path);
|
||||
warn!("{}", err_msg);
|
||||
AgentFilterChainError::RequestParsing(serde_json::Error::custom(err_msg))
|
||||
warn!(path = %request_path, "unsupported endpoint");
|
||||
AgentFilterChainError::UnsupportedEndpoint(request_path.clone())
|
||||
})?;
|
||||
|
||||
let mut client_request =
|
||||
match ProviderRequestType::try_from((&chat_request_bytes[..], &api_type)) {
|
||||
Ok(request) => request,
|
||||
Err(err) => {
|
||||
warn!("failed to parse request as ProviderRequestType: {}", err);
|
||||
let err_msg = format!("Failed to parse request: {}", err);
|
||||
return Err(AgentFilterChainError::RequestParsing(
|
||||
serde_json::Error::custom(err_msg),
|
||||
));
|
||||
}
|
||||
};
|
||||
let client_request = ProviderRequestType::try_from((&chat_request_bytes[..], &api_type))
|
||||
.map_err(|err| {
|
||||
warn!(error = %err, "failed to parse request as ProviderRequestType");
|
||||
AgentFilterChainError::RequestParsing(format!("Failed to parse request: {}", err))
|
||||
})?;
|
||||
|
||||
// If model is not specified in the request, resolve from default provider
|
||||
if client_request.model().is_empty() {
|
||||
match llm_providers.read().await.default() {
|
||||
Some(default_provider) => {
|
||||
let default_model = default_provider.name.clone();
|
||||
info!(default_model = %default_model, "no model specified in request, using default provider");
|
||||
client_request.set_model(default_model);
|
||||
}
|
||||
None => {
|
||||
let err_msg = "No model specified in request and no default provider configured";
|
||||
warn!("{}", err_msg);
|
||||
return Ok(BrightStaffError::NoModelSpecified.into_response());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let message: Vec<OpenAIMessage> = client_request.get_messages();
|
||||
let messages: Vec<OpenAIMessage> = client_request.get_messages();
|
||||
|
||||
let request_id = request_headers
|
||||
.get(common::consts::REQUEST_ID_HEADER)
|
||||
.and_then(|val| val.to_str().ok())
|
||||
.map(|s| s.to_string());
|
||||
|
||||
// Create agent map for pipeline processing and agent selection
|
||||
let agent_map = {
|
||||
let agents = agents_list.read().await;
|
||||
let agents = agents.as_ref().unwrap();
|
||||
agent_selector.create_agent_map(agents)
|
||||
};
|
||||
Ok((
|
||||
AgentRequest {
|
||||
client_request,
|
||||
messages,
|
||||
request_headers,
|
||||
request_id,
|
||||
},
|
||||
listener,
|
||||
agent_selector,
|
||||
))
|
||||
}
|
||||
|
||||
/// Select agents via the orchestrator model and record selection metrics.
|
||||
async fn select_and_build_agent_map(
|
||||
agent_selector: &AgentSelector,
|
||||
state: &AppState,
|
||||
messages: &[OpenAIMessage],
|
||||
listener: &common::configuration::Listener,
|
||||
request_id: Option<String>,
|
||||
) -> Result<
|
||||
(
|
||||
Vec<common::configuration::AgentFilterChain>,
|
||||
std::collections::HashMap<String, common::configuration::Agent>,
|
||||
),
|
||||
AgentFilterChainError,
|
||||
> {
|
||||
let agents = state
|
||||
.agents_list
|
||||
.as_ref()
|
||||
.ok_or(AgentFilterChainError::NoAgentsConfigured)?;
|
||||
let agent_map = agent_selector.create_agent_map(agents);
|
||||
|
||||
// Select appropriate agents using arch orchestrator llm model
|
||||
let selection_start = Instant::now();
|
||||
let selected_agents = agent_selector
|
||||
.select_agents(&message, &listener, request_id.clone())
|
||||
.select_agents(messages, listener, request_id)
|
||||
.await?;
|
||||
|
||||
// Record selection attributes on the current orchestrator span
|
||||
let selection_elapsed_ms = selection_start.elapsed().as_secs_f64() * 1000.0;
|
||||
get_active_span(|span| {
|
||||
span.set_attribute(opentelemetry::KeyValue::new(
|
||||
|
|
@ -316,12 +258,25 @@ async fn handle_agent_chat_inner(
|
|||
"selected agents for execution"
|
||||
);
|
||||
|
||||
// Execute agents sequentially, passing output from one to the next
|
||||
let mut current_messages = message.clone();
|
||||
Ok((selected_agents, agent_map))
|
||||
}
|
||||
|
||||
/// Execute the agent chain: run each selected agent sequentially, streaming
|
||||
/// the final agent's response back to the client.
|
||||
async fn execute_agent_chain(
|
||||
selected_agents: &[common::configuration::AgentFilterChain],
|
||||
agent_map: &std::collections::HashMap<String, common::configuration::Agent>,
|
||||
client_request: ProviderRequestType,
|
||||
messages: Vec<OpenAIMessage>,
|
||||
request_headers: &hyper::HeaderMap,
|
||||
custom_attrs: &std::collections::HashMap<String, String>,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, AgentFilterChainError> {
|
||||
let mut pipeline_processor = PipelineProcessor::default();
|
||||
let response_handler = ResponseHandler::new();
|
||||
let mut current_messages = messages;
|
||||
let agent_count = selected_agents.len();
|
||||
|
||||
for (agent_index, selected_agent) in selected_agents.iter().enumerate() {
|
||||
// Get agent name
|
||||
let agent_name = selected_agent.id.clone();
|
||||
let is_last_agent = agent_index == agent_count - 1;
|
||||
|
||||
|
|
@ -332,8 +287,6 @@ async fn handle_agent_chat_inner(
|
|||
"processing agent"
|
||||
);
|
||||
|
||||
// Process input filters — serialize current request as OpenAI chat completions body,
|
||||
// pass raw bytes through each filter, then extract updated messages from the result.
|
||||
let chat_history = if selected_agent
|
||||
.input_filters
|
||||
.as_ref()
|
||||
|
|
@ -351,8 +304,8 @@ async fn handle_agent_chat_inner(
|
|||
.process_raw_filter_chain(
|
||||
&filter_bytes,
|
||||
selected_agent,
|
||||
&agent_map,
|
||||
&request_headers,
|
||||
agent_map,
|
||||
request_headers,
|
||||
"/v1/chat/completions",
|
||||
)
|
||||
.await?;
|
||||
|
|
@ -365,8 +318,9 @@ async fn handle_agent_chat_inner(
|
|||
current_messages.clone()
|
||||
};
|
||||
|
||||
// Get agent details and invoke
|
||||
let agent = agent_map.get(&agent_name).unwrap();
|
||||
let agent = agent_map
|
||||
.get(&agent_name)
|
||||
.ok_or_else(|| AgentFilterChainError::AgentNotFound(agent_name.clone()))?;
|
||||
|
||||
debug!(agent = %agent_name, "invoking agent");
|
||||
|
||||
|
|
@ -380,7 +334,7 @@ async fn handle_agent_chat_inner(
|
|||
set_service_name(operation_component::AGENT);
|
||||
get_active_span(|span| {
|
||||
span.update_name(format!("{} /v1/chat/completions", agent_name));
|
||||
for (key, value) in &custom_attrs {
|
||||
for (key, value) in custom_attrs {
|
||||
span.set_attribute(opentelemetry::KeyValue::new(key.clone(), value.clone()));
|
||||
}
|
||||
});
|
||||
|
|
@ -390,28 +344,25 @@ async fn handle_agent_chat_inner(
|
|||
&chat_history,
|
||||
client_request.clone(),
|
||||
agent,
|
||||
&request_headers,
|
||||
request_headers,
|
||||
)
|
||||
.await
|
||||
}
|
||||
.instrument(agent_span.clone())
|
||||
.await?;
|
||||
|
||||
// If this is the last agent, return the streaming response
|
||||
if is_last_agent {
|
||||
info!(
|
||||
agent = %agent_name,
|
||||
"completed agent chain, returning response"
|
||||
);
|
||||
// Capture the orchestrator span (parent of the agent span) so it
|
||||
// stays open for the full streaming duration alongside the agent span.
|
||||
let orchestrator_span = tracing::Span::current();
|
||||
return async {
|
||||
response_handler
|
||||
.create_streaming_response(
|
||||
llm_response,
|
||||
tracing::Span::current(), // agent span (inner)
|
||||
orchestrator_span, // orchestrator span (outer)
|
||||
tracing::Span::current(),
|
||||
orchestrator_span,
|
||||
)
|
||||
.await
|
||||
.map_err(AgentFilterChainError::from)
|
||||
|
|
@ -420,7 +371,6 @@ async fn handle_agent_chat_inner(
|
|||
.await;
|
||||
}
|
||||
|
||||
// For intermediate agents, collect the full response and pass to next agent
|
||||
debug!(agent = %agent_name, "collecting response from intermediate agent");
|
||||
let response_text = async { response_handler.collect_full_response(llm_response).await }
|
||||
.instrument(agent_span)
|
||||
|
|
@ -432,11 +382,11 @@ async fn handle_agent_chat_inner(
|
|||
"agent completed, passing response to next agent"
|
||||
);
|
||||
|
||||
// remove last message and add new one at the end
|
||||
let last_message = current_messages.pop().unwrap();
|
||||
let Some(last_message) = current_messages.pop() else {
|
||||
warn!(agent = %agent_name, "no messages in conversation history");
|
||||
return Err(AgentFilterChainError::EmptyHistory);
|
||||
};
|
||||
|
||||
// Create a new message with the agent's response as assistant message
|
||||
// and add it to the conversation history
|
||||
current_messages.push(OpenAIMessage {
|
||||
role: hermesllm::apis::openai::Role::Assistant,
|
||||
content: Some(hermesllm::apis::openai::MessageContent::Text(response_text)),
|
||||
|
|
@ -448,6 +398,34 @@ async fn handle_agent_chat_inner(
|
|||
current_messages.push(last_message);
|
||||
}
|
||||
|
||||
// This should never be reached since we return in the last agent iteration
|
||||
unreachable!("Agent execution loop should have returned a response")
|
||||
Err(AgentFilterChainError::IncompleteChain)
|
||||
}
|
||||
|
||||
async fn handle_agent_chat_inner(
|
||||
request: Request<hyper::body::Incoming>,
|
||||
state: Arc<AppState>,
|
||||
request_id: String,
|
||||
custom_attrs: std::collections::HashMap<String, String>,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, AgentFilterChainError> {
|
||||
let (agent_req, listener, agent_selector) =
|
||||
parse_agent_request(request, &state, &request_id, &custom_attrs).await?;
|
||||
|
||||
let (selected_agents, agent_map) = select_and_build_agent_map(
|
||||
&agent_selector,
|
||||
&state,
|
||||
&agent_req.messages,
|
||||
&listener,
|
||||
agent_req.request_id,
|
||||
)
|
||||
.await?;
|
||||
|
||||
execute_agent_chain(
|
||||
&selected_agents,
|
||||
&agent_map,
|
||||
agent_req.client_request,
|
||||
agent_req.messages,
|
||||
&agent_req.request_headers,
|
||||
&custom_attrs,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
|
@ -12,7 +12,7 @@ use opentelemetry::global;
|
|||
use opentelemetry_http::HeaderInjector;
|
||||
use tracing::{debug, info, instrument, warn};
|
||||
|
||||
use crate::handlers::jsonrpc::{
|
||||
use super::jsonrpc::{
|
||||
JsonRpcId, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, JSON_RPC_VERSION,
|
||||
MCP_INITIALIZE, MCP_INITIALIZE_NOTIFICATION, TOOL_CALL_METHOD,
|
||||
};
|
||||
|
|
@ -78,12 +78,11 @@ impl PipelineProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
/// Build common MCP headers for requests
|
||||
fn build_mcp_headers(
|
||||
&self,
|
||||
/// Prepare headers shared by all agent/filter requests: removes
|
||||
/// content-length, injects trace context, sets upstream host and retry.
|
||||
fn build_agent_headers(
|
||||
request_headers: &HeaderMap,
|
||||
agent_id: &str,
|
||||
session_id: Option<&str>,
|
||||
) -> Result<HeaderMap, PipelineError> {
|
||||
let mut headers = request_headers.clone();
|
||||
headers.remove(hyper::header::CONTENT_LENGTH);
|
||||
|
|
@ -104,24 +103,34 @@ impl PipelineProcessor {
|
|||
|
||||
headers.insert(
|
||||
ENVOY_RETRY_HEADER,
|
||||
hyper::header::HeaderValue::from_str("3").unwrap(),
|
||||
hyper::header::HeaderValue::from_static("3"),
|
||||
);
|
||||
|
||||
Ok(headers)
|
||||
}
|
||||
|
||||
/// Build headers for MCP requests (adds Accept, Content-Type, optional session id).
|
||||
fn build_mcp_headers(
|
||||
&self,
|
||||
request_headers: &HeaderMap,
|
||||
agent_id: &str,
|
||||
session_id: Option<&str>,
|
||||
) -> Result<HeaderMap, PipelineError> {
|
||||
let mut headers = Self::build_agent_headers(request_headers, agent_id)?;
|
||||
|
||||
headers.insert(
|
||||
"Accept",
|
||||
hyper::header::HeaderValue::from_static("application/json, text/event-stream"),
|
||||
);
|
||||
|
||||
headers.insert(
|
||||
"Content-Type",
|
||||
hyper::header::HeaderValue::from_static("application/json"),
|
||||
);
|
||||
|
||||
if let Some(sid) = session_id {
|
||||
headers.insert(
|
||||
"mcp-session-id",
|
||||
hyper::header::HeaderValue::from_str(sid).unwrap(),
|
||||
);
|
||||
if let Ok(val) = hyper::header::HeaderValue::from_str(sid) {
|
||||
headers.insert("mcp-session-id", val);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(headers)
|
||||
|
|
@ -243,7 +252,7 @@ impl PipelineProcessor {
|
|||
let mcp_session_id = if let Some(session_id) = self.agent_id_session_map.get(&agent.id) {
|
||||
session_id.clone()
|
||||
} else {
|
||||
let session_id = self.get_new_session_id(&agent.id, request_headers).await;
|
||||
let session_id = self.get_new_session_id(&agent.id, request_headers).await?;
|
||||
self.agent_id_session_map
|
||||
.insert(agent.id.clone(), session_id.clone());
|
||||
session_id
|
||||
|
|
@ -394,18 +403,19 @@ impl PipelineProcessor {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_new_session_id(&self, agent_id: &str, request_headers: &HeaderMap) -> String {
|
||||
async fn get_new_session_id(
|
||||
&self,
|
||||
agent_id: &str,
|
||||
request_headers: &HeaderMap,
|
||||
) -> Result<String, PipelineError> {
|
||||
info!("initializing MCP session for agent {}", agent_id);
|
||||
|
||||
let initialize_request = self.build_initialize_request();
|
||||
let headers = self
|
||||
.build_mcp_headers(request_headers, agent_id, None)
|
||||
.expect("Failed to build headers for initialization");
|
||||
let headers = self.build_mcp_headers(request_headers, agent_id, None)?;
|
||||
|
||||
let response = self
|
||||
.send_mcp_request(&initialize_request, &headers, agent_id)
|
||||
.await
|
||||
.expect("Failed to initialize MCP session");
|
||||
.await?;
|
||||
|
||||
info!("initialize response status: {}", response.status());
|
||||
|
||||
|
|
@ -413,8 +423,13 @@ impl PipelineProcessor {
|
|||
.headers()
|
||||
.get("mcp-session-id")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.expect("No mcp-session-id in response")
|
||||
.to_string();
|
||||
.map(|s| s.to_string())
|
||||
.ok_or_else(|| {
|
||||
PipelineError::NoContentInResponse(format!(
|
||||
"No mcp-session-id header in initialize response from agent {}",
|
||||
agent_id
|
||||
))
|
||||
})?;
|
||||
|
||||
info!(
|
||||
"created new MCP session for agent {}: {}",
|
||||
|
|
@ -423,10 +438,9 @@ impl PipelineProcessor {
|
|||
|
||||
// Send initialized notification
|
||||
self.send_initialized_notification(agent_id, &session_id, &headers)
|
||||
.await
|
||||
.expect("Failed to send initialized notification");
|
||||
.await?;
|
||||
|
||||
session_id
|
||||
Ok(session_id)
|
||||
}
|
||||
|
||||
/// Execute a raw bytes filter — POST bytes to agent.url, receive bytes back.
|
||||
|
|
@ -454,25 +468,7 @@ impl PipelineProcessor {
|
|||
span.update_name(format!("execute_raw_filter ({})", agent.id));
|
||||
});
|
||||
|
||||
let mut agent_headers = request_headers.clone();
|
||||
agent_headers.remove(hyper::header::CONTENT_LENGTH);
|
||||
|
||||
agent_headers.remove(TRACE_PARENT_HEADER);
|
||||
global::get_text_map_propagator(|propagator| {
|
||||
let cx =
|
||||
tracing_opentelemetry::OpenTelemetrySpanExt::context(&tracing::Span::current());
|
||||
propagator.inject_context(&cx, &mut HeaderInjector(&mut agent_headers));
|
||||
});
|
||||
|
||||
agent_headers.insert(
|
||||
ARCH_UPSTREAM_HOST_HEADER,
|
||||
hyper::header::HeaderValue::from_str(&agent.id)
|
||||
.map_err(|_| PipelineError::AgentNotFound(agent.id.clone()))?,
|
||||
);
|
||||
agent_headers.insert(
|
||||
ENVOY_RETRY_HEADER,
|
||||
hyper::header::HeaderValue::from_str("3").unwrap(),
|
||||
);
|
||||
let mut agent_headers = Self::build_agent_headers(request_headers, &agent.id)?;
|
||||
agent_headers.insert(
|
||||
"Accept",
|
||||
hyper::header::HeaderValue::from_static("application/json"),
|
||||
|
|
@ -578,36 +574,15 @@ impl PipelineProcessor {
|
|||
terminal_agent: &Agent,
|
||||
request_headers: &HeaderMap,
|
||||
) -> Result<reqwest::Response, PipelineError> {
|
||||
// let mut request = original_request.clone();
|
||||
original_request.set_messages(messages);
|
||||
|
||||
let request_url = "/v1/chat/completions";
|
||||
|
||||
let request_body = ProviderRequestType::to_bytes(&original_request).unwrap();
|
||||
// let request_body = serde_json::to_string(&request)?;
|
||||
let request_body = ProviderRequestType::to_bytes(&original_request)
|
||||
.map_err(|e| PipelineError::NoContentInResponse(e.to_string()))?;
|
||||
debug!("sending request to terminal agent {}", terminal_agent.id);
|
||||
|
||||
let mut agent_headers = request_headers.clone();
|
||||
agent_headers.remove(hyper::header::CONTENT_LENGTH);
|
||||
|
||||
// Inject OpenTelemetry trace context automatically
|
||||
agent_headers.remove(TRACE_PARENT_HEADER);
|
||||
global::get_text_map_propagator(|propagator| {
|
||||
let cx =
|
||||
tracing_opentelemetry::OpenTelemetrySpanExt::context(&tracing::Span::current());
|
||||
propagator.inject_context(&cx, &mut HeaderInjector(&mut agent_headers));
|
||||
});
|
||||
|
||||
agent_headers.insert(
|
||||
ARCH_UPSTREAM_HOST_HEADER,
|
||||
hyper::header::HeaderValue::from_str(&terminal_agent.id)
|
||||
.map_err(|_| PipelineError::AgentNotFound(terminal_agent.id.clone()))?,
|
||||
);
|
||||
|
||||
agent_headers.insert(
|
||||
ENVOY_RETRY_HEADER,
|
||||
hyper::header::HeaderValue::from_str("3").unwrap(),
|
||||
);
|
||||
let agent_headers = Self::build_agent_headers(request_headers, &terminal_agent.id)?;
|
||||
|
||||
let response = self
|
||||
.client
|
||||
|
|
@ -7,7 +7,7 @@ use common::configuration::{
|
|||
use hermesllm::apis::openai::Message;
|
||||
use tracing::{debug, warn};
|
||||
|
||||
use crate::router::plano_orchestrator::OrchestratorService;
|
||||
use crate::router::orchestrator::OrchestratorService;
|
||||
|
||||
/// Errors that can occur during agent selection
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
|
|
@ -37,7 +37,7 @@ impl AgentSelector {
|
|||
}
|
||||
|
||||
/// Find listener by name from the request headers
|
||||
pub async fn find_listener(
|
||||
pub fn find_listener(
|
||||
&self,
|
||||
listener_name: Option<&str>,
|
||||
listeners: &[common::configuration::Listener],
|
||||
|
|
@ -84,7 +84,7 @@ impl AgentSelector {
|
|||
}
|
||||
|
||||
/// Convert agent descriptions to orchestration preferences
|
||||
async fn convert_agent_description_to_orchestration_preferences(
|
||||
fn convert_agent_description_to_orchestration_preferences(
|
||||
&self,
|
||||
agents: &[AgentFilterChain],
|
||||
) -> Vec<AgentUsagePreference> {
|
||||
|
|
@ -121,9 +121,7 @@ impl AgentSelector {
|
|||
return Ok(vec![agents[0].clone()]);
|
||||
}
|
||||
|
||||
let usage_preferences = self
|
||||
.convert_agent_description_to_orchestration_preferences(agents)
|
||||
.await;
|
||||
let usage_preferences = self.convert_agent_description_to_orchestration_preferences(agents);
|
||||
debug!(
|
||||
"Agents usage preferences for orchestration: {}",
|
||||
serde_json::to_string(&usage_preferences).unwrap_or_default()
|
||||
|
|
@ -222,9 +220,7 @@ mod tests {
|
|||
let listener2 = create_test_listener("other-listener", vec![]);
|
||||
let listeners = vec![listener1.clone(), listener2];
|
||||
|
||||
let result = selector
|
||||
.find_listener(Some("test-listener"), &listeners)
|
||||
.await;
|
||||
let result = selector.find_listener(Some("test-listener"), &listeners);
|
||||
|
||||
assert!(result.is_ok());
|
||||
assert_eq!(result.unwrap().name, "test-listener");
|
||||
|
|
@ -237,9 +233,7 @@ mod tests {
|
|||
|
||||
let listeners = vec![create_test_listener("other-listener", vec![])];
|
||||
|
||||
let result = selector
|
||||
.find_listener(Some("nonexistent"), &listeners)
|
||||
.await;
|
||||
let result = selector.find_listener(Some("nonexistent"), &listeners);
|
||||
|
||||
assert!(result.is_err());
|
||||
matches!(
|
||||
|
|
@ -3,12 +3,11 @@ use std::sync::Arc;
|
|||
use hermesllm::apis::openai::{ChatCompletionsRequest, Message, MessageContent, Role};
|
||||
use hyper::header::HeaderMap;
|
||||
|
||||
use crate::handlers::agent_selector::{AgentSelectionError, AgentSelector};
|
||||
use crate::handlers::pipeline_processor::PipelineProcessor;
|
||||
use crate::router::plano_orchestrator::OrchestratorService;
|
||||
use common::errors::BrightStaffError;
|
||||
use http_body_util::BodyExt;
|
||||
use hyper::StatusCode;
|
||||
use crate::handlers::agents::pipeline::PipelineProcessor;
|
||||
use crate::handlers::agents::selector::{AgentSelectionError, AgentSelector};
|
||||
use crate::handlers::response::ResponseHandler;
|
||||
use crate::router::orchestrator::OrchestratorService;
|
||||
|
||||
/// Integration test that demonstrates the modular agent chat flow
|
||||
/// This test shows how the three main components work together:
|
||||
/// 1. AgentSelector - selects the appropriate agents based on orchestration
|
||||
|
|
@ -86,9 +85,7 @@ mod tests {
|
|||
let messages = vec![create_test_message(Role::User, "Hello world!")];
|
||||
|
||||
// Test 1: Agent Selection
|
||||
let selected_listener = agent_selector
|
||||
.find_listener(Some("test-listener"), &listeners)
|
||||
.await;
|
||||
let selected_listener = agent_selector.find_listener(Some("test-listener"), &listeners);
|
||||
|
||||
assert!(selected_listener.is_ok());
|
||||
let listener = selected_listener.unwrap();
|
||||
|
|
@ -142,24 +139,8 @@ mod tests {
|
|||
}
|
||||
|
||||
// Test 4: Error Response Creation
|
||||
let err = BrightStaffError::ModelNotFound("gpt-5-secret".to_string());
|
||||
let response = err.into_response();
|
||||
|
||||
assert_eq!(response.status(), StatusCode::NOT_FOUND);
|
||||
|
||||
// Helper to extract body as JSON
|
||||
let body_bytes = response.into_body().collect().await.unwrap().to_bytes();
|
||||
let body: serde_json::Value = serde_json::from_slice(&body_bytes).unwrap();
|
||||
|
||||
assert_eq!(body["error"]["code"], "ModelNotFound");
|
||||
assert_eq!(
|
||||
body["error"]["details"]["rejected_model_id"],
|
||||
"gpt-5-secret"
|
||||
);
|
||||
assert!(body["error"]["message"]
|
||||
.as_str()
|
||||
.unwrap()
|
||||
.contains("gpt-5-secret"));
|
||||
let error_response = ResponseHandler::create_bad_request("Test error");
|
||||
assert_eq!(error_response.status(), hyper::StatusCode::BAD_REQUEST);
|
||||
|
||||
println!("✅ All modular components working correctly!");
|
||||
}
|
||||
|
|
@ -170,7 +151,7 @@ mod tests {
|
|||
let agent_selector = AgentSelector::new(router_service);
|
||||
|
||||
// Test listener not found
|
||||
let result = agent_selector.find_listener(Some("nonexistent"), &[]).await;
|
||||
let result = agent_selector.find_listener(Some("nonexistent"), &[]);
|
||||
|
||||
assert!(result.is_err());
|
||||
assert!(matches!(
|
||||
|
|
@ -178,21 +159,12 @@ mod tests {
|
|||
AgentSelectionError::ListenerNotFound(_)
|
||||
));
|
||||
|
||||
let technical_reason = "Database connection timed out";
|
||||
let err = BrightStaffError::InternalServerError(technical_reason.to_string());
|
||||
|
||||
let response = err.into_response();
|
||||
|
||||
// --- 1. EXTRACT BYTES ---
|
||||
let body_bytes = response.into_body().collect().await.unwrap().to_bytes();
|
||||
|
||||
// --- 2. DECLARE body_json HERE ---
|
||||
let body_json: serde_json::Value =
|
||||
serde_json::from_slice(&body_bytes).expect("Failed to parse JSON body");
|
||||
|
||||
// --- 3. USE body_json ---
|
||||
assert_eq!(body_json["error"]["code"], "InternalServerError");
|
||||
assert_eq!(body_json["error"]["details"]["reason"], technical_reason);
|
||||
// Test error response creation
|
||||
let error_response = ResponseHandler::create_internal_error("Pipeline failed");
|
||||
assert_eq!(
|
||||
error_response.status(),
|
||||
hyper::StatusCode::INTERNAL_SERVER_ERROR
|
||||
);
|
||||
|
||||
println!("✅ Error handling working correctly!");
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,686 +0,0 @@
|
|||
use bytes::Bytes;
|
||||
use common::configuration::{FilterPipeline, ModelAlias, SpanAttributes};
|
||||
use common::consts::{
|
||||
ARCH_IS_STREAMING_HEADER, ARCH_PROVIDER_HINT_HEADER, REQUEST_ID_HEADER, TRACE_PARENT_HEADER,
|
||||
};
|
||||
use common::llm_providers::LlmProviders;
|
||||
use hermesllm::apis::openai_responses::InputParam;
|
||||
use hermesllm::clients::{SupportedAPIsFromClient, SupportedUpstreamAPIs};
|
||||
use hermesllm::{ProviderRequest, ProviderRequestType};
|
||||
use http_body_util::combinators::BoxBody;
|
||||
use http_body_util::{BodyExt, Full};
|
||||
use hyper::header::{self};
|
||||
use hyper::{Request, Response, StatusCode};
|
||||
use opentelemetry::global;
|
||||
use opentelemetry::trace::get_active_span;
|
||||
use opentelemetry_http::HeaderInjector;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::{debug, info, info_span, warn, Instrument};
|
||||
|
||||
use super::pipeline_processor::PipelineProcessor;
|
||||
|
||||
use crate::handlers::router_chat::router_chat_get_upstream_model;
|
||||
use crate::handlers::streaming::{
|
||||
create_streaming_response, create_streaming_response_with_output_filter, truncate_message,
|
||||
ObservableStreamProcessor, StreamProcessor,
|
||||
};
|
||||
use crate::router::llm_router::RouterService;
|
||||
use crate::state::response_state_processor::ResponsesStateProcessor;
|
||||
use crate::state::{
|
||||
extract_input_items, retrieve_and_combine_input, StateStorage, StateStorageError,
|
||||
};
|
||||
use crate::tracing::{
|
||||
collect_custom_trace_attributes, llm as tracing_llm, operation_component, set_service_name,
|
||||
};
|
||||
|
||||
use common::errors::BrightStaffError;
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn llm_chat(
|
||||
request: Request<hyper::body::Incoming>,
|
||||
router_service: Arc<RouterService>,
|
||||
full_qualified_llm_provider_url: String,
|
||||
model_aliases: Arc<Option<HashMap<String, ModelAlias>>>,
|
||||
llm_providers: Arc<RwLock<LlmProviders>>,
|
||||
span_attributes: Arc<Option<SpanAttributes>>,
|
||||
state_storage: Option<Arc<dyn StateStorage>>,
|
||||
filter_pipeline: Arc<FilterPipeline>,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||
let request_path = request.uri().path().to_string();
|
||||
let request_headers = request.headers().clone();
|
||||
let request_id: String = match request_headers
|
||||
.get(REQUEST_ID_HEADER)
|
||||
.and_then(|h| h.to_str().ok())
|
||||
.map(|s| s.to_string())
|
||||
{
|
||||
Some(id) => id,
|
||||
None => uuid::Uuid::new_v4().to_string(),
|
||||
};
|
||||
let custom_attrs =
|
||||
collect_custom_trace_attributes(&request_headers, span_attributes.as_ref().as_ref());
|
||||
|
||||
// Create a span with request_id that will be included in all log lines
|
||||
let request_span = info_span!(
|
||||
"llm",
|
||||
component = "llm",
|
||||
request_id = %request_id,
|
||||
http.method = %request.method(),
|
||||
http.path = %request_path,
|
||||
llm.model = tracing::field::Empty,
|
||||
llm.tools = tracing::field::Empty,
|
||||
llm.user_message_preview = tracing::field::Empty,
|
||||
llm.temperature = tracing::field::Empty,
|
||||
);
|
||||
|
||||
// Execute the rest of the handler inside the span
|
||||
llm_chat_inner(
|
||||
request,
|
||||
router_service,
|
||||
full_qualified_llm_provider_url,
|
||||
model_aliases,
|
||||
llm_providers,
|
||||
custom_attrs,
|
||||
state_storage,
|
||||
request_id,
|
||||
request_path,
|
||||
request_headers,
|
||||
filter_pipeline,
|
||||
)
|
||||
.instrument(request_span)
|
||||
.await
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn llm_chat_inner(
|
||||
request: Request<hyper::body::Incoming>,
|
||||
router_service: Arc<RouterService>,
|
||||
full_qualified_llm_provider_url: String,
|
||||
model_aliases: Arc<Option<HashMap<String, ModelAlias>>>,
|
||||
llm_providers: Arc<RwLock<LlmProviders>>,
|
||||
custom_attrs: HashMap<String, String>,
|
||||
state_storage: Option<Arc<dyn StateStorage>>,
|
||||
request_id: String,
|
||||
request_path: String,
|
||||
mut request_headers: hyper::HeaderMap,
|
||||
filter_pipeline: Arc<FilterPipeline>,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||
// Set service name for LLM operations
|
||||
set_service_name(operation_component::LLM);
|
||||
get_active_span(|span| {
|
||||
for (key, value) in &custom_attrs {
|
||||
span.set_attribute(opentelemetry::KeyValue::new(key.clone(), value.clone()));
|
||||
}
|
||||
});
|
||||
|
||||
// Extract or generate traceparent - this establishes the trace context for all spans
|
||||
let traceparent: String = match request_headers
|
||||
.get(TRACE_PARENT_HEADER)
|
||||
.and_then(|h| h.to_str().ok())
|
||||
.map(|s| s.to_string())
|
||||
{
|
||||
Some(tp) => tp,
|
||||
None => {
|
||||
use uuid::Uuid;
|
||||
let trace_id = Uuid::new_v4().to_string().replace("-", "");
|
||||
let generated_tp = format!("00-{}-0000000000000000-01", trace_id);
|
||||
warn!(
|
||||
generated_traceparent = %generated_tp,
|
||||
"TRACE_PARENT header missing, generated new traceparent"
|
||||
);
|
||||
generated_tp
|
||||
}
|
||||
};
|
||||
|
||||
let raw_bytes = request.collect().await?.to_bytes();
|
||||
|
||||
debug!(
|
||||
body = %String::from_utf8_lossy(&raw_bytes),
|
||||
"request body received"
|
||||
);
|
||||
|
||||
// Extract routing_policy from request body if present
|
||||
let (chat_request_bytes, inline_routing_policy) =
|
||||
match crate::handlers::routing_service::extract_routing_policy(&raw_bytes, false) {
|
||||
Ok(result) => result,
|
||||
Err(err) => {
|
||||
warn!(error = %err, "failed to parse request JSON");
|
||||
return Ok(BrightStaffError::InvalidRequest(format!(
|
||||
"Failed to parse request: {}",
|
||||
err
|
||||
))
|
||||
.into_response());
|
||||
}
|
||||
};
|
||||
|
||||
let mut client_request = match ProviderRequestType::try_from((
|
||||
&chat_request_bytes[..],
|
||||
&SupportedAPIsFromClient::from_endpoint(request_path.as_str()).unwrap(),
|
||||
)) {
|
||||
Ok(request) => request,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
error = %err,
|
||||
"failed to parse request as ProviderRequestType"
|
||||
);
|
||||
return Ok(BrightStaffError::InvalidRequest(format!(
|
||||
"Failed to parse request: {}",
|
||||
err
|
||||
))
|
||||
.into_response());
|
||||
}
|
||||
};
|
||||
|
||||
// === v1/responses state management: Extract input items early ===
|
||||
let mut original_input_items = Vec::new();
|
||||
let client_api = SupportedAPIsFromClient::from_endpoint(request_path.as_str());
|
||||
let is_responses_api_client = matches!(
|
||||
client_api,
|
||||
Some(SupportedAPIsFromClient::OpenAIResponsesAPI(_))
|
||||
);
|
||||
|
||||
// If model is not specified in the request, resolve from default provider
|
||||
let model_from_request = client_request.model().to_string();
|
||||
let model_from_request = if model_from_request.is_empty() {
|
||||
match llm_providers.read().await.default() {
|
||||
Some(default_provider) => {
|
||||
let default_model = default_provider.name.clone();
|
||||
info!(default_model = %default_model, "no model specified in request, using default provider");
|
||||
client_request.set_model(default_model.clone());
|
||||
default_model
|
||||
}
|
||||
None => {
|
||||
let err_msg = "No model specified in request and no default provider configured";
|
||||
warn!("{}", err_msg);
|
||||
return Ok(BrightStaffError::NoModelSpecified.into_response());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
model_from_request
|
||||
};
|
||||
|
||||
// Model alias resolution: update model field in client_request immediately
|
||||
// This ensures all downstream objects use the resolved model
|
||||
let temperature = client_request.get_temperature();
|
||||
let is_streaming_request = client_request.is_streaming();
|
||||
let alias_resolved_model = resolve_model_alias(&model_from_request, &model_aliases);
|
||||
let (provider_id, _) = get_provider_info(&llm_providers, &alias_resolved_model).await;
|
||||
|
||||
// Validate that the requested model exists in configuration
|
||||
// This matches the validation in llm_gateway routing.rs
|
||||
if llm_providers
|
||||
.read()
|
||||
.await
|
||||
.get(&alias_resolved_model)
|
||||
.is_none()
|
||||
{
|
||||
warn!(model = %alias_resolved_model, "model not found in configured providers");
|
||||
return Ok(BrightStaffError::ModelNotFound(alias_resolved_model).into_response());
|
||||
}
|
||||
|
||||
// Handle provider/model slug format (e.g., "openai/gpt-4")
|
||||
// Extract just the model name for upstream (providers don't understand the slug)
|
||||
let model_name_only = if let Some((_, model)) = alias_resolved_model.split_once('/') {
|
||||
model.to_string()
|
||||
} else {
|
||||
alias_resolved_model.clone()
|
||||
};
|
||||
|
||||
// Extract tool names and user message preview for span attributes
|
||||
let tool_names = client_request.get_tool_names();
|
||||
let user_message_preview = client_request
|
||||
.get_recent_user_message()
|
||||
.map(|msg| truncate_message(&msg, 50));
|
||||
let span = tracing::Span::current();
|
||||
if let Some(temp) = temperature {
|
||||
span.record(tracing_llm::TEMPERATURE, tracing::field::display(temp));
|
||||
}
|
||||
if let Some(tools) = &tool_names {
|
||||
let formatted_tools = tools
|
||||
.iter()
|
||||
.map(|name| format!("{}(...)", name))
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n");
|
||||
span.record(tracing_llm::TOOLS, formatted_tools.as_str());
|
||||
}
|
||||
if let Some(preview) = &user_message_preview {
|
||||
span.record(tracing_llm::USER_MESSAGE_PREVIEW, preview.as_str());
|
||||
}
|
||||
|
||||
// Extract messages for signal analysis (clone before moving client_request)
|
||||
let messages_for_signals = Some(client_request.get_messages());
|
||||
|
||||
// Set the model to just the model name (without provider prefix)
|
||||
// This ensures upstream receives "gpt-4" not "openai/gpt-4"
|
||||
client_request.set_model(model_name_only.clone());
|
||||
if client_request.remove_metadata_key("plano_preference_config") {
|
||||
debug!("removed plano_preference_config from metadata");
|
||||
}
|
||||
|
||||
// === Input filters processing for model listener ===
|
||||
// Filters receive the raw request bytes and return (possibly modified) raw bytes.
|
||||
// The returned bytes are re-parsed into a ProviderRequestType to continue the request.
|
||||
{
|
||||
if let Some(ref input_chain) = filter_pipeline.input {
|
||||
if !input_chain.is_empty() {
|
||||
debug!(input_filters = ?input_chain.filter_ids, "processing model listener input filters");
|
||||
|
||||
let chain = input_chain.to_agent_filter_chain("model_listener");
|
||||
|
||||
let mut pipeline_processor = PipelineProcessor::default();
|
||||
match pipeline_processor
|
||||
.process_raw_filter_chain(
|
||||
&chat_request_bytes,
|
||||
&chain,
|
||||
&input_chain.agents,
|
||||
&request_headers,
|
||||
&request_path,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(filtered_bytes) => {
|
||||
match ProviderRequestType::try_from((
|
||||
&filtered_bytes[..],
|
||||
&SupportedAPIsFromClient::from_endpoint(request_path.as_str()).unwrap(),
|
||||
)) {
|
||||
Ok(updated_request) => {
|
||||
client_request = updated_request;
|
||||
info!("input filter chain processed successfully");
|
||||
}
|
||||
Err(parse_err) => {
|
||||
warn!(error = %parse_err, "input filter returned invalid request JSON");
|
||||
return Ok(BrightStaffError::InvalidRequest(format!(
|
||||
"Input filter returned invalid request: {}",
|
||||
parse_err
|
||||
))
|
||||
.into_response());
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(super::pipeline_processor::PipelineError::ClientError {
|
||||
agent,
|
||||
status,
|
||||
body,
|
||||
}) => {
|
||||
warn!(
|
||||
agent = %agent,
|
||||
status = %status,
|
||||
body = %body,
|
||||
"client error from filter chain"
|
||||
);
|
||||
let error_json = serde_json::json!({
|
||||
"error": "FilterChainError",
|
||||
"agent": agent,
|
||||
"status": status,
|
||||
"agent_response": body
|
||||
});
|
||||
let mut error_response = Response::new(full(error_json.to_string()));
|
||||
*error_response.status_mut() =
|
||||
StatusCode::from_u16(status).unwrap_or(StatusCode::BAD_REQUEST);
|
||||
error_response.headers_mut().insert(
|
||||
hyper::header::CONTENT_TYPE,
|
||||
"application/json".parse().unwrap(),
|
||||
);
|
||||
return Ok(error_response);
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(error = %err, "filter chain processing failed");
|
||||
let err_msg = format!("Filter chain processing failed: {}", err);
|
||||
let mut internal_error = Response::new(full(err_msg));
|
||||
*internal_error.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
|
||||
return Ok(internal_error);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(ref client_api_kind) = client_api {
|
||||
let upstream_api =
|
||||
provider_id.compatible_api_for_client(client_api_kind, is_streaming_request);
|
||||
client_request.normalize_for_upstream(provider_id, &upstream_api);
|
||||
}
|
||||
// === v1/responses state management: Determine upstream API and combine input if needed ===
|
||||
// Do this BEFORE routing since routing consumes the request
|
||||
// Only process state if state_storage is configured
|
||||
let mut should_manage_state = false;
|
||||
if is_responses_api_client {
|
||||
if let (
|
||||
ProviderRequestType::ResponsesAPIRequest(ref mut responses_req),
|
||||
Some(ref state_store),
|
||||
) = (&mut client_request, &state_storage)
|
||||
{
|
||||
// Extract original input once
|
||||
original_input_items = extract_input_items(&responses_req.input);
|
||||
|
||||
// Get the upstream path and check if it's ResponsesAPI
|
||||
let upstream_path = get_upstream_path(
|
||||
&llm_providers,
|
||||
&alias_resolved_model,
|
||||
&request_path,
|
||||
&alias_resolved_model,
|
||||
is_streaming_request,
|
||||
)
|
||||
.await;
|
||||
|
||||
let upstream_api = SupportedUpstreamAPIs::from_endpoint(&upstream_path);
|
||||
|
||||
// Only manage state if upstream is NOT OpenAIResponsesAPI (needs translation)
|
||||
should_manage_state = !matches!(
|
||||
upstream_api,
|
||||
Some(SupportedUpstreamAPIs::OpenAIResponsesAPI(_))
|
||||
);
|
||||
|
||||
if should_manage_state {
|
||||
// Retrieve and combine conversation history if previous_response_id exists
|
||||
if let Some(ref prev_resp_id) = responses_req.previous_response_id {
|
||||
match retrieve_and_combine_input(
|
||||
state_store.clone(),
|
||||
prev_resp_id,
|
||||
original_input_items, // Pass ownership instead of cloning
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(combined_input) => {
|
||||
// Update both the request and original_input_items
|
||||
responses_req.input = InputParam::Items(combined_input.clone());
|
||||
original_input_items = combined_input;
|
||||
info!(
|
||||
items = original_input_items.len(),
|
||||
"updated request with conversation history"
|
||||
);
|
||||
}
|
||||
Err(StateStorageError::NotFound(_)) => {
|
||||
// Return 409 Conflict when previous_response_id not found
|
||||
warn!(previous_response_id = %prev_resp_id, "previous response_id not found");
|
||||
return Ok(BrightStaffError::ConversationStateNotFound(
|
||||
prev_resp_id.to_string(),
|
||||
)
|
||||
.into_response());
|
||||
}
|
||||
Err(e) => {
|
||||
// Log warning but continue on other storage errors
|
||||
warn!(
|
||||
previous_response_id = %prev_resp_id,
|
||||
error = %e,
|
||||
"failed to retrieve conversation state"
|
||||
);
|
||||
// Restore original_input_items since we passed ownership
|
||||
original_input_items = extract_input_items(&responses_req.input);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
debug!("upstream supports ResponsesAPI natively");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Serialize request for upstream BEFORE router consumes it
|
||||
let client_request_bytes_for_upstream = ProviderRequestType::to_bytes(&client_request).unwrap();
|
||||
|
||||
// Determine routing using the dedicated router_chat module
|
||||
// This gets its own span for latency and error tracking
|
||||
let routing_span = info_span!(
|
||||
"routing",
|
||||
component = "routing",
|
||||
http.method = "POST",
|
||||
http.target = %request_path,
|
||||
model.requested = %model_from_request,
|
||||
model.alias_resolved = %alias_resolved_model,
|
||||
route.selected_model = tracing::field::Empty,
|
||||
routing.determination_ms = tracing::field::Empty,
|
||||
);
|
||||
let routing_result = match async {
|
||||
set_service_name(operation_component::ROUTING);
|
||||
router_chat_get_upstream_model(
|
||||
router_service,
|
||||
client_request, // Pass the original request - router_chat will convert it
|
||||
&traceparent,
|
||||
&request_path,
|
||||
&request_id,
|
||||
inline_routing_policy,
|
||||
)
|
||||
.await
|
||||
}
|
||||
.instrument(routing_span)
|
||||
.await
|
||||
{
|
||||
Ok(result) => result,
|
||||
Err(err) => {
|
||||
return Ok(BrightStaffError::ForwardedError {
|
||||
status_code: err.status_code,
|
||||
message: err.message,
|
||||
}
|
||||
.into_response());
|
||||
}
|
||||
};
|
||||
|
||||
// Determine final model to use
|
||||
// Router returns "none" as a sentinel value when it doesn't select a specific model
|
||||
let router_selected_model = routing_result.model_name;
|
||||
let resolved_model = if router_selected_model != "none" {
|
||||
// Router selected a specific model via routing preferences
|
||||
router_selected_model
|
||||
} else {
|
||||
// Router returned "none" sentinel, use validated resolved_model from request
|
||||
alias_resolved_model.clone()
|
||||
};
|
||||
tracing::Span::current().record(tracing_llm::MODEL_NAME, resolved_model.as_str());
|
||||
|
||||
let span_name = if model_from_request == resolved_model {
|
||||
format!("POST {} {}", request_path, resolved_model)
|
||||
} else {
|
||||
format!(
|
||||
"POST {} {} -> {}",
|
||||
request_path, model_from_request, resolved_model
|
||||
)
|
||||
};
|
||||
get_active_span(|span| {
|
||||
span.update_name(span_name.clone());
|
||||
});
|
||||
|
||||
debug!(
|
||||
url = %full_qualified_llm_provider_url,
|
||||
provider_hint = %resolved_model,
|
||||
upstream_model = %model_name_only,
|
||||
"Routing to upstream"
|
||||
);
|
||||
|
||||
request_headers.insert(
|
||||
ARCH_PROVIDER_HINT_HEADER,
|
||||
header::HeaderValue::from_str(&resolved_model).unwrap(),
|
||||
);
|
||||
|
||||
request_headers.insert(
|
||||
header::HeaderName::from_static(ARCH_IS_STREAMING_HEADER),
|
||||
header::HeaderValue::from_str(&is_streaming_request.to_string()).unwrap(),
|
||||
);
|
||||
// remove content-length header if it exists
|
||||
request_headers.remove(header::CONTENT_LENGTH);
|
||||
|
||||
// Inject current LLM span's trace context so upstream spans are children of plano(llm)
|
||||
global::get_text_map_propagator(|propagator| {
|
||||
let cx = tracing_opentelemetry::OpenTelemetrySpanExt::context(&tracing::Span::current());
|
||||
propagator.inject_context(&cx, &mut HeaderInjector(&mut request_headers));
|
||||
});
|
||||
|
||||
// Output filters run for any API shape that reaches this handler (e.g. /v1/chat/completions,
|
||||
// /v1/messages, /v1/responses). Brightstaff does inbound translation and llm_gateway does
|
||||
// outbound translation; filters receive raw response bytes and request path.
|
||||
let has_output_filter = filter_pipeline.has_output_filters();
|
||||
|
||||
// Save request headers for output filters (before they're consumed by upstream request)
|
||||
let output_filter_request_headers = if has_output_filter {
|
||||
Some(request_headers.clone())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Capture start time right before sending request to upstream
|
||||
let request_start_time = std::time::Instant::now();
|
||||
let _request_start_system_time = std::time::SystemTime::now();
|
||||
|
||||
let llm_response = match reqwest::Client::new()
|
||||
.post(&full_qualified_llm_provider_url)
|
||||
.headers(request_headers)
|
||||
.body(client_request_bytes_for_upstream)
|
||||
.send()
|
||||
.await
|
||||
{
|
||||
Ok(res) => res,
|
||||
Err(err) => {
|
||||
return Ok(BrightStaffError::InternalServerError(format!(
|
||||
"Failed to send request: {}",
|
||||
err
|
||||
))
|
||||
.into_response());
|
||||
}
|
||||
};
|
||||
|
||||
// copy over the headers and status code from the original response
|
||||
let response_headers = llm_response.headers().clone();
|
||||
let upstream_status = llm_response.status();
|
||||
let mut response = Response::builder().status(upstream_status);
|
||||
let headers = response.headers_mut().unwrap();
|
||||
for (header_name, header_value) in response_headers.iter() {
|
||||
headers.insert(header_name, header_value.clone());
|
||||
}
|
||||
|
||||
// Build LLM span with actual status code using constants
|
||||
let byte_stream = llm_response.bytes_stream();
|
||||
|
||||
// Create base processor for metrics and tracing
|
||||
let base_processor = ObservableStreamProcessor::new(
|
||||
operation_component::LLM,
|
||||
span_name,
|
||||
request_start_time,
|
||||
messages_for_signals,
|
||||
);
|
||||
|
||||
// === v1/responses state management: Wrap with ResponsesStateProcessor ===
|
||||
// Pick the right processor: state-aware if needed, otherwise base metrics-only.
|
||||
let processor: Box<dyn StreamProcessor> = if let (true, false, Some(state_store)) = (
|
||||
should_manage_state,
|
||||
original_input_items.is_empty(),
|
||||
state_storage,
|
||||
) {
|
||||
let content_encoding = response_headers
|
||||
.get("content-encoding")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.map(|s| s.to_string());
|
||||
|
||||
Box::new(ResponsesStateProcessor::new(
|
||||
base_processor,
|
||||
state_store,
|
||||
original_input_items,
|
||||
alias_resolved_model.clone(),
|
||||
resolved_model.clone(),
|
||||
is_streaming_request,
|
||||
false, // Not OpenAI upstream since should_manage_state is true
|
||||
content_encoding,
|
||||
request_id,
|
||||
))
|
||||
} else {
|
||||
Box::new(base_processor)
|
||||
};
|
||||
|
||||
// Apply output filters if configured, then build the streaming response.
|
||||
let streaming_response = if has_output_filter {
|
||||
let output_chain = filter_pipeline.output.as_ref().unwrap().clone();
|
||||
create_streaming_response_with_output_filter(
|
||||
byte_stream,
|
||||
processor,
|
||||
output_chain,
|
||||
output_filter_request_headers.unwrap(),
|
||||
request_path.clone(),
|
||||
)
|
||||
} else {
|
||||
create_streaming_response(byte_stream, processor)
|
||||
};
|
||||
|
||||
match response.body(streaming_response.body) {
|
||||
Ok(response) => Ok(response),
|
||||
Err(err) => Ok(BrightStaffError::InternalServerError(format!(
|
||||
"Failed to create response: {}",
|
||||
err
|
||||
))
|
||||
.into_response()),
|
||||
}
|
||||
}
|
||||
/// Resolves model aliases by looking up the requested model in the model_aliases map.
|
||||
/// Returns the target model if an alias is found, otherwise returns the original model.
|
||||
fn resolve_model_alias(
|
||||
model_from_request: &str,
|
||||
model_aliases: &Arc<Option<HashMap<String, ModelAlias>>>,
|
||||
) -> String {
|
||||
if let Some(aliases) = model_aliases.as_ref() {
|
||||
if let Some(model_alias) = aliases.get(model_from_request) {
|
||||
debug!(
|
||||
"Model Alias: 'From {}' -> 'To {}'",
|
||||
model_from_request, model_alias.target
|
||||
);
|
||||
return model_alias.target.clone();
|
||||
}
|
||||
}
|
||||
model_from_request.to_string()
|
||||
}
|
||||
|
||||
/// Calculates the upstream path for the provider based on the model name.
|
||||
/// Looks up provider configuration, gets the ProviderId and base_url_path_prefix,
|
||||
/// then uses target_endpoint_for_provider to calculate the correct upstream path.
|
||||
async fn get_upstream_path(
|
||||
llm_providers: &Arc<RwLock<LlmProviders>>,
|
||||
model_name: &str,
|
||||
request_path: &str,
|
||||
resolved_model: &str,
|
||||
is_streaming: bool,
|
||||
) -> String {
|
||||
let (provider_id, base_url_path_prefix) = get_provider_info(llm_providers, model_name).await;
|
||||
|
||||
// Calculate the upstream path using the proper API
|
||||
let client_api = SupportedAPIsFromClient::from_endpoint(request_path)
|
||||
.expect("Should have valid API endpoint");
|
||||
|
||||
client_api.target_endpoint_for_provider(
|
||||
&provider_id,
|
||||
request_path,
|
||||
resolved_model,
|
||||
is_streaming,
|
||||
base_url_path_prefix.as_deref(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Helper function to get provider info (ProviderId and base_url_path_prefix)
|
||||
async fn get_provider_info(
|
||||
llm_providers: &Arc<RwLock<LlmProviders>>,
|
||||
model_name: &str,
|
||||
) -> (hermesllm::ProviderId, Option<String>) {
|
||||
let providers_lock = llm_providers.read().await;
|
||||
|
||||
// Try to find by model name or provider name using LlmProviders::get
|
||||
// This handles both "gpt-4" and "openai/gpt-4" formats
|
||||
if let Some(provider) = providers_lock.get(model_name) {
|
||||
let provider_id = provider.provider_interface.to_provider_id();
|
||||
let prefix = provider.base_url_path_prefix.clone();
|
||||
return (provider_id, prefix);
|
||||
}
|
||||
|
||||
// Fall back to default provider
|
||||
if let Some(provider) = providers_lock.default() {
|
||||
let provider_id = provider.provider_interface.to_provider_id();
|
||||
let prefix = provider.base_url_path_prefix.clone();
|
||||
(provider_id, prefix)
|
||||
} else {
|
||||
// Last resort: use OpenAI as hardcoded fallback
|
||||
warn!("No default provider found, falling back to OpenAI");
|
||||
(hermesllm::ProviderId::OpenAI, None)
|
||||
}
|
||||
}
|
||||
|
||||
fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
|
||||
Full::new(chunk.into())
|
||||
.map_err(|never| match never {})
|
||||
.boxed()
|
||||
}
|
||||
780
crates/brightstaff/src/handlers/llm/mod.rs
Normal file
780
crates/brightstaff/src/handlers/llm/mod.rs
Normal file
|
|
@ -0,0 +1,780 @@
|
|||
use bytes::Bytes;
|
||||
use common::configuration::{FilterPipeline, ModelAlias};
|
||||
use common::consts::{ARCH_IS_STREAMING_HEADER, ARCH_PROVIDER_HINT_HEADER};
|
||||
use common::llm_providers::LlmProviders;
|
||||
use hermesllm::apis::openai::Message;
|
||||
use hermesllm::apis::openai_responses::InputParam;
|
||||
use hermesllm::clients::{SupportedAPIsFromClient, SupportedUpstreamAPIs};
|
||||
use hermesllm::{ProviderRequest, ProviderRequestType};
|
||||
use http_body_util::combinators::BoxBody;
|
||||
use http_body_util::BodyExt;
|
||||
use hyper::header::{self};
|
||||
use hyper::{Request, Response, StatusCode};
|
||||
use opentelemetry::global;
|
||||
use opentelemetry::trace::get_active_span;
|
||||
use opentelemetry_http::HeaderInjector;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::{debug, info, info_span, warn, Instrument};
|
||||
|
||||
pub(crate) mod model_selection;
|
||||
|
||||
use crate::app_state::AppState;
|
||||
use crate::handlers::agents::pipeline::PipelineProcessor;
|
||||
use crate::handlers::extract_or_generate_traceparent;
|
||||
use crate::handlers::extract_request_id;
|
||||
use crate::handlers::full;
|
||||
use crate::state::response_state_processor::ResponsesStateProcessor;
|
||||
use crate::state::{
|
||||
extract_input_items, retrieve_and_combine_input, StateStorage, StateStorageError,
|
||||
};
|
||||
use crate::streaming::{
|
||||
create_streaming_response, create_streaming_response_with_output_filter, truncate_message,
|
||||
ObservableStreamProcessor, StreamProcessor,
|
||||
};
|
||||
use crate::tracing::{
|
||||
collect_custom_trace_attributes, llm as tracing_llm, operation_component, set_service_name,
|
||||
};
|
||||
use model_selection::router_chat_get_upstream_model;
|
||||
|
||||
pub async fn llm_chat(
|
||||
request: Request<hyper::body::Incoming>,
|
||||
state: Arc<AppState>,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||
let request_path = request.uri().path().to_string();
|
||||
let request_headers = request.headers().clone();
|
||||
let request_id = extract_request_id(&request);
|
||||
let custom_attrs =
|
||||
collect_custom_trace_attributes(&request_headers, state.span_attributes.as_ref());
|
||||
|
||||
// Create a span with request_id that will be included in all log lines
|
||||
let request_span = info_span!(
|
||||
"llm",
|
||||
component = "llm",
|
||||
request_id = %request_id,
|
||||
http.method = %request.method(),
|
||||
http.path = %request_path,
|
||||
llm.model = tracing::field::Empty,
|
||||
llm.tools = tracing::field::Empty,
|
||||
llm.user_message_preview = tracing::field::Empty,
|
||||
llm.temperature = tracing::field::Empty,
|
||||
);
|
||||
|
||||
// Execute the rest of the handler inside the span
|
||||
llm_chat_inner(
|
||||
request,
|
||||
state,
|
||||
custom_attrs,
|
||||
request_id,
|
||||
request_path,
|
||||
request_headers,
|
||||
)
|
||||
.instrument(request_span)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn llm_chat_inner(
|
||||
request: Request<hyper::body::Incoming>,
|
||||
state: Arc<AppState>,
|
||||
custom_attrs: HashMap<String, String>,
|
||||
request_id: String,
|
||||
request_path: String,
|
||||
mut request_headers: hyper::HeaderMap,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||
// Set service name for LLM operations
|
||||
set_service_name(operation_component::LLM);
|
||||
get_active_span(|span| {
|
||||
for (key, value) in &custom_attrs {
|
||||
span.set_attribute(opentelemetry::KeyValue::new(key.clone(), value.clone()));
|
||||
}
|
||||
});
|
||||
|
||||
let traceparent = extract_or_generate_traceparent(&request_headers);
|
||||
|
||||
let full_qualified_llm_provider_url = format!("{}{}", state.llm_provider_url, request_path);
|
||||
|
||||
// --- Phase 1: Parse and validate the incoming request ---
|
||||
let parsed = match parse_and_validate_request(
|
||||
request,
|
||||
&request_path,
|
||||
&state.model_aliases,
|
||||
&state.llm_providers,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(p) => p,
|
||||
Err(response) => return Ok(response),
|
||||
};
|
||||
|
||||
let PreparedRequest {
|
||||
mut client_request,
|
||||
chat_request_bytes,
|
||||
model_from_request,
|
||||
alias_resolved_model,
|
||||
model_name_only,
|
||||
is_streaming_request,
|
||||
is_responses_api_client,
|
||||
messages_for_signals,
|
||||
temperature,
|
||||
tool_names,
|
||||
user_message_preview,
|
||||
inline_routing_policy,
|
||||
client_api,
|
||||
provider_id,
|
||||
} = parsed;
|
||||
|
||||
// Record LLM-specific span attributes
|
||||
let span = tracing::Span::current();
|
||||
if let Some(temp) = temperature {
|
||||
span.record(tracing_llm::TEMPERATURE, tracing::field::display(temp));
|
||||
}
|
||||
if let Some(tools) = &tool_names {
|
||||
let formatted_tools = tools
|
||||
.iter()
|
||||
.map(|name| format!("{}(...)", name))
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n");
|
||||
span.record(tracing_llm::TOOLS, formatted_tools.as_str());
|
||||
}
|
||||
if let Some(preview) = &user_message_preview {
|
||||
span.record(tracing_llm::USER_MESSAGE_PREVIEW, preview.as_str());
|
||||
}
|
||||
|
||||
// --- Phase 1b: Input filter processing for model listener ---
|
||||
if let Some(ref input_chain) = state.filter_pipeline.input {
|
||||
if !input_chain.is_empty() {
|
||||
debug!(input_filters = ?input_chain.filter_ids, "processing model listener input filters");
|
||||
let chain = input_chain.to_agent_filter_chain("model_listener");
|
||||
let mut pipeline_processor = PipelineProcessor::default();
|
||||
match pipeline_processor
|
||||
.process_raw_filter_chain(
|
||||
&chat_request_bytes,
|
||||
&chain,
|
||||
&input_chain.agents,
|
||||
&request_headers,
|
||||
&request_path,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(filtered_bytes) => {
|
||||
let api_type = SupportedAPIsFromClient::from_endpoint(request_path.as_str())
|
||||
.expect("endpoint validated in parse_and_validate_request");
|
||||
match ProviderRequestType::try_from((&filtered_bytes[..], &api_type)) {
|
||||
Ok(updated_request) => {
|
||||
client_request = updated_request;
|
||||
info!("input filter chain processed successfully");
|
||||
}
|
||||
Err(parse_err) => {
|
||||
warn!(error = %parse_err, "input filter returned invalid request JSON");
|
||||
return Ok(common::errors::BrightStaffError::InvalidRequest(format!(
|
||||
"Input filter returned invalid request: {}",
|
||||
parse_err
|
||||
))
|
||||
.into_response());
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(super::agents::pipeline::PipelineError::ClientError {
|
||||
agent,
|
||||
status,
|
||||
body,
|
||||
}) => {
|
||||
warn!(agent = %agent, status = %status, body = %body, "client error from filter chain");
|
||||
let error_json = serde_json::json!({
|
||||
"error": "FilterChainError",
|
||||
"agent": agent,
|
||||
"status": status,
|
||||
"agent_response": body
|
||||
});
|
||||
let mut error_response = Response::new(full(error_json.to_string()));
|
||||
*error_response.status_mut() =
|
||||
StatusCode::from_u16(status).unwrap_or(StatusCode::BAD_REQUEST);
|
||||
error_response.headers_mut().insert(
|
||||
hyper::header::CONTENT_TYPE,
|
||||
hyper::header::HeaderValue::from_static("application/json"),
|
||||
);
|
||||
return Ok(error_response);
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(error = %err, "filter chain processing failed");
|
||||
let mut internal_error =
|
||||
Response::new(full(format!("Filter chain processing failed: {}", err)));
|
||||
*internal_error.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
|
||||
return Ok(internal_error);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Normalize for upstream after input filters
|
||||
if let Some(ref client_api_kind) = client_api {
|
||||
let upstream_api =
|
||||
provider_id.compatible_api_for_client(client_api_kind, is_streaming_request);
|
||||
client_request.normalize_for_upstream(provider_id, &upstream_api);
|
||||
}
|
||||
|
||||
// --- Phase 2: Resolve conversation state (v1/responses API) ---
|
||||
let state_ctx = match resolve_conversation_state(
|
||||
&mut client_request,
|
||||
is_responses_api_client,
|
||||
&state.state_storage,
|
||||
&state.llm_providers,
|
||||
&alias_resolved_model,
|
||||
&request_path,
|
||||
is_streaming_request,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(ctx) => ctx,
|
||||
Err(response) => return Ok(response),
|
||||
};
|
||||
|
||||
// Serialize request for upstream BEFORE router consumes it
|
||||
let client_request_bytes_for_upstream: Bytes =
|
||||
match ProviderRequestType::to_bytes(&client_request) {
|
||||
Ok(bytes) => bytes.into(),
|
||||
Err(err) => {
|
||||
warn!(error = %err, "failed to serialize request for upstream");
|
||||
let mut r = Response::new(full(format!("Failed to serialize request: {}", err)));
|
||||
*r.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
|
||||
return Ok(r);
|
||||
}
|
||||
};
|
||||
|
||||
// --- Phase 3: Route the request ---
|
||||
let routing_span = info_span!(
|
||||
"routing",
|
||||
component = "routing",
|
||||
http.method = "POST",
|
||||
http.target = %request_path,
|
||||
model.requested = %model_from_request,
|
||||
model.alias_resolved = %alias_resolved_model,
|
||||
route.selected_model = tracing::field::Empty,
|
||||
routing.determination_ms = tracing::field::Empty,
|
||||
);
|
||||
let routing_result = match async {
|
||||
set_service_name(operation_component::ROUTING);
|
||||
router_chat_get_upstream_model(
|
||||
Arc::clone(&state.router_service),
|
||||
client_request,
|
||||
&traceparent,
|
||||
&request_path,
|
||||
&request_id,
|
||||
inline_routing_policy,
|
||||
)
|
||||
.await
|
||||
}
|
||||
.instrument(routing_span)
|
||||
.await
|
||||
{
|
||||
Ok(result) => result,
|
||||
Err(err) => {
|
||||
let mut internal_error = Response::new(full(err.message));
|
||||
*internal_error.status_mut() = err.status_code;
|
||||
return Ok(internal_error);
|
||||
}
|
||||
};
|
||||
|
||||
// Determine final model (router returns "none" when it doesn't select a specific model)
|
||||
let router_selected_model = routing_result.model_name;
|
||||
let resolved_model = if router_selected_model != "none" {
|
||||
router_selected_model
|
||||
} else {
|
||||
alias_resolved_model.clone()
|
||||
};
|
||||
tracing::Span::current().record(tracing_llm::MODEL_NAME, resolved_model.as_str());
|
||||
|
||||
// --- Phase 4: Forward to upstream and stream back ---
|
||||
send_upstream(
|
||||
&state.http_client,
|
||||
&full_qualified_llm_provider_url,
|
||||
&mut request_headers,
|
||||
client_request_bytes_for_upstream,
|
||||
&model_from_request,
|
||||
&alias_resolved_model,
|
||||
&resolved_model,
|
||||
&model_name_only,
|
||||
&request_path,
|
||||
is_streaming_request,
|
||||
messages_for_signals,
|
||||
state_ctx,
|
||||
state.state_storage.clone(),
|
||||
request_id,
|
||||
&state.filter_pipeline,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Phase 1 — Parse & validate the incoming request
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// All pre-validated request data extracted from the raw HTTP request.
|
||||
struct PreparedRequest {
|
||||
client_request: ProviderRequestType,
|
||||
chat_request_bytes: Bytes,
|
||||
model_from_request: String,
|
||||
alias_resolved_model: String,
|
||||
model_name_only: String,
|
||||
is_streaming_request: bool,
|
||||
is_responses_api_client: bool,
|
||||
messages_for_signals: Option<Vec<Message>>,
|
||||
temperature: Option<f32>,
|
||||
tool_names: Option<Vec<String>>,
|
||||
user_message_preview: Option<String>,
|
||||
inline_routing_policy: Option<Vec<common::configuration::ModelUsagePreference>>,
|
||||
client_api: Option<SupportedAPIsFromClient>,
|
||||
provider_id: hermesllm::ProviderId,
|
||||
}
|
||||
|
||||
/// Parse the body, resolve the model alias, and validate the model exists.
|
||||
///
|
||||
/// Returns `Err(Response)` for early-exit error responses (400 etc.).
|
||||
async fn parse_and_validate_request(
|
||||
request: Request<hyper::body::Incoming>,
|
||||
request_path: &str,
|
||||
model_aliases: &Option<HashMap<String, ModelAlias>>,
|
||||
llm_providers: &Arc<RwLock<LlmProviders>>,
|
||||
) -> Result<PreparedRequest, Response<BoxBody<Bytes, hyper::Error>>> {
|
||||
let raw_bytes = request
|
||||
.collect()
|
||||
.await
|
||||
.map_err(|_| {
|
||||
let mut r = Response::new(full("Failed to read request body"));
|
||||
*r.status_mut() = StatusCode::BAD_REQUEST;
|
||||
r
|
||||
})?
|
||||
.to_bytes();
|
||||
|
||||
debug!(
|
||||
body = %String::from_utf8_lossy(&raw_bytes),
|
||||
"request body received"
|
||||
);
|
||||
|
||||
// Extract routing_policy from request body if present
|
||||
let (chat_request_bytes, inline_routing_policy) =
|
||||
crate::handlers::routing_service::extract_routing_policy(&raw_bytes, false).map_err(
|
||||
|err| {
|
||||
warn!(error = %err, "failed to parse request JSON");
|
||||
let mut r = Response::new(full(format!("Failed to parse request: {}", err)));
|
||||
*r.status_mut() = StatusCode::BAD_REQUEST;
|
||||
r
|
||||
},
|
||||
)?;
|
||||
|
||||
let api_type = SupportedAPIsFromClient::from_endpoint(request_path).ok_or_else(|| {
|
||||
warn!(path = %request_path, "unsupported endpoint");
|
||||
let mut r = Response::new(full(format!("Unsupported endpoint: {}", request_path)));
|
||||
*r.status_mut() = StatusCode::BAD_REQUEST;
|
||||
r
|
||||
})?;
|
||||
|
||||
let mut client_request = ProviderRequestType::try_from((&chat_request_bytes[..], &api_type))
|
||||
.map_err(|err| {
|
||||
warn!(error = %err, "failed to parse request as ProviderRequestType");
|
||||
let mut r = Response::new(full(format!("Failed to parse request: {}", err)));
|
||||
*r.status_mut() = StatusCode::BAD_REQUEST;
|
||||
r
|
||||
})?;
|
||||
|
||||
let is_responses_api_client =
|
||||
matches!(api_type, SupportedAPIsFromClient::OpenAIResponsesAPI(_));
|
||||
let client_api = Some(api_type);
|
||||
|
||||
let model_from_request = client_request.model().to_string();
|
||||
let temperature = client_request.get_temperature();
|
||||
let is_streaming_request = client_request.is_streaming();
|
||||
let alias_resolved_model = resolve_model_alias(&model_from_request, model_aliases);
|
||||
let (provider_id, _) = get_provider_info(llm_providers, &alias_resolved_model).await;
|
||||
|
||||
// Validate model exists in configuration
|
||||
if llm_providers
|
||||
.read()
|
||||
.await
|
||||
.get(&alias_resolved_model)
|
||||
.is_none()
|
||||
{
|
||||
let err_msg = format!(
|
||||
"Model '{}' not found in configured providers",
|
||||
alias_resolved_model
|
||||
);
|
||||
warn!(model = %alias_resolved_model, "model not found in configured providers");
|
||||
let mut r = Response::new(full(err_msg));
|
||||
*r.status_mut() = StatusCode::BAD_REQUEST;
|
||||
return Err(r);
|
||||
}
|
||||
|
||||
// Strip provider prefix for upstream (e.g. "openai/gpt-4" → "gpt-4")
|
||||
let model_name_only = alias_resolved_model
|
||||
.split_once('/')
|
||||
.map(|(_, model)| model.to_string())
|
||||
.unwrap_or_else(|| alias_resolved_model.clone());
|
||||
|
||||
// Extract span attributes and messages before mutating client_request
|
||||
let tool_names = client_request.get_tool_names();
|
||||
let user_message_preview = client_request
|
||||
.get_recent_user_message()
|
||||
.map(|msg| truncate_message(&msg, 50));
|
||||
let messages_for_signals = Some(client_request.get_messages());
|
||||
|
||||
// Set the upstream model name and strip routing metadata
|
||||
client_request.set_model(model_name_only.clone());
|
||||
if client_request.remove_metadata_key("archgw_preference_config") {
|
||||
debug!("removed archgw_preference_config from metadata");
|
||||
}
|
||||
if client_request.remove_metadata_key("plano_preference_config") {
|
||||
debug!("removed plano_preference_config from metadata");
|
||||
}
|
||||
|
||||
Ok(PreparedRequest {
|
||||
client_request,
|
||||
chat_request_bytes,
|
||||
model_from_request,
|
||||
alias_resolved_model,
|
||||
model_name_only,
|
||||
is_streaming_request,
|
||||
is_responses_api_client,
|
||||
messages_for_signals,
|
||||
temperature,
|
||||
tool_names,
|
||||
user_message_preview,
|
||||
inline_routing_policy,
|
||||
client_api,
|
||||
provider_id,
|
||||
})
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Phase 2 — Resolve conversation state (v1/responses API)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Holds the state management context resolved from a v1/responses request.
|
||||
struct ConversationStateContext {
|
||||
should_manage_state: bool,
|
||||
original_input_items: Vec<hermesllm::apis::openai_responses::InputItem>,
|
||||
}
|
||||
|
||||
/// If the client uses the v1/responses API and the upstream provider doesn't
|
||||
/// support it natively, we manage conversation state ourselves.
|
||||
///
|
||||
/// This resolves `previous_response_id`, merges conversation history, and
|
||||
/// updates the request in place.
|
||||
///
|
||||
/// Returns `Err(Response)` for early-exit (e.g. 409 Conflict).
|
||||
async fn resolve_conversation_state(
|
||||
client_request: &mut ProviderRequestType,
|
||||
is_responses_api_client: bool,
|
||||
state_storage: &Option<Arc<dyn StateStorage>>,
|
||||
llm_providers: &Arc<RwLock<LlmProviders>>,
|
||||
alias_resolved_model: &str,
|
||||
request_path: &str,
|
||||
is_streaming_request: bool,
|
||||
) -> Result<ConversationStateContext, Response<BoxBody<Bytes, hyper::Error>>> {
|
||||
if !is_responses_api_client {
|
||||
return Ok(ConversationStateContext {
|
||||
should_manage_state: false,
|
||||
original_input_items: Vec::new(),
|
||||
});
|
||||
}
|
||||
|
||||
let (responses_req, state_store) = match (client_request, state_storage) {
|
||||
(ProviderRequestType::ResponsesAPIRequest(ref mut req), Some(store)) => (req, store),
|
||||
_ => {
|
||||
return Ok(ConversationStateContext {
|
||||
should_manage_state: false,
|
||||
original_input_items: Vec::new(),
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
let mut original_input_items = extract_input_items(&responses_req.input);
|
||||
|
||||
// Check whether the upstream supports v1/responses natively
|
||||
let upstream_path = get_upstream_path(
|
||||
llm_providers,
|
||||
alias_resolved_model,
|
||||
request_path,
|
||||
alias_resolved_model,
|
||||
is_streaming_request,
|
||||
)
|
||||
.await;
|
||||
|
||||
let upstream_api = SupportedUpstreamAPIs::from_endpoint(&upstream_path);
|
||||
let should_manage_state = !matches!(
|
||||
upstream_api,
|
||||
Some(SupportedUpstreamAPIs::OpenAIResponsesAPI(_))
|
||||
);
|
||||
|
||||
if !should_manage_state {
|
||||
debug!("upstream supports ResponsesAPI natively");
|
||||
return Ok(ConversationStateContext {
|
||||
should_manage_state: false,
|
||||
original_input_items,
|
||||
});
|
||||
}
|
||||
|
||||
// Retrieve and combine conversation history if previous_response_id exists
|
||||
if let Some(ref prev_resp_id) = responses_req.previous_response_id {
|
||||
match retrieve_and_combine_input(state_store.clone(), prev_resp_id, original_input_items)
|
||||
.await
|
||||
{
|
||||
Ok(combined_input) => {
|
||||
responses_req.input = InputParam::Items(combined_input.clone());
|
||||
original_input_items = combined_input;
|
||||
info!(
|
||||
items = original_input_items.len(),
|
||||
"updated request with conversation history"
|
||||
);
|
||||
}
|
||||
Err(StateStorageError::NotFound(_)) => {
|
||||
warn!(previous_response_id = %prev_resp_id, "previous response_id not found");
|
||||
let err_msg = format!(
|
||||
"Conversation state not found for previous_response_id: {}",
|
||||
prev_resp_id
|
||||
);
|
||||
let mut r = Response::new(full(err_msg));
|
||||
*r.status_mut() = StatusCode::CONFLICT;
|
||||
return Err(r);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(
|
||||
previous_response_id = %prev_resp_id,
|
||||
error = %e,
|
||||
"failed to retrieve conversation state"
|
||||
);
|
||||
// Restore original_input_items since we passed ownership
|
||||
original_input_items = extract_input_items(&responses_req.input);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(ConversationStateContext {
|
||||
should_manage_state,
|
||||
original_input_items,
|
||||
})
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Phase 4 — Forward to upstream and stream the response back
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn send_upstream(
|
||||
http_client: &reqwest::Client,
|
||||
upstream_url: &str,
|
||||
request_headers: &mut hyper::HeaderMap,
|
||||
body: bytes::Bytes,
|
||||
model_from_request: &str,
|
||||
alias_resolved_model: &str,
|
||||
resolved_model: &str,
|
||||
model_name_only: &str,
|
||||
request_path: &str,
|
||||
is_streaming_request: bool,
|
||||
messages_for_signals: Option<Vec<Message>>,
|
||||
state_ctx: ConversationStateContext,
|
||||
state_storage: Option<Arc<dyn StateStorage>>,
|
||||
request_id: String,
|
||||
filter_pipeline: &Arc<FilterPipeline>,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||
let span_name = if model_from_request == resolved_model {
|
||||
format!("POST {} {}", request_path, resolved_model)
|
||||
} else {
|
||||
format!(
|
||||
"POST {} {} -> {}",
|
||||
request_path, model_from_request, resolved_model
|
||||
)
|
||||
};
|
||||
get_active_span(|span| {
|
||||
span.update_name(span_name.clone());
|
||||
});
|
||||
|
||||
debug!(
|
||||
url = %upstream_url,
|
||||
provider_hint = %resolved_model,
|
||||
upstream_model = %model_name_only,
|
||||
"Routing to upstream"
|
||||
);
|
||||
|
||||
if let Ok(val) = header::HeaderValue::from_str(resolved_model) {
|
||||
request_headers.insert(ARCH_PROVIDER_HINT_HEADER, val);
|
||||
}
|
||||
request_headers.insert(
|
||||
header::HeaderName::from_static(ARCH_IS_STREAMING_HEADER),
|
||||
header::HeaderValue::from_static(if is_streaming_request {
|
||||
"true"
|
||||
} else {
|
||||
"false"
|
||||
}),
|
||||
);
|
||||
request_headers.remove(header::CONTENT_LENGTH);
|
||||
|
||||
// Inject current span's trace context so upstream spans are children of plano(llm)
|
||||
global::get_text_map_propagator(|propagator| {
|
||||
let cx = tracing_opentelemetry::OpenTelemetrySpanExt::context(&tracing::Span::current());
|
||||
propagator.inject_context(&cx, &mut HeaderInjector(request_headers));
|
||||
});
|
||||
|
||||
let request_start_time = std::time::Instant::now();
|
||||
|
||||
let llm_response = match http_client
|
||||
.post(upstream_url)
|
||||
.headers(request_headers.clone())
|
||||
.body(body)
|
||||
.send()
|
||||
.await
|
||||
{
|
||||
Ok(res) => res,
|
||||
Err(err) => {
|
||||
let err_msg = format!("Failed to send request: {}", err);
|
||||
let mut internal_error = Response::new(full(err_msg));
|
||||
*internal_error.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
|
||||
return Ok(internal_error);
|
||||
}
|
||||
};
|
||||
|
||||
// Propagate upstream headers and status
|
||||
let response_headers = llm_response.headers().clone();
|
||||
let upstream_status = llm_response.status();
|
||||
let mut response = Response::builder().status(upstream_status);
|
||||
if let Some(headers) = response.headers_mut() {
|
||||
for (name, value) in response_headers.iter() {
|
||||
headers.insert(name, value.clone());
|
||||
}
|
||||
}
|
||||
|
||||
let byte_stream = llm_response.bytes_stream();
|
||||
|
||||
// Create base processor for metrics and tracing
|
||||
let base_processor = ObservableStreamProcessor::new(
|
||||
operation_component::LLM,
|
||||
span_name,
|
||||
request_start_time,
|
||||
messages_for_signals,
|
||||
);
|
||||
|
||||
let output_filter_request_headers = if filter_pipeline.has_output_filters() {
|
||||
Some(request_headers.clone())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Pick the right processor: state-aware if needed, otherwise base metrics-only.
|
||||
let processor: Box<dyn StreamProcessor> = if let (true, false, Some(state_store)) = (
|
||||
state_ctx.should_manage_state,
|
||||
state_ctx.original_input_items.is_empty(),
|
||||
state_storage,
|
||||
) {
|
||||
let content_encoding = response_headers
|
||||
.get("content-encoding")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.map(|s| s.to_string());
|
||||
|
||||
Box::new(ResponsesStateProcessor::new(
|
||||
base_processor,
|
||||
state_store,
|
||||
state_ctx.original_input_items,
|
||||
alias_resolved_model.to_string(),
|
||||
resolved_model.to_string(),
|
||||
is_streaming_request,
|
||||
false,
|
||||
content_encoding,
|
||||
request_id,
|
||||
))
|
||||
} else {
|
||||
Box::new(base_processor)
|
||||
};
|
||||
|
||||
let streaming_response = if let (Some(output_chain), Some(filter_headers)) = (
|
||||
filter_pipeline.output.as_ref().filter(|c| !c.is_empty()),
|
||||
output_filter_request_headers,
|
||||
) {
|
||||
create_streaming_response_with_output_filter(
|
||||
byte_stream,
|
||||
processor,
|
||||
output_chain.clone(),
|
||||
filter_headers,
|
||||
request_path.to_string(),
|
||||
)
|
||||
} else {
|
||||
create_streaming_response(byte_stream, processor)
|
||||
};
|
||||
|
||||
match response.body(streaming_response.body) {
|
||||
Ok(response) => Ok(response),
|
||||
Err(err) => {
|
||||
let err_msg = format!("Failed to create response: {}", err);
|
||||
let mut internal_error = Response::new(full(err_msg));
|
||||
*internal_error.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
|
||||
Ok(internal_error)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Resolves model aliases by looking up the requested model in the model_aliases map.
|
||||
/// Returns the target model if an alias is found, otherwise returns the original model.
|
||||
fn resolve_model_alias(
|
||||
model_from_request: &str,
|
||||
model_aliases: &Option<HashMap<String, ModelAlias>>,
|
||||
) -> String {
|
||||
if let Some(aliases) = model_aliases.as_ref() {
|
||||
if let Some(model_alias) = aliases.get(model_from_request) {
|
||||
debug!(
|
||||
"Model Alias: 'From {}' -> 'To {}'",
|
||||
model_from_request, model_alias.target
|
||||
);
|
||||
return model_alias.target.clone();
|
||||
}
|
||||
}
|
||||
model_from_request.to_string()
|
||||
}
|
||||
|
||||
/// Calculates the upstream path for the provider based on the model name.
|
||||
async fn get_upstream_path(
|
||||
llm_providers: &Arc<RwLock<LlmProviders>>,
|
||||
model_name: &str,
|
||||
request_path: &str,
|
||||
resolved_model: &str,
|
||||
is_streaming: bool,
|
||||
) -> String {
|
||||
let (provider_id, base_url_path_prefix) = get_provider_info(llm_providers, model_name).await;
|
||||
|
||||
let Some(client_api) = SupportedAPIsFromClient::from_endpoint(request_path) else {
|
||||
return request_path.to_string();
|
||||
};
|
||||
|
||||
client_api.target_endpoint_for_provider(
|
||||
&provider_id,
|
||||
request_path,
|
||||
resolved_model,
|
||||
is_streaming,
|
||||
base_url_path_prefix.as_deref(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Helper to get provider info (ProviderId and base_url_path_prefix).
|
||||
async fn get_provider_info(
|
||||
llm_providers: &Arc<RwLock<LlmProviders>>,
|
||||
model_name: &str,
|
||||
) -> (hermesllm::ProviderId, Option<String>) {
|
||||
let providers_lock = llm_providers.read().await;
|
||||
|
||||
if let Some(provider) = providers_lock.get(model_name) {
|
||||
let provider_id = provider.provider_interface.to_provider_id();
|
||||
let prefix = provider.base_url_path_prefix.clone();
|
||||
return (provider_id, prefix);
|
||||
}
|
||||
|
||||
if let Some(provider) = providers_lock.default() {
|
||||
let provider_id = provider.provider_interface.to_provider_id();
|
||||
let prefix = provider.base_url_path_prefix.clone();
|
||||
(provider_id, prefix)
|
||||
} else {
|
||||
warn!("No default provider found, falling back to OpenAI");
|
||||
(hermesllm::ProviderId::OpenAI, None)
|
||||
}
|
||||
}
|
||||
|
|
@ -5,7 +5,8 @@ use hyper::StatusCode;
|
|||
use std::sync::Arc;
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
use crate::router::llm_router::RouterService;
|
||||
use crate::router::llm::RouterService;
|
||||
use crate::streaming::truncate_message;
|
||||
use crate::tracing::routing;
|
||||
|
||||
pub struct RoutingResult {
|
||||
|
|
@ -103,16 +104,7 @@ pub async fn router_chat_get_upstream_model(
|
|||
.map_or("None".to_string(), |c| c.to_string().replace('\n', "\\n"))
|
||||
});
|
||||
|
||||
const MAX_MESSAGE_LENGTH: usize = 50;
|
||||
let latest_message_for_log = if latest_message_for_log.chars().count() > MAX_MESSAGE_LENGTH {
|
||||
let truncated: String = latest_message_for_log
|
||||
.chars()
|
||||
.take(MAX_MESSAGE_LENGTH)
|
||||
.collect();
|
||||
format!("{}...", truncated)
|
||||
} else {
|
||||
latest_message_for_log
|
||||
};
|
||||
let latest_message_for_log = truncate_message(&latest_message_for_log, 50);
|
||||
|
||||
info!(
|
||||
has_usage_preferences = usage_preferences.is_some(),
|
||||
|
|
@ -1,14 +1,57 @@
|
|||
pub mod agent_chat_completions;
|
||||
pub mod agent_selector;
|
||||
pub mod agents;
|
||||
pub mod function_calling;
|
||||
pub mod jsonrpc;
|
||||
pub mod llm;
|
||||
pub mod models;
|
||||
pub mod pipeline_processor;
|
||||
pub mod response_handler;
|
||||
pub mod router_chat;
|
||||
pub mod response;
|
||||
pub mod routing_service;
|
||||
pub mod streaming;
|
||||
|
||||
#[cfg(test)]
|
||||
mod integration_tests;
|
||||
|
||||
use bytes::Bytes;
|
||||
use common::consts::TRACE_PARENT_HEADER;
|
||||
use http_body_util::combinators::BoxBody;
|
||||
use http_body_util::{BodyExt, Empty, Full};
|
||||
use hyper::Request;
|
||||
use tracing::warn;
|
||||
|
||||
/// Wrap a chunk into a `BoxBody` for hyper responses.
|
||||
pub fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
|
||||
Full::new(chunk.into())
|
||||
.map_err(|never| match never {})
|
||||
.boxed()
|
||||
}
|
||||
|
||||
/// An empty HTTP body (used for 404 / OPTIONS responses).
|
||||
pub fn empty() -> BoxBody<Bytes, hyper::Error> {
|
||||
Empty::<Bytes>::new()
|
||||
.map_err(|never| match never {})
|
||||
.boxed()
|
||||
}
|
||||
|
||||
/// Extract request ID from incoming request headers, or generate a new UUID v4.
|
||||
pub fn extract_request_id<T>(request: &Request<T>) -> String {
|
||||
request
|
||||
.headers()
|
||||
.get(common::consts::REQUEST_ID_HEADER)
|
||||
.and_then(|h| h.to_str().ok())
|
||||
.map(|s| s.to_string())
|
||||
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string())
|
||||
}
|
||||
|
||||
/// Extract or generate a W3C `traceparent` header value.
|
||||
pub fn extract_or_generate_traceparent(headers: &hyper::HeaderMap) -> String {
|
||||
headers
|
||||
.get(TRACE_PARENT_HEADER)
|
||||
.and_then(|h| h.to_str().ok())
|
||||
.map(|s| s.to_string())
|
||||
.unwrap_or_else(|| {
|
||||
let trace_id = uuid::Uuid::new_v4().to_string().replace("-", "");
|
||||
let tp = format!("00-{}-0000000000000000-01", trace_id);
|
||||
warn!(
|
||||
generated_traceparent = %tp,
|
||||
"TRACE_PARENT header missing, generated new traceparent"
|
||||
);
|
||||
tp
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,10 +1,11 @@
|
|||
use bytes::Bytes;
|
||||
use common::llm_providers::LlmProviders;
|
||||
use http_body_util::{combinators::BoxBody, BodyExt, Full};
|
||||
use http_body_util::combinators::BoxBody;
|
||||
use hyper::{Response, StatusCode};
|
||||
use serde_json;
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::full;
|
||||
|
||||
pub async fn list_models(
|
||||
llm_providers: Arc<tokio::sync::RwLock<LlmProviders>>,
|
||||
) -> Response<BoxBody<Bytes, hyper::Error>> {
|
||||
|
|
@ -12,27 +13,15 @@ pub async fn list_models(
|
|||
let models = prov.to_models();
|
||||
|
||||
match serde_json::to_string(&models) {
|
||||
Ok(json) => {
|
||||
let body = Full::new(Bytes::from(json))
|
||||
.map_err(|never| match never {})
|
||||
.boxed();
|
||||
Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header("Content-Type", "application/json")
|
||||
.body(body)
|
||||
.unwrap()
|
||||
}
|
||||
Err(_) => {
|
||||
let body = Full::new(Bytes::from_static(
|
||||
b"{\"error\":\"Failed to serialize models\"}",
|
||||
))
|
||||
.map_err(|never| match never {})
|
||||
.boxed();
|
||||
Response::builder()
|
||||
.status(StatusCode::INTERNAL_SERVER_ERROR)
|
||||
.header("Content-Type", "application/json")
|
||||
.body(body)
|
||||
.unwrap()
|
||||
}
|
||||
Ok(json) => Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header("Content-Type", "application/json")
|
||||
.body(full(json))
|
||||
.unwrap(),
|
||||
Err(_) => Response::builder()
|
||||
.status(StatusCode::INTERNAL_SERVER_ERROR)
|
||||
.header("Content-Type", "application/json")
|
||||
.body(full("{\"error\":\"Failed to serialize models\"}"))
|
||||
.unwrap(),
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ use hermesllm::apis::OpenAIApi;
|
|||
use hermesllm::clients::{SupportedAPIsFromClient, SupportedUpstreamAPIs};
|
||||
use hermesllm::SseEvent;
|
||||
use http_body_util::combinators::BoxBody;
|
||||
use http_body_util::{BodyExt, Full, StreamBody};
|
||||
use http_body_util::StreamBody;
|
||||
use hyper::body::Frame;
|
||||
use hyper::Response;
|
||||
use tokio::sync::mpsc;
|
||||
|
|
@ -12,6 +12,8 @@ use tokio_stream::wrappers::ReceiverStream;
|
|||
use tokio_stream::StreamExt;
|
||||
use tracing::{info, warn, Instrument};
|
||||
|
||||
use super::full;
|
||||
|
||||
/// Service for handling HTTP responses and streaming
|
||||
pub struct ResponseHandler;
|
||||
|
||||
|
|
@ -22,9 +24,40 @@ impl ResponseHandler {
|
|||
|
||||
/// Create a full response body from bytes
|
||||
pub fn create_full_body<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
|
||||
Full::new(chunk.into())
|
||||
.map_err(|never| match never {})
|
||||
.boxed()
|
||||
full(chunk)
|
||||
}
|
||||
|
||||
/// Create a JSON error response with BAD_REQUEST status
|
||||
pub fn create_json_error_response(
|
||||
json: &serde_json::Value,
|
||||
) -> Response<BoxBody<Bytes, hyper::Error>> {
|
||||
let body = Self::create_full_body(json.to_string());
|
||||
let mut response = Response::new(body);
|
||||
*response.status_mut() = hyper::StatusCode::BAD_REQUEST;
|
||||
response.headers_mut().insert(
|
||||
hyper::header::CONTENT_TYPE,
|
||||
hyper::header::HeaderValue::from_static("application/json"),
|
||||
);
|
||||
response
|
||||
}
|
||||
|
||||
/// Create a BAD_REQUEST error response with a message
|
||||
pub fn create_bad_request(message: &str) -> Response<BoxBody<Bytes, hyper::Error>> {
|
||||
let json = serde_json::json!({"error": message});
|
||||
Self::create_json_error_response(&json)
|
||||
}
|
||||
|
||||
/// Create an INTERNAL_SERVER_ERROR response with a message
|
||||
pub fn create_internal_error(message: &str) -> Response<BoxBody<Bytes, hyper::Error>> {
|
||||
let json = serde_json::json!({"error": message});
|
||||
let body = Self::create_full_body(json.to_string());
|
||||
let mut response = Response::new(body);
|
||||
*response.status_mut() = hyper::StatusCode::INTERNAL_SERVER_ERROR;
|
||||
response.headers_mut().insert(
|
||||
hyper::header::CONTENT_TYPE,
|
||||
hyper::header::HeaderValue::from_static("application/json"),
|
||||
);
|
||||
response
|
||||
}
|
||||
|
||||
/// Create a streaming response from a reqwest response.
|
||||
|
|
@ -112,7 +145,9 @@ impl ResponseHandler {
|
|||
let upstream_api =
|
||||
SupportedUpstreamAPIs::OpenAIChatCompletions(OpenAIApi::ChatCompletions);
|
||||
|
||||
let sse_iter = SseStreamIter::try_from(response_bytes.as_ref()).unwrap();
|
||||
let sse_iter = SseStreamIter::try_from(response_bytes.as_ref()).map_err(|e| {
|
||||
BrightStaffError::StreamError(format!("Failed to parse SSE stream: {}", e))
|
||||
})?;
|
||||
let mut accumulated_text = String::new();
|
||||
|
||||
for sse_event in sse_iter {
|
||||
|
|
@ -122,7 +157,13 @@ impl ResponseHandler {
|
|||
}
|
||||
|
||||
let transformed_event =
|
||||
SseEvent::try_from((sse_event, &client_api, &upstream_api)).unwrap();
|
||||
match SseEvent::try_from((sse_event, &client_api, &upstream_api)) {
|
||||
Ok(event) => event,
|
||||
Err(e) => {
|
||||
warn!(error = ?e, "failed to transform SSE event, skipping");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// Try to get provider response and extract content delta
|
||||
match transformed_event.provider_response() {
|
||||
|
|
@ -1,6 +1,6 @@
|
|||
use bytes::Bytes;
|
||||
use common::configuration::{ModelUsagePreference, SpanAttributes};
|
||||
use common::consts::{REQUEST_ID_HEADER, TRACE_PARENT_HEADER};
|
||||
use common::consts::REQUEST_ID_HEADER;
|
||||
use common::errors::BrightStaffError;
|
||||
use hermesllm::clients::SupportedAPIsFromClient;
|
||||
use hermesllm::ProviderRequestType;
|
||||
|
|
@ -10,8 +10,9 @@ use hyper::{Request, Response, StatusCode};
|
|||
use std::sync::Arc;
|
||||
use tracing::{debug, info, info_span, warn, Instrument};
|
||||
|
||||
use crate::handlers::router_chat::router_chat_get_upstream_model;
|
||||
use crate::router::llm_router::RouterService;
|
||||
use super::extract_or_generate_traceparent;
|
||||
use crate::handlers::llm::model_selection::router_chat_get_upstream_model;
|
||||
use crate::router::llm::RouterService;
|
||||
use crate::tracing::{collect_custom_trace_attributes, operation_component, set_service_name};
|
||||
|
||||
const ROUTING_POLICY_SIZE_WARNING_BYTES: usize = 5120;
|
||||
|
|
@ -72,7 +73,7 @@ pub async fn routing_decision(
|
|||
request: Request<hyper::body::Incoming>,
|
||||
router_service: Arc<RouterService>,
|
||||
request_path: String,
|
||||
span_attributes: Arc<Option<SpanAttributes>>,
|
||||
span_attributes: &Option<SpanAttributes>,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||
let request_headers = request.headers().clone();
|
||||
let request_id: String = request_headers
|
||||
|
|
@ -81,8 +82,7 @@ pub async fn routing_decision(
|
|||
.map(|s| s.to_string())
|
||||
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
|
||||
|
||||
let custom_attrs =
|
||||
collect_custom_trace_attributes(&request_headers, span_attributes.as_ref().as_ref());
|
||||
let custom_attrs = collect_custom_trace_attributes(&request_headers, span_attributes.as_ref());
|
||||
|
||||
let request_span = info_span!(
|
||||
"routing_decision",
|
||||
|
|
@ -119,23 +119,7 @@ async fn routing_decision_inner(
|
|||
}
|
||||
});
|
||||
|
||||
// Extract or generate traceparent
|
||||
let traceparent: String = match request_headers
|
||||
.get(TRACE_PARENT_HEADER)
|
||||
.and_then(|h| h.to_str().ok())
|
||||
.map(|s| s.to_string())
|
||||
{
|
||||
Some(tp) => tp,
|
||||
None => {
|
||||
let trace_id = uuid::Uuid::new_v4().to_string().replace("-", "");
|
||||
let generated_tp = format!("00-{}-0000000000000000-01", trace_id);
|
||||
warn!(
|
||||
generated_traceparent = %generated_tp,
|
||||
"TRACE_PARENT header missing, generated new traceparent"
|
||||
);
|
||||
generated_tp
|
||||
}
|
||||
};
|
||||
let traceparent = extract_or_generate_traceparent(&request_headers);
|
||||
|
||||
// Extract trace_id from traceparent (format: 00-{trace_id}-{span_id}-{flags})
|
||||
let trace_id = traceparent
|
||||
|
|
|
|||
|
|
@ -1,406 +0,0 @@
|
|||
use bytes::Bytes;
|
||||
use common::configuration::ResolvedFilterChain;
|
||||
use http_body_util::combinators::BoxBody;
|
||||
use http_body_util::StreamBody;
|
||||
use hyper::body::Frame;
|
||||
use hyper::header::HeaderMap;
|
||||
use opentelemetry::trace::TraceContextExt;
|
||||
use opentelemetry::KeyValue;
|
||||
use std::time::Instant;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tokio_stream::StreamExt;
|
||||
use tracing::{debug, info, warn, Instrument};
|
||||
use tracing_opentelemetry::OpenTelemetrySpanExt;
|
||||
|
||||
use super::pipeline_processor::{PipelineError, PipelineProcessor};
|
||||
|
||||
const STREAM_BUFFER_SIZE: usize = 16;
|
||||
use crate::signals::{InteractionQuality, SignalAnalyzer, TextBasedSignalAnalyzer, FLAG_MARKER};
|
||||
use crate::tracing::{llm, set_service_name, signals as signal_constants};
|
||||
use hermesllm::apis::openai::Message;
|
||||
|
||||
/// Trait for processing streaming chunks
|
||||
/// Implementors can inject custom logic during streaming (e.g., hallucination detection, logging)
|
||||
pub trait StreamProcessor: Send + 'static {
|
||||
/// Process an incoming chunk of bytes
|
||||
fn process_chunk(&mut self, chunk: Bytes) -> Result<Option<Bytes>, String>;
|
||||
|
||||
/// Called when the first bytes are received (for time-to-first-token tracking)
|
||||
fn on_first_bytes(&mut self) {}
|
||||
|
||||
/// Called when streaming completes successfully
|
||||
fn on_complete(&mut self) {}
|
||||
|
||||
/// Called when streaming encounters an error
|
||||
fn on_error(&mut self, _error: &str) {}
|
||||
}
|
||||
|
||||
impl StreamProcessor for Box<dyn StreamProcessor> {
|
||||
fn process_chunk(&mut self, chunk: Bytes) -> Result<Option<Bytes>, String> {
|
||||
(**self).process_chunk(chunk)
|
||||
}
|
||||
fn on_first_bytes(&mut self) {
|
||||
(**self).on_first_bytes()
|
||||
}
|
||||
fn on_complete(&mut self) {
|
||||
(**self).on_complete()
|
||||
}
|
||||
fn on_error(&mut self, error: &str) {
|
||||
(**self).on_error(error)
|
||||
}
|
||||
}
|
||||
|
||||
/// A processor that tracks streaming metrics
|
||||
pub struct ObservableStreamProcessor {
|
||||
service_name: String,
|
||||
operation_name: String,
|
||||
total_bytes: usize,
|
||||
chunk_count: usize,
|
||||
start_time: Instant,
|
||||
time_to_first_token: Option<u128>,
|
||||
messages: Option<Vec<Message>>,
|
||||
}
|
||||
|
||||
impl ObservableStreamProcessor {
|
||||
/// Create a new passthrough processor
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `service_name` - The service name for this span (e.g., "plano(llm)")
|
||||
/// This will be set as the `service.name.override` attribute on the current span,
|
||||
/// allowing the ServiceNameOverrideExporter to route spans to different services.
|
||||
/// * `operation_name` - The current span operation name (e.g., "POST /v1/chat/completions gpt-4")
|
||||
/// Used to append the flag marker when concerning signals are detected.
|
||||
/// * `start_time` - When the request started (for duration calculation)
|
||||
/// * `messages` - Optional conversation messages for signal analysis
|
||||
pub fn new(
|
||||
service_name: impl Into<String>,
|
||||
operation_name: impl Into<String>,
|
||||
start_time: Instant,
|
||||
messages: Option<Vec<Message>>,
|
||||
) -> Self {
|
||||
let service_name = service_name.into();
|
||||
|
||||
// Set the service name override on the current span for OpenTelemetry export
|
||||
// This allows the ServiceNameOverrideExporter to route this span to the correct service
|
||||
set_service_name(&service_name);
|
||||
|
||||
Self {
|
||||
service_name,
|
||||
operation_name: operation_name.into(),
|
||||
total_bytes: 0,
|
||||
chunk_count: 0,
|
||||
start_time,
|
||||
time_to_first_token: None,
|
||||
messages,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl StreamProcessor for ObservableStreamProcessor {
|
||||
fn process_chunk(&mut self, chunk: Bytes) -> Result<Option<Bytes>, String> {
|
||||
self.total_bytes += chunk.len();
|
||||
self.chunk_count += 1;
|
||||
Ok(Some(chunk))
|
||||
}
|
||||
|
||||
fn on_first_bytes(&mut self) {
|
||||
// Record time to first token (only for streaming)
|
||||
if self.time_to_first_token.is_none() {
|
||||
self.time_to_first_token = Some(self.start_time.elapsed().as_millis());
|
||||
}
|
||||
}
|
||||
|
||||
fn on_complete(&mut self) {
|
||||
// Record time-to-first-token as an OTel span attribute + event (streaming only)
|
||||
if let Some(ttft) = self.time_to_first_token {
|
||||
let span = tracing::Span::current();
|
||||
let otel_context = span.context();
|
||||
let otel_span = otel_context.span();
|
||||
otel_span.set_attribute(KeyValue::new(llm::TIME_TO_FIRST_TOKEN_MS, ttft as i64));
|
||||
otel_span.add_event(
|
||||
llm::TIME_TO_FIRST_TOKEN_MS,
|
||||
vec![KeyValue::new(llm::TIME_TO_FIRST_TOKEN_MS, ttft as i64)],
|
||||
);
|
||||
}
|
||||
|
||||
// Analyze signals if messages are available and record as span attributes
|
||||
if let Some(ref messages) = self.messages {
|
||||
let analyzer: Box<dyn SignalAnalyzer> = Box::new(TextBasedSignalAnalyzer::new());
|
||||
let report = analyzer.analyze(messages);
|
||||
|
||||
// Get the current OTel span to set signal attributes
|
||||
let span = tracing::Span::current();
|
||||
let otel_context = span.context();
|
||||
let otel_span = otel_context.span();
|
||||
|
||||
// Add overall quality
|
||||
otel_span.set_attribute(KeyValue::new(
|
||||
signal_constants::QUALITY,
|
||||
format!("{:?}", report.overall_quality),
|
||||
));
|
||||
|
||||
// Add repair/follow-up metrics if concerning
|
||||
if report.follow_up.is_concerning || report.follow_up.repair_count > 0 {
|
||||
otel_span.set_attribute(KeyValue::new(
|
||||
signal_constants::REPAIR_COUNT,
|
||||
report.follow_up.repair_count as i64,
|
||||
));
|
||||
otel_span.set_attribute(KeyValue::new(
|
||||
signal_constants::REPAIR_RATIO,
|
||||
format!("{:.3}", report.follow_up.repair_ratio),
|
||||
));
|
||||
}
|
||||
|
||||
// Add frustration metrics
|
||||
if report.frustration.has_frustration {
|
||||
otel_span.set_attribute(KeyValue::new(
|
||||
signal_constants::FRUSTRATION_COUNT,
|
||||
report.frustration.frustration_count as i64,
|
||||
));
|
||||
otel_span.set_attribute(KeyValue::new(
|
||||
signal_constants::FRUSTRATION_SEVERITY,
|
||||
report.frustration.severity as i64,
|
||||
));
|
||||
}
|
||||
|
||||
// Add repetition metrics
|
||||
if report.repetition.has_looping {
|
||||
otel_span.set_attribute(KeyValue::new(
|
||||
signal_constants::REPETITION_COUNT,
|
||||
report.repetition.repetition_count as i64,
|
||||
));
|
||||
}
|
||||
|
||||
// Add escalation metrics
|
||||
if report.escalation.escalation_requested {
|
||||
otel_span
|
||||
.set_attribute(KeyValue::new(signal_constants::ESCALATION_REQUESTED, true));
|
||||
}
|
||||
|
||||
// Add positive feedback metrics
|
||||
if report.positive_feedback.has_positive_feedback {
|
||||
otel_span.set_attribute(KeyValue::new(
|
||||
signal_constants::POSITIVE_FEEDBACK_COUNT,
|
||||
report.positive_feedback.positive_count as i64,
|
||||
));
|
||||
}
|
||||
|
||||
// Flag the span name if any concerning signal is detected
|
||||
let should_flag = report.frustration.has_frustration
|
||||
|| report.repetition.has_looping
|
||||
|| report.escalation.escalation_requested
|
||||
|| matches!(
|
||||
report.overall_quality,
|
||||
InteractionQuality::Poor | InteractionQuality::Severe
|
||||
);
|
||||
|
||||
if should_flag {
|
||||
otel_span.update_name(format!("{} {}", self.operation_name, FLAG_MARKER));
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
service = %self.service_name,
|
||||
total_bytes = self.total_bytes,
|
||||
chunk_count = self.chunk_count,
|
||||
duration_ms = self.start_time.elapsed().as_millis(),
|
||||
time_to_first_token_ms = ?self.time_to_first_token,
|
||||
"streaming completed"
|
||||
);
|
||||
}
|
||||
|
||||
fn on_error(&mut self, error_msg: &str) {
|
||||
warn!(
|
||||
service = %self.service_name,
|
||||
error = error_msg,
|
||||
duration_ms = self.start_time.elapsed().as_millis(),
|
||||
"stream error"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Result of creating a streaming response
|
||||
pub struct StreamingResponse {
|
||||
pub body: BoxBody<Bytes, hyper::Error>,
|
||||
pub processor_handle: tokio::task::JoinHandle<()>,
|
||||
}
|
||||
|
||||
pub fn create_streaming_response<S, P>(mut byte_stream: S, mut processor: P) -> StreamingResponse
|
||||
where
|
||||
S: StreamExt<Item = Result<Bytes, reqwest::Error>> + Send + Unpin + 'static,
|
||||
P: StreamProcessor,
|
||||
{
|
||||
let (tx, rx) = mpsc::channel::<Bytes>(STREAM_BUFFER_SIZE);
|
||||
|
||||
// Capture the current span so the spawned task inherits the request context
|
||||
let current_span = tracing::Span::current();
|
||||
|
||||
// Spawn a task to process and forward chunks
|
||||
let processor_handle = tokio::spawn(
|
||||
async move {
|
||||
let mut is_first_chunk = true;
|
||||
|
||||
while let Some(item) = byte_stream.next().await {
|
||||
let chunk = match item {
|
||||
Ok(chunk) => chunk,
|
||||
Err(err) => {
|
||||
let err_msg = format!("Error receiving chunk: {:?}", err);
|
||||
warn!(error = %err_msg, "stream error");
|
||||
processor.on_error(&err_msg);
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
// Call on_first_bytes for the first chunk
|
||||
if is_first_chunk {
|
||||
processor.on_first_bytes();
|
||||
is_first_chunk = false;
|
||||
}
|
||||
|
||||
// Process the chunk
|
||||
match processor.process_chunk(chunk) {
|
||||
Ok(Some(processed_chunk)) => {
|
||||
if tx.send(processed_chunk).await.is_err() {
|
||||
warn!("receiver dropped");
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
// Skip this chunk
|
||||
continue;
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("processor error: {}", err);
|
||||
processor.on_error(&err);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
processor.on_complete();
|
||||
}
|
||||
.instrument(current_span),
|
||||
);
|
||||
|
||||
// Convert channel receiver to HTTP stream
|
||||
let stream = ReceiverStream::new(rx).map(|chunk| Ok::<_, hyper::Error>(Frame::data(chunk)));
|
||||
let stream_body = BoxBody::new(StreamBody::new(stream));
|
||||
|
||||
StreamingResponse {
|
||||
body: stream_body,
|
||||
processor_handle,
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a streaming response that processes each raw chunk through output filters.
|
||||
/// Filters receive the raw LLM response bytes and request path (any API shape; not limited to
|
||||
/// chat completions). On filter error mid-stream the original chunk is passed through (headers already sent).
|
||||
pub fn create_streaming_response_with_output_filter<S, P>(
|
||||
mut byte_stream: S,
|
||||
mut inner_processor: P,
|
||||
output_chain: ResolvedFilterChain,
|
||||
request_headers: HeaderMap,
|
||||
request_path: String,
|
||||
) -> StreamingResponse
|
||||
where
|
||||
S: StreamExt<Item = Result<Bytes, reqwest::Error>> + Send + Unpin + 'static,
|
||||
P: StreamProcessor,
|
||||
{
|
||||
let (tx, rx) = mpsc::channel::<Bytes>(STREAM_BUFFER_SIZE);
|
||||
let current_span = tracing::Span::current();
|
||||
|
||||
let processor_handle = tokio::spawn(
|
||||
async move {
|
||||
let mut is_first_chunk = true;
|
||||
let mut pipeline_processor = PipelineProcessor::default();
|
||||
let chain = output_chain.to_agent_filter_chain("output_filter");
|
||||
|
||||
while let Some(item) = byte_stream.next().await {
|
||||
let chunk = match item {
|
||||
Ok(chunk) => chunk,
|
||||
Err(err) => {
|
||||
let err_msg = format!("Error receiving chunk: {:?}", err);
|
||||
warn!(error = %err_msg, "stream error");
|
||||
inner_processor.on_error(&err_msg);
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
if is_first_chunk {
|
||||
inner_processor.on_first_bytes();
|
||||
is_first_chunk = false;
|
||||
}
|
||||
|
||||
// Pass raw chunk bytes through the output filter chain
|
||||
let processed_chunk = match pipeline_processor
|
||||
.process_raw_filter_chain(
|
||||
&chunk,
|
||||
&chain,
|
||||
&output_chain.agents,
|
||||
&request_headers,
|
||||
&request_path,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(filtered) => filtered,
|
||||
Err(PipelineError::ClientError {
|
||||
agent,
|
||||
status,
|
||||
body,
|
||||
}) => {
|
||||
warn!(
|
||||
agent = %agent,
|
||||
status = %status,
|
||||
body = %body,
|
||||
"output filter client error, passing through original chunk"
|
||||
);
|
||||
chunk
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(error = %e, "output filter error, passing through original chunk");
|
||||
chunk
|
||||
}
|
||||
};
|
||||
|
||||
// Pass through inner processor for metrics/observability
|
||||
match inner_processor.process_chunk(processed_chunk) {
|
||||
Ok(Some(final_chunk)) => {
|
||||
if tx.send(final_chunk).await.is_err() {
|
||||
warn!("receiver dropped");
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(None) => continue,
|
||||
Err(err) => {
|
||||
warn!("processor error: {}", err);
|
||||
inner_processor.on_error(&err);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
inner_processor.on_complete();
|
||||
debug!("output filter streaming completed");
|
||||
}
|
||||
.instrument(current_span),
|
||||
);
|
||||
|
||||
let stream = ReceiverStream::new(rx).map(|chunk| Ok::<_, hyper::Error>(Frame::data(chunk)));
|
||||
let stream_body = BoxBody::new(StreamBody::new(stream));
|
||||
|
||||
StreamingResponse {
|
||||
body: stream_body,
|
||||
processor_handle,
|
||||
}
|
||||
}
|
||||
|
||||
/// Truncates a message to the specified maximum length, adding "..." if truncated.
|
||||
pub fn truncate_message(message: &str, max_length: usize) -> String {
|
||||
if message.chars().count() > max_length {
|
||||
let truncated: String = message.chars().take(max_length).collect();
|
||||
format!("{}...", truncated)
|
||||
} else {
|
||||
message.to_string()
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue