From 8c71acfc76279e0560986e98bfa7afa229c47176 Mon Sep 17 00:00:00 2001 From: Adil Hafeez Date: Thu, 12 Mar 2026 13:26:19 -0700 Subject: [PATCH] refactor a bit to send filters instead of agents --- .../src/handlers/agent_selector.rs | 1 + .../src/handlers/integration_tests.rs | 1 + crates/brightstaff/src/handlers/llm.rs | 22 +++++++---------- crates/brightstaff/src/main.rs | 24 +++++++++++++++++-- crates/common/src/configuration.rs | 2 ++ 5 files changed, 34 insertions(+), 16 deletions(-) diff --git a/crates/brightstaff/src/handlers/agent_selector.rs b/crates/brightstaff/src/handlers/agent_selector.rs index a555646d..45fc74bd 100644 --- a/crates/brightstaff/src/handlers/agent_selector.rs +++ b/crates/brightstaff/src/handlers/agent_selector.rs @@ -197,6 +197,7 @@ mod tests { filter_chain: None, port: 8080, router: None, + filter_agents: None, } } diff --git a/crates/brightstaff/src/handlers/integration_tests.rs b/crates/brightstaff/src/handlers/integration_tests.rs index c0592d58..bf88ee86 100644 --- a/crates/brightstaff/src/handlers/integration_tests.rs +++ b/crates/brightstaff/src/handlers/integration_tests.rs @@ -77,6 +77,7 @@ mod tests { filter_chain: None, port: 8080, router: None, + filter_agents: None, }; let listeners = vec![listener]; diff --git a/crates/brightstaff/src/handlers/llm.rs b/crates/brightstaff/src/handlers/llm.rs index d9d1bb9c..0852b0ed 100644 --- a/crates/brightstaff/src/handlers/llm.rs +++ b/crates/brightstaff/src/handlers/llm.rs @@ -1,5 +1,5 @@ use bytes::Bytes; -use common::configuration::{Agent, AgentFilterChain, Listener, ModelAlias, SpanAttributes}; +use common::configuration::{AgentFilterChain, Listener, ModelAlias, SpanAttributes}; use common::consts::{ ARCH_IS_STREAMING_HEADER, ARCH_PROVIDER_HINT_HEADER, REQUEST_ID_HEADER, TRACE_PARENT_HEADER, }; @@ -46,7 +46,6 @@ pub async fn llm_chat( span_attributes: Arc>, state_storage: Option>, listeners: Arc>>, - agents_list: Arc>>>, ) -> Result>, hyper::Error> { let request_path = request.uri().path().to_string(); let request_headers = request.headers().clone(); @@ -87,7 +86,6 @@ pub async fn llm_chat( request_path, request_headers, listeners, - agents_list, ) .instrument(request_span) .await @@ -106,7 +104,6 @@ async fn llm_chat_inner( request_path: String, mut request_headers: hyper::HeaderMap, listeners: Arc>>, - agents_list: Arc>>>, ) -> Result>, hyper::Error> { // Set service name for LLM operations set_service_name(operation_component::LLM); @@ -264,22 +261,19 @@ async fn llm_chat_inner( // Check if any model listener (no agents) has a filter_chain configured { let listeners_guard = listeners.read().await; - let filter_chain: Option> = listeners_guard + let model_listener = listeners_guard .iter() - .find(|l| l.agents.is_none() && l.filter_chain.is_some()) - .and_then(|l| l.filter_chain.clone()); + .find(|l| l.agents.is_none() && l.filter_chain.is_some()); + + let filter_chain = model_listener.and_then(|l| l.filter_chain.clone()); + let agent_map = model_listener + .and_then(|l| l.filter_agents.clone()) + .unwrap_or_default(); if let Some(ref fc) = filter_chain { if !fc.is_empty() { debug!(filter_chain = ?fc, "processing model listener filter chain"); - // Build agent map from agents_list - let agents_guard = agents_list.read().await; - let agent_map: HashMap = agents_guard - .as_ref() - .map(|agents| agents.iter().map(|a| (a.id.clone(), a.clone())).collect()) - .unwrap_or_default(); - // Create a temporary AgentFilterChain to reuse PipelineProcessor let temp_filter_chain = AgentFilterChain { id: "model_listener".to_string(), diff --git a/crates/brightstaff/src/main.rs b/crates/brightstaff/src/main.rs index 5ccc336d..872951c8 100644 --- a/crates/brightstaff/src/main.rs +++ b/crates/brightstaff/src/main.rs @@ -24,6 +24,7 @@ use hyper_util::rt::TokioIo; use opentelemetry::trace::FutureExt; use opentelemetry::{global, Context}; use opentelemetry_http::HeaderExtractor; +use std::collections::HashMap; use std::sync::Arc; use std::{env, fs}; use tokio::net::TcpListener; @@ -80,12 +81,32 @@ async fn main() -> Result<(), Box> { .cloned() .collect(); + // Build global agent map for resolving filter chain references + let global_agent_map: HashMap = all_agents + .iter() + .map(|a| (a.id.clone(), a.clone())) + .collect(); + + // Resolve filter_agents on each listener at startup + let mut listeners_resolved = plano_config.listeners.clone(); + for listener in &mut listeners_resolved { + if let Some(ref fc) = listener.filter_chain { + let filter_agents: HashMap = fc + .iter() + .filter_map(|id| global_agent_map.get(id).map(|a| (id.clone(), a.clone()))) + .collect(); + if !filter_agents.is_empty() { + listener.filter_agents = Some(filter_agents); + } + } + } + // Create expanded provider list for /v1/models endpoint let llm_providers = LlmProviders::try_from(plano_config.model_providers.clone()) .expect("Failed to create LlmProviders"); let llm_providers = Arc::new(RwLock::new(llm_providers)); let combined_agents_filters_list = Arc::new(RwLock::new(Some(all_agents))); - let listeners = Arc::new(RwLock::new(plano_config.listeners.clone())); + let listeners = Arc::new(RwLock::new(listeners_resolved)); let llm_provider_url = env::var("LLM_PROVIDER_ENDPOINT").unwrap_or_else(|_| "http://localhost:12001".to_string()); @@ -249,7 +270,6 @@ async fn main() -> Result<(), Box> { span_attributes, state_storage, listeners, - agents_list, ) .with_context(parent_cx) .await diff --git a/crates/common/src/configuration.rs b/crates/common/src/configuration.rs index 8f0eb5fd..b54453f3 100644 --- a/crates/common/src/configuration.rs +++ b/crates/common/src/configuration.rs @@ -43,6 +43,8 @@ pub struct Listener { pub agents: Option>, pub filter_chain: Option>, pub port: u16, + #[serde(skip)] + pub filter_agents: Option>, } #[derive(Debug, Clone, Serialize, Deserialize)]