From eb65e9478179125a34d45e7c34d669522bdf3693 Mon Sep 17 00:00:00 2001 From: Adil Hafeez Date: Tue, 16 Dec 2025 00:33:55 -0800 Subject: [PATCH] add trace for filters --- .../src/handlers/agent_chat_completions.rs | 5 +- .../src/handlers/pipeline_processor.rs | 53 ++++++++++++++++++- crates/brightstaff/src/main.rs | 1 + demos/use_cases/rag_agent/arch_config.yaml | 20 +++---- 4 files changed, 66 insertions(+), 13 deletions(-) diff --git a/crates/brightstaff/src/handlers/agent_chat_completions.rs b/crates/brightstaff/src/handlers/agent_chat_completions.rs index a47a4435..b72c4c6c 100644 --- a/crates/brightstaff/src/handlers/agent_chat_completions.rs +++ b/crates/brightstaff/src/handlers/agent_chat_completions.rs @@ -33,8 +33,9 @@ pub async fn agent_chat( _: String, agents_list: Arc>>>, listeners: Arc>>, + trace_collector: Arc, ) -> Result>, hyper::Error> { - match handle_agent_chat(request, router_service, agents_list, listeners).await { + match handle_agent_chat(request, router_service, agents_list, listeners, trace_collector).await { Ok(response) => Ok(response), Err(err) => { // Check if this is a client error from the pipeline that should be cascaded @@ -109,6 +110,7 @@ async fn handle_agent_chat( router_service: Arc, agents_list: Arc>>>, listeners: Arc>>, + trace_collector: Arc, ) -> Result>, AgentFilterChainError> { // Initialize services let agent_selector = AgentSelector::new(router_service); @@ -181,6 +183,7 @@ async fn handle_agent_chat( &selected_agent, &agent_map, &request_headers, + Some(&trace_collector), ) .await?; diff --git a/crates/brightstaff/src/handlers/pipeline_processor.rs b/crates/brightstaff/src/handlers/pipeline_processor.rs index 25e284da..8d4c1fe8 100644 --- a/crates/brightstaff/src/handlers/pipeline_processor.rs +++ b/crates/brightstaff/src/handlers/pipeline_processor.rs @@ -2,9 +2,12 @@ use std::collections::HashMap; use common::configuration::{Agent, AgentFilterChain}; use common::consts::{ARCH_UPSTREAM_HOST_HEADER, ENVOY_RETRY_HEADER}; +use common::traces::{SpanBuilder, SpanKind}; use hermesllm::apis::openai::{ChatCompletionsRequest, Message}; use hyper::header::HeaderMap; +use opentelemetry::trace::TraceContextExt; use tracing::{debug, info, warn}; +use std::time::{Instant, SystemTime}; use crate::handlers::jsonrpc::{JsonRpcId, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse}; use uuid::Uuid; @@ -77,6 +80,7 @@ impl PipelineProcessor { agent_filter_chain: &AgentFilterChain, agent_map: &HashMap, request_headers: &HeaderMap, + trace_collector: Option<&std::sync::Arc>, ) -> Result, PipelineError> { let mut chat_history_updated = chat_history.to_vec(); @@ -97,14 +101,58 @@ impl PipelineProcessor { chat_history.len() ); + let start_time = SystemTime::now(); + let start_instant = Instant::now(); + + // Extract trace context from OpenTelemetry + let current_cx = opentelemetry::Context::current(); + let span_ref = current_cx.span(); + let span_context = span_ref.span_context(); + let trace_id = if span_context.is_valid() { + format!("{:032x}", span_context.trace_id()) + } else { + String::new() // SpanBuilder will generate one + }; + let parent_span_id = if span_context.is_valid() { + Some(format!("{:016x}", span_context.span_id())) + } else { + None + }; + chat_history_updated = self .execute_filter(&chat_history_updated, agent, request_headers) .await?; + + let end_time = SystemTime::now(); + let elapsed = start_instant.elapsed(); info!( - "Received response: updated conversation length: {}", - chat_history.len() + "Filter '{}' completed in {:.2}ms, updated conversation length: {}", + agent_name, + elapsed.as_secs_f64() * 1000.0, + chat_history_updated.len() ); + + // Build span with trace context + if let Some(collector) = trace_collector { + let mut span_builder = SpanBuilder::new(format!("filter_execution: {}", agent_name)) + .with_kind(SpanKind::Internal) + .with_start_time(start_time) + .with_end_time(end_time) + .with_attribute("filter_name", agent_name.to_string()) + .with_attribute("tool_name", tool_name.to_string()) + .with_attribute("duration_ms", format!("{:.2}", elapsed.as_secs_f64() * 1000.0)); + + if !trace_id.is_empty() { + span_builder = span_builder.with_trace_id(trace_id); + } + if let Some(parent_id) = parent_span_id { + span_builder = span_builder.with_parent_span_id(parent_id); + } + + let span = span_builder.build(); + collector.record_span("brightstaff", span); + } } Ok(chat_history_updated) @@ -483,6 +531,7 @@ mod tests { &pipeline, &agent_map, &request_headers, + None, ) .await; diff --git a/crates/brightstaff/src/main.rs b/crates/brightstaff/src/main.rs index d73fe3df..f9b7b8c5 100644 --- a/crates/brightstaff/src/main.rs +++ b/crates/brightstaff/src/main.rs @@ -153,6 +153,7 @@ async fn main() -> Result<(), Box> { fully_qualified_url, agents_list, listeners, + trace_collector, ) .with_context(parent_cx) .await diff --git a/demos/use_cases/rag_agent/arch_config.yaml b/demos/use_cases/rag_agent/arch_config.yaml index d56aa33b..0c44806a 100644 --- a/demos/use_cases/rag_agent/arch_config.yaml +++ b/demos/use_cases/rag_agent/arch_config.yaml @@ -2,21 +2,21 @@ version: v0.3.0 agents: - id: rag_agent - url: mcp://host.docker.internal:10505 + url: http://host.docker.internal:10505 - id: travel_agent - transport: streamable-http - tool: invoke - url: mcp://host.docker.internal:10401 + url: http://host.docker.internal:10401 agent_filters: - id: query_rewriter - transport: streamable-http - tool: query_rewriter - url: mcp://host.docker.internal:10501 + url: http://host.docker.internal:10501 + # kind: mcp + # transport: streamable-http + # tool: query_rewriter - id: context_builder - transport: streamable-http - tool: context_builder - url: mcp://host.docker.internal:10502 + url: http://host.docker.internal:10502 + # kind: mcp + # transport: streamable-http + # tool: context_builder model_providers: - model: openai/gpt-4o-mini