diff --git a/crates/brightstaff/src/app_state.rs b/crates/brightstaff/src/app_state.rs index 33e8d0d6..57707f6e 100644 --- a/crates/brightstaff/src/app_state.rs +++ b/crates/brightstaff/src/app_state.rs @@ -16,13 +16,13 @@ use crate::state::StateStorage; pub struct AppState { pub router_service: Arc, pub orchestrator_service: Arc, - pub model_aliases: Arc>>, + pub model_aliases: Option>, pub llm_providers: Arc>, - pub agents_list: Arc>>>, - pub listeners: Arc>>, + pub agents_list: Option>, + pub listeners: Vec, pub state_storage: Option>, pub llm_provider_url: String, - pub span_attributes: Arc>, + pub span_attributes: Option, /// Shared HTTP client for upstream LLM requests (connection pooling / keep-alive). pub http_client: reqwest::Client, pub filter_pipeline: Arc, diff --git a/crates/brightstaff/src/handlers/errors.rs b/crates/brightstaff/src/handlers/agents/errors.rs similarity index 96% rename from crates/brightstaff/src/handlers/errors.rs rename to crates/brightstaff/src/handlers/agents/errors.rs index 947ec57a..478b4380 100644 --- a/crates/brightstaff/src/handlers/errors.rs +++ b/crates/brightstaff/src/handlers/agents/errors.rs @@ -4,7 +4,7 @@ use hyper::Response; use serde_json::json; use tracing::{info, warn}; -use super::response::ResponseHandler; +use crate::handlers::response::ResponseHandler; /// Build a JSON error response from an `AgentFilterChainError`, logging the /// full error chain along the way. diff --git a/crates/brightstaff/src/handlers/agents/mod.rs b/crates/brightstaff/src/handlers/agents/mod.rs index 068cebb8..5b507907 100644 --- a/crates/brightstaff/src/handlers/agents/mod.rs +++ b/crates/brightstaff/src/handlers/agents/mod.rs @@ -1,3 +1,4 @@ +pub mod errors; pub mod jsonrpc; pub mod orchestrator; pub mod pipeline; diff --git a/crates/brightstaff/src/handlers/agents/orchestrator.rs b/crates/brightstaff/src/handlers/agents/orchestrator.rs index 40b4d5e3..8ece914e 100644 --- a/crates/brightstaff/src/handlers/agents/orchestrator.rs +++ b/crates/brightstaff/src/handlers/agents/orchestrator.rs @@ -10,14 +10,13 @@ use http_body_util::combinators::BoxBody; use http_body_util::BodyExt; use hyper::{Request, Response}; use opentelemetry::trace::get_active_span; -use serde::ser::Error as _; use tracing::{debug, info, info_span, warn, Instrument}; +use super::errors::build_error_chain_response; use super::pipeline::{PipelineError, PipelineProcessor}; use super::selector::{AgentSelectionError, AgentSelector}; use crate::app_state::AppState; -use crate::handlers::errors::build_error_chain_response; -use crate::handlers::request::extract_request_id; +use crate::handlers::extract_request_id; use crate::handlers::response::ResponseHandler; use crate::tracing::{collect_custom_trace_attributes, operation_component, set_service_name}; @@ -31,9 +30,19 @@ pub enum AgentFilterChainError { #[error("Response handling error: {0}")] Response(#[from] common::errors::BrightStaffError), #[error("Request parsing error: {0}")] - RequestParsing(#[from] serde_json::Error), + RequestParsing(String), #[error("HTTP error: {0}")] Http(#[from] hyper::Error), + #[error("Unsupported endpoint: {0}")] + UnsupportedEndpoint(String), + #[error("No agents configured")] + NoAgentsConfigured, + #[error("Agent '{0}' not found in configuration")] + AgentNotFound(String), + #[error("No messages in conversation history")] + EmptyHistory, + #[error("Agent chain completed without producing a response")] + IncompleteChain, } pub async fn agent_chat( @@ -42,7 +51,7 @@ pub async fn agent_chat( ) -> Result>, hyper::Error> { let request_id = extract_request_id(&request); let custom_attrs = - collect_custom_trace_attributes(request.headers(), state.span_attributes.as_ref().as_ref()); + collect_custom_trace_attributes(request.headers(), state.span_attributes.as_ref()); // Create a span with request_id that will be included in all log lines let request_span = info_span!( @@ -126,10 +135,7 @@ async fn parse_agent_request( .and_then(|name| name.to_str().ok()); // Find the appropriate listener - let listener: common::configuration::Listener = { - let listeners = state.listeners.read().await; - agent_selector.find_listener(listener_name, &listeners)? - }; + let listener = agent_selector.find_listener(listener_name, &state.listeners)?; get_active_span(|span| { span.update_name(listener.name.to_string()); @@ -169,18 +175,14 @@ async fn parse_agent_request( let api_type = SupportedAPIsFromClient::from_endpoint(request_path.as_str()).ok_or_else(|| { - let err_msg = format!("Unsupported endpoint: {}", request_path); - warn!("{}", err_msg); - AgentFilterChainError::RequestParsing(serde_json::Error::custom(err_msg)) + warn!(path = %request_path, "unsupported endpoint"); + AgentFilterChainError::UnsupportedEndpoint(request_path.clone()) })?; let client_request = ProviderRequestType::try_from((&chat_request_bytes[..], &api_type)) .map_err(|err| { - warn!("failed to parse request as ProviderRequestType: {}", err); - AgentFilterChainError::RequestParsing(serde_json::Error::custom(format!( - "Failed to parse request: {}", - err - ))) + warn!(error = %err, "failed to parse request as ProviderRequestType"); + AgentFilterChainError::RequestParsing(format!("Failed to parse request: {}", err)) })?; let messages: Vec = client_request.get_messages(); @@ -216,13 +218,11 @@ async fn select_and_build_agent_map( ), AgentFilterChainError, > { - let agent_map = { - let agents = state.agents_list.read().await; - let agents = agents.as_ref().ok_or_else(|| { - AgentFilterChainError::RequestParsing(serde_json::Error::custom("No agents configured")) - })?; - agent_selector.create_agent_map(agents) - }; + let agents = state + .agents_list + .as_ref() + .ok_or(AgentFilterChainError::NoAgentsConfigured)?; + let agent_map = agent_selector.create_agent_map(agents); let selection_start = Instant::now(); let selected_agents = agent_selector @@ -318,12 +318,9 @@ async fn execute_agent_chain( current_messages.clone() }; - let agent = agent_map.get(&agent_name).ok_or_else(|| { - AgentFilterChainError::RequestParsing(serde_json::Error::custom(format!( - "Selected agent '{}' not found in configuration", - agent_name - ))) - })?; + let agent = agent_map + .get(&agent_name) + .ok_or_else(|| AgentFilterChainError::AgentNotFound(agent_name.clone()))?; debug!(agent = %agent_name, "invoking agent"); @@ -387,9 +384,7 @@ async fn execute_agent_chain( let Some(last_message) = current_messages.pop() else { warn!(agent = %agent_name, "no messages in conversation history"); - return Err(AgentFilterChainError::RequestParsing( - serde_json::Error::custom("No messages in conversation history after agent response"), - )); + return Err(AgentFilterChainError::EmptyHistory); }; current_messages.push(OpenAIMessage { @@ -403,9 +398,7 @@ async fn execute_agent_chain( current_messages.push(last_message); } - Err(AgentFilterChainError::RequestParsing( - serde_json::Error::custom("Agent chain completed without producing a response"), - )) + Err(AgentFilterChainError::IncompleteChain) } async fn handle_agent_chat_inner( diff --git a/crates/brightstaff/src/handlers/agents/selector.rs b/crates/brightstaff/src/handlers/agents/selector.rs index 7f35fd31..8225a003 100644 --- a/crates/brightstaff/src/handlers/agents/selector.rs +++ b/crates/brightstaff/src/handlers/agents/selector.rs @@ -84,7 +84,7 @@ impl AgentSelector { } /// Convert agent descriptions to orchestration preferences - async fn convert_agent_description_to_orchestration_preferences( + fn convert_agent_description_to_orchestration_preferences( &self, agents: &[AgentFilterChain], ) -> Vec { @@ -121,9 +121,7 @@ impl AgentSelector { return Ok(vec![agents[0].clone()]); } - let usage_preferences = self - .convert_agent_description_to_orchestration_preferences(agents) - .await; + let usage_preferences = self.convert_agent_description_to_orchestration_preferences(agents); debug!( "Agents usage preferences for orchestration: {}", serde_json::to_string(&usage_preferences).unwrap_or_default() diff --git a/crates/brightstaff/src/handlers/llm/mod.rs b/crates/brightstaff/src/handlers/llm/mod.rs index 6f689ad8..9d4a2dfb 100644 --- a/crates/brightstaff/src/handlers/llm/mod.rs +++ b/crates/brightstaff/src/handlers/llm/mod.rs @@ -1,13 +1,13 @@ use bytes::Bytes; use common::configuration::{FilterPipeline, ModelAlias}; -use common::consts::{ARCH_IS_STREAMING_HEADER, ARCH_PROVIDER_HINT_HEADER, TRACE_PARENT_HEADER}; +use common::consts::{ARCH_IS_STREAMING_HEADER, ARCH_PROVIDER_HINT_HEADER}; use common::llm_providers::LlmProviders; use hermesllm::apis::openai::Message; use hermesllm::apis::openai_responses::InputParam; use hermesllm::clients::{SupportedAPIsFromClient, SupportedUpstreamAPIs}; use hermesllm::{ProviderRequest, ProviderRequestType}; use http_body_util::combinators::BoxBody; -use http_body_util::{BodyExt, Full}; +use http_body_util::BodyExt; use hyper::header::{self}; use hyper::{Request, Response, StatusCode}; use opentelemetry::global; @@ -18,29 +18,25 @@ use std::sync::Arc; use tokio::sync::RwLock; use tracing::{debug, info, info_span, warn, Instrument}; -pub(crate) mod router; +pub(crate) mod model_selection; use crate::app_state::AppState; use crate::handlers::agents::pipeline::PipelineProcessor; -use crate::handlers::request::extract_request_id; -use crate::handlers::streaming::{ - create_streaming_response, create_streaming_response_with_output_filter, truncate_message, - ObservableStreamProcessor, StreamProcessor, -}; +use crate::handlers::extract_or_generate_traceparent; +use crate::handlers::extract_request_id; +use crate::handlers::full; use crate::state::response_state_processor::ResponsesStateProcessor; use crate::state::{ extract_input_items, retrieve_and_combine_input, StateStorage, StateStorageError, }; +use crate::streaming::{ + create_streaming_response, create_streaming_response_with_output_filter, truncate_message, + ObservableStreamProcessor, StreamProcessor, +}; use crate::tracing::{ collect_custom_trace_attributes, llm as tracing_llm, operation_component, set_service_name, }; -use router::router_chat_get_upstream_model; - -fn full>(chunk: T) -> BoxBody { - Full::new(chunk.into()) - .map_err(|never| match never {}) - .boxed() -} +use model_selection::router_chat_get_upstream_model; pub async fn llm_chat( request: Request, @@ -50,7 +46,7 @@ pub async fn llm_chat( let request_headers = request.headers().clone(); let request_id = extract_request_id(&request); let custom_attrs = - collect_custom_trace_attributes(&request_headers, state.span_attributes.as_ref().as_ref()); + collect_custom_trace_attributes(&request_headers, state.span_attributes.as_ref()); // Create a span with request_id that will be included in all log lines let request_span = info_span!( @@ -162,10 +158,8 @@ async fn llm_chat_inner( .await { Ok(filtered_bytes) => { - let api_type = SupportedAPIsFromClient::from_endpoint( - request_path.as_str(), - ) - .expect("endpoint validated in parse_and_validate_request"); + let api_type = SupportedAPIsFromClient::from_endpoint(request_path.as_str()) + .expect("endpoint validated in parse_and_validate_request"); match ProviderRequestType::try_from((&filtered_bytes[..], &api_type)) { Ok(updated_request) => { client_request = updated_request; @@ -173,13 +167,11 @@ async fn llm_chat_inner( } Err(parse_err) => { warn!(error = %parse_err, "input filter returned invalid request JSON"); - return Ok( - common::errors::BrightStaffError::InvalidRequest(format!( - "Input filter returned invalid request: {}", - parse_err - )) - .into_response(), - ); + return Ok(common::errors::BrightStaffError::InvalidRequest(format!( + "Input filter returned invalid request: {}", + parse_err + )) + .into_response()); } } } @@ -342,7 +334,7 @@ struct PreparedRequest { async fn parse_and_validate_request( request: Request, request_path: &str, - model_aliases: &Arc>>, + model_aliases: &Option>, llm_providers: &Arc>, ) -> Result>> { let raw_bytes = request @@ -386,11 +378,9 @@ async fn parse_and_validate_request( r })?; - let client_api = SupportedAPIsFromClient::from_endpoint(request_path); - let is_responses_api_client = matches!( - client_api, - Some(SupportedAPIsFromClient::OpenAIResponsesAPI(_)) - ); + let is_responses_api_client = + matches!(api_type, SupportedAPIsFromClient::OpenAIResponsesAPI(_)); + let client_api = Some(api_type); let model_from_request = client_request.model().to_string(); let temperature = client_request.get_temperature(); @@ -695,21 +685,20 @@ async fn send_upstream( Box::new(base_processor) }; - let streaming_response = - if let (Some(output_chain), Some(filter_headers)) = ( - filter_pipeline.output.as_ref().filter(|c| !c.is_empty()), - output_filter_request_headers, - ) { - create_streaming_response_with_output_filter( - byte_stream, - processor, - output_chain.clone(), - filter_headers, - request_path.to_string(), - ) - } else { - create_streaming_response(byte_stream, processor) - }; + let streaming_response = if let (Some(output_chain), Some(filter_headers)) = ( + filter_pipeline.output.as_ref().filter(|c| !c.is_empty()), + output_filter_request_headers, + ) { + create_streaming_response_with_output_filter( + byte_stream, + processor, + output_chain.clone(), + filter_headers, + request_path.to_string(), + ) + } else { + create_streaming_response(byte_stream, processor) + }; match response.body(streaming_response.body) { Ok(response) => Ok(response), @@ -726,28 +715,11 @@ async fn send_upstream( // Helpers // --------------------------------------------------------------------------- -/// Extract or generate a W3C `traceparent` header value. -fn extract_or_generate_traceparent(headers: &hyper::HeaderMap) -> String { - headers - .get(TRACE_PARENT_HEADER) - .and_then(|h| h.to_str().ok()) - .map(|s| s.to_string()) - .unwrap_or_else(|| { - let trace_id = uuid::Uuid::new_v4().to_string().replace("-", ""); - let tp = format!("00-{}-0000000000000000-01", trace_id); - warn!( - generated_traceparent = %tp, - "TRACE_PARENT header missing, generated new traceparent" - ); - tp - }) -} - /// Resolves model aliases by looking up the requested model in the model_aliases map. /// Returns the target model if an alias is found, otherwise returns the original model. fn resolve_model_alias( model_from_request: &str, - model_aliases: &Arc>>, + model_aliases: &Option>, ) -> String { if let Some(aliases) = model_aliases.as_ref() { if let Some(model_alias) = aliases.get(model_from_request) { diff --git a/crates/brightstaff/src/handlers/llm/router.rs b/crates/brightstaff/src/handlers/llm/model_selection.rs similarity index 99% rename from crates/brightstaff/src/handlers/llm/router.rs rename to crates/brightstaff/src/handlers/llm/model_selection.rs index abfe5c7a..455b7c0e 100644 --- a/crates/brightstaff/src/handlers/llm/router.rs +++ b/crates/brightstaff/src/handlers/llm/model_selection.rs @@ -5,8 +5,8 @@ use hyper::StatusCode; use std::sync::Arc; use tracing::{debug, info, warn}; -use crate::handlers::streaming::truncate_message; use crate::router::llm::RouterService; +use crate::streaming::truncate_message; use crate::tracing::routing; pub struct RoutingResult { diff --git a/crates/brightstaff/src/handlers/mod.rs b/crates/brightstaff/src/handlers/mod.rs index 8e624f9e..485a0438 100644 --- a/crates/brightstaff/src/handlers/mod.rs +++ b/crates/brightstaff/src/handlers/mod.rs @@ -1,12 +1,57 @@ pub mod agents; -pub mod errors; pub mod function_calling; pub mod llm; pub mod models; -pub mod request; pub mod response; pub mod routing_service; -pub mod streaming; #[cfg(test)] mod integration_tests; + +use bytes::Bytes; +use common::consts::TRACE_PARENT_HEADER; +use http_body_util::combinators::BoxBody; +use http_body_util::{BodyExt, Empty, Full}; +use hyper::Request; +use tracing::warn; + +/// Wrap a chunk into a `BoxBody` for hyper responses. +pub fn full>(chunk: T) -> BoxBody { + Full::new(chunk.into()) + .map_err(|never| match never {}) + .boxed() +} + +/// An empty HTTP body (used for 404 / OPTIONS responses). +pub fn empty() -> BoxBody { + Empty::::new() + .map_err(|never| match never {}) + .boxed() +} + +/// Extract request ID from incoming request headers, or generate a new UUID v4. +pub fn extract_request_id(request: &Request) -> String { + request + .headers() + .get(common::consts::REQUEST_ID_HEADER) + .and_then(|h| h.to_str().ok()) + .map(|s| s.to_string()) + .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()) +} + +/// Extract or generate a W3C `traceparent` header value. +pub fn extract_or_generate_traceparent(headers: &hyper::HeaderMap) -> String { + headers + .get(TRACE_PARENT_HEADER) + .and_then(|h| h.to_str().ok()) + .map(|s| s.to_string()) + .unwrap_or_else(|| { + let trace_id = uuid::Uuid::new_v4().to_string().replace("-", ""); + let tp = format!("00-{}-0000000000000000-01", trace_id); + warn!( + generated_traceparent = %tp, + "TRACE_PARENT header missing, generated new traceparent" + ); + tp + }) +} diff --git a/crates/brightstaff/src/handlers/models.rs b/crates/brightstaff/src/handlers/models.rs index a29d5e90..9fd5fe07 100644 --- a/crates/brightstaff/src/handlers/models.rs +++ b/crates/brightstaff/src/handlers/models.rs @@ -1,10 +1,11 @@ use bytes::Bytes; use common::llm_providers::LlmProviders; -use http_body_util::{combinators::BoxBody, BodyExt, Full}; +use http_body_util::combinators::BoxBody; use hyper::{Response, StatusCode}; -use serde_json; use std::sync::Arc; +use super::full; + pub async fn list_models( llm_providers: Arc>, ) -> Response> { @@ -12,27 +13,15 @@ pub async fn list_models( let models = prov.to_models(); match serde_json::to_string(&models) { - Ok(json) => { - let body = Full::new(Bytes::from(json)) - .map_err(|never| match never {}) - .boxed(); - Response::builder() - .status(StatusCode::OK) - .header("Content-Type", "application/json") - .body(body) - .unwrap() - } - Err(_) => { - let body = Full::new(Bytes::from_static( - b"{\"error\":\"Failed to serialize models\"}", - )) - .map_err(|never| match never {}) - .boxed(); - Response::builder() - .status(StatusCode::INTERNAL_SERVER_ERROR) - .header("Content-Type", "application/json") - .body(body) - .unwrap() - } + Ok(json) => Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "application/json") + .body(full(json)) + .unwrap(), + Err(_) => Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .header("Content-Type", "application/json") + .body(full("{\"error\":\"Failed to serialize models\"}")) + .unwrap(), } } diff --git a/crates/brightstaff/src/handlers/request.rs b/crates/brightstaff/src/handlers/request.rs deleted file mode 100644 index c528f4fd..00000000 --- a/crates/brightstaff/src/handlers/request.rs +++ /dev/null @@ -1,11 +0,0 @@ -use hyper::Request; - -/// Extract request ID from incoming request headers, or generate a new UUID v4. -pub fn extract_request_id(request: &Request) -> String { - request - .headers() - .get(common::consts::REQUEST_ID_HEADER) - .and_then(|h| h.to_str().ok()) - .map(|s| s.to_string()) - .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()) -} diff --git a/crates/brightstaff/src/handlers/response.rs b/crates/brightstaff/src/handlers/response.rs index 0a6bbe4c..4db2f8a8 100644 --- a/crates/brightstaff/src/handlers/response.rs +++ b/crates/brightstaff/src/handlers/response.rs @@ -4,7 +4,7 @@ use hermesllm::apis::OpenAIApi; use hermesllm::clients::{SupportedAPIsFromClient, SupportedUpstreamAPIs}; use hermesllm::SseEvent; use http_body_util::combinators::BoxBody; -use http_body_util::{BodyExt, Full, StreamBody}; +use http_body_util::StreamBody; use hyper::body::Frame; use hyper::Response; use tokio::sync::mpsc; @@ -12,6 +12,8 @@ use tokio_stream::wrappers::ReceiverStream; use tokio_stream::StreamExt; use tracing::{info, warn, Instrument}; +use super::full; + /// Service for handling HTTP responses and streaming pub struct ResponseHandler; @@ -22,9 +24,7 @@ impl ResponseHandler { /// Create a full response body from bytes pub fn create_full_body>(chunk: T) -> BoxBody { - Full::new(chunk.into()) - .map_err(|never| match never {}) - .boxed() + full(chunk) } /// Create a JSON error response with BAD_REQUEST status diff --git a/crates/brightstaff/src/handlers/routing_service.rs b/crates/brightstaff/src/handlers/routing_service.rs index 4b4a2b6f..ec09f06f 100644 --- a/crates/brightstaff/src/handlers/routing_service.rs +++ b/crates/brightstaff/src/handlers/routing_service.rs @@ -1,6 +1,6 @@ use bytes::Bytes; use common::configuration::{ModelUsagePreference, SpanAttributes}; -use common::consts::{REQUEST_ID_HEADER, TRACE_PARENT_HEADER}; +use common::consts::REQUEST_ID_HEADER; use common::errors::BrightStaffError; use hermesllm::clients::SupportedAPIsFromClient; use hermesllm::ProviderRequestType; @@ -10,7 +10,8 @@ use hyper::{Request, Response, StatusCode}; use std::sync::Arc; use tracing::{debug, info, info_span, warn, Instrument}; -use crate::handlers::llm::router::router_chat_get_upstream_model; +use super::extract_or_generate_traceparent; +use crate::handlers::llm::model_selection::router_chat_get_upstream_model; use crate::router::llm::RouterService; use crate::tracing::{collect_custom_trace_attributes, operation_component, set_service_name}; @@ -72,7 +73,7 @@ pub async fn routing_decision( request: Request, router_service: Arc, request_path: String, - span_attributes: Arc>, + span_attributes: &Option, ) -> Result>, hyper::Error> { let request_headers = request.headers().clone(); let request_id: String = request_headers @@ -81,8 +82,7 @@ pub async fn routing_decision( .map(|s| s.to_string()) .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()); - let custom_attrs = - collect_custom_trace_attributes(&request_headers, span_attributes.as_ref().as_ref()); + let custom_attrs = collect_custom_trace_attributes(&request_headers, span_attributes.as_ref()); let request_span = info_span!( "routing_decision", @@ -119,23 +119,7 @@ async fn routing_decision_inner( } }); - // Extract or generate traceparent - let traceparent: String = match request_headers - .get(TRACE_PARENT_HEADER) - .and_then(|h| h.to_str().ok()) - .map(|s| s.to_string()) - { - Some(tp) => tp, - None => { - let trace_id = uuid::Uuid::new_v4().to_string().replace("-", ""); - let generated_tp = format!("00-{}-0000000000000000-01", trace_id); - warn!( - generated_traceparent = %generated_tp, - "TRACE_PARENT header missing, generated new traceparent" - ); - generated_tp - } - }; + let traceparent = extract_or_generate_traceparent(&request_headers); // Extract trace_id from traceparent (format: 00-{trace_id}-{span_id}-{flags}) let trace_id = traceparent diff --git a/crates/brightstaff/src/lib.rs b/crates/brightstaff/src/lib.rs index 2457d917..b4ab82a9 100644 --- a/crates/brightstaff/src/lib.rs +++ b/crates/brightstaff/src/lib.rs @@ -3,4 +3,5 @@ pub mod handlers; pub mod router; pub mod signals; pub mod state; +pub mod streaming; pub mod tracing; diff --git a/crates/brightstaff/src/main.rs b/crates/brightstaff/src/main.rs index edd370b7..60a69bca 100644 --- a/crates/brightstaff/src/main.rs +++ b/crates/brightstaff/src/main.rs @@ -1,5 +1,6 @@ use brightstaff::app_state::AppState; use brightstaff::handlers::agents::orchestrator::agent_chat; +use brightstaff::handlers::empty; use brightstaff::handlers::function_calling::function_calling_chat_handler; use brightstaff::handlers::llm::llm_chat; use brightstaff::handlers::models::list_models; @@ -16,7 +17,7 @@ use common::configuration::{ }; use common::consts::{CHAT_COMPLETIONS_PATH, MESSAGES_PATH, OPENAI_RESPONSES_API_PATH}; use common::llm_providers::LlmProviders; -use http_body_util::{combinators::BoxBody, BodyExt, Empty}; +use http_body_util::combinators::BoxBody; use hyper::body::Incoming; use hyper::header::HeaderValue; use hyper::server::conn::http1; @@ -39,17 +40,6 @@ const DEFAULT_ROUTING_MODEL_NAME: &str = "Arch-Router"; const DEFAULT_ORCHESTRATOR_LLM_PROVIDER: &str = "plano-orchestrator"; const DEFAULT_ORCHESTRATOR_MODEL_NAME: &str = "Plano-Orchestrator"; -// --------------------------------------------------------------------------- -// Helpers -// --------------------------------------------------------------------------- - -/// An empty HTTP body (used for 404 / OPTIONS responses). -fn empty() -> BoxBody { - Empty::::new() - .map_err(|never| match never {}) - .boxed() -} - /// CORS pre-flight response for the models endpoint. fn cors_preflight() -> Result>, hyper::Error> { let mut response = Response::new(empty()); @@ -139,7 +129,11 @@ async fn init_app_state( filter_ids.map(|ids| { let agents = ids .iter() - .filter_map(|id| global_agent_map.get(id).map(|a: &Agent| (id.clone(), a.clone()))) + .filter_map(|id| { + global_agent_map + .get(id) + .map(|a: &Agent| (id.clone(), a.clone())) + }) .collect(); ResolvedFilterChain { filter_ids: ids, @@ -197,20 +191,18 @@ async fn init_app_state( let state_storage = init_state_storage(config).await?; - let span_attributes = Arc::new( - config - .tracing - .as_ref() - .and_then(|tracing| tracing.span_attributes.clone()), - ); + let span_attributes = config + .tracing + .as_ref() + .and_then(|tracing| tracing.span_attributes.clone()); Ok(AppState { router_service, orchestrator_service, - model_aliases: Arc::new(config.model_aliases.clone()), + model_aliases: config.model_aliases.clone(), llm_providers: Arc::new(RwLock::new(llm_providers)), - agents_list: Arc::new(RwLock::new(Some(all_agents))), - listeners: Arc::new(RwLock::new(config.listeners.clone())), + agents_list: Some(all_agents), + listeners: config.listeners.clone(), state_storage, llm_provider_url, span_attributes, @@ -294,7 +286,7 @@ async fn route( req, Arc::clone(&state.router_service), stripped, - Arc::clone(&state.span_attributes), + &state.span_attributes, ) .with_context(parent_cx) .await; diff --git a/crates/brightstaff/src/state/response_state_processor.rs b/crates/brightstaff/src/state/response_state_processor.rs index 6f6c7b62..bca2991a 100644 --- a/crates/brightstaff/src/state/response_state_processor.rs +++ b/crates/brightstaff/src/state/response_state_processor.rs @@ -7,8 +7,8 @@ use std::io::Read; use std::sync::Arc; use tracing::{debug, info, warn}; -use crate::handlers::streaming::StreamProcessor; use crate::state::{OpenAIConversationState, StateStorage}; +use crate::streaming::StreamProcessor; /// Processor that wraps another processor and handles v1/responses state management /// Captures response_id and output from streaming responses, stores state after completion diff --git a/crates/brightstaff/src/handlers/streaming.rs b/crates/brightstaff/src/streaming.rs similarity index 99% rename from crates/brightstaff/src/handlers/streaming.rs rename to crates/brightstaff/src/streaming.rs index 17787be8..f7af8ae0 100644 --- a/crates/brightstaff/src/handlers/streaming.rs +++ b/crates/brightstaff/src/streaming.rs @@ -13,7 +13,7 @@ use tokio_stream::StreamExt; use tracing::{debug, info, warn, Instrument}; use tracing_opentelemetry::OpenTelemetrySpanExt; -use super::agents::pipeline::{PipelineError, PipelineProcessor}; +use crate::handlers::agents::pipeline::{PipelineError, PipelineProcessor}; const STREAM_BUFFER_SIZE: usize = 16; use crate::signals::{InteractionQuality, SignalAnalyzer, TextBasedSignalAnalyzer, FLAG_MARKER};