mirror of
https://github.com/katanemo/plano.git
synced 2026-04-25 00:36:34 +02:00
refactor: file reorganization and code quality improvements
Made-with: Cursor
This commit is contained in:
parent
4845d83100
commit
c957016ac3
16 changed files with 165 additions and 201 deletions
|
|
@ -16,13 +16,13 @@ use crate::state::StateStorage;
|
|||
pub struct AppState {
|
||||
pub router_service: Arc<RouterService>,
|
||||
pub orchestrator_service: Arc<OrchestratorService>,
|
||||
pub model_aliases: Arc<Option<HashMap<String, ModelAlias>>>,
|
||||
pub model_aliases: Option<HashMap<String, ModelAlias>>,
|
||||
pub llm_providers: Arc<RwLock<LlmProviders>>,
|
||||
pub agents_list: Arc<RwLock<Option<Vec<Agent>>>>,
|
||||
pub listeners: Arc<RwLock<Vec<Listener>>>,
|
||||
pub agents_list: Option<Vec<Agent>>,
|
||||
pub listeners: Vec<Listener>,
|
||||
pub state_storage: Option<Arc<dyn StateStorage>>,
|
||||
pub llm_provider_url: String,
|
||||
pub span_attributes: Arc<Option<SpanAttributes>>,
|
||||
pub span_attributes: Option<SpanAttributes>,
|
||||
/// Shared HTTP client for upstream LLM requests (connection pooling / keep-alive).
|
||||
pub http_client: reqwest::Client,
|
||||
pub filter_pipeline: Arc<FilterPipeline>,
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ use hyper::Response;
|
|||
use serde_json::json;
|
||||
use tracing::{info, warn};
|
||||
|
||||
use super::response::ResponseHandler;
|
||||
use crate::handlers::response::ResponseHandler;
|
||||
|
||||
/// Build a JSON error response from an `AgentFilterChainError`, logging the
|
||||
/// full error chain along the way.
|
||||
|
|
@ -1,3 +1,4 @@
|
|||
pub mod errors;
|
||||
pub mod jsonrpc;
|
||||
pub mod orchestrator;
|
||||
pub mod pipeline;
|
||||
|
|
|
|||
|
|
@ -10,14 +10,13 @@ use http_body_util::combinators::BoxBody;
|
|||
use http_body_util::BodyExt;
|
||||
use hyper::{Request, Response};
|
||||
use opentelemetry::trace::get_active_span;
|
||||
use serde::ser::Error as _;
|
||||
use tracing::{debug, info, info_span, warn, Instrument};
|
||||
|
||||
use super::errors::build_error_chain_response;
|
||||
use super::pipeline::{PipelineError, PipelineProcessor};
|
||||
use super::selector::{AgentSelectionError, AgentSelector};
|
||||
use crate::app_state::AppState;
|
||||
use crate::handlers::errors::build_error_chain_response;
|
||||
use crate::handlers::request::extract_request_id;
|
||||
use crate::handlers::extract_request_id;
|
||||
use crate::handlers::response::ResponseHandler;
|
||||
use crate::tracing::{collect_custom_trace_attributes, operation_component, set_service_name};
|
||||
|
||||
|
|
@ -31,9 +30,19 @@ pub enum AgentFilterChainError {
|
|||
#[error("Response handling error: {0}")]
|
||||
Response(#[from] common::errors::BrightStaffError),
|
||||
#[error("Request parsing error: {0}")]
|
||||
RequestParsing(#[from] serde_json::Error),
|
||||
RequestParsing(String),
|
||||
#[error("HTTP error: {0}")]
|
||||
Http(#[from] hyper::Error),
|
||||
#[error("Unsupported endpoint: {0}")]
|
||||
UnsupportedEndpoint(String),
|
||||
#[error("No agents configured")]
|
||||
NoAgentsConfigured,
|
||||
#[error("Agent '{0}' not found in configuration")]
|
||||
AgentNotFound(String),
|
||||
#[error("No messages in conversation history")]
|
||||
EmptyHistory,
|
||||
#[error("Agent chain completed without producing a response")]
|
||||
IncompleteChain,
|
||||
}
|
||||
|
||||
pub async fn agent_chat(
|
||||
|
|
@ -42,7 +51,7 @@ pub async fn agent_chat(
|
|||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||
let request_id = extract_request_id(&request);
|
||||
let custom_attrs =
|
||||
collect_custom_trace_attributes(request.headers(), state.span_attributes.as_ref().as_ref());
|
||||
collect_custom_trace_attributes(request.headers(), state.span_attributes.as_ref());
|
||||
|
||||
// Create a span with request_id that will be included in all log lines
|
||||
let request_span = info_span!(
|
||||
|
|
@ -126,10 +135,7 @@ async fn parse_agent_request(
|
|||
.and_then(|name| name.to_str().ok());
|
||||
|
||||
// Find the appropriate listener
|
||||
let listener: common::configuration::Listener = {
|
||||
let listeners = state.listeners.read().await;
|
||||
agent_selector.find_listener(listener_name, &listeners)?
|
||||
};
|
||||
let listener = agent_selector.find_listener(listener_name, &state.listeners)?;
|
||||
|
||||
get_active_span(|span| {
|
||||
span.update_name(listener.name.to_string());
|
||||
|
|
@ -169,18 +175,14 @@ async fn parse_agent_request(
|
|||
|
||||
let api_type =
|
||||
SupportedAPIsFromClient::from_endpoint(request_path.as_str()).ok_or_else(|| {
|
||||
let err_msg = format!("Unsupported endpoint: {}", request_path);
|
||||
warn!("{}", err_msg);
|
||||
AgentFilterChainError::RequestParsing(serde_json::Error::custom(err_msg))
|
||||
warn!(path = %request_path, "unsupported endpoint");
|
||||
AgentFilterChainError::UnsupportedEndpoint(request_path.clone())
|
||||
})?;
|
||||
|
||||
let client_request = ProviderRequestType::try_from((&chat_request_bytes[..], &api_type))
|
||||
.map_err(|err| {
|
||||
warn!("failed to parse request as ProviderRequestType: {}", err);
|
||||
AgentFilterChainError::RequestParsing(serde_json::Error::custom(format!(
|
||||
"Failed to parse request: {}",
|
||||
err
|
||||
)))
|
||||
warn!(error = %err, "failed to parse request as ProviderRequestType");
|
||||
AgentFilterChainError::RequestParsing(format!("Failed to parse request: {}", err))
|
||||
})?;
|
||||
|
||||
let messages: Vec<OpenAIMessage> = client_request.get_messages();
|
||||
|
|
@ -216,13 +218,11 @@ async fn select_and_build_agent_map(
|
|||
),
|
||||
AgentFilterChainError,
|
||||
> {
|
||||
let agent_map = {
|
||||
let agents = state.agents_list.read().await;
|
||||
let agents = agents.as_ref().ok_or_else(|| {
|
||||
AgentFilterChainError::RequestParsing(serde_json::Error::custom("No agents configured"))
|
||||
})?;
|
||||
agent_selector.create_agent_map(agents)
|
||||
};
|
||||
let agents = state
|
||||
.agents_list
|
||||
.as_ref()
|
||||
.ok_or(AgentFilterChainError::NoAgentsConfigured)?;
|
||||
let agent_map = agent_selector.create_agent_map(agents);
|
||||
|
||||
let selection_start = Instant::now();
|
||||
let selected_agents = agent_selector
|
||||
|
|
@ -318,12 +318,9 @@ async fn execute_agent_chain(
|
|||
current_messages.clone()
|
||||
};
|
||||
|
||||
let agent = agent_map.get(&agent_name).ok_or_else(|| {
|
||||
AgentFilterChainError::RequestParsing(serde_json::Error::custom(format!(
|
||||
"Selected agent '{}' not found in configuration",
|
||||
agent_name
|
||||
)))
|
||||
})?;
|
||||
let agent = agent_map
|
||||
.get(&agent_name)
|
||||
.ok_or_else(|| AgentFilterChainError::AgentNotFound(agent_name.clone()))?;
|
||||
|
||||
debug!(agent = %agent_name, "invoking agent");
|
||||
|
||||
|
|
@ -387,9 +384,7 @@ async fn execute_agent_chain(
|
|||
|
||||
let Some(last_message) = current_messages.pop() else {
|
||||
warn!(agent = %agent_name, "no messages in conversation history");
|
||||
return Err(AgentFilterChainError::RequestParsing(
|
||||
serde_json::Error::custom("No messages in conversation history after agent response"),
|
||||
));
|
||||
return Err(AgentFilterChainError::EmptyHistory);
|
||||
};
|
||||
|
||||
current_messages.push(OpenAIMessage {
|
||||
|
|
@ -403,9 +398,7 @@ async fn execute_agent_chain(
|
|||
current_messages.push(last_message);
|
||||
}
|
||||
|
||||
Err(AgentFilterChainError::RequestParsing(
|
||||
serde_json::Error::custom("Agent chain completed without producing a response"),
|
||||
))
|
||||
Err(AgentFilterChainError::IncompleteChain)
|
||||
}
|
||||
|
||||
async fn handle_agent_chat_inner(
|
||||
|
|
|
|||
|
|
@ -84,7 +84,7 @@ impl AgentSelector {
|
|||
}
|
||||
|
||||
/// Convert agent descriptions to orchestration preferences
|
||||
async fn convert_agent_description_to_orchestration_preferences(
|
||||
fn convert_agent_description_to_orchestration_preferences(
|
||||
&self,
|
||||
agents: &[AgentFilterChain],
|
||||
) -> Vec<AgentUsagePreference> {
|
||||
|
|
@ -121,9 +121,7 @@ impl AgentSelector {
|
|||
return Ok(vec![agents[0].clone()]);
|
||||
}
|
||||
|
||||
let usage_preferences = self
|
||||
.convert_agent_description_to_orchestration_preferences(agents)
|
||||
.await;
|
||||
let usage_preferences = self.convert_agent_description_to_orchestration_preferences(agents);
|
||||
debug!(
|
||||
"Agents usage preferences for orchestration: {}",
|
||||
serde_json::to_string(&usage_preferences).unwrap_or_default()
|
||||
|
|
|
|||
|
|
@ -1,13 +1,13 @@
|
|||
use bytes::Bytes;
|
||||
use common::configuration::{FilterPipeline, ModelAlias};
|
||||
use common::consts::{ARCH_IS_STREAMING_HEADER, ARCH_PROVIDER_HINT_HEADER, TRACE_PARENT_HEADER};
|
||||
use common::consts::{ARCH_IS_STREAMING_HEADER, ARCH_PROVIDER_HINT_HEADER};
|
||||
use common::llm_providers::LlmProviders;
|
||||
use hermesllm::apis::openai::Message;
|
||||
use hermesllm::apis::openai_responses::InputParam;
|
||||
use hermesllm::clients::{SupportedAPIsFromClient, SupportedUpstreamAPIs};
|
||||
use hermesllm::{ProviderRequest, ProviderRequestType};
|
||||
use http_body_util::combinators::BoxBody;
|
||||
use http_body_util::{BodyExt, Full};
|
||||
use http_body_util::BodyExt;
|
||||
use hyper::header::{self};
|
||||
use hyper::{Request, Response, StatusCode};
|
||||
use opentelemetry::global;
|
||||
|
|
@ -18,29 +18,25 @@ use std::sync::Arc;
|
|||
use tokio::sync::RwLock;
|
||||
use tracing::{debug, info, info_span, warn, Instrument};
|
||||
|
||||
pub(crate) mod router;
|
||||
pub(crate) mod model_selection;
|
||||
|
||||
use crate::app_state::AppState;
|
||||
use crate::handlers::agents::pipeline::PipelineProcessor;
|
||||
use crate::handlers::request::extract_request_id;
|
||||
use crate::handlers::streaming::{
|
||||
create_streaming_response, create_streaming_response_with_output_filter, truncate_message,
|
||||
ObservableStreamProcessor, StreamProcessor,
|
||||
};
|
||||
use crate::handlers::extract_or_generate_traceparent;
|
||||
use crate::handlers::extract_request_id;
|
||||
use crate::handlers::full;
|
||||
use crate::state::response_state_processor::ResponsesStateProcessor;
|
||||
use crate::state::{
|
||||
extract_input_items, retrieve_and_combine_input, StateStorage, StateStorageError,
|
||||
};
|
||||
use crate::streaming::{
|
||||
create_streaming_response, create_streaming_response_with_output_filter, truncate_message,
|
||||
ObservableStreamProcessor, StreamProcessor,
|
||||
};
|
||||
use crate::tracing::{
|
||||
collect_custom_trace_attributes, llm as tracing_llm, operation_component, set_service_name,
|
||||
};
|
||||
use router::router_chat_get_upstream_model;
|
||||
|
||||
fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
|
||||
Full::new(chunk.into())
|
||||
.map_err(|never| match never {})
|
||||
.boxed()
|
||||
}
|
||||
use model_selection::router_chat_get_upstream_model;
|
||||
|
||||
pub async fn llm_chat(
|
||||
request: Request<hyper::body::Incoming>,
|
||||
|
|
@ -50,7 +46,7 @@ pub async fn llm_chat(
|
|||
let request_headers = request.headers().clone();
|
||||
let request_id = extract_request_id(&request);
|
||||
let custom_attrs =
|
||||
collect_custom_trace_attributes(&request_headers, state.span_attributes.as_ref().as_ref());
|
||||
collect_custom_trace_attributes(&request_headers, state.span_attributes.as_ref());
|
||||
|
||||
// Create a span with request_id that will be included in all log lines
|
||||
let request_span = info_span!(
|
||||
|
|
@ -162,10 +158,8 @@ async fn llm_chat_inner(
|
|||
.await
|
||||
{
|
||||
Ok(filtered_bytes) => {
|
||||
let api_type = SupportedAPIsFromClient::from_endpoint(
|
||||
request_path.as_str(),
|
||||
)
|
||||
.expect("endpoint validated in parse_and_validate_request");
|
||||
let api_type = SupportedAPIsFromClient::from_endpoint(request_path.as_str())
|
||||
.expect("endpoint validated in parse_and_validate_request");
|
||||
match ProviderRequestType::try_from((&filtered_bytes[..], &api_type)) {
|
||||
Ok(updated_request) => {
|
||||
client_request = updated_request;
|
||||
|
|
@ -173,13 +167,11 @@ async fn llm_chat_inner(
|
|||
}
|
||||
Err(parse_err) => {
|
||||
warn!(error = %parse_err, "input filter returned invalid request JSON");
|
||||
return Ok(
|
||||
common::errors::BrightStaffError::InvalidRequest(format!(
|
||||
"Input filter returned invalid request: {}",
|
||||
parse_err
|
||||
))
|
||||
.into_response(),
|
||||
);
|
||||
return Ok(common::errors::BrightStaffError::InvalidRequest(format!(
|
||||
"Input filter returned invalid request: {}",
|
||||
parse_err
|
||||
))
|
||||
.into_response());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -342,7 +334,7 @@ struct PreparedRequest {
|
|||
async fn parse_and_validate_request(
|
||||
request: Request<hyper::body::Incoming>,
|
||||
request_path: &str,
|
||||
model_aliases: &Arc<Option<HashMap<String, ModelAlias>>>,
|
||||
model_aliases: &Option<HashMap<String, ModelAlias>>,
|
||||
llm_providers: &Arc<RwLock<LlmProviders>>,
|
||||
) -> Result<PreparedRequest, Response<BoxBody<Bytes, hyper::Error>>> {
|
||||
let raw_bytes = request
|
||||
|
|
@ -386,11 +378,9 @@ async fn parse_and_validate_request(
|
|||
r
|
||||
})?;
|
||||
|
||||
let client_api = SupportedAPIsFromClient::from_endpoint(request_path);
|
||||
let is_responses_api_client = matches!(
|
||||
client_api,
|
||||
Some(SupportedAPIsFromClient::OpenAIResponsesAPI(_))
|
||||
);
|
||||
let is_responses_api_client =
|
||||
matches!(api_type, SupportedAPIsFromClient::OpenAIResponsesAPI(_));
|
||||
let client_api = Some(api_type);
|
||||
|
||||
let model_from_request = client_request.model().to_string();
|
||||
let temperature = client_request.get_temperature();
|
||||
|
|
@ -695,21 +685,20 @@ async fn send_upstream(
|
|||
Box::new(base_processor)
|
||||
};
|
||||
|
||||
let streaming_response =
|
||||
if let (Some(output_chain), Some(filter_headers)) = (
|
||||
filter_pipeline.output.as_ref().filter(|c| !c.is_empty()),
|
||||
output_filter_request_headers,
|
||||
) {
|
||||
create_streaming_response_with_output_filter(
|
||||
byte_stream,
|
||||
processor,
|
||||
output_chain.clone(),
|
||||
filter_headers,
|
||||
request_path.to_string(),
|
||||
)
|
||||
} else {
|
||||
create_streaming_response(byte_stream, processor)
|
||||
};
|
||||
let streaming_response = if let (Some(output_chain), Some(filter_headers)) = (
|
||||
filter_pipeline.output.as_ref().filter(|c| !c.is_empty()),
|
||||
output_filter_request_headers,
|
||||
) {
|
||||
create_streaming_response_with_output_filter(
|
||||
byte_stream,
|
||||
processor,
|
||||
output_chain.clone(),
|
||||
filter_headers,
|
||||
request_path.to_string(),
|
||||
)
|
||||
} else {
|
||||
create_streaming_response(byte_stream, processor)
|
||||
};
|
||||
|
||||
match response.body(streaming_response.body) {
|
||||
Ok(response) => Ok(response),
|
||||
|
|
@ -726,28 +715,11 @@ async fn send_upstream(
|
|||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Extract or generate a W3C `traceparent` header value.
|
||||
fn extract_or_generate_traceparent(headers: &hyper::HeaderMap) -> String {
|
||||
headers
|
||||
.get(TRACE_PARENT_HEADER)
|
||||
.and_then(|h| h.to_str().ok())
|
||||
.map(|s| s.to_string())
|
||||
.unwrap_or_else(|| {
|
||||
let trace_id = uuid::Uuid::new_v4().to_string().replace("-", "");
|
||||
let tp = format!("00-{}-0000000000000000-01", trace_id);
|
||||
warn!(
|
||||
generated_traceparent = %tp,
|
||||
"TRACE_PARENT header missing, generated new traceparent"
|
||||
);
|
||||
tp
|
||||
})
|
||||
}
|
||||
|
||||
/// Resolves model aliases by looking up the requested model in the model_aliases map.
|
||||
/// Returns the target model if an alias is found, otherwise returns the original model.
|
||||
fn resolve_model_alias(
|
||||
model_from_request: &str,
|
||||
model_aliases: &Arc<Option<HashMap<String, ModelAlias>>>,
|
||||
model_aliases: &Option<HashMap<String, ModelAlias>>,
|
||||
) -> String {
|
||||
if let Some(aliases) = model_aliases.as_ref() {
|
||||
if let Some(model_alias) = aliases.get(model_from_request) {
|
||||
|
|
|
|||
|
|
@ -5,8 +5,8 @@ use hyper::StatusCode;
|
|||
use std::sync::Arc;
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
use crate::handlers::streaming::truncate_message;
|
||||
use crate::router::llm::RouterService;
|
||||
use crate::streaming::truncate_message;
|
||||
use crate::tracing::routing;
|
||||
|
||||
pub struct RoutingResult {
|
||||
|
|
@ -1,12 +1,57 @@
|
|||
pub mod agents;
|
||||
pub mod errors;
|
||||
pub mod function_calling;
|
||||
pub mod llm;
|
||||
pub mod models;
|
||||
pub mod request;
|
||||
pub mod response;
|
||||
pub mod routing_service;
|
||||
pub mod streaming;
|
||||
|
||||
#[cfg(test)]
|
||||
mod integration_tests;
|
||||
|
||||
use bytes::Bytes;
|
||||
use common::consts::TRACE_PARENT_HEADER;
|
||||
use http_body_util::combinators::BoxBody;
|
||||
use http_body_util::{BodyExt, Empty, Full};
|
||||
use hyper::Request;
|
||||
use tracing::warn;
|
||||
|
||||
/// Wrap a chunk into a `BoxBody` for hyper responses.
|
||||
pub fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
|
||||
Full::new(chunk.into())
|
||||
.map_err(|never| match never {})
|
||||
.boxed()
|
||||
}
|
||||
|
||||
/// An empty HTTP body (used for 404 / OPTIONS responses).
|
||||
pub fn empty() -> BoxBody<Bytes, hyper::Error> {
|
||||
Empty::<Bytes>::new()
|
||||
.map_err(|never| match never {})
|
||||
.boxed()
|
||||
}
|
||||
|
||||
/// Extract request ID from incoming request headers, or generate a new UUID v4.
|
||||
pub fn extract_request_id<T>(request: &Request<T>) -> String {
|
||||
request
|
||||
.headers()
|
||||
.get(common::consts::REQUEST_ID_HEADER)
|
||||
.and_then(|h| h.to_str().ok())
|
||||
.map(|s| s.to_string())
|
||||
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string())
|
||||
}
|
||||
|
||||
/// Extract or generate a W3C `traceparent` header value.
|
||||
pub fn extract_or_generate_traceparent(headers: &hyper::HeaderMap) -> String {
|
||||
headers
|
||||
.get(TRACE_PARENT_HEADER)
|
||||
.and_then(|h| h.to_str().ok())
|
||||
.map(|s| s.to_string())
|
||||
.unwrap_or_else(|| {
|
||||
let trace_id = uuid::Uuid::new_v4().to_string().replace("-", "");
|
||||
let tp = format!("00-{}-0000000000000000-01", trace_id);
|
||||
warn!(
|
||||
generated_traceparent = %tp,
|
||||
"TRACE_PARENT header missing, generated new traceparent"
|
||||
);
|
||||
tp
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,10 +1,11 @@
|
|||
use bytes::Bytes;
|
||||
use common::llm_providers::LlmProviders;
|
||||
use http_body_util::{combinators::BoxBody, BodyExt, Full};
|
||||
use http_body_util::combinators::BoxBody;
|
||||
use hyper::{Response, StatusCode};
|
||||
use serde_json;
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::full;
|
||||
|
||||
pub async fn list_models(
|
||||
llm_providers: Arc<tokio::sync::RwLock<LlmProviders>>,
|
||||
) -> Response<BoxBody<Bytes, hyper::Error>> {
|
||||
|
|
@ -12,27 +13,15 @@ pub async fn list_models(
|
|||
let models = prov.to_models();
|
||||
|
||||
match serde_json::to_string(&models) {
|
||||
Ok(json) => {
|
||||
let body = Full::new(Bytes::from(json))
|
||||
.map_err(|never| match never {})
|
||||
.boxed();
|
||||
Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header("Content-Type", "application/json")
|
||||
.body(body)
|
||||
.unwrap()
|
||||
}
|
||||
Err(_) => {
|
||||
let body = Full::new(Bytes::from_static(
|
||||
b"{\"error\":\"Failed to serialize models\"}",
|
||||
))
|
||||
.map_err(|never| match never {})
|
||||
.boxed();
|
||||
Response::builder()
|
||||
.status(StatusCode::INTERNAL_SERVER_ERROR)
|
||||
.header("Content-Type", "application/json")
|
||||
.body(body)
|
||||
.unwrap()
|
||||
}
|
||||
Ok(json) => Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header("Content-Type", "application/json")
|
||||
.body(full(json))
|
||||
.unwrap(),
|
||||
Err(_) => Response::builder()
|
||||
.status(StatusCode::INTERNAL_SERVER_ERROR)
|
||||
.header("Content-Type", "application/json")
|
||||
.body(full("{\"error\":\"Failed to serialize models\"}"))
|
||||
.unwrap(),
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,11 +0,0 @@
|
|||
use hyper::Request;
|
||||
|
||||
/// Extract request ID from incoming request headers, or generate a new UUID v4.
|
||||
pub fn extract_request_id<T>(request: &Request<T>) -> String {
|
||||
request
|
||||
.headers()
|
||||
.get(common::consts::REQUEST_ID_HEADER)
|
||||
.and_then(|h| h.to_str().ok())
|
||||
.map(|s| s.to_string())
|
||||
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string())
|
||||
}
|
||||
|
|
@ -4,7 +4,7 @@ use hermesllm::apis::OpenAIApi;
|
|||
use hermesllm::clients::{SupportedAPIsFromClient, SupportedUpstreamAPIs};
|
||||
use hermesllm::SseEvent;
|
||||
use http_body_util::combinators::BoxBody;
|
||||
use http_body_util::{BodyExt, Full, StreamBody};
|
||||
use http_body_util::StreamBody;
|
||||
use hyper::body::Frame;
|
||||
use hyper::Response;
|
||||
use tokio::sync::mpsc;
|
||||
|
|
@ -12,6 +12,8 @@ use tokio_stream::wrappers::ReceiverStream;
|
|||
use tokio_stream::StreamExt;
|
||||
use tracing::{info, warn, Instrument};
|
||||
|
||||
use super::full;
|
||||
|
||||
/// Service for handling HTTP responses and streaming
|
||||
pub struct ResponseHandler;
|
||||
|
||||
|
|
@ -22,9 +24,7 @@ impl ResponseHandler {
|
|||
|
||||
/// Create a full response body from bytes
|
||||
pub fn create_full_body<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
|
||||
Full::new(chunk.into())
|
||||
.map_err(|never| match never {})
|
||||
.boxed()
|
||||
full(chunk)
|
||||
}
|
||||
|
||||
/// Create a JSON error response with BAD_REQUEST status
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
use bytes::Bytes;
|
||||
use common::configuration::{ModelUsagePreference, SpanAttributes};
|
||||
use common::consts::{REQUEST_ID_HEADER, TRACE_PARENT_HEADER};
|
||||
use common::consts::REQUEST_ID_HEADER;
|
||||
use common::errors::BrightStaffError;
|
||||
use hermesllm::clients::SupportedAPIsFromClient;
|
||||
use hermesllm::ProviderRequestType;
|
||||
|
|
@ -10,7 +10,8 @@ use hyper::{Request, Response, StatusCode};
|
|||
use std::sync::Arc;
|
||||
use tracing::{debug, info, info_span, warn, Instrument};
|
||||
|
||||
use crate::handlers::llm::router::router_chat_get_upstream_model;
|
||||
use super::extract_or_generate_traceparent;
|
||||
use crate::handlers::llm::model_selection::router_chat_get_upstream_model;
|
||||
use crate::router::llm::RouterService;
|
||||
use crate::tracing::{collect_custom_trace_attributes, operation_component, set_service_name};
|
||||
|
||||
|
|
@ -72,7 +73,7 @@ pub async fn routing_decision(
|
|||
request: Request<hyper::body::Incoming>,
|
||||
router_service: Arc<RouterService>,
|
||||
request_path: String,
|
||||
span_attributes: Arc<Option<SpanAttributes>>,
|
||||
span_attributes: &Option<SpanAttributes>,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||
let request_headers = request.headers().clone();
|
||||
let request_id: String = request_headers
|
||||
|
|
@ -81,8 +82,7 @@ pub async fn routing_decision(
|
|||
.map(|s| s.to_string())
|
||||
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
|
||||
|
||||
let custom_attrs =
|
||||
collect_custom_trace_attributes(&request_headers, span_attributes.as_ref().as_ref());
|
||||
let custom_attrs = collect_custom_trace_attributes(&request_headers, span_attributes.as_ref());
|
||||
|
||||
let request_span = info_span!(
|
||||
"routing_decision",
|
||||
|
|
@ -119,23 +119,7 @@ async fn routing_decision_inner(
|
|||
}
|
||||
});
|
||||
|
||||
// Extract or generate traceparent
|
||||
let traceparent: String = match request_headers
|
||||
.get(TRACE_PARENT_HEADER)
|
||||
.and_then(|h| h.to_str().ok())
|
||||
.map(|s| s.to_string())
|
||||
{
|
||||
Some(tp) => tp,
|
||||
None => {
|
||||
let trace_id = uuid::Uuid::new_v4().to_string().replace("-", "");
|
||||
let generated_tp = format!("00-{}-0000000000000000-01", trace_id);
|
||||
warn!(
|
||||
generated_traceparent = %generated_tp,
|
||||
"TRACE_PARENT header missing, generated new traceparent"
|
||||
);
|
||||
generated_tp
|
||||
}
|
||||
};
|
||||
let traceparent = extract_or_generate_traceparent(&request_headers);
|
||||
|
||||
// Extract trace_id from traceparent (format: 00-{trace_id}-{span_id}-{flags})
|
||||
let trace_id = traceparent
|
||||
|
|
|
|||
|
|
@ -3,4 +3,5 @@ pub mod handlers;
|
|||
pub mod router;
|
||||
pub mod signals;
|
||||
pub mod state;
|
||||
pub mod streaming;
|
||||
pub mod tracing;
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
use brightstaff::app_state::AppState;
|
||||
use brightstaff::handlers::agents::orchestrator::agent_chat;
|
||||
use brightstaff::handlers::empty;
|
||||
use brightstaff::handlers::function_calling::function_calling_chat_handler;
|
||||
use brightstaff::handlers::llm::llm_chat;
|
||||
use brightstaff::handlers::models::list_models;
|
||||
|
|
@ -16,7 +17,7 @@ use common::configuration::{
|
|||
};
|
||||
use common::consts::{CHAT_COMPLETIONS_PATH, MESSAGES_PATH, OPENAI_RESPONSES_API_PATH};
|
||||
use common::llm_providers::LlmProviders;
|
||||
use http_body_util::{combinators::BoxBody, BodyExt, Empty};
|
||||
use http_body_util::combinators::BoxBody;
|
||||
use hyper::body::Incoming;
|
||||
use hyper::header::HeaderValue;
|
||||
use hyper::server::conn::http1;
|
||||
|
|
@ -39,17 +40,6 @@ const DEFAULT_ROUTING_MODEL_NAME: &str = "Arch-Router";
|
|||
const DEFAULT_ORCHESTRATOR_LLM_PROVIDER: &str = "plano-orchestrator";
|
||||
const DEFAULT_ORCHESTRATOR_MODEL_NAME: &str = "Plano-Orchestrator";
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// An empty HTTP body (used for 404 / OPTIONS responses).
|
||||
fn empty() -> BoxBody<Bytes, hyper::Error> {
|
||||
Empty::<Bytes>::new()
|
||||
.map_err(|never| match never {})
|
||||
.boxed()
|
||||
}
|
||||
|
||||
/// CORS pre-flight response for the models endpoint.
|
||||
fn cors_preflight() -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||
let mut response = Response::new(empty());
|
||||
|
|
@ -139,7 +129,11 @@ async fn init_app_state(
|
|||
filter_ids.map(|ids| {
|
||||
let agents = ids
|
||||
.iter()
|
||||
.filter_map(|id| global_agent_map.get(id).map(|a: &Agent| (id.clone(), a.clone())))
|
||||
.filter_map(|id| {
|
||||
global_agent_map
|
||||
.get(id)
|
||||
.map(|a: &Agent| (id.clone(), a.clone()))
|
||||
})
|
||||
.collect();
|
||||
ResolvedFilterChain {
|
||||
filter_ids: ids,
|
||||
|
|
@ -197,20 +191,18 @@ async fn init_app_state(
|
|||
|
||||
let state_storage = init_state_storage(config).await?;
|
||||
|
||||
let span_attributes = Arc::new(
|
||||
config
|
||||
.tracing
|
||||
.as_ref()
|
||||
.and_then(|tracing| tracing.span_attributes.clone()),
|
||||
);
|
||||
let span_attributes = config
|
||||
.tracing
|
||||
.as_ref()
|
||||
.and_then(|tracing| tracing.span_attributes.clone());
|
||||
|
||||
Ok(AppState {
|
||||
router_service,
|
||||
orchestrator_service,
|
||||
model_aliases: Arc::new(config.model_aliases.clone()),
|
||||
model_aliases: config.model_aliases.clone(),
|
||||
llm_providers: Arc::new(RwLock::new(llm_providers)),
|
||||
agents_list: Arc::new(RwLock::new(Some(all_agents))),
|
||||
listeners: Arc::new(RwLock::new(config.listeners.clone())),
|
||||
agents_list: Some(all_agents),
|
||||
listeners: config.listeners.clone(),
|
||||
state_storage,
|
||||
llm_provider_url,
|
||||
span_attributes,
|
||||
|
|
@ -294,7 +286,7 @@ async fn route(
|
|||
req,
|
||||
Arc::clone(&state.router_service),
|
||||
stripped,
|
||||
Arc::clone(&state.span_attributes),
|
||||
&state.span_attributes,
|
||||
)
|
||||
.with_context(parent_cx)
|
||||
.await;
|
||||
|
|
|
|||
|
|
@ -7,8 +7,8 @@ use std::io::Read;
|
|||
use std::sync::Arc;
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
use crate::handlers::streaming::StreamProcessor;
|
||||
use crate::state::{OpenAIConversationState, StateStorage};
|
||||
use crate::streaming::StreamProcessor;
|
||||
|
||||
/// Processor that wraps another processor and handles v1/responses state management
|
||||
/// Captures response_id and output from streaming responses, stores state after completion
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ use tokio_stream::StreamExt;
|
|||
use tracing::{debug, info, warn, Instrument};
|
||||
use tracing_opentelemetry::OpenTelemetrySpanExt;
|
||||
|
||||
use super::agents::pipeline::{PipelineError, PipelineProcessor};
|
||||
use crate::handlers::agents::pipeline::{PipelineError, PipelineProcessor};
|
||||
|
||||
const STREAM_BUFFER_SIZE: usize = 16;
|
||||
use crate::signals::{InteractionQuality, SignalAnalyzer, TextBasedSignalAnalyzer, FLAG_MARKER};
|
||||
Loading…
Add table
Add a link
Reference in a new issue