rename filter_chain/output_filter_chain to input_filters/output_filters, scope output filters to chat completions only

This commit is contained in:
Adil Hafeez 2026-03-13 17:58:40 -07:00
parent e458daf162
commit ca64833686
No known key found for this signature in database
GPG key ID: 9B18EF7691369645
13 changed files with 69 additions and 60 deletions

View file

@ -195,8 +195,8 @@ mod tests {
listener_type: ListenerType::Agent,
name: name.to_string(),
agents: Some(agents),
filter_chain: None,
output_filter_chain: None,
input_filters: None,
output_filters: None,
port: 8080,
router: None,
}

View file

@ -75,8 +75,8 @@ mod tests {
listener_type: ListenerType::Agent,
name: "test-listener".to_string(),
agents: Some(vec![agent_pipeline.clone()]),
filter_chain: None,
output_filter_chain: None,
input_filters: None,
output_filters: None,
port: 8080,
router: None,
};

View file

@ -46,9 +46,9 @@ pub async fn llm_chat(
llm_providers: Arc<RwLock<LlmProviders>>,
span_attributes: Arc<Option<SpanAttributes>>,
state_storage: Option<Arc<dyn StateStorage>>,
filter_chain: Arc<Option<Vec<String>>>,
filter_agents: Arc<HashMap<String, Agent>>,
output_filter_chain: Arc<Option<Vec<String>>>,
input_filters: Arc<Option<Vec<String>>>,
input_filter_agents: Arc<HashMap<String, Agent>>,
output_filters: Arc<Option<Vec<String>>>,
output_filter_agents: Arc<HashMap<String, Agent>>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let request_path = request.uri().path().to_string();
@ -89,9 +89,9 @@ pub async fn llm_chat(
request_id,
request_path,
request_headers,
filter_chain,
filter_agents,
output_filter_chain,
input_filters,
input_filter_agents,
output_filters,
output_filter_agents,
)
.instrument(request_span)
@ -110,9 +110,9 @@ async fn llm_chat_inner(
request_id: String,
request_path: String,
mut request_headers: hyper::HeaderMap,
filter_chain: Arc<Option<Vec<String>>>,
filter_agents: Arc<HashMap<String, Agent>>,
output_filter_chain: Arc<Option<Vec<String>>>,
input_filters: Arc<Option<Vec<String>>>,
input_filter_agents: Arc<HashMap<String, Agent>>,
output_filters: Arc<Option<Vec<String>>>,
output_filter_agents: Arc<HashMap<String, Agent>>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
// Set service name for LLM operations
@ -267,11 +267,11 @@ async fn llm_chat_inner(
debug!("removed plano_preference_config from metadata");
}
// === Filter chain processing for model listener ===
// === Input filters processing for model listener ===
{
if let Some(ref fc) = *filter_chain {
if let Some(ref fc) = *input_filters {
if !fc.is_empty() {
debug!(filter_chain = ?fc, "processing model listener filter chain");
debug!(input_filters = ?fc, "processing model listener input filters");
// Create a temporary AgentFilterChain to reuse PipelineProcessor
let temp_filter_chain = AgentFilterChain {
@ -287,7 +287,7 @@ async fn llm_chat_inner(
.process_filter_chain(
&messages,
&temp_filter_chain,
&filter_agents,
&input_filter_agents,
&request_headers,
)
.await
@ -508,14 +508,23 @@ async fn llm_chat_inner(
propagator.inject_context(&cx, &mut HeaderInjector(&mut request_headers));
});
// Determine if output filter chain is configured
let has_output_filter = output_filter_chain
// Output filters are only supported for /v1/chat/completions — the SSE content
// extraction logic is specific to that API shape (choices[].delta.content).
let output_filters_configured = output_filters
.as_ref()
.as_ref()
.map(|fc| !fc.is_empty())
.unwrap_or(false);
let has_output_filter = output_filters_configured
&& request_path == common::consts::CHAT_COMPLETIONS_PATH;
if output_filters_configured && !has_output_filter {
warn!(
path = %request_path,
"output filters are configured but only supported for /v1/chat/completions, skipping"
);
}
// Save request headers for output filter chain (before they're consumed by upstream request)
// Save request headers for output filters (before they're consumed by upstream request)
let output_filter_request_headers = if has_output_filter {
Some(request_headers.clone())
} else {
@ -589,7 +598,7 @@ async fn llm_chat_inner(
request_id,
);
if has_output_filter {
let ofc = output_filter_chain.as_ref().as_ref().unwrap().clone();
let ofc = output_filters.as_ref().as_ref().unwrap().clone();
let ofa = (*output_filter_agents).clone();
create_streaming_response_with_output_filter(
byte_stream,
@ -603,7 +612,7 @@ async fn llm_chat_inner(
create_streaming_response(byte_stream, state_processor, 16)
}
} else if has_output_filter {
let ofc = output_filter_chain.as_ref().as_ref().unwrap().clone();
let ofc = output_filters.as_ref().as_ref().unwrap().clone();
let ofa = (*output_filter_agents).clone();
create_streaming_response_with_output_filter(
byte_stream,

View file

@ -457,13 +457,13 @@ pub async fn filter_non_streaming_response(
Bytes::from(serde_json::to_string(&value).unwrap_or_else(|_| response_str.to_string()))
}
/// Creates a streaming response that processes each chunk through an output filter chain.
/// Creates a streaming response that processes each chunk through output filters.
/// The output filter is called asynchronously for each SSE chunk's content.
pub fn create_streaming_response_with_output_filter<S, P>(
mut byte_stream: S,
mut inner_processor: P,
buffer_size: usize,
output_filter_chain: Vec<String>,
output_filters: Vec<String>,
output_filter_agents: HashMap<String, Agent>,
request_headers: HeaderMap,
) -> StreamingResponse
@ -482,7 +482,7 @@ where
id: "output_filter".to_string(),
default: None,
description: None,
filter_chain: Some(output_filter_chain),
filter_chain: Some(output_filters),
};
while let Some(item) = byte_stream.next().await {

View file

@ -108,10 +108,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
.listeners
.iter()
.find(|l| l.listener_type == ListenerType::Model);
let model_filter_chain: Arc<Option<Vec<String>>> =
Arc::new(model_listener.and_then(|l| l.filter_chain.clone()));
let model_filter_agents: Arc<HashMap<String, Agent>> = Arc::new(
model_filter_chain
let model_input_filters: Arc<Option<Vec<String>>> =
Arc::new(model_listener.and_then(|l| l.input_filters.clone()));
let model_input_filter_agents: Arc<HashMap<String, Agent>> = Arc::new(
model_input_filters
.as_ref()
.as_ref()
.map(|fc| {
@ -121,10 +121,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
})
.unwrap_or_default(),
);
let model_output_filter_chain: Arc<Option<Vec<String>>> =
Arc::new(model_listener.and_then(|l| l.output_filter_chain.clone()));
let model_output_filters: Arc<Option<Vec<String>>> =
Arc::new(model_listener.and_then(|l| l.output_filters.clone()));
let model_output_filter_agents: Arc<HashMap<String, Agent>> = Arc::new(
model_output_filter_chain
model_output_filters
.as_ref()
.as_ref()
.map(|fc| {
@ -228,9 +228,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let llm_providers = llm_providers.clone();
let agents_list = combined_agents_filters_list.clone();
let model_filter_chain = model_filter_chain.clone();
let model_filter_agents = model_filter_agents.clone();
let model_output_filter_chain = model_output_filter_chain.clone();
let model_input_filters = model_input_filters.clone();
let model_input_filter_agents = model_input_filter_agents.clone();
let model_output_filters = model_output_filters.clone();
let model_output_filter_agents = model_output_filter_agents.clone();
let listeners = listeners.clone();
let span_attributes = span_attributes.clone();
@ -243,9 +243,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let llm_providers = llm_providers.clone();
let model_aliases = Arc::clone(&model_aliases);
let agents_list = agents_list.clone();
let model_filter_chain = model_filter_chain.clone();
let model_filter_agents = model_filter_agents.clone();
let model_output_filter_chain = model_output_filter_chain.clone();
let model_input_filters = model_input_filters.clone();
let model_input_filter_agents = model_input_filter_agents.clone();
let model_output_filters = model_output_filters.clone();
let model_output_filter_agents = model_output_filter_agents.clone();
let listeners = listeners.clone();
let span_attributes = span_attributes.clone();
@ -305,9 +305,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
llm_providers,
span_attributes,
state_storage,
model_filter_chain,
model_filter_agents,
model_output_filter_chain,
model_input_filters,
model_input_filter_agents,
model_output_filters,
model_output_filter_agents,
)
.with_context(parent_cx)