add trace for filters

This commit is contained in:
Adil Hafeez 2025-12-16 00:33:55 -08:00
parent 8bb64f6c62
commit eb65e94781
No known key found for this signature in database
GPG key ID: 9B18EF7691369645
4 changed files with 66 additions and 13 deletions

View file

@ -33,8 +33,9 @@ pub async fn agent_chat(
_: String,
agents_list: Arc<tokio::sync::RwLock<Option<Vec<common::configuration::Agent>>>>,
listeners: Arc<tokio::sync::RwLock<Vec<common::configuration::Listener>>>,
trace_collector: Arc<common::traces::TraceCollector>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
match handle_agent_chat(request, router_service, agents_list, listeners).await {
match handle_agent_chat(request, router_service, agents_list, listeners, trace_collector).await {
Ok(response) => Ok(response),
Err(err) => {
// Check if this is a client error from the pipeline that should be cascaded
@ -109,6 +110,7 @@ async fn handle_agent_chat(
router_service: Arc<RouterService>,
agents_list: Arc<tokio::sync::RwLock<Option<Vec<common::configuration::Agent>>>>,
listeners: Arc<tokio::sync::RwLock<Vec<common::configuration::Listener>>>,
trace_collector: Arc<common::traces::TraceCollector>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, AgentFilterChainError> {
// Initialize services
let agent_selector = AgentSelector::new(router_service);
@ -181,6 +183,7 @@ async fn handle_agent_chat(
&selected_agent,
&agent_map,
&request_headers,
Some(&trace_collector),
)
.await?;

View file

@ -2,9 +2,12 @@ use std::collections::HashMap;
use common::configuration::{Agent, AgentFilterChain};
use common::consts::{ARCH_UPSTREAM_HOST_HEADER, ENVOY_RETRY_HEADER};
use common::traces::{SpanBuilder, SpanKind};
use hermesllm::apis::openai::{ChatCompletionsRequest, Message};
use hyper::header::HeaderMap;
use opentelemetry::trace::TraceContextExt;
use tracing::{debug, info, warn};
use std::time::{Instant, SystemTime};
use crate::handlers::jsonrpc::{JsonRpcId, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse};
use uuid::Uuid;
@ -77,6 +80,7 @@ impl PipelineProcessor {
agent_filter_chain: &AgentFilterChain,
agent_map: &HashMap<String, Agent>,
request_headers: &HeaderMap,
trace_collector: Option<&std::sync::Arc<common::traces::TraceCollector>>,
) -> Result<Vec<Message>, PipelineError> {
let mut chat_history_updated = chat_history.to_vec();
@ -97,14 +101,58 @@ impl PipelineProcessor {
chat_history.len()
);
let start_time = SystemTime::now();
let start_instant = Instant::now();
// Extract trace context from OpenTelemetry
let current_cx = opentelemetry::Context::current();
let span_ref = current_cx.span();
let span_context = span_ref.span_context();
let trace_id = if span_context.is_valid() {
format!("{:032x}", span_context.trace_id())
} else {
String::new() // SpanBuilder will generate one
};
let parent_span_id = if span_context.is_valid() {
Some(format!("{:016x}", span_context.span_id()))
} else {
None
};
chat_history_updated = self
.execute_filter(&chat_history_updated, agent, request_headers)
.await?;
let end_time = SystemTime::now();
let elapsed = start_instant.elapsed();
info!(
"Received response: updated conversation length: {}",
chat_history.len()
"Filter '{}' completed in {:.2}ms, updated conversation length: {}",
agent_name,
elapsed.as_secs_f64() * 1000.0,
chat_history_updated.len()
);
// Build span with trace context
if let Some(collector) = trace_collector {
let mut span_builder = SpanBuilder::new(format!("filter_execution: {}", agent_name))
.with_kind(SpanKind::Internal)
.with_start_time(start_time)
.with_end_time(end_time)
.with_attribute("filter_name", agent_name.to_string())
.with_attribute("tool_name", tool_name.to_string())
.with_attribute("duration_ms", format!("{:.2}", elapsed.as_secs_f64() * 1000.0));
if !trace_id.is_empty() {
span_builder = span_builder.with_trace_id(trace_id);
}
if let Some(parent_id) = parent_span_id {
span_builder = span_builder.with_parent_span_id(parent_id);
}
let span = span_builder.build();
collector.record_span("brightstaff", span);
}
}
Ok(chat_history_updated)
@ -483,6 +531,7 @@ mod tests {
&pipeline,
&agent_map,
&request_headers,
None,
)
.await;

View file

@ -153,6 +153,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
fully_qualified_url,
agents_list,
listeners,
trace_collector,
)
.with_context(parent_cx)
.await

View file

@ -2,21 +2,21 @@ version: v0.3.0
agents:
- id: rag_agent
url: mcp://host.docker.internal:10505
url: http://host.docker.internal:10505
- id: travel_agent
transport: streamable-http
tool: invoke
url: mcp://host.docker.internal:10401
url: http://host.docker.internal:10401
agent_filters:
- id: query_rewriter
transport: streamable-http
tool: query_rewriter
url: mcp://host.docker.internal:10501
url: http://host.docker.internal:10501
# kind: mcp
# transport: streamable-http
# tool: query_rewriter
- id: context_builder
transport: streamable-http
tool: context_builder
url: mcp://host.docker.internal:10502
url: http://host.docker.internal:10502
# kind: mcp
# transport: streamable-http
# tool: context_builder
model_providers:
- model: openai/gpt-4o-mini