refactor tracing

This commit is contained in:
Adil Hafeez 2025-12-16 13:44:13 -08:00
parent 26b791b76e
commit ac73806082
No known key found for this signature in database
GPG key ID: 9B18EF7691369645

View file

@ -74,6 +74,96 @@ impl PipelineProcessor {
}
}
/// Extract trace context from current OpenTelemetry context
fn extract_trace_context(&self) -> (String, Option<String>) {
let current_cx = opentelemetry::Context::current();
let span_ref = current_cx.span();
let span_context = span_ref.span_context();
let trace_id = if span_context.is_valid() {
format!("{:032x}", span_context.trace_id())
} else {
String::new() // SpanBuilder will generate one
};
let parent_span_id = if span_context.is_valid() {
Some(format!("{:016x}", span_context.span_id()))
} else {
None
};
(trace_id, parent_span_id)
}
/// Record a span for filter execution
fn record_filter_span(
&self,
collector: &std::sync::Arc<common::traces::TraceCollector>,
agent_name: &str,
tool_name: &str,
start_time: SystemTime,
end_time: SystemTime,
elapsed: std::time::Duration,
) {
let (trace_id, parent_span_id) = self.extract_trace_context();
let mut span_builder = SpanBuilder::new(format!("filter_execution: {}", agent_name))
.with_kind(SpanKind::Internal)
.with_start_time(start_time)
.with_end_time(end_time)
.with_attribute("filter_name", agent_name.to_string())
.with_attribute("tool_name", tool_name.to_string())
.with_attribute("duration_ms", format!("{:.2}", elapsed.as_secs_f64() * 1000.0));
if !trace_id.is_empty() {
span_builder = span_builder.with_trace_id(trace_id);
}
if let Some(parent_id) = parent_span_id {
span_builder = span_builder.with_parent_span_id(parent_id);
}
let span = span_builder.build();
collector.record_span("brightstaff", span);
}
/// Record a span for MCP protocol interactions
fn record_mcp_span(
&self,
collector: &std::sync::Arc<common::traces::TraceCollector>,
operation: &str,
agent_id: &str,
start_time: SystemTime,
end_time: SystemTime,
elapsed: std::time::Duration,
additional_attrs: Option<HashMap<&str, String>>,
) {
let (trace_id, parent_span_id) = self.extract_trace_context();
let mut span_builder = SpanBuilder::new(format!("mcp_{}", operation))
.with_kind(SpanKind::Client)
.with_start_time(start_time)
.with_end_time(end_time)
.with_attribute("mcp.operation", operation.to_string())
.with_attribute("mcp.agent_id", agent_id.to_string())
.with_attribute("duration_ms", format!("{:.2}", elapsed.as_secs_f64() * 1000.0));
if let Some(attrs) = additional_attrs {
for (key, value) in attrs {
span_builder = span_builder.with_attribute(key, value);
}
}
if !trace_id.is_empty() {
span_builder = span_builder.with_trace_id(trace_id);
}
if let Some(parent_id) = parent_span_id {
span_builder = span_builder.with_parent_span_id(parent_id);
}
let span = span_builder.build();
collector.record_span("brightstaff", span);
}
/// Process the filter chain of agents (all except the terminal agent)
pub async fn process_filter_chain(
&mut self,
@ -105,23 +195,8 @@ impl PipelineProcessor {
let start_time = SystemTime::now();
let start_instant = Instant::now();
// Extract trace context from OpenTelemetry
let current_cx = opentelemetry::Context::current();
let span_ref = current_cx.span();
let span_context = span_ref.span_context();
let trace_id = if span_context.is_valid() {
format!("{:032x}", span_context.trace_id())
} else {
String::new() // SpanBuilder will generate one
};
let parent_span_id = if span_context.is_valid() {
Some(format!("{:016x}", span_context.span_id()))
} else {
None
};
chat_history_updated = self
.execute_filter(&chat_history_updated, agent, request_headers)
.execute_filter(&chat_history_updated, agent, request_headers, trace_collector)
.await?;
let end_time = SystemTime::now();
@ -134,25 +209,16 @@ impl PipelineProcessor {
chat_history_updated.len()
);
// Build span with trace context
// Record span for this filter execution
if let Some(collector) = trace_collector {
let mut span_builder = SpanBuilder::new(format!("filter_execution: {}", agent_name))
.with_kind(SpanKind::Internal)
.with_start_time(start_time)
.with_end_time(end_time)
.with_attribute("filter_name", agent_name.to_string())
.with_attribute("tool_name", tool_name.to_string())
.with_attribute("duration_ms", format!("{:.2}", elapsed.as_secs_f64() * 1000.0));
if !trace_id.is_empty() {
span_builder = span_builder.with_trace_id(trace_id);
}
if let Some(parent_id) = parent_span_id {
span_builder = span_builder.with_parent_span_id(parent_id);
}
let span = span_builder.build();
collector.record_span("brightstaff", span);
self.record_filter_span(
collector,
agent_name,
tool_name,
start_time,
end_time,
elapsed,
);
}
}
@ -293,12 +359,13 @@ impl PipelineProcessor {
messages: &[Message],
agent: &Agent,
request_headers: &HeaderMap,
trace_collector: Option<&std::sync::Arc<common::traces::TraceCollector>>,
) -> Result<Vec<Message>, PipelineError> {
// Get or create MCP session
let mcp_session_id = if let Some(session_id) = self.agent_id_session_map.get(&agent.id) {
session_id.clone()
} else {
let session_id = self.get_new_session_id(&agent.id).await;
let session_id = self.get_new_session_id(&agent.id, trace_collector).await;
self.agent_id_session_map
.insert(agent.id.clone(), session_id.clone());
session_id
@ -317,11 +384,36 @@ impl PipelineProcessor {
Some(&mcp_session_id),
)?;
// Send request
// Send request with tracing
let start_time = SystemTime::now();
let start_instant = Instant::now();
let response = self.send_mcp_request(&json_rpc_request, agent_headers, &agent.id).await?;
let http_status = response.status();
let response_bytes = response.bytes().await?;
let end_time = SystemTime::now();
let elapsed = start_instant.elapsed();
// Record MCP tool call span
if let Some(collector) = trace_collector {
let mut attrs = HashMap::new();
attrs.insert("mcp.method", "tools/call".to_string());
attrs.insert("mcp.tool_name", tool_name.to_string());
attrs.insert("mcp.session_id", mcp_session_id.clone());
attrs.insert("http.status_code", http_status.as_u16().to_string());
self.record_mcp_span(
collector,
"tool_call",
&agent.id,
start_time,
end_time,
elapsed,
Some(attrs),
);
}
// Handle HTTP errors
if !http_status.is_success() {
let error_body = String::from_utf8_lossy(&response_bytes).to_string();
@ -410,7 +502,12 @@ impl PipelineProcessor {
}
/// Send initialized notification after session creation
async fn send_initialized_notification(&self, agent_id: &str, session_id: &str) -> Result<(), PipelineError> {
async fn send_initialized_notification(
&self,
agent_id: &str,
session_id: &str,
trace_collector: Option<&std::sync::Arc<common::traces::TraceCollector>>,
) -> Result<(), PipelineError> {
let initialized_notification = JsonRpcNotification {
jsonrpc: "2.0".to_string(),
method: "notifications/initialized".to_string(),
@ -422,6 +519,9 @@ impl PipelineProcessor {
let headers = self.build_mcp_headers(&HeaderMap::new(), agent_id, Some(session_id))?;
let start_time = SystemTime::now();
let start_instant = Instant::now();
let response = self
.client
.post(format!("{}/mcp", self.url))
@ -430,13 +530,42 @@ impl PipelineProcessor {
.send()
.await?;
let end_time = SystemTime::now();
let elapsed = start_instant.elapsed();
info!("Initialized notification response status: {}", response.status());
// Record MCP notification span
if let Some(collector) = trace_collector {
let mut attrs = HashMap::new();
attrs.insert("mcp.method", "notifications/initialized".to_string());
attrs.insert("mcp.session_id", session_id.to_string());
attrs.insert("http.status_code", response.status().as_u16().to_string());
self.record_mcp_span(
collector,
"notification",
agent_id,
start_time,
end_time,
elapsed,
Some(attrs),
);
}
Ok(())
}
async fn get_new_session_id(&self, agent_id: &str) -> String {
async fn get_new_session_id(
&self,
agent_id: &str,
trace_collector: Option<&std::sync::Arc<common::traces::TraceCollector>>,
) -> String {
info!("Initializing MCP session for agent {}", agent_id);
let start_time = SystemTime::now();
let start_instant = Instant::now();
let initialize_request = self.build_initialize_request();
let headers = self.build_mcp_headers(&HeaderMap::new(), agent_id, None)
.expect("Failed to build headers for initialization");
@ -457,8 +586,29 @@ impl PipelineProcessor {
info!("Created new MCP session for agent {}: {}", agent_id, session_id);
let end_time = SystemTime::now();
let elapsed = start_instant.elapsed();
// Record MCP session initialization span
if let Some(collector) = trace_collector {
let mut attrs = HashMap::new();
attrs.insert("mcp.method", "initialize".to_string());
attrs.insert("mcp.session_id", session_id.clone());
attrs.insert("mcp.protocol_version", "2024-11-05".to_string());
self.record_mcp_span(
collector,
"session_init",
agent_id,
start_time,
end_time,
elapsed,
Some(attrs),
);
}
// Send initialized notification
self.send_initialized_notification(agent_id, &session_id)
self.send_initialized_notification(agent_id, &session_id, trace_collector)
.await
.expect("Failed to send initialized notification");
@ -582,7 +732,7 @@ mod tests {
let request_headers = HeaderMap::new();
let result = processor
.execute_filter(&messages, &agent, &request_headers)
.execute_filter(&messages, &agent, &request_headers, None)
.await;
match result {
@ -621,7 +771,7 @@ mod tests {
let request_headers = HeaderMap::new();
let result = processor
.execute_filter(&messages, &agent, &request_headers)
.execute_filter(&messages, &agent, &request_headers, None)
.await;
match result {
@ -673,7 +823,7 @@ mod tests {
let request_headers = HeaderMap::new();
let result = processor
.execute_filter(&messages, &agent, &request_headers)
.execute_filter(&messages, &agent, &request_headers, None)
.await;
match result {