From ac73806082a2d394dd992ebe8319b046387f5e16 Mon Sep 17 00:00:00 2001 From: Adil Hafeez Date: Tue, 16 Dec 2025 13:44:13 -0800 Subject: [PATCH] refactor tracing --- .../src/handlers/pipeline_processor.rs | 234 ++++++++++++++---- 1 file changed, 192 insertions(+), 42 deletions(-) diff --git a/crates/brightstaff/src/handlers/pipeline_processor.rs b/crates/brightstaff/src/handlers/pipeline_processor.rs index 134863e9..3fe58801 100644 --- a/crates/brightstaff/src/handlers/pipeline_processor.rs +++ b/crates/brightstaff/src/handlers/pipeline_processor.rs @@ -74,6 +74,96 @@ impl PipelineProcessor { } } + /// Extract trace context from current OpenTelemetry context + fn extract_trace_context(&self) -> (String, Option) { + let current_cx = opentelemetry::Context::current(); + let span_ref = current_cx.span(); + let span_context = span_ref.span_context(); + + let trace_id = if span_context.is_valid() { + format!("{:032x}", span_context.trace_id()) + } else { + String::new() // SpanBuilder will generate one + }; + + let parent_span_id = if span_context.is_valid() { + Some(format!("{:016x}", span_context.span_id())) + } else { + None + }; + + (trace_id, parent_span_id) + } + + /// Record a span for filter execution + fn record_filter_span( + &self, + collector: &std::sync::Arc, + agent_name: &str, + tool_name: &str, + start_time: SystemTime, + end_time: SystemTime, + elapsed: std::time::Duration, + ) { + let (trace_id, parent_span_id) = self.extract_trace_context(); + + let mut span_builder = SpanBuilder::new(format!("filter_execution: {}", agent_name)) + .with_kind(SpanKind::Internal) + .with_start_time(start_time) + .with_end_time(end_time) + .with_attribute("filter_name", agent_name.to_string()) + .with_attribute("tool_name", tool_name.to_string()) + .with_attribute("duration_ms", format!("{:.2}", elapsed.as_secs_f64() * 1000.0)); + + if !trace_id.is_empty() { + span_builder = span_builder.with_trace_id(trace_id); + } + if let Some(parent_id) = parent_span_id { + span_builder = span_builder.with_parent_span_id(parent_id); + } + + let span = span_builder.build(); + collector.record_span("brightstaff", span); + } + + /// Record a span for MCP protocol interactions + fn record_mcp_span( + &self, + collector: &std::sync::Arc, + operation: &str, + agent_id: &str, + start_time: SystemTime, + end_time: SystemTime, + elapsed: std::time::Duration, + additional_attrs: Option>, + ) { + let (trace_id, parent_span_id) = self.extract_trace_context(); + + let mut span_builder = SpanBuilder::new(format!("mcp_{}", operation)) + .with_kind(SpanKind::Client) + .with_start_time(start_time) + .with_end_time(end_time) + .with_attribute("mcp.operation", operation.to_string()) + .with_attribute("mcp.agent_id", agent_id.to_string()) + .with_attribute("duration_ms", format!("{:.2}", elapsed.as_secs_f64() * 1000.0)); + + if let Some(attrs) = additional_attrs { + for (key, value) in attrs { + span_builder = span_builder.with_attribute(key, value); + } + } + + if !trace_id.is_empty() { + span_builder = span_builder.with_trace_id(trace_id); + } + if let Some(parent_id) = parent_span_id { + span_builder = span_builder.with_parent_span_id(parent_id); + } + + let span = span_builder.build(); + collector.record_span("brightstaff", span); + } + /// Process the filter chain of agents (all except the terminal agent) pub async fn process_filter_chain( &mut self, @@ -105,23 +195,8 @@ impl PipelineProcessor { let start_time = SystemTime::now(); let start_instant = Instant::now(); - // Extract trace context from OpenTelemetry - let current_cx = opentelemetry::Context::current(); - let span_ref = current_cx.span(); - let span_context = span_ref.span_context(); - let trace_id = if span_context.is_valid() { - format!("{:032x}", span_context.trace_id()) - } else { - String::new() // SpanBuilder will generate one - }; - let parent_span_id = if span_context.is_valid() { - Some(format!("{:016x}", span_context.span_id())) - } else { - None - }; - chat_history_updated = self - .execute_filter(&chat_history_updated, agent, request_headers) + .execute_filter(&chat_history_updated, agent, request_headers, trace_collector) .await?; let end_time = SystemTime::now(); @@ -134,25 +209,16 @@ impl PipelineProcessor { chat_history_updated.len() ); - // Build span with trace context + // Record span for this filter execution if let Some(collector) = trace_collector { - let mut span_builder = SpanBuilder::new(format!("filter_execution: {}", agent_name)) - .with_kind(SpanKind::Internal) - .with_start_time(start_time) - .with_end_time(end_time) - .with_attribute("filter_name", agent_name.to_string()) - .with_attribute("tool_name", tool_name.to_string()) - .with_attribute("duration_ms", format!("{:.2}", elapsed.as_secs_f64() * 1000.0)); - - if !trace_id.is_empty() { - span_builder = span_builder.with_trace_id(trace_id); - } - if let Some(parent_id) = parent_span_id { - span_builder = span_builder.with_parent_span_id(parent_id); - } - - let span = span_builder.build(); - collector.record_span("brightstaff", span); + self.record_filter_span( + collector, + agent_name, + tool_name, + start_time, + end_time, + elapsed, + ); } } @@ -293,12 +359,13 @@ impl PipelineProcessor { messages: &[Message], agent: &Agent, request_headers: &HeaderMap, + trace_collector: Option<&std::sync::Arc>, ) -> Result, PipelineError> { // Get or create MCP session let mcp_session_id = if let Some(session_id) = self.agent_id_session_map.get(&agent.id) { session_id.clone() } else { - let session_id = self.get_new_session_id(&agent.id).await; + let session_id = self.get_new_session_id(&agent.id, trace_collector).await; self.agent_id_session_map .insert(agent.id.clone(), session_id.clone()); session_id @@ -317,11 +384,36 @@ impl PipelineProcessor { Some(&mcp_session_id), )?; - // Send request + // Send request with tracing + let start_time = SystemTime::now(); + let start_instant = Instant::now(); + let response = self.send_mcp_request(&json_rpc_request, agent_headers, &agent.id).await?; let http_status = response.status(); let response_bytes = response.bytes().await?; + let end_time = SystemTime::now(); + let elapsed = start_instant.elapsed(); + + // Record MCP tool call span + if let Some(collector) = trace_collector { + let mut attrs = HashMap::new(); + attrs.insert("mcp.method", "tools/call".to_string()); + attrs.insert("mcp.tool_name", tool_name.to_string()); + attrs.insert("mcp.session_id", mcp_session_id.clone()); + attrs.insert("http.status_code", http_status.as_u16().to_string()); + + self.record_mcp_span( + collector, + "tool_call", + &agent.id, + start_time, + end_time, + elapsed, + Some(attrs), + ); + } + // Handle HTTP errors if !http_status.is_success() { let error_body = String::from_utf8_lossy(&response_bytes).to_string(); @@ -410,7 +502,12 @@ impl PipelineProcessor { } /// Send initialized notification after session creation - async fn send_initialized_notification(&self, agent_id: &str, session_id: &str) -> Result<(), PipelineError> { + async fn send_initialized_notification( + &self, + agent_id: &str, + session_id: &str, + trace_collector: Option<&std::sync::Arc>, + ) -> Result<(), PipelineError> { let initialized_notification = JsonRpcNotification { jsonrpc: "2.0".to_string(), method: "notifications/initialized".to_string(), @@ -422,6 +519,9 @@ impl PipelineProcessor { let headers = self.build_mcp_headers(&HeaderMap::new(), agent_id, Some(session_id))?; + let start_time = SystemTime::now(); + let start_instant = Instant::now(); + let response = self .client .post(format!("{}/mcp", self.url)) @@ -430,13 +530,42 @@ impl PipelineProcessor { .send() .await?; + let end_time = SystemTime::now(); + let elapsed = start_instant.elapsed(); + info!("Initialized notification response status: {}", response.status()); + + // Record MCP notification span + if let Some(collector) = trace_collector { + let mut attrs = HashMap::new(); + attrs.insert("mcp.method", "notifications/initialized".to_string()); + attrs.insert("mcp.session_id", session_id.to_string()); + attrs.insert("http.status_code", response.status().as_u16().to_string()); + + self.record_mcp_span( + collector, + "notification", + agent_id, + start_time, + end_time, + elapsed, + Some(attrs), + ); + } + Ok(()) } - async fn get_new_session_id(&self, agent_id: &str) -> String { + async fn get_new_session_id( + &self, + agent_id: &str, + trace_collector: Option<&std::sync::Arc>, + ) -> String { info!("Initializing MCP session for agent {}", agent_id); + let start_time = SystemTime::now(); + let start_instant = Instant::now(); + let initialize_request = self.build_initialize_request(); let headers = self.build_mcp_headers(&HeaderMap::new(), agent_id, None) .expect("Failed to build headers for initialization"); @@ -457,8 +586,29 @@ impl PipelineProcessor { info!("Created new MCP session for agent {}: {}", agent_id, session_id); + let end_time = SystemTime::now(); + let elapsed = start_instant.elapsed(); + + // Record MCP session initialization span + if let Some(collector) = trace_collector { + let mut attrs = HashMap::new(); + attrs.insert("mcp.method", "initialize".to_string()); + attrs.insert("mcp.session_id", session_id.clone()); + attrs.insert("mcp.protocol_version", "2024-11-05".to_string()); + + self.record_mcp_span( + collector, + "session_init", + agent_id, + start_time, + end_time, + elapsed, + Some(attrs), + ); + } + // Send initialized notification - self.send_initialized_notification(agent_id, &session_id) + self.send_initialized_notification(agent_id, &session_id, trace_collector) .await .expect("Failed to send initialized notification"); @@ -582,7 +732,7 @@ mod tests { let request_headers = HeaderMap::new(); let result = processor - .execute_filter(&messages, &agent, &request_headers) + .execute_filter(&messages, &agent, &request_headers, None) .await; match result { @@ -621,7 +771,7 @@ mod tests { let request_headers = HeaderMap::new(); let result = processor - .execute_filter(&messages, &agent, &request_headers) + .execute_filter(&messages, &agent, &request_headers, None) .await; match result { @@ -673,7 +823,7 @@ mod tests { let request_headers = HeaderMap::new(); let result = processor - .execute_filter(&messages, &agent, &request_headers) + .execute_filter(&messages, &agent, &request_headers, None) .await; match result {