diff --git a/config/envoy.template.yaml b/config/envoy.template.yaml index 308a35b5..6fa623ac 100644 --- a/config/envoy.template.yaml +++ b/config/envoy.template.yaml @@ -390,22 +390,6 @@ static_resources: - name: envoy.filters.network.http_connection_manager typed_config: "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager - {% if "random_sampling" in arch_tracing and arch_tracing["random_sampling"] > 0 %} - generate_request_id: true - tracing: - provider: - name: envoy.tracers.opentelemetry - typed_config: - "@type": type.googleapis.com/envoy.config.trace.v3.OpenTelemetryConfig - grpc_service: - envoy_grpc: - cluster_name: opentelemetry_collector - timeout: 0.250s - service_name: plano(outbound) - random_sampling: - value: {{ arch_tracing.random_sampling }} - operation: "%REQ(:METHOD)% %REQ(X-ENVOY-ORIGINAL-PATH?:PATH)%" - {% endif %} stat_prefix: egress_traffic codec_type: AUTO scheme_header_transformation: @@ -483,6 +467,22 @@ static_resources: - name: envoy.filters.network.http_connection_manager typed_config: "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + {% if "random_sampling" in arch_tracing and arch_tracing["random_sampling"] > 0 %} + generate_request_id: true + tracing: + provider: + name: envoy.tracers.opentelemetry + typed_config: + "@type": type.googleapis.com/envoy.config.trace.v3.OpenTelemetryConfig + grpc_service: + envoy_grpc: + cluster_name: opentelemetry_collector + timeout: 0.250s + service_name: plano(outbound) + random_sampling: + value: {{ arch_tracing.random_sampling }} + operation: "%REQ(:METHOD)% %REQ(:AUTHORITY)%%REQ(X-ENVOY-ORIGINAL-PATH?:PATH)%" + {% endif %} stat_prefix: egress_traffic codec_type: AUTO scheme_header_transformation: diff --git a/crates/brightstaff/src/handlers/agent_chat_completions.rs b/crates/brightstaff/src/handlers/agent_chat_completions.rs index 99a1f2e9..adfdce02 100644 --- a/crates/brightstaff/src/handlers/agent_chat_completions.rs +++ b/crates/brightstaff/src/handlers/agent_chat_completions.rs @@ -1,7 +1,7 @@ use std::sync::Arc; +use std::time::Instant; use bytes::Bytes; -use common::consts::TRACE_PARENT_HEADER; use hermesllm::apis::OpenAIMessage; use hermesllm::clients::SupportedAPIsFromClient; use hermesllm::providers::request::ProviderRequest; @@ -234,12 +234,6 @@ async fn handle_agent_chat_inner( let message: Vec = client_request.get_messages(); - // Extract trace parent for routing - let traceparent = request_headers - .iter() - .find(|(key, _)| key.as_str() == TRACE_PARENT_HEADER) - .map(|(_, value)| value.to_str().unwrap_or_default().to_string()); - let request_id = request_headers .get(common::consts::REQUEST_ID_HEADER) .and_then(|val| val.to_str().ok()) @@ -253,10 +247,36 @@ async fn handle_agent_chat_inner( }; // Select appropriate agents using arch orchestrator llm model + let selection_start = Instant::now(); let selected_agents = agent_selector - .select_agents(&message, &listener, traceparent.clone(), request_id.clone()) + .select_agents(&message, &listener, request_id.clone()) .await?; + // Record selection attributes on the current orchestrator span + let selection_elapsed_ms = selection_start.elapsed().as_secs_f64() * 1000.0; + get_active_span(|span| { + span.set_attribute(opentelemetry::KeyValue::new( + "selection.listener", + listener.name.clone(), + )); + span.set_attribute(opentelemetry::KeyValue::new( + "selection.agent_count", + selected_agents.len() as i64, + )); + span.set_attribute(opentelemetry::KeyValue::new( + "selection.agents", + selected_agents + .iter() + .map(|a| a.id.as_str()) + .collect::>() + .join(","), + )); + span.set_attribute(opentelemetry::KeyValue::new( + "selection.determination_ms", + format!("{:.2}", selection_elapsed_ms), + )); + }); + info!( count = selected_agents.len(), "selected agents for execution" @@ -267,18 +287,17 @@ async fn handle_agent_chat_inner( let agent_count = selected_agents.len(); for (agent_index, selected_agent) in selected_agents.iter().enumerate() { + // Get agent name + let agent_name = selected_agent.id.clone(); let is_last_agent = agent_index == agent_count - 1; debug!( agent_index = agent_index + 1, total = agent_count, - agent = %selected_agent.id, + agent = %agent_name, "processing agent" ); - // Get agent name - let agent_name = selected_agent.id.clone(); - // Process the filter chain let chat_history = pipeline_processor .process_filter_chain( @@ -294,14 +313,29 @@ async fn handle_agent_chat_inner( debug!(agent = %agent_name, "invoking agent"); - let llm_response = pipeline_processor - .invoke_agent( - &chat_history, - client_request.clone(), - agent, - &request_headers, - ) - .await?; + let agent_span = info_span!( + "agent", + agent_id = %agent_name, + message_count = chat_history.len(), + ); + + let llm_response = async { + set_service_name(operation_component::AGENT); + get_active_span(|span| { + span.update_name(format!("{} /v1/chat/completions", agent_name)); + }); + + pipeline_processor + .invoke_agent( + &chat_history, + client_request.clone(), + agent, + &request_headers, + ) + .await + } + .instrument(agent_span.clone()) + .await?; // If this is the last agent, return the streaming response if is_last_agent { @@ -309,15 +343,28 @@ async fn handle_agent_chat_inner( agent = %agent_name, "completed agent chain, returning response" ); - return response_handler - .create_streaming_response(llm_response) - .await - .map_err(AgentFilterChainError::from); + // Capture the orchestrator span (parent of the agent span) so it + // stays open for the full streaming duration alongside the agent span. + let orchestrator_span = tracing::Span::current(); + return async { + response_handler + .create_streaming_response( + llm_response, + tracing::Span::current(), // agent span (inner) + orchestrator_span, // orchestrator span (outer) + ) + .await + .map_err(AgentFilterChainError::from) + } + .instrument(agent_span) + .await; } // For intermediate agents, collect the full response and pass to next agent debug!(agent = %agent_name, "collecting response from intermediate agent"); - let response_text = response_handler.collect_full_response(llm_response).await?; + let response_text = async { response_handler.collect_full_response(llm_response).await } + .instrument(agent_span) + .await?; info!( agent = %agent_name, diff --git a/crates/brightstaff/src/handlers/agent_selector.rs b/crates/brightstaff/src/handlers/agent_selector.rs index e257134a..faa734ee 100644 --- a/crates/brightstaff/src/handlers/agent_selector.rs +++ b/crates/brightstaff/src/handlers/agent_selector.rs @@ -108,7 +108,6 @@ impl AgentSelector { &self, messages: &[Message], listener: &Listener, - trace_parent: Option, request_id: Option, ) -> Result, AgentSelectionError> { let agents = listener @@ -132,7 +131,7 @@ impl AgentSelector { match self .orchestrator_service - .determine_orchestration(messages, trace_parent, Some(usage_preferences), request_id) + .determine_orchestration(messages, Some(usage_preferences), request_id) .await { Ok(Some(routes)) => { diff --git a/crates/brightstaff/src/handlers/llm.rs b/crates/brightstaff/src/handlers/llm.rs index 12456af1..7278c9fd 100644 --- a/crates/brightstaff/src/handlers/llm.rs +++ b/crates/brightstaff/src/handlers/llm.rs @@ -12,6 +12,7 @@ use http_body_util::{BodyExt, Full}; use hyper::header::{self}; use hyper::{Request, Response, StatusCode}; use opentelemetry::trace::get_active_span; +use opentelemetry::{global, propagation::Injector}; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::RwLock; @@ -34,6 +35,19 @@ fn full>(chunk: T) -> BoxBody { .boxed() } +/// Adapter to inject OpenTelemetry trace context into Hyper HeaderMap +struct HeaderMapInjector<'a>(&'a mut header::HeaderMap); + +impl<'a> Injector for HeaderMapInjector<'a> { + fn set(&mut self, key: &str, value: String) { + if let Ok(name) = header::HeaderName::from_bytes(key.as_bytes()) { + if let Ok(val) = header::HeaderValue::from_str(&value) { + self.0.insert(name, val); + } + } + } +} + pub async fn llm_chat( request: Request, router_service: Arc, @@ -60,9 +74,6 @@ pub async fn llm_chat( request_id = %request_id, http.method = %request.method(), http.path = %request_path, - model.requested = tracing::field::Empty, - model.alias_resolved = tracing::field::Empty, - model.routing_resolved = tracing::field::Empty ); // Execute the rest of the handler inside the span @@ -154,10 +165,6 @@ async fn llm_chat_inner( let is_streaming_request = client_request.is_streaming(); let alias_resolved_model = resolve_model_alias(&model_from_request, &model_aliases); - // Record model information in span - tracing::Span::current().record("model.requested", model_from_request.as_str()); - tracing::Span::current().record("model.alias_resolved", alias_resolved_model.as_str()); - // Validate that the requested model exists in configuration // This matches the validation in llm_gateway routing.rs if llm_providers @@ -191,7 +198,7 @@ async fn llm_chat_inner( .map(|msg| truncate_message(&msg, 50)); // Extract messages for signal analysis (clone before moving client_request) - let messages_for_signals = client_request.get_messages(); + let messages_for_signals = Some(client_request.get_messages()); // Set the model to just the model name (without provider prefix) // This ensures upstream receives "gpt-4" not "openai/gpt-4" @@ -283,13 +290,29 @@ async fn llm_chat_inner( let client_request_bytes_for_upstream = ProviderRequestType::to_bytes(&client_request).unwrap(); // Determine routing using the dedicated router_chat module - let routing_result = match router_chat_get_upstream_model( - router_service, - client_request, // Pass the original request - router_chat will convert it - &traceparent, - &request_path, - &request_id, - ) + // This gets its own span for latency and error tracking + let routing_span = info_span!( + "routing", + component = "routing", + http.method = "POST", + http.target = %request_path, + model.requested = %model_from_request, + model.alias_resolved = %alias_resolved_model, + route.selected_model = tracing::field::Empty, + routing.determination_ms = tracing::field::Empty, + ); + let routing_result = match async { + set_service_name(operation_component::ROUTING); + router_chat_get_upstream_model( + router_service, + client_request, // Pass the original request - router_chat will convert it + &traceparent, + &request_path, + &request_id, + ) + .await + } + .instrument(routing_span) .await { Ok(result) => result, @@ -311,9 +334,6 @@ async fn llm_chat_inner( alias_resolved_model.clone() }; - // Record the routed model in span - tracing::Span::current().record("model.routing_resolved", resolved_model.as_str()); - let span_name = if model_from_request == resolved_model { format!("POST {} {}", request_path, resolved_model) } else { @@ -345,12 +365,18 @@ async fn llm_chat_inner( // remove content-length header if it exists request_headers.remove(header::CONTENT_LENGTH); + // Inject current LLM span's trace context so upstream spans are children of plano(llm) + global::get_text_map_propagator(|propagator| { + let cx = tracing_opentelemetry::OpenTelemetrySpanExt::context(&tracing::Span::current()); + propagator.inject_context(&cx, &mut HeaderMapInjector(&mut request_headers)); + }); + // Capture start time right before sending request to upstream let request_start_time = std::time::Instant::now(); let _request_start_system_time = std::time::SystemTime::now(); let llm_response = match reqwest::Client::new() - .post(full_qualified_llm_provider_url) + .post(&full_qualified_llm_provider_url) .headers(request_headers) .body(client_request_bytes_for_upstream) .send() @@ -382,7 +408,7 @@ async fn llm_chat_inner( operation_component::LLM, span_name, request_start_time, - Some(messages_for_signals), + messages_for_signals, ); // === v1/responses state management: Wrap with ResponsesStateProcessor === diff --git a/crates/brightstaff/src/handlers/pipeline_processor.rs b/crates/brightstaff/src/handlers/pipeline_processor.rs index a83bf4cb..a68776da 100644 --- a/crates/brightstaff/src/handlers/pipeline_processor.rs +++ b/crates/brightstaff/src/handlers/pipeline_processor.rs @@ -9,7 +9,6 @@ use hermesllm::{ProviderRequest, ProviderRequestType}; use hyper::header::HeaderMap; use opentelemetry::global; use opentelemetry::propagation::Injector; -use opentelemetry::trace::get_active_span; use tracing::{debug, info, instrument, warn}; use crate::handlers::jsonrpc::{ @@ -620,13 +619,8 @@ impl PipelineProcessor { } /// Send request to terminal agent and return the raw response for streaming - #[instrument( - skip(self, messages, original_request, terminal_agent, request_headers), - fields( - agent_id = %terminal_agent.id, - message_count = messages.len() - ) - )] + /// Note: The caller is responsible for creating the plano(agent) span that wraps + /// both this call and the subsequent response consumption. pub async fn invoke_agent( &self, messages: &[Message], @@ -634,18 +628,11 @@ impl PipelineProcessor { terminal_agent: &Agent, request_headers: &HeaderMap, ) -> Result { - // Set service name for agent invocation span - set_service_name(operation_component::AGENT); - // let mut request = original_request.clone(); original_request.set_messages(messages); let request_url = "/v1/chat/completions"; - get_active_span(|span| { - span.update_name(format!("{} {}", terminal_agent.id, request_url)); - }); - let request_body = ProviderRequestType::to_bytes(&original_request).unwrap(); // let request_body = serde_json::to_string(&request)?; debug!("sending request to terminal agent {}", terminal_agent.id); diff --git a/crates/brightstaff/src/handlers/response_handler.rs b/crates/brightstaff/src/handlers/response_handler.rs index 422dd8da..e2561a8f 100644 --- a/crates/brightstaff/src/handlers/response_handler.rs +++ b/crates/brightstaff/src/handlers/response_handler.rs @@ -9,7 +9,7 @@ use hyper::{Response, StatusCode}; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::StreamExt; -use tracing::{info, warn}; +use tracing::{info, warn, Instrument}; /// Errors that can occur during response handling #[derive(Debug, thiserror::Error)] @@ -69,10 +69,14 @@ impl ResponseHandler { response } - /// Create a streaming response from a reqwest response + /// Create a streaming response from a reqwest response. + /// The spawned streaming task is instrumented with both `agent_span` and `orchestrator_span` + /// so their durations reflect the actual time spent streaming to the client. pub async fn create_streaming_response( &self, llm_response: reqwest::Response, + agent_span: tracing::Span, + orchestrator_span: tracing::Span, ) -> Result>, ResponseError> { // Copy headers from the original response let response_headers = llm_response.headers(); @@ -89,25 +93,30 @@ impl ResponseHandler { // Create channel for async streaming let (tx, rx) = mpsc::channel::(16); - // Spawn task to stream data - tokio::spawn(async move { - let mut byte_stream = llm_response.bytes_stream(); + // Spawn streaming task instrumented with both spans (nested) so both + // remain entered for the full streaming duration. + tokio::spawn( + async move { + let mut byte_stream = llm_response.bytes_stream(); - while let Some(item) = byte_stream.next().await { - let chunk = match item { - Ok(chunk) => chunk, - Err(err) => { - warn!(error = ?err, "error receiving chunk"); + while let Some(item) = byte_stream.next().await { + let chunk = match item { + Ok(chunk) => chunk, + Err(err) => { + warn!(error = ?err, "error receiving chunk"); + break; + } + }; + + if tx.send(chunk).await.is_err() { + warn!("receiver dropped"); break; } - }; - - if tx.send(chunk).await.is_err() { - warn!("receiver dropped"); - break; } } - }); + .instrument(agent_span) + .instrument(orchestrator_span), + ); let stream = ReceiverStream::new(rx).map(|chunk| Ok::<_, hyper::Error>(Frame::data(chunk))); let stream_body = BoxBody::new(StreamBody::new(stream)); @@ -248,7 +257,13 @@ mod tests { let llm_response = client.get(&(server.url() + "/test")).send().await.unwrap(); let handler = ResponseHandler::new(); - let result = handler.create_streaming_response(llm_response).await; + let result = handler + .create_streaming_response( + llm_response, + tracing::Span::current(), + tracing::Span::current(), + ) + .await; mock.assert_async().await; assert!(result.is_ok()); diff --git a/crates/brightstaff/src/handlers/router_chat.rs b/crates/brightstaff/src/handlers/router_chat.rs index 370a4f63..01c67a94 100644 --- a/crates/brightstaff/src/handlers/router_chat.rs +++ b/crates/brightstaff/src/handlers/router_chat.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use tracing::{debug, info, warn}; use crate::router::llm_router::RouterService; +use crate::tracing::routing; pub struct RoutingResult { pub model_name: String, @@ -114,8 +115,7 @@ pub async fn router_chat_get_upstream_model( ); // Capture start time for routing span - let _routing_start_time = std::time::Instant::now(); - let _routing_start_system_time = std::time::SystemTime::now(); + let routing_start_time = std::time::Instant::now(); // Attempt to determine route using the router service let routing_result = router_service @@ -127,12 +127,20 @@ pub async fn router_chat_get_upstream_model( ) .await; + let determination_ms = routing_start_time.elapsed().as_millis() as i64; + let current_span = tracing::Span::current(); + current_span.record(routing::ROUTE_DETERMINATION_MS, determination_ms); + match routing_result { Ok(route) => match route { - Some((_, model_name)) => Ok(RoutingResult { model_name }), + Some((_, model_name)) => { + current_span.record("route.selected_model", model_name.as_str()); + Ok(RoutingResult { model_name }) + } None => { // No route determined, return sentinel value "none" // This signals to llm.rs to use the original validated request model + current_span.record("route.selected_model", "none"); info!("no route determined, using default model"); Ok(RoutingResult { @@ -140,9 +148,12 @@ pub async fn router_chat_get_upstream_model( }) } }, - Err(err) => Err(RoutingError::internal_error(format!( - "Failed to determine route: {}", - err - ))), + Err(err) => { + current_span.record("route.selected_model", "unknown"); + Err(RoutingError::internal_error(format!( + "Failed to determine route: {}", + err + ))) + } } } diff --git a/crates/brightstaff/src/handlers/utils.rs b/crates/brightstaff/src/handlers/utils.rs index 5d5ced8a..bff87ad4 100644 --- a/crates/brightstaff/src/handlers/utils.rs +++ b/crates/brightstaff/src/handlers/utils.rs @@ -12,7 +12,7 @@ use tracing::{info, warn, Instrument}; use tracing_opentelemetry::OpenTelemetrySpanExt; use crate::signals::{InteractionQuality, SignalAnalyzer, TextBasedSignalAnalyzer, FLAG_MARKER}; -use crate::tracing::{set_service_name, signals as signal_constants}; +use crate::tracing::{llm, set_service_name, signals as signal_constants}; use hermesllm::apis::openai::Message; /// Trait for processing streaming chunks @@ -92,6 +92,18 @@ impl StreamProcessor for ObservableStreamProcessor { } fn on_complete(&mut self) { + // Record time-to-first-token as an OTel span attribute + event (streaming only) + if let Some(ttft) = self.time_to_first_token { + let span = tracing::Span::current(); + let otel_context = span.context(); + let otel_span = otel_context.span(); + otel_span.set_attribute(KeyValue::new(llm::TIME_TO_FIRST_TOKEN_MS, ttft as i64)); + otel_span.add_event( + llm::TIME_TO_FIRST_TOKEN_MS, + vec![KeyValue::new(llm::TIME_TO_FIRST_TOKEN_MS, ttft as i64)], + ); + } + // Analyze signals if messages are available and record as span attributes if let Some(ref messages) = self.messages { let analyzer: Box = Box::new(TextBasedSignalAnalyzer::new()); diff --git a/crates/brightstaff/src/router/plano_orchestrator.rs b/crates/brightstaff/src/router/plano_orchestrator.rs index bd26f0f7..bca6a5a0 100644 --- a/crates/brightstaff/src/router/plano_orchestrator.rs +++ b/crates/brightstaff/src/router/plano_orchestrator.rs @@ -2,13 +2,11 @@ use std::{collections::HashMap, sync::Arc}; use common::{ configuration::{AgentUsagePreference, OrchestrationPreference}, - consts::{ - ARCH_PROVIDER_HINT_HEADER, PLANO_ORCHESTRATOR_MODEL_NAME, REQUEST_ID_HEADER, - TRACE_PARENT_HEADER, - }, + consts::{ARCH_PROVIDER_HINT_HEADER, PLANO_ORCHESTRATOR_MODEL_NAME, REQUEST_ID_HEADER}, }; use hermesllm::apis::openai::{ChatCompletionsResponse, Message}; use hyper::header; +use opentelemetry::{global, propagation::Injector}; use thiserror::Error; use tracing::{debug, info, warn}; @@ -16,6 +14,19 @@ use crate::router::orchestrator_model_v1::{self}; use super::orchestrator_model::OrchestratorModel; +/// Adapter to inject OpenTelemetry trace context into Hyper HeaderMap +struct HeaderMapInjector<'a>(&'a mut header::HeaderMap); + +impl<'a> Injector for HeaderMapInjector<'a> { + fn set(&mut self, key: &str, value: String) { + if let Ok(header_name) = header::HeaderName::from_bytes(key.as_bytes()) { + if let Ok(header_value) = header::HeaderValue::from_str(&value) { + self.0.insert(header_name, header_value); + } + } + } +} + pub struct OrchestratorService { orchestrator_url: String, client: reqwest::Client, @@ -57,7 +68,6 @@ impl OrchestratorService { pub async fn determine_orchestration( &self, messages: &[Message], - trace_parent: Option, usage_preferences: Option>, request_id: Option, ) -> Result>> { @@ -96,12 +106,15 @@ impl OrchestratorService { header::HeaderValue::from_str(PLANO_ORCHESTRATOR_MODEL_NAME).unwrap(), ); - if let Some(trace_parent) = trace_parent { - orchestration_request_headers.insert( - header::HeaderName::from_static(TRACE_PARENT_HEADER), - header::HeaderValue::from_str(&trace_parent).unwrap(), + // Inject OpenTelemetry trace context from current span + global::get_text_map_propagator(|propagator| { + let cx = + tracing_opentelemetry::OpenTelemetrySpanExt::context(&tracing::Span::current()); + propagator.inject_context( + &cx, + &mut HeaderMapInjector(&mut orchestration_request_headers), ); - } + }); if let Some(request_id) = request_id { orchestration_request_headers.insert( diff --git a/crates/brightstaff/src/tracing/mod.rs b/crates/brightstaff/src/tracing/mod.rs index 930f6ba0..7332170c 100644 --- a/crates/brightstaff/src/tracing/mod.rs +++ b/crates/brightstaff/src/tracing/mod.rs @@ -6,13 +6,12 @@ pub use constants::{ }; pub use service_name_exporter::{ServiceNameOverrideExporter, SERVICE_NAME_OVERRIDE_KEY}; -use opentelemetry::trace::TraceContextExt; +use opentelemetry::trace::get_active_span; use opentelemetry::KeyValue; -use tracing_opentelemetry::OpenTelemetrySpanExt; -/// Sets the service name override on the current tracing span. +/// Sets the service name override on the current active OpenTelemetry span. /// -/// This function adds the `service.name.override` attribute to the underlying +/// This function adds the `service.name.override` attribute to the active /// OpenTelemetry span, which allows observability backends to filter and group /// spans by their logical service (e.g., `plano(llm)`, `plano(filter)`). /// @@ -27,28 +26,10 @@ use tracing_opentelemetry::OpenTelemetrySpanExt; /// set_service_name(operation_component::LLM); /// ``` pub fn set_service_name(service_name: &str) { - let span = tracing::Span::current(); - let otel_context = span.context(); - let otel_span = otel_context.span(); - otel_span.set_attribute(KeyValue::new( - SERVICE_NAME_OVERRIDE_KEY, - service_name.to_string(), - )); -} - -/// Sets the service name override on the given tracing span. -/// -/// Use this when you have a specific span reference and want to set -/// the service name override attribute on it. -/// -/// # Arguments -/// * `span` - The tracing span to set the service name on -/// * `service_name` - The service name to use (e.g., `operation_component::LLM`) -pub fn set_service_name_on_span(span: &tracing::Span, service_name: &str) { - let otel_context = span.context(); - let otel_span = otel_context.span(); - otel_span.set_attribute(KeyValue::new( - SERVICE_NAME_OVERRIDE_KEY, - service_name.to_string(), - )); + get_active_span(|span| { + span.set_attribute(KeyValue::new( + SERVICE_NAME_OVERRIDE_KEY, + service_name.to_string(), + )); + }); } diff --git a/crates/brightstaff/src/tracing/service_name_exporter.rs b/crates/brightstaff/src/tracing/service_name_exporter.rs index dc045c47..d0bd8199 100644 --- a/crates/brightstaff/src/tracing/service_name_exporter.rs +++ b/crates/brightstaff/src/tracing/service_name_exporter.rs @@ -47,12 +47,23 @@ const ALL_SERVICE_NAMES: &[&str] = &[ operation_component::INBOUND, operation_component::ROUTING, operation_component::ORCHESTRATOR, - operation_component::HANDOFF, operation_component::AGENT_FILTER, operation_component::AGENT, operation_component::LLM, ]; +/// Span attribute keys to remove before export. +const FILTERED_ATTR_KEYS: &[&str] = &[ + "busy_ns", + "idle_ns", + "thread.id", + "thread.name", + "code.file.path", + "code.line.number", + "code.module.name", + "target", +]; + /// A SpanExporter that supports per-span `service.name` overrides. /// /// Internally it holds one OTLP exporter per known service name. Each exporter @@ -114,6 +125,11 @@ impl SpanExporter for ServiceNameOverrideExporter { let mut spans_by_service: HashMap> = HashMap::new(); for span in batch { + let mut span = span; + + span.attributes + .retain(|kv| !FILTERED_ATTR_KEYS.contains(&kv.key.as_str())); + let service_name = span .attributes .iter()