From 4d52acf60c757b003542a9c034abde36d73a005b Mon Sep 17 00:00:00 2001 From: Adil Hafeez Date: Wed, 17 Dec 2025 16:26:17 -0800 Subject: [PATCH] fix span --- arch/envoy.template.yaml | 32 +-- .../src/handlers/agent_chat_completions.rs | 29 +- .../src/handlers/integration_tests.rs | 2 + .../src/handlers/pipeline_processor.rs | 269 ++++++++++-------- crates/brightstaff/src/tracing/constants.rs | 2 +- crates/common/src/traces/mod.rs | 2 +- crates/common/src/traces/span_builder.rs | 11 +- 7 files changed, 198 insertions(+), 149 deletions(-) diff --git a/arch/envoy.template.yaml b/arch/envoy.template.yaml index 3d618faf..4ead29c2 100644 --- a/arch/envoy.template.yaml +++ b/arch/envoy.template.yaml @@ -214,21 +214,21 @@ static_resources: - name: envoy.filters.network.http_connection_manager typed_config: "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager - {% if "random_sampling" in arch_tracing and arch_tracing["random_sampling"] > 0 %} - generate_request_id: true - tracing: - provider: - name: envoy.tracers.opentelemetry - typed_config: - "@type": type.googleapis.com/envoy.config.trace.v3.OpenTelemetryConfig - grpc_service: - envoy_grpc: - cluster_name: opentelemetry_collector - timeout: 0.250s - service_name: tools - random_sampling: - value: {{ arch_tracing.random_sampling }} - {% endif %} + # {% if "random_sampling" in arch_tracing and arch_tracing["random_sampling"] > 0 %} + # generate_request_id: true + # tracing: + # provider: + # name: envoy.tracers.opentelemetry + # typed_config: + # "@type": type.googleapis.com/envoy.config.trace.v3.OpenTelemetryConfig + # grpc_service: + # envoy_grpc: + # cluster_name: opentelemetry_collector + # timeout: 0.250s + # service_name: tools + # random_sampling: + # value: {{ arch_tracing.random_sampling }} + # {% endif %} stat_prefix: outbound_api_traffic codec_type: AUTO scheme_header_transformation: @@ -299,7 +299,7 @@ static_resources: envoy_grpc: cluster_name: opentelemetry_collector timeout: 0.250s - service_name: arch_gateway + service_name: plano(inbound) random_sampling: value: {{ arch_tracing.random_sampling }} {% endif %} diff --git a/crates/brightstaff/src/handlers/agent_chat_completions.rs b/crates/brightstaff/src/handlers/agent_chat_completions.rs index be82e952..c6675b06 100644 --- a/crates/brightstaff/src/handlers/agent_chat_completions.rs +++ b/crates/brightstaff/src/handlers/agent_chat_completions.rs @@ -3,7 +3,7 @@ use std::time::{Instant, SystemTime}; use bytes::Bytes; use common::consts::TRACE_PARENT_HEADER; -use common::traces::{SpanBuilder, SpanKind, parse_traceparent}; +use common::traces::{SpanBuilder, SpanKind, parse_traceparent, generate_random_span_id}; use hermesllm::apis::OpenAIMessage; use hermesllm::clients::SupportedAPIsFromClient; use hermesllm::providers::request::ProviderRequest; @@ -208,6 +208,13 @@ async fn handle_agent_chat( agent_selector.create_agent_map(agents) }; + // Parse trace parent to get trace_id and parent_span_id + let (trace_id, parent_span_id) = if let Some(ref tp) = trace_parent { + parse_traceparent(tp) + } else { + (String::new(), None) + }; + // Select appropriate agent using arch router llm model let selected_agent = agent_selector .select_agent(&message, &listener, trace_parent.clone()) @@ -218,6 +225,14 @@ async fn handle_agent_chat( // Record the start time for agent span let agent_start_time = SystemTime::now(); let agent_start_instant = Instant::now(); + // let (span_id, trace_id) = trace_collector.start_span( + // trace_parent.clone(), + // operation_component::AGENT, + // &format!("/agents{}", request_path), + // &selected_agent.id, + // ); + + let span_id = generate_random_span_id(); // Process the filter chain let chat_history = pipeline_processor @@ -227,6 +242,8 @@ async fn handle_agent_chat( &agent_map, &request_headers, Some(&trace_collector), + trace_id.clone(), + span_id.clone(), ) .await?; @@ -243,6 +260,8 @@ async fn handle_agent_chat( client_request, terminal_agent, &request_headers, + trace_id.clone(), + span_id.clone(), ) .await?; @@ -260,14 +279,8 @@ async fn handle_agent_chat( .with_target(&terminal_agent_name) .build(); - // Parse trace parent to get trace_id and parent_span_id - let (trace_id, parent_span_id) = if let Some(ref tp) = trace_parent { - parse_traceparent(tp) - } else { - (String::new(), None) - }; - let mut span_builder = SpanBuilder::new(&operation_name) + .with_span_id(span_id) .with_kind(SpanKind::Internal) .with_start_time(agent_start_time) .with_end_time(agent_end_time) diff --git a/crates/brightstaff/src/handlers/integration_tests.rs b/crates/brightstaff/src/handlers/integration_tests.rs index e5bbf488..42686796 100644 --- a/crates/brightstaff/src/handlers/integration_tests.rs +++ b/crates/brightstaff/src/handlers/integration_tests.rs @@ -117,6 +117,8 @@ mod integration_tests { &agent_map, &headers, None, + String::new(), + String::new(), ) .await; diff --git a/crates/brightstaff/src/handlers/pipeline_processor.rs b/crates/brightstaff/src/handlers/pipeline_processor.rs index ad2c3504..88e8238e 100644 --- a/crates/brightstaff/src/handlers/pipeline_processor.rs +++ b/crates/brightstaff/src/handlers/pipeline_processor.rs @@ -1,19 +1,23 @@ use std::collections::HashMap; use common::configuration::{Agent, AgentFilterChain}; -use common::consts::{ARCH_UPSTREAM_HOST_HEADER, BRIGHT_STAFF_SERVICE_NAME, ENVOY_RETRY_HEADER}; -use common::traces::{SpanBuilder, SpanKind}; +use common::consts::{ + ARCH_UPSTREAM_HOST_HEADER, BRIGHT_STAFF_SERVICE_NAME, ENVOY_RETRY_HEADER, TRACE_PARENT_HEADER, +}; +use common::traces::{SpanBuilder, SpanKind, generate_random_span_id}; +use hermesllm::apis::openai::Message; use hermesllm::{ProviderRequest, ProviderRequestType}; -use hermesllm::apis::openai::{Message}; use hyper::header::HeaderMap; -use opentelemetry::trace::TraceContextExt; -use tracing::{debug, info, warn}; use std::time::{Instant, SystemTime}; +use tracing::{debug, info, warn}; use crate::tracing::operation_component::{self}; -use crate::tracing::{OperationNameBuilder, http}; +use crate::tracing::{http, OperationNameBuilder}; -use crate::handlers::jsonrpc::{JSON_RPC_VERSION, JsonRpcId, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, MCP_INITIALIZE, MCP_INITIALIZE_NOTIFICATION, TOOL_CALL_METHOD}; +use crate::handlers::jsonrpc::{ + JsonRpcId, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, JSON_RPC_VERSION, + MCP_INITIALIZE, MCP_INITIALIZE_NOTIFICATION, TOOL_CALL_METHOD, +}; use uuid::Uuid; /// Errors that can occur during pipeline processing @@ -77,27 +81,6 @@ impl PipelineProcessor { } } - /// Extract trace context from current OpenTelemetry context - fn extract_trace_context(&self) -> (String, Option) { - let current_cx = opentelemetry::Context::current(); - let span_ref = current_cx.span(); - let span_context = span_ref.span_context(); - - let trace_id = if span_context.is_valid() { - format!("{:032x}", span_context.trace_id()) - } else { - String::new() // SpanBuilder will generate one - }; - - let parent_span_id = if span_context.is_valid() { - Some(format!("{:016x}", span_context.span_id())) - } else { - None - }; - - (trace_id, parent_span_id) - } - /// Record a span for filter execution fn record_filter_span( &self, @@ -107,8 +90,11 @@ impl PipelineProcessor { start_time: SystemTime, end_time: SystemTime, elapsed: std::time::Duration, - ) { - let (trace_id, parent_span_id) = self.extract_trace_context(); + trace_id: String, + parent_span_id: String, + span_id: String, + ) -> String { + // let (trace_id, parent_span_id) = self.extract_trace_context(); // Build operation name: POST /agents/* {filter_name} // Using generic path since we don't have access to specific endpoint here @@ -119,6 +105,7 @@ impl PipelineProcessor { .build(); let mut span_builder = SpanBuilder::new(&operation_name) + .with_span_id(span_id.clone()) .with_kind(SpanKind::Client) .with_start_time(start_time) .with_end_time(end_time) @@ -126,18 +113,22 @@ impl PipelineProcessor { .with_attribute(http::TARGET, "/agents/*") .with_attribute("filter.name", agent_name.to_string()) .with_attribute("filter.tool_name", tool_name.to_string()) - .with_attribute("duration_ms", format!("{:.2}", elapsed.as_secs_f64() * 1000.0)); + .with_attribute( + "duration_ms", + format!("{:.2}", elapsed.as_secs_f64() * 1000.0), + ); if !trace_id.is_empty() { span_builder = span_builder.with_trace_id(trace_id); } - if let Some(parent_id) = parent_span_id { - span_builder = span_builder.with_parent_span_id(parent_id); + if !parent_span_id.is_empty() { + span_builder = span_builder.with_parent_span_id(parent_span_id); } let span = span_builder.build(); // Use plano(filter) as service name for filter execution spans collector.record_span(operation_component::AGENT_FILTER, span); + span_id.clone() } /// Record a span for MCP protocol interactions @@ -150,8 +141,11 @@ impl PipelineProcessor { end_time: SystemTime, elapsed: std::time::Duration, additional_attrs: Option>, + trace_id: String, + parent_span_id: String, + span_id: Option, ) { - let (trace_id, parent_span_id) = self.extract_trace_context(); + // let (trace_id, parent_span_id) = self.extract_trace_context(); // Build operation name: POST /mcp {agent_id} let operation_name = OperationNameBuilder::new() @@ -162,6 +156,7 @@ impl PipelineProcessor { .build(); let mut span_builder = SpanBuilder::new(&operation_name) + .with_span_id(span_id.unwrap_or_else(|| generate_random_span_id())) .with_kind(SpanKind::Client) .with_start_time(start_time) .with_end_time(end_time) @@ -169,7 +164,10 @@ impl PipelineProcessor { .with_attribute(http::TARGET, &format!("/mcp ({})", operation.to_string())) .with_attribute("mcp.operation", operation.to_string()) .with_attribute("mcp.agent_id", agent_id.to_string()) - .with_attribute("duration_ms", format!("{:.2}", elapsed.as_secs_f64() * 1000.0)); + .with_attribute( + "duration_ms", + format!("{:.2}", elapsed.as_secs_f64() * 1000.0), + ); if let Some(attrs) = additional_attrs { for (key, value) in attrs { @@ -180,8 +178,8 @@ impl PipelineProcessor { if !trace_id.is_empty() { span_builder = span_builder.with_trace_id(trace_id); } - if let Some(parent_id) = parent_span_id { - span_builder = span_builder.with_parent_span_id(parent_id); + if !parent_span_id.is_empty() { + span_builder = span_builder.with_parent_span_id(parent_span_id); } let span = span_builder.build(); @@ -197,6 +195,8 @@ impl PipelineProcessor { agent_map: &HashMap, request_headers: &HeaderMap, trace_collector: Option<&std::sync::Arc>, + trace_id: String, + parent_span_id: String, ) -> Result, PipelineError> { let mut chat_history_updated = chat_history.to_vec(); @@ -220,8 +220,18 @@ impl PipelineProcessor { let start_time = SystemTime::now(); let start_instant = Instant::now(); + // Generate filter span ID before execution so MCP spans can use it as parent + let filter_span_id = generate_random_span_id(); + chat_history_updated = self - .execute_filter(&chat_history_updated, agent, request_headers, trace_collector) + .execute_filter( + &chat_history_updated, + agent, + request_headers, + trace_collector, + trace_id.clone(), + filter_span_id.clone(), + ) .await?; let end_time = SystemTime::now(); @@ -243,6 +253,9 @@ impl PipelineProcessor { start_time, end_time, elapsed, + trace_id.clone(), + parent_span_id.clone(), + filter_span_id, ); } } @@ -256,10 +269,19 @@ impl PipelineProcessor { request_headers: &HeaderMap, agent_id: &str, session_id: Option<&str>, + trace_id: String, + parent_span_id: String, ) -> Result { + let trace_parent = format!("00-{}-{}-01", trace_id, parent_span_id); let mut headers = request_headers.clone(); headers.remove(hyper::header::CONTENT_LENGTH); + headers.remove(TRACE_PARENT_HEADER); + headers.insert( + TRACE_PARENT_HEADER, + hyper::header::HeaderValue::from_str(&trace_parent).unwrap(), + ); + headers.insert( ARCH_UPSTREAM_HOST_HEADER, hyper::header::HeaderValue::from_str(agent_id) @@ -292,7 +314,11 @@ impl PipelineProcessor { } /// Parse SSE formatted response and extract JSON-RPC data - fn parse_sse_response(&self, response_bytes: &[u8], agent_id: &str) -> Result { + fn parse_sse_response( + &self, + response_bytes: &[u8], + agent_id: &str, + ) -> Result { let response_str = String::from_utf8_lossy(response_bytes); let lines: Vec<&str> = response_str.lines().collect(); @@ -342,7 +368,10 @@ impl PipelineProcessor { ) -> Result { let request_body = serde_json::to_string(json_rpc_request)?; - debug!("Sending MCP request to agent {}: {}", agent_id, request_body); + debug!( + "Sending MCP request to agent {}: {}", + agent_id, request_body + ); let response = self .client @@ -362,10 +391,7 @@ impl PipelineProcessor { messages: &[Message], ) -> Result { let mut arguments = HashMap::new(); - arguments.insert( - "messages".to_string(), - serde_json::to_value(messages)?, - ); + arguments.insert("messages".to_string(), serde_json::to_value(messages)?); let mut params = HashMap::new(); params.insert("name".to_string(), serde_json::to_value(tool_name)?); @@ -386,35 +412,52 @@ impl PipelineProcessor { agent: &Agent, request_headers: &HeaderMap, trace_collector: Option<&std::sync::Arc>, + trace_id: String, + filter_span_id: String, ) -> Result, PipelineError> { // Get or create MCP session let mcp_session_id = if let Some(session_id) = self.agent_id_session_map.get(&agent.id) { session_id.clone() } else { - let session_id = self.get_new_session_id(&agent.id, trace_collector).await; + let session_id = self + .get_new_session_id( + &agent.id, + trace_id.clone(), + filter_span_id.clone(), + ) + .await; self.agent_id_session_map .insert(agent.id.clone(), session_id.clone()); session_id }; - info!("Using MCP session ID {} for agent {}", mcp_session_id, agent.id); + info!( + "Using MCP session ID {} for agent {}", + mcp_session_id, agent.id + ); // Build JSON-RPC request let tool_name = agent.tool.as_deref().unwrap_or(&agent.id); let json_rpc_request = self.build_tool_call_request(tool_name, messages)?; + // Generate span ID for this MCP tool call (child of filter span) + let mcp_span_id = generate_random_span_id(); + // Build headers - let agent_headers = self.build_mcp_headers( - request_headers, - &agent.id, - Some(&mcp_session_id), - )?; + let agent_headers = + self.build_mcp_headers(request_headers, &agent.id, Some(&mcp_session_id), trace_id.clone(), mcp_span_id.clone())?; // Send request with tracing let start_time = SystemTime::now(); let start_instant = Instant::now(); - let response = self.send_mcp_request(&json_rpc_request, agent_headers, &agent.id).await?; + let response = self + .send_mcp_request( + &json_rpc_request, + agent_headers, + &agent.id, + ) + .await?; let http_status = response.status(); let response_bytes = response.bytes().await?; @@ -437,6 +480,9 @@ impl PipelineProcessor { end_time, elapsed, Some(attrs), + trace_id.clone(), + filter_span_id.clone(), + Some(mcp_span_id), ); } @@ -458,7 +504,11 @@ impl PipelineProcessor { }); } - info!("Response from agent {}: {}", agent.id, String::from_utf8_lossy(&response_bytes)); + info!( + "Response from agent {}: {}", + agent.id, + String::from_utf8_lossy(&response_bytes) + ); // Parse SSE response let data_chunk = self.parse_sse_response(&response_bytes, &agent.id)?; @@ -468,7 +518,11 @@ impl PipelineProcessor { .ok_or_else(|| PipelineError::NoResultInResponse(agent.id.clone()))?; // Check if error field is set in response result - if response_result.get("isError").and_then(|v| v.as_bool()).unwrap_or(false) { + if response_result + .get("isError") + .and_then(|v| v.as_bool()) + .unwrap_or(false) + { let error_message = response_result .get("content") .and_then(|v| v.as_array()) @@ -532,7 +586,8 @@ impl PipelineProcessor { &self, agent_id: &str, session_id: &str, - trace_collector: Option<&std::sync::Arc>, + trace_id: String, + parent_span_id: String, ) -> Result<(), PipelineError> { let initialized_notification = JsonRpcNotification { jsonrpc: JSON_RPC_VERSION.to_string(), @@ -543,10 +598,7 @@ impl PipelineProcessor { let notification_body = serde_json::to_string(&initialized_notification)?; debug!("Sending initialized notification for agent {}", agent_id); - let headers = self.build_mcp_headers(&HeaderMap::new(), agent_id, Some(session_id))?; - - let start_time = SystemTime::now(); - let start_instant = Instant::now(); + let headers = self.build_mcp_headers(&HeaderMap::new(), agent_id, Some(session_id), trace_id.clone(), parent_span_id.clone())?; let response = self .client @@ -556,28 +608,10 @@ impl PipelineProcessor { .send() .await?; - let end_time = SystemTime::now(); - let elapsed = start_instant.elapsed(); - - info!("Initialized notification response status: {}", response.status()); - - // Record MCP notification span - if let Some(collector) = trace_collector { - let mut attrs = HashMap::new(); - attrs.insert("mcp.method", "notifications/initialized".to_string()); - attrs.insert("mcp.session_id", session_id.to_string()); - attrs.insert("http.status_code", response.status().as_u16().to_string()); - - self.record_mcp_span( - collector, - "notification", - agent_id, - start_time, - end_time, - elapsed, - Some(attrs), - ); - } + info!( + "Initialized notification response status: {}", + response.status() + ); Ok(()) } @@ -585,15 +619,14 @@ impl PipelineProcessor { async fn get_new_session_id( &self, agent_id: &str, - trace_collector: Option<&std::sync::Arc>, + trace_id: String, + parent_span_id: String, ) -> String { info!("Initializing MCP session for agent {}", agent_id); - let start_time = SystemTime::now(); - let start_instant = Instant::now(); - let initialize_request = self.build_initialize_request(); - let headers = self.build_mcp_headers(&HeaderMap::new(), agent_id, None) + let headers = self + .build_mcp_headers(&HeaderMap::new(), agent_id, None, trace_id.clone(), parent_span_id.clone()) .expect("Failed to build headers for initialization"); let response = self @@ -610,33 +643,20 @@ impl PipelineProcessor { .expect("No mcp-session-id in response") .to_string(); - info!("Created new MCP session for agent {}: {}", agent_id, session_id); - - let end_time = SystemTime::now(); - let elapsed = start_instant.elapsed(); - - // Record MCP session initialization span - if let Some(collector) = trace_collector { - let mut attrs = HashMap::new(); - attrs.insert("mcp.method", "initialize".to_string()); - attrs.insert("mcp.session_id", session_id.clone()); - attrs.insert("mcp.protocol_version", "2024-11-05".to_string()); - - self.record_mcp_span( - collector, - "session_init", - agent_id, - start_time, - end_time, - elapsed, - Some(attrs), - ); - } + info!( + "Created new MCP session for agent {}: {}", + agent_id, session_id + ); // Send initialized notification - self.send_initialized_notification(agent_id, &session_id, trace_collector) - .await - .expect("Failed to send initialized notification"); + self.send_initialized_notification( + agent_id, + &session_id, + trace_id.clone(), + parent_span_id.clone(), + ) + .await + .expect("Failed to send initialized notification"); session_id } @@ -648,6 +668,8 @@ impl PipelineProcessor { mut original_request: ProviderRequestType, terminal_agent: &Agent, request_headers: &HeaderMap, + trace_id: String, + agent_span_id: String, ) -> Result { // let mut request = original_request.clone(); original_request.set_messages(messages); @@ -658,6 +680,17 @@ impl PipelineProcessor { let mut agent_headers = request_headers.clone(); agent_headers.remove(hyper::header::CONTENT_LENGTH); + + // Set traceparent header to make the egress span a child of the agent span + if !trace_id.is_empty() && !agent_span_id.is_empty() { + let trace_parent = format!("00-{}-{}-01", trace_id, agent_span_id); + agent_headers.remove(TRACE_PARENT_HEADER); + agent_headers.insert( + TRACE_PARENT_HEADER, + hyper::header::HeaderValue::from_str(&trace_parent).unwrap(), + ); + } + agent_headers.insert( ARCH_UPSTREAM_HOST_HEADER, hyper::header::HeaderValue::from_str(&terminal_agent.id) @@ -718,13 +751,7 @@ mod tests { let pipeline = create_test_pipeline(vec!["nonexistent-agent", "terminal-agent"]); let result = processor - .process_filter_chain( - &messages, - &pipeline, - &agent_map, - &request_headers, - None, - ) + .process_filter_chain(&messages, &pipeline, &agent_map, &request_headers, None, String::new(), String::new()) .await; assert!(result.is_err()); @@ -758,7 +785,7 @@ mod tests { let request_headers = HeaderMap::new(); let result = processor - .execute_filter(&messages, &agent, &request_headers, None) + .execute_filter(&messages, &agent, &request_headers, None, "trace-123".to_string(), "span-123".to_string()) .await; match result { @@ -797,7 +824,7 @@ mod tests { let request_headers = HeaderMap::new(); let result = processor - .execute_filter(&messages, &agent, &request_headers, None) + .execute_filter(&messages, &agent, &request_headers, None, "trace-456".to_string(), "span-456".to_string()) .await; match result { @@ -849,7 +876,7 @@ mod tests { let request_headers = HeaderMap::new(); let result = processor - .execute_filter(&messages, &agent, &request_headers, None) + .execute_filter(&messages, &agent, &request_headers, None, "trace-789".to_string(), "span-789".to_string()) .await; match result { diff --git a/crates/brightstaff/src/tracing/constants.rs b/crates/brightstaff/src/tracing/constants.rs index 6a5dbc2a..d3eafb2e 100644 --- a/crates/brightstaff/src/tracing/constants.rs +++ b/crates/brightstaff/src/tracing/constants.rs @@ -157,7 +157,7 @@ pub mod operation_component { pub const HANDOFF: &str = "plano(handoff)"; /// Agent filter execution - pub const AGENT_FILTER: &str = "plano(agent filter)"; + pub const AGENT_FILTER: &str = "plano(filter)"; /// Agent execution pub const AGENT: &str = "plano(agent)"; diff --git a/crates/common/src/traces/mod.rs b/crates/common/src/traces/mod.rs index c0d042fa..c4197995 100644 --- a/crates/common/src/traces/mod.rs +++ b/crates/common/src/traces/mod.rs @@ -18,7 +18,7 @@ pub use shapes::{ }; // Re-export new utilities -pub use span_builder::{SpanBuilder, SpanKind}; +pub use span_builder::{SpanBuilder, SpanKind, generate_random_span_id}; pub use resource_span_builder::ResourceSpanBuilder; pub use constants::*; diff --git a/crates/common/src/traces/span_builder.rs b/crates/common/src/traces/span_builder.rs index 187c1678..e07cfab9 100644 --- a/crates/common/src/traces/span_builder.rs +++ b/crates/common/src/traces/span_builder.rs @@ -37,6 +37,7 @@ pub struct SpanBuilder { end_time: Option, kind: SpanKind, attributes: HashMap, + span_id: Option, } impl SpanBuilder { @@ -53,6 +54,7 @@ impl SpanBuilder { end_time: None, kind: SpanKind::Internal, attributes: HashMap::new(), + span_id: None, } } @@ -62,6 +64,11 @@ impl SpanBuilder { self } + pub fn with_span_id(mut self, span_id: impl Into) -> Self { + self.span_id = Some(span_id.into()); + self + } + /// Set the parent span ID to link this span to its parent pub fn with_parent_span_id(mut self, parent_span_id: impl Into) -> Self { self.parent_span_id = Some(parent_span_id.into()); @@ -125,7 +132,7 @@ impl SpanBuilder { // Build span directly without going through Span::new() Span { trace_id, - span_id: generate_random_span_id(), + span_id: self.span_id.unwrap_or_else(|| generate_random_span_id()), parent_span_id: self.parent_span_id, name: self.name, start_time_unix_nano: format!("{}", start_nanos), @@ -145,7 +152,7 @@ fn system_time_to_nanos(time: SystemTime) -> u128 { } /// Generate a random span ID (16 hex characters = 8 bytes) -fn generate_random_span_id() -> String { +pub fn generate_random_span_id() -> String { use rand::RngCore; let mut rng = rand::thread_rng(); let mut random_bytes = [0u8; 8];