From ca64833686cea385712f287749dd6408b0788e14 Mon Sep 17 00:00:00 2001 From: Adil Hafeez Date: Fri, 13 Mar 2026 17:58:40 -0700 Subject: [PATCH] rename filter_chain/output_filter_chain to input_filters/output_filters, scope output filters to chat completions only --- cli/planoai/config_generator.py | 8 ++-- config/plano_config_schema.yaml | 4 +- .../src/handlers/agent_selector.rs | 4 +- .../src/handlers/integration_tests.rs | 4 +- crates/brightstaff/src/handlers/llm.rs | 45 +++++++++++-------- crates/brightstaff/src/handlers/utils.rs | 6 +-- crates/brightstaff/src/main.rs | 32 ++++++------- crates/common/src/configuration.rs | 4 +- .../model_listener_filter/README.md | 4 +- .../model_listener_filter/config.yaml | 2 +- demos/filter_chains/pii_anonymizer/README.md | 8 ++-- .../filter_chains/pii_anonymizer/config.yaml | 4 +- .../includes/plano_config_full_reference.yaml | 4 +- 13 files changed, 69 insertions(+), 60 deletions(-) diff --git a/cli/planoai/config_generator.py b/cli/planoai/config_generator.py index e8bb516b..a4f9eb21 100644 --- a/cli/planoai/config_generator.py +++ b/cli/planoai/config_generator.py @@ -426,13 +426,13 @@ def validate_and_render_schema(): "Please provide model_providers either under listeners or at root level, not both. Currently we don't support multiple listeners with model_providers" ) - # Validate filter_chain IDs on listeners reference valid agent/filter IDs + # Validate input_filters IDs on listeners reference valid agent/filter IDs for listener in listeners: - listener_filter_chain = listener.get("filter_chain", []) - for fc_id in listener_filter_chain: + listener_input_filters = listener.get("input_filters", []) + for fc_id in listener_input_filters: if fc_id not in agent_id_keys: raise Exception( - f"Listener '{listener.get('name', 'unknown')}' references filter_chain id '{fc_id}' " + f"Listener '{listener.get('name', 'unknown')}' references input_filters id '{fc_id}' " f"which is not defined in agents or filters. Available ids: {', '.join(sorted(agent_id_keys))}" ) diff --git a/config/plano_config_schema.yaml b/config/plano_config_schema.yaml index efe084d6..b65fdb17 100644 --- a/config/plano_config_schema.yaml +++ b/config/plano_config_schema.yaml @@ -93,11 +93,11 @@ properties: required: - id - description - filter_chain: + input_filters: type: array items: type: string - output_filter_chain: + output_filters: type: array items: type: string diff --git a/crates/brightstaff/src/handlers/agent_selector.rs b/crates/brightstaff/src/handlers/agent_selector.rs index 59459c26..33cf73ff 100644 --- a/crates/brightstaff/src/handlers/agent_selector.rs +++ b/crates/brightstaff/src/handlers/agent_selector.rs @@ -195,8 +195,8 @@ mod tests { listener_type: ListenerType::Agent, name: name.to_string(), agents: Some(agents), - filter_chain: None, - output_filter_chain: None, + input_filters: None, + output_filters: None, port: 8080, router: None, } diff --git a/crates/brightstaff/src/handlers/integration_tests.rs b/crates/brightstaff/src/handlers/integration_tests.rs index 2e51485a..8013ed0a 100644 --- a/crates/brightstaff/src/handlers/integration_tests.rs +++ b/crates/brightstaff/src/handlers/integration_tests.rs @@ -75,8 +75,8 @@ mod tests { listener_type: ListenerType::Agent, name: "test-listener".to_string(), agents: Some(vec![agent_pipeline.clone()]), - filter_chain: None, - output_filter_chain: None, + input_filters: None, + output_filters: None, port: 8080, router: None, }; diff --git a/crates/brightstaff/src/handlers/llm.rs b/crates/brightstaff/src/handlers/llm.rs index 1359bcd0..b8beb7b8 100644 --- a/crates/brightstaff/src/handlers/llm.rs +++ b/crates/brightstaff/src/handlers/llm.rs @@ -46,9 +46,9 @@ pub async fn llm_chat( llm_providers: Arc>, span_attributes: Arc>, state_storage: Option>, - filter_chain: Arc>>, - filter_agents: Arc>, - output_filter_chain: Arc>>, + input_filters: Arc>>, + input_filter_agents: Arc>, + output_filters: Arc>>, output_filter_agents: Arc>, ) -> Result>, hyper::Error> { let request_path = request.uri().path().to_string(); @@ -89,9 +89,9 @@ pub async fn llm_chat( request_id, request_path, request_headers, - filter_chain, - filter_agents, - output_filter_chain, + input_filters, + input_filter_agents, + output_filters, output_filter_agents, ) .instrument(request_span) @@ -110,9 +110,9 @@ async fn llm_chat_inner( request_id: String, request_path: String, mut request_headers: hyper::HeaderMap, - filter_chain: Arc>>, - filter_agents: Arc>, - output_filter_chain: Arc>>, + input_filters: Arc>>, + input_filter_agents: Arc>, + output_filters: Arc>>, output_filter_agents: Arc>, ) -> Result>, hyper::Error> { // Set service name for LLM operations @@ -267,11 +267,11 @@ async fn llm_chat_inner( debug!("removed plano_preference_config from metadata"); } - // === Filter chain processing for model listener === + // === Input filters processing for model listener === { - if let Some(ref fc) = *filter_chain { + if let Some(ref fc) = *input_filters { if !fc.is_empty() { - debug!(filter_chain = ?fc, "processing model listener filter chain"); + debug!(input_filters = ?fc, "processing model listener input filters"); // Create a temporary AgentFilterChain to reuse PipelineProcessor let temp_filter_chain = AgentFilterChain { @@ -287,7 +287,7 @@ async fn llm_chat_inner( .process_filter_chain( &messages, &temp_filter_chain, - &filter_agents, + &input_filter_agents, &request_headers, ) .await @@ -508,14 +508,23 @@ async fn llm_chat_inner( propagator.inject_context(&cx, &mut HeaderInjector(&mut request_headers)); }); - // Determine if output filter chain is configured - let has_output_filter = output_filter_chain + // Output filters are only supported for /v1/chat/completions — the SSE content + // extraction logic is specific to that API shape (choices[].delta.content). + let output_filters_configured = output_filters .as_ref() .as_ref() .map(|fc| !fc.is_empty()) .unwrap_or(false); + let has_output_filter = output_filters_configured + && request_path == common::consts::CHAT_COMPLETIONS_PATH; + if output_filters_configured && !has_output_filter { + warn!( + path = %request_path, + "output filters are configured but only supported for /v1/chat/completions, skipping" + ); + } - // Save request headers for output filter chain (before they're consumed by upstream request) + // Save request headers for output filters (before they're consumed by upstream request) let output_filter_request_headers = if has_output_filter { Some(request_headers.clone()) } else { @@ -589,7 +598,7 @@ async fn llm_chat_inner( request_id, ); if has_output_filter { - let ofc = output_filter_chain.as_ref().as_ref().unwrap().clone(); + let ofc = output_filters.as_ref().as_ref().unwrap().clone(); let ofa = (*output_filter_agents).clone(); create_streaming_response_with_output_filter( byte_stream, @@ -603,7 +612,7 @@ async fn llm_chat_inner( create_streaming_response(byte_stream, state_processor, 16) } } else if has_output_filter { - let ofc = output_filter_chain.as_ref().as_ref().unwrap().clone(); + let ofc = output_filters.as_ref().as_ref().unwrap().clone(); let ofa = (*output_filter_agents).clone(); create_streaming_response_with_output_filter( byte_stream, diff --git a/crates/brightstaff/src/handlers/utils.rs b/crates/brightstaff/src/handlers/utils.rs index 49e66bb8..de548e7a 100644 --- a/crates/brightstaff/src/handlers/utils.rs +++ b/crates/brightstaff/src/handlers/utils.rs @@ -457,13 +457,13 @@ pub async fn filter_non_streaming_response( Bytes::from(serde_json::to_string(&value).unwrap_or_else(|_| response_str.to_string())) } -/// Creates a streaming response that processes each chunk through an output filter chain. +/// Creates a streaming response that processes each chunk through output filters. /// The output filter is called asynchronously for each SSE chunk's content. pub fn create_streaming_response_with_output_filter( mut byte_stream: S, mut inner_processor: P, buffer_size: usize, - output_filter_chain: Vec, + output_filters: Vec, output_filter_agents: HashMap, request_headers: HeaderMap, ) -> StreamingResponse @@ -482,7 +482,7 @@ where id: "output_filter".to_string(), default: None, description: None, - filter_chain: Some(output_filter_chain), + filter_chain: Some(output_filters), }; while let Some(item) = byte_stream.next().await { diff --git a/crates/brightstaff/src/main.rs b/crates/brightstaff/src/main.rs index ec2662af..c8e34002 100644 --- a/crates/brightstaff/src/main.rs +++ b/crates/brightstaff/src/main.rs @@ -108,10 +108,10 @@ async fn main() -> Result<(), Box> { .listeners .iter() .find(|l| l.listener_type == ListenerType::Model); - let model_filter_chain: Arc>> = - Arc::new(model_listener.and_then(|l| l.filter_chain.clone())); - let model_filter_agents: Arc> = Arc::new( - model_filter_chain + let model_input_filters: Arc>> = + Arc::new(model_listener.and_then(|l| l.input_filters.clone())); + let model_input_filter_agents: Arc> = Arc::new( + model_input_filters .as_ref() .as_ref() .map(|fc| { @@ -121,10 +121,10 @@ async fn main() -> Result<(), Box> { }) .unwrap_or_default(), ); - let model_output_filter_chain: Arc>> = - Arc::new(model_listener.and_then(|l| l.output_filter_chain.clone())); + let model_output_filters: Arc>> = + Arc::new(model_listener.and_then(|l| l.output_filters.clone())); let model_output_filter_agents: Arc> = Arc::new( - model_output_filter_chain + model_output_filters .as_ref() .as_ref() .map(|fc| { @@ -228,9 +228,9 @@ async fn main() -> Result<(), Box> { let llm_providers = llm_providers.clone(); let agents_list = combined_agents_filters_list.clone(); - let model_filter_chain = model_filter_chain.clone(); - let model_filter_agents = model_filter_agents.clone(); - let model_output_filter_chain = model_output_filter_chain.clone(); + let model_input_filters = model_input_filters.clone(); + let model_input_filter_agents = model_input_filter_agents.clone(); + let model_output_filters = model_output_filters.clone(); let model_output_filter_agents = model_output_filter_agents.clone(); let listeners = listeners.clone(); let span_attributes = span_attributes.clone(); @@ -243,9 +243,9 @@ async fn main() -> Result<(), Box> { let llm_providers = llm_providers.clone(); let model_aliases = Arc::clone(&model_aliases); let agents_list = agents_list.clone(); - let model_filter_chain = model_filter_chain.clone(); - let model_filter_agents = model_filter_agents.clone(); - let model_output_filter_chain = model_output_filter_chain.clone(); + let model_input_filters = model_input_filters.clone(); + let model_input_filter_agents = model_input_filter_agents.clone(); + let model_output_filters = model_output_filters.clone(); let model_output_filter_agents = model_output_filter_agents.clone(); let listeners = listeners.clone(); let span_attributes = span_attributes.clone(); @@ -305,9 +305,9 @@ async fn main() -> Result<(), Box> { llm_providers, span_attributes, state_storage, - model_filter_chain, - model_filter_agents, - model_output_filter_chain, + model_input_filters, + model_input_filter_agents, + model_output_filters, model_output_filter_agents, ) .with_context(parent_cx) diff --git a/crates/common/src/configuration.rs b/crates/common/src/configuration.rs index d1b94f4e..3050eac0 100644 --- a/crates/common/src/configuration.rs +++ b/crates/common/src/configuration.rs @@ -51,8 +51,8 @@ pub struct Listener { pub name: String, pub router: Option, pub agents: Option>, - pub filter_chain: Option>, - pub output_filter_chain: Option>, + pub input_filters: Option>, + pub output_filters: Option>, pub port: u16, } diff --git a/demos/filter_chains/model_listener_filter/README.md b/demos/filter_chains/model_listener_filter/README.md index 35be791d..92d44695 100644 --- a/demos/filter_chains/model_listener_filter/README.md +++ b/demos/filter_chains/model_listener_filter/README.md @@ -2,7 +2,7 @@ Run content-safety filters on direct LLM requests — no agent layer required. -This demo uses the `filter_chain` feature on a **model-type listener** to intercept +This demo uses the `input_filters` feature on a **model-type listener** to intercept `/v1/chat/completions` requests and block unsafe content before they reach the LLM provider. ## Architecture @@ -10,7 +10,7 @@ This demo uses the `filter_chain` feature on a **model-type listener** to interc ``` Client ──► Plano (model listener :12000) │ - ├─ filter_chain: content_guard ──► Block / Allow + ├─ input_filters: content_guard ──► Block / Allow │ └─ model_provider: openai/gpt-4o-mini ``` diff --git a/demos/filter_chains/model_listener_filter/config.yaml b/demos/filter_chains/model_listener_filter/config.yaml index 6d5bd1c6..2eb1d0f2 100644 --- a/demos/filter_chains/model_listener_filter/config.yaml +++ b/demos/filter_chains/model_listener_filter/config.yaml @@ -14,7 +14,7 @@ listeners: - type: model name: llm_gateway port: 12000 - filter_chain: + input_filters: - content_guard tracing: diff --git a/demos/filter_chains/pii_anonymizer/README.md b/demos/filter_chains/pii_anonymizer/README.md index f8a21e56..a288733b 100644 --- a/demos/filter_chains/pii_anonymizer/README.md +++ b/demos/filter_chains/pii_anonymizer/README.md @@ -2,20 +2,20 @@ Automatically redact PII from LLM requests and restore it in responses — inspired by [Uber's GenAI Gateway PII Redactor](https://www.uber.com/blog/genai-gateway/). -This demo uses both `filter_chain` (input) and `output_filter_chain` (output) on a **model-type listener** to anonymize PII before it reaches the LLM provider, then de-anonymize the response before returning it to the client. +This demo uses both `input_filters` and `output_filters` on a **model-type listener** to anonymize PII before it reaches the LLM provider, then de-anonymize the response before returning it to the client. ## Architecture ``` Client ──► Plano (model listener :12000) │ - ├─ filter_chain: pii_anonymizer + ├─ input_filters: pii_anonymizer │ └─ Replace PII with [EMAIL_0], [SSN_0], etc. │ ├─ model_provider: openai/gpt-4o-mini │ └─ LLM only sees anonymized data │ - └─ output_filter_chain: pii_deanonymizer + └─ output_filters: pii_deanonymizer └─ Restore [EMAIL_0] → original email (per-chunk for streaming) ``` @@ -82,7 +82,7 @@ Check the PII filter service logs in the terminal running `start_agents.sh`. You ## How Streaming De-anonymization Works -For streaming responses, each SSE chunk is sent through the output filter chain as it arrives from the LLM: +For streaming responses, each SSE chunk is sent through the output filters as it arrives from the LLM: 1. Plano receives a chunk with content like `"The email [EMAIL_0] belongs to..."` 2. The chunk content is sent to the `/deanonymize` endpoint diff --git a/demos/filter_chains/pii_anonymizer/config.yaml b/demos/filter_chains/pii_anonymizer/config.yaml index 7b3dd773..3bae7354 100644 --- a/demos/filter_chains/pii_anonymizer/config.yaml +++ b/demos/filter_chains/pii_anonymizer/config.yaml @@ -17,9 +17,9 @@ listeners: - type: model name: llm_gateway port: 12000 - filter_chain: + input_filters: - pii_anonymizer - output_filter_chain: + output_filters: - pii_deanonymizer tracing: diff --git a/docs/source/resources/includes/plano_config_full_reference.yaml b/docs/source/resources/includes/plano_config_full_reference.yaml index c0e97add..2f1d1596 100644 --- a/docs/source/resources/includes/plano_config_full_reference.yaml +++ b/docs/source/resources/includes/plano_config_full_reference.yaml @@ -66,8 +66,8 @@ listeners: name: model_1 address: 0.0.0.0 port: 12000 - # Optional: attach a filter chain for input guardrails on direct LLM requests - # filter_chain: + # Optional: attach input filters for guardrails on direct LLM requests + # input_filters: # - input_guards # Prompt listener for function calling (for prompt_targets)