diff --git a/config/plano_config_schema.yaml b/config/plano_config_schema.yaml index 0f3cefb7..8fd98e2c 100644 --- a/config/plano_config_schema.yaml +++ b/config/plano_config_schema.yaml @@ -382,8 +382,27 @@ properties: type: integer trace_arch_internal: type: boolean - opentracing_grpc_endpoint: - type: string + custom_attributes: + type: array + items: + type: object + properties: + key: + type: string + type: + type: string + enum: + - str + - bool + - float + - int + header: + type: string + additionalProperties: false + required: + - key + - type + - header additionalProperties: false mode: type: string diff --git a/crates/brightstaff/src/handlers/agent_chat_completions.rs b/crates/brightstaff/src/handlers/agent_chat_completions.rs index adfdce02..32d9e91a 100644 --- a/crates/brightstaff/src/handlers/agent_chat_completions.rs +++ b/crates/brightstaff/src/handlers/agent_chat_completions.rs @@ -1,7 +1,10 @@ use std::sync::Arc; -use std::time::Instant; +use std::time::{Instant, SystemTime}; use bytes::Bytes; +use common::configuration::Tracing; +use common::consts::TRACE_PARENT_HEADER; +use common::traces::{generate_random_span_id, parse_traceparent, SpanBuilder, SpanKind}; use hermesllm::apis::OpenAIMessage; use hermesllm::clients::SupportedAPIsFromClient; use hermesllm::providers::request::ProviderRequest; @@ -9,15 +12,17 @@ use hermesllm::ProviderRequestType; use http_body_util::combinators::BoxBody; use http_body_util::BodyExt; use hyper::{Request, Response}; -use opentelemetry::trace::get_active_span; use serde::ser::Error as SerError; -use tracing::{debug, info, info_span, warn, Instrument}; +use tracing::{debug, info, warn}; use super::agent_selector::{AgentSelectionError, AgentSelector}; use super::pipeline_processor::{PipelineError, PipelineProcessor}; use super::response_handler::ResponseHandler; use crate::router::plano_orchestrator::OrchestratorService; -use crate::tracing::{operation_component, set_service_name}; +use crate::tracing::{ + append_span_attributes, collect_custom_trace_attributes, http, operation_component, + OperationNameBuilder, +}; /// Main errors for agent chat completions #[derive(Debug, thiserror::Error)] @@ -40,122 +45,95 @@ pub async fn agent_chat( _: String, agents_list: Arc>>>, listeners: Arc>>, + trace_collector: Arc, + tracing_config: Arc>, ) -> Result>, hyper::Error> { - // Extract request_id from headers or generate a new one - let request_id: String = match request - .headers() - .get(common::consts::REQUEST_ID_HEADER) - .and_then(|h| h.to_str().ok()) - .map(|s| s.to_string()) + match handle_agent_chat( + request, + orchestrator_service, + agents_list, + listeners, + trace_collector, + tracing_config, + ) + .await { - Some(id) => id, - None => uuid::Uuid::new_v4().to_string(), - }; + Ok(response) => Ok(response), + Err(err) => { + // Check if this is a client error from the pipeline that should be cascaded + if let AgentFilterChainError::Pipeline(PipelineError::ClientError { + agent, + status, + body, + }) = &err + { + warn!( + "Client error from agent '{}' (HTTP {}): {}", + agent, status, body + ); - // Create a span with request_id that will be included in all log lines - let request_span = info_span!( - "(orchestrator)", - component = "orchestrator", - request_id = %request_id, - http.method = %request.method(), - http.path = %request.uri().path() - ); - - // Execute the handler inside the span - async { - // Set service name for orchestrator operations - set_service_name(operation_component::ORCHESTRATOR); - - match handle_agent_chat_inner( - request, - orchestrator_service, - agents_list, - listeners, - request_id, - ) - .await - { - Ok(response) => Ok(response), - Err(err) => { - // Check if this is a client error from the pipeline that should be cascaded - if let AgentFilterChainError::Pipeline(PipelineError::ClientError { - agent, - status, - body, - }) = &err - { - warn!( - agent = %agent, - status = %status, - body = %body, - "client error from agent" - ); - - // Create error response with the original status code and body - let error_json = serde_json::json!({ - "error": "ClientError", - "agent": agent, - "status": status, - "agent_response": body - }); - - let json_string = error_json.to_string(); - let mut response = - Response::new(ResponseHandler::create_full_body(json_string)); - *response.status_mut() = hyper::StatusCode::from_u16(*status) - .unwrap_or(hyper::StatusCode::BAD_REQUEST); - response.headers_mut().insert( - hyper::header::CONTENT_TYPE, - "application/json".parse().unwrap(), - ); - return Ok(response); - } - - // Print detailed error information with full error chain for other errors - let mut error_chain = Vec::new(); - let mut current_error: &dyn std::error::Error = &err; - - // Collect the full error chain - loop { - error_chain.push(current_error.to_string()); - match current_error.source() { - Some(source) => current_error = source, - None => break, - } - } - - // Log the complete error chain - warn!(error_chain = ?error_chain, "agent chat error chain"); - warn!(root_error = ?err, "root error"); - - // Create structured error response as JSON + // Create error response with the original status code and body let error_json = serde_json::json!({ - "error": { - "type": "AgentFilterChainError", - "message": err.to_string(), - "error_chain": error_chain, - "debug_info": format!("{:?}", err) - } + "error": "ClientError", + "agent": agent, + "status": status, + "agent_response": body }); - // Log the error for debugging - info!(error = %error_json, "structured error info"); - - // Return JSON error response - Ok(ResponseHandler::create_json_error_response(&error_json)) + let json_string = error_json.to_string(); + let mut response = Response::new(ResponseHandler::create_full_body(json_string)); + *response.status_mut() = + hyper::StatusCode::from_u16(*status).unwrap_or(hyper::StatusCode::BAD_REQUEST); + response.headers_mut().insert( + hyper::header::CONTENT_TYPE, + "application/json".parse().unwrap(), + ); + return Ok(response); } + + // Print detailed error information with full error chain for other errors + let mut error_chain = Vec::new(); + let mut current_error: &dyn std::error::Error = &err; + + // Collect the full error chain + loop { + error_chain.push(current_error.to_string()); + match current_error.source() { + Some(source) => current_error = source, + None => break, + } + } + + // Log the complete error chain + warn!("Agent chat error chain: {:#?}", error_chain); + warn!("Root error: {:?}", err); + + // Create structured error response as JSON + let error_json = serde_json::json!({ + "error": { + "type": "AgentFilterChainError", + "message": err.to_string(), + "error_chain": error_chain, + "debug_info": format!("{:?}", err) + } + }); + + // Log the error for debugging + info!("Structured error info: {}", error_json); + + // Return JSON error response + Ok(ResponseHandler::create_json_error_response(&error_json)) } } - .instrument(request_span) - .await } -async fn handle_agent_chat_inner( +async fn handle_agent_chat( request: Request, orchestrator_service: Arc, agents_list: Arc>>>, listeners: Arc>>, - request_id: String, + trace_collector: Arc, + tracing_config: Arc>, ) -> Result>, AgentFilterChainError> { // Initialize services let agent_selector = AgentSelector::new(orchestrator_service); @@ -169,18 +147,14 @@ async fn handle_agent_chat_inner( .and_then(|name| name.to_str().ok()); // Find the appropriate listener - let listener: common::configuration::Listener = { + let listener = { let listeners = listeners.read().await; agent_selector .find_listener(listener_name, &listeners) .await? }; - get_active_span(|span| { - span.update_name(listener.name.to_string()); - }); - - info!(listener = %listener.name, "handling request"); + info!("Handling request for listener: {}", listener.name); // Parse request body let request_path = request @@ -195,8 +169,12 @@ async fn handle_agent_chat_inner( let mut headers = request.headers().clone(); headers.remove(common::consts::ENVOY_ORIGINAL_PATH_HEADER); - // Set the request_id in headers if not already present if !headers.contains_key(common::consts::REQUEST_ID_HEADER) { + let request_id = uuid::Uuid::new_v4().to_string(); + info!( + "Request id not found in headers, generated new request id: {}", + request_id + ); headers.insert( common::consts::REQUEST_ID_HEADER, hyper::header::HeaderValue::from_str(&request_id).unwrap(), @@ -205,12 +183,19 @@ async fn handle_agent_chat_inner( headers }; + let custom_attrs = collect_custom_trace_attributes( + &request_headers, + tracing_config + .as_ref() + .as_ref() + .and_then(|tracing| tracing.custom_attributes.as_deref()), + ); let chat_request_bytes = request.collect().await?.to_bytes(); debug!( - body = %String::from_utf8_lossy(&chat_request_bytes), - "received request body" + "Received request body (raw utf8): {}", + String::from_utf8_lossy(&chat_request_bytes) ); // Determine the API type from the endpoint @@ -224,7 +209,7 @@ async fn handle_agent_chat_inner( let client_request = match ProviderRequestType::try_from((&chat_request_bytes[..], &api_type)) { Ok(request) => request, Err(err) => { - warn!("failed to parse request as ProviderRequestType: {}", err); + warn!("Failed to parse request as ProviderRequestType: {}", err); let err_msg = format!("Failed to parse request: {}", err); return Err(AgentFilterChainError::RequestParsing( serde_json::Error::custom(err_msg), @@ -234,6 +219,12 @@ async fn handle_agent_chat_inner( let message: Vec = client_request.get_messages(); + // Extract trace parent for routing + let traceparent = request_headers + .iter() + .find(|(key, _)| key.as_str() == TRACE_PARENT_HEADER) + .map(|(_, value)| value.to_str().unwrap_or_default().to_string()); + let request_id = request_headers .get(common::consts::REQUEST_ID_HEADER) .and_then(|val| val.to_str().ok()) @@ -246,58 +237,90 @@ async fn handle_agent_chat_inner( agent_selector.create_agent_map(agents) }; + // Parse trace parent to get trace_id and parent_span_id + let (trace_id, parent_span_id) = if let Some(ref tp) = traceparent { + parse_traceparent(tp) + } else { + (String::new(), None) + }; + // Select appropriate agents using arch orchestrator llm model - let selection_start = Instant::now(); + let selection_span_id = generate_random_span_id(); + let selection_start_time = SystemTime::now(); + let selection_start_instant = Instant::now(); + let selected_agents = agent_selector - .select_agents(&message, &listener, request_id.clone()) + .select_agents(&message, &listener, traceparent.clone(), request_id.clone()) .await?; - // Record selection attributes on the current orchestrator span - let selection_elapsed_ms = selection_start.elapsed().as_secs_f64() * 1000.0; - get_active_span(|span| { - span.set_attribute(opentelemetry::KeyValue::new( - "selection.listener", - listener.name.clone(), - )); - span.set_attribute(opentelemetry::KeyValue::new( - "selection.agent_count", - selected_agents.len() as i64, - )); - span.set_attribute(opentelemetry::KeyValue::new( + // Record agent selection span + let selection_end_time = SystemTime::now(); + let selection_elapsed = selection_start_instant.elapsed(); + let selection_operation_name = OperationNameBuilder::new() + .with_method("POST") + .with_path("/agents/select") + .with_target(&listener.name) + .build(); + + let mut selection_span_builder = append_span_attributes( + SpanBuilder::new(&selection_operation_name) + .with_span_id(selection_span_id) + .with_kind(SpanKind::Internal) + .with_start_time(selection_start_time) + .with_end_time(selection_end_time) + .with_attribute(http::METHOD, "POST") + .with_attribute(http::TARGET, "/agents/select") + .with_attribute("selection.listener", listener.name.clone()) + .with_attribute("selection.agent_count", selected_agents.len().to_string()) + .with_attribute( "selection.agents", selected_agents .iter() .map(|a| a.id.as_str()) .collect::>() .join(","), - )); - span.set_attribute(opentelemetry::KeyValue::new( - "selection.determination_ms", - format!("{:.2}", selection_elapsed_ms), - )); - }); - - info!( - count = selected_agents.len(), - "selected agents for execution" + ) + .with_attribute( + "duration_ms", + format!("{:.2}", selection_elapsed.as_secs_f64() * 1000.0), + ), + &custom_attrs, ); + if !trace_id.is_empty() { + selection_span_builder = selection_span_builder.with_trace_id(trace_id.clone()); + } + if let Some(parent_id) = parent_span_id.clone() { + selection_span_builder = selection_span_builder.with_parent_span_id(parent_id); + } + + let selection_span = selection_span_builder.build(); + trace_collector.record_span(operation_component::ORCHESTRATOR, selection_span); + + info!("Selected {} agent(s) for execution", selected_agents.len()); + // Execute agents sequentially, passing output from one to the next let mut current_messages = message.clone(); let agent_count = selected_agents.len(); for (agent_index, selected_agent) in selected_agents.iter().enumerate() { - // Get agent name - let agent_name = selected_agent.id.clone(); let is_last_agent = agent_index == agent_count - 1; debug!( - agent_index = agent_index + 1, - total = agent_count, - agent = %agent_name, - "processing agent" + "Processing agent {}/{}: {}", + agent_index + 1, + agent_count, + selected_agent.id ); + // Record the start time for agent span + let agent_start_time = SystemTime::now(); + let agent_start_instant = Instant::now(); + let span_id = generate_random_span_id(); + + // Get agent name + let agent_name = selected_agent.id.clone(); + // Process the filter chain let chat_history = pipeline_processor .process_filter_chain( @@ -305,71 +328,91 @@ async fn handle_agent_chat_inner( selected_agent, &agent_map, &request_headers, + Some(&trace_collector), + trace_id.clone(), + span_id.clone(), ) .await?; // Get agent details and invoke let agent = agent_map.get(&agent_name).unwrap(); - debug!(agent = %agent_name, "invoking agent"); + debug!("Invoking agent: {}", agent_name); - let agent_span = info_span!( - "agent", - agent_id = %agent_name, - message_count = chat_history.len(), + let llm_response = pipeline_processor + .invoke_agent( + &chat_history, + client_request.clone(), + agent, + &request_headers, + trace_id.clone(), + span_id.clone(), + ) + .await?; + + // Record agent span + let agent_end_time = SystemTime::now(); + let agent_elapsed = agent_start_instant.elapsed(); + let full_path = format!("/agents{}", request_path); + let operation_name = OperationNameBuilder::new() + .with_method("POST") + .with_path(&full_path) + .with_target(&agent_name) + .build(); + + let mut span_builder = append_span_attributes( + SpanBuilder::new(&operation_name) + .with_span_id(span_id) + .with_kind(SpanKind::Internal) + .with_start_time(agent_start_time) + .with_end_time(agent_end_time) + .with_attribute(http::METHOD, "POST") + .with_attribute(http::TARGET, full_path) + .with_attribute("agent.name", agent_name.clone()) + .with_attribute( + "agent.sequence", + format!("{}/{}", agent_index + 1, agent_count), + ) + .with_attribute( + "duration_ms", + format!("{:.2}", agent_elapsed.as_secs_f64() * 1000.0), + ), + &custom_attrs, ); - let llm_response = async { - set_service_name(operation_component::AGENT); - get_active_span(|span| { - span.update_name(format!("{} /v1/chat/completions", agent_name)); - }); - - pipeline_processor - .invoke_agent( - &chat_history, - client_request.clone(), - agent, - &request_headers, - ) - .await + if !trace_id.is_empty() { + span_builder = span_builder.with_trace_id(trace_id.clone()); } - .instrument(agent_span.clone()) - .await?; + if let Some(parent_id) = parent_span_id.clone() { + span_builder = span_builder.with_parent_span_id(parent_id); + } + + let span = span_builder.build(); + trace_collector.record_span(operation_component::AGENT, span); // If this is the last agent, return the streaming response if is_last_agent { info!( - agent = %agent_name, - "completed agent chain, returning response" + "Completed agent chain, returning response from last agent: {}", + agent_name ); - // Capture the orchestrator span (parent of the agent span) so it - // stays open for the full streaming duration alongside the agent span. - let orchestrator_span = tracing::Span::current(); - return async { - response_handler - .create_streaming_response( - llm_response, - tracing::Span::current(), // agent span (inner) - orchestrator_span, // orchestrator span (outer) - ) - .await - .map_err(AgentFilterChainError::from) - } - .instrument(agent_span) - .await; + return response_handler + .create_streaming_response(llm_response) + .await + .map_err(AgentFilterChainError::from); } // For intermediate agents, collect the full response and pass to next agent - debug!(agent = %agent_name, "collecting response from intermediate agent"); - let response_text = async { response_handler.collect_full_response(llm_response).await } - .instrument(agent_span) - .await?; + debug!( + "Collecting response from intermediate agent: {}", + agent_name + ); + let response_text = response_handler.collect_full_response(llm_response).await?; info!( - agent = %agent_name, - response_len = response_text.len(), - "agent completed, passing response to next agent" + "Agent {} completed, passing {} character response to next agent", + agent_name, + response_text.len() ); // remove last message and add new one at the end diff --git a/crates/brightstaff/src/handlers/llm.rs b/crates/brightstaff/src/handlers/llm.rs index 10a68c1a..e3abcf50 100644 --- a/crates/brightstaff/src/handlers/llm.rs +++ b/crates/brightstaff/src/handlers/llm.rs @@ -1,9 +1,10 @@ use bytes::Bytes; -use common::configuration::ModelAlias; +use common::configuration::{ModelAlias, Tracing}; use common::consts::{ ARCH_IS_STREAMING_HEADER, ARCH_PROVIDER_HINT_HEADER, REQUEST_ID_HEADER, TRACE_PARENT_HEADER, }; use common::llm_providers::LlmProviders; +use common::traces::TraceCollector; use hermesllm::apis::openai_responses::InputParam; use hermesllm::clients::{SupportedAPIsFromClient, SupportedUpstreamAPIs}; use hermesllm::{ProviderRequest, ProviderRequestType}; @@ -11,13 +12,10 @@ use http_body_util::combinators::BoxBody; use http_body_util::{BodyExt, Full}; use hyper::header::{self}; use hyper::{Request, Response, StatusCode}; -use opentelemetry::global; -use opentelemetry::trace::get_active_span; -use opentelemetry_http::HeaderInjector; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::RwLock; -use tracing::{debug, info, info_span, warn, Instrument}; +use tracing::{debug, info, warn}; use crate::handlers::router_chat::router_chat_get_upstream_model; use crate::handlers::utils::{ @@ -28,7 +26,7 @@ use crate::state::response_state_processor::ResponsesStateProcessor; use crate::state::{ extract_input_items, retrieve_and_combine_input, StateStorage, StateStorageError, }; -use crate::tracing::{llm as tracing_llm, operation_component, set_service_name}; +use crate::tracing::{collect_custom_trace_attributes, operation_component}; fn full>(chunk: T) -> BoxBody { Full::new(chunk.into()) @@ -36,69 +34,43 @@ fn full>(chunk: T) -> BoxBody { .boxed() } +// ! we reached the limit of the number of arguments for a function +#[allow(clippy::too_many_arguments)] pub async fn llm_chat( request: Request, router_service: Arc, full_qualified_llm_provider_url: String, model_aliases: Arc>>, llm_providers: Arc>, + trace_collector: Arc, + tracing_config: Arc>, // ! right here state_storage: Option>, ) -> Result>, hyper::Error> { let request_path = request.uri().path().to_string(); let request_headers = request.headers().clone(); + let custom_attrs = collect_custom_trace_attributes( + &request_headers, + tracing_config + .as_ref() + .as_ref() + .and_then(|tracing| tracing.custom_attributes.as_deref()), + ); let request_id: String = match request_headers .get(REQUEST_ID_HEADER) .and_then(|h| h.to_str().ok()) .map(|s| s.to_string()) { Some(id) => id, - None => uuid::Uuid::new_v4().to_string(), + None => { + let generated_id = uuid::Uuid::new_v4().to_string(); + warn!( + "[PLANO_REQ_ID:{}] | REQUEST_ID header missing, generated new ID", + generated_id + ); + generated_id + } }; - // Create a span with request_id that will be included in all log lines - let request_span = info_span!( - "llm", - component = "llm", - request_id = %request_id, - http.method = %request.method(), - http.path = %request_path, - llm.model = tracing::field::Empty, - llm.tools = tracing::field::Empty, - llm.user_message_preview = tracing::field::Empty, - llm.temperature = tracing::field::Empty, - ); - - // Execute the rest of the handler inside the span - llm_chat_inner( - request, - router_service, - full_qualified_llm_provider_url, - model_aliases, - llm_providers, - state_storage, - request_id, - request_path, - request_headers, - ) - .instrument(request_span) - .await -} - -#[allow(clippy::too_many_arguments)] -async fn llm_chat_inner( - request: Request, - router_service: Arc, - full_qualified_llm_provider_url: String, - model_aliases: Arc>>, - llm_providers: Arc>, - state_storage: Option>, - request_id: String, - request_path: String, - mut request_headers: hyper::HeaderMap, -) -> Result>, hyper::Error> { - // Set service name for LLM operations - set_service_name(operation_component::LLM); - // Extract or generate traceparent - this establishes the trace context for all spans let traceparent: String = match request_headers .get(TRACE_PARENT_HEADER) @@ -111,18 +83,20 @@ async fn llm_chat_inner( let trace_id = Uuid::new_v4().to_string().replace("-", ""); let generated_tp = format!("00-{}-0000000000000000-01", trace_id); warn!( - generated_traceparent = %generated_tp, - "TRACE_PARENT header missing, generated new traceparent" + "[PLANO_REQ_ID:{}] | TRACE_PARENT header missing, generated new traceparent: {}", + request_id, generated_tp ); generated_tp } }; + let mut request_headers = request_headers; let chat_request_bytes = request.collect().await?.to_bytes(); debug!( - body = %String::from_utf8_lossy(&chat_request_bytes), - "request body received" + "[PLANO_REQ_ID:{}] | REQUEST_BODY (UTF8): {}", + request_id, + String::from_utf8_lossy(&chat_request_bytes) ); let mut client_request = match ProviderRequestType::try_from(( @@ -132,10 +106,13 @@ async fn llm_chat_inner( Ok(request) => request, Err(err) => { warn!( - error = %err, - "failed to parse request as ProviderRequestType" + "[PLANO_REQ_ID:{}] | FAILURE | Failed to parse request as ProviderRequestType: {}", + request_id, err + ); + let err_msg = format!( + "[PLANO_REQ_ID:{}] | FAILURE | Failed to parse request: {}", + request_id, err ); - let err_msg = format!("Failed to parse request: {}", err); let mut bad_request = Response::new(full(err_msg)); *bad_request.status_mut() = StatusCode::BAD_REQUEST; return Ok(bad_request); @@ -155,21 +132,16 @@ async fn llm_chat_inner( let model_from_request = client_request.model().to_string(); let temperature = client_request.get_temperature(); let is_streaming_request = client_request.is_streaming(); - let alias_resolved_model = resolve_model_alias(&model_from_request, &model_aliases); + let resolved_model = resolve_model_alias(&model_from_request, &model_aliases); // Validate that the requested model exists in configuration // This matches the validation in llm_gateway routing.rs - if llm_providers - .read() - .await - .get(&alias_resolved_model) - .is_none() - { + if llm_providers.read().await.get(&resolved_model).is_none() { let err_msg = format!( "Model '{}' not found in configured providers", - alias_resolved_model + resolved_model ); - warn!(model = %alias_resolved_model, "model not found in configured providers"); + warn!("[PLANO_REQ_ID:{}] | FAILURE | {}", request_id, err_msg); let mut bad_request = Response::new(full(err_msg)); *bad_request.status_mut() = StatusCode::BAD_REQUEST; return Ok(bad_request); @@ -177,10 +149,10 @@ async fn llm_chat_inner( // Handle provider/model slug format (e.g., "openai/gpt-4") // Extract just the model name for upstream (providers don't understand the slug) - let model_name_only = if let Some((_, model)) = alias_resolved_model.split_once('/') { + let model_name_only = if let Some((_, model)) = resolved_model.split_once('/') { model.to_string() } else { - alias_resolved_model.clone() + resolved_model.clone() }; // Extract tool names and user message preview for span attributes @@ -188,30 +160,18 @@ async fn llm_chat_inner( let user_message_preview = client_request .get_recent_user_message() .map(|msg| truncate_message(&msg, 50)); - let span = tracing::Span::current(); - if let Some(temp) = temperature { - span.record(tracing_llm::TEMPERATURE, tracing::field::display(temp)); - } - if let Some(tools) = &tool_names { - let formatted_tools = tools - .iter() - .map(|name| format!("{}(...)", name)) - .collect::>() - .join("\n"); - span.record(tracing_llm::TOOLS, formatted_tools.as_str()); - } - if let Some(preview) = &user_message_preview { - span.record(tracing_llm::USER_MESSAGE_PREVIEW, preview.as_str()); - } // Extract messages for signal analysis (clone before moving client_request) - let messages_for_signals = Some(client_request.get_messages()); + let messages_for_signals = client_request.get_messages(); // Set the model to just the model name (without provider prefix) // This ensures upstream receives "gpt-4" not "openai/gpt-4" client_request.set_model(model_name_only.clone()); - if client_request.remove_metadata_key("plano_preference_config") { - debug!("removed plano_preference_config from metadata"); + if client_request.remove_metadata_key("archgw_preference_config") { + debug!( + "[PLANO_REQ_ID:{}] Removed archgw_preference_config from metadata", + request_id + ); } // === v1/responses state management: Determine upstream API and combine input if needed === @@ -230,9 +190,9 @@ async fn llm_chat_inner( // Get the upstream path and check if it's ResponsesAPI let upstream_path = get_upstream_path( &llm_providers, - &alias_resolved_model, + &resolved_model, &request_path, - &alias_resolved_model, + &resolved_model, is_streaming_request, ) .await; @@ -259,17 +219,14 @@ async fn llm_chat_inner( // Update both the request and original_input_items responses_req.input = InputParam::Items(combined_input.clone()); original_input_items = combined_input; - info!( - items = original_input_items.len(), - "updated request with conversation history" - ); + info!("[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Updated request with conversation history ({} items)", request_id, original_input_items.len()); } Err(StateStorageError::NotFound(_)) => { // Return 409 Conflict when previous_response_id not found - warn!(previous_response_id = %prev_resp_id, "previous response_id not found"); + warn!("[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Previous response_id not found: {}", request_id, prev_resp_id); let err_msg = format!( - "Conversation state not found for previous_response_id: {}", - prev_resp_id + "[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Conversation state not found for previous_response_id: {}", + request_id, prev_resp_id ); let mut conflict_response = Response::new(full(err_msg)); *conflict_response.status_mut() = StatusCode::CONFLICT; @@ -278,9 +235,8 @@ async fn llm_chat_inner( Err(e) => { // Log warning but continue on other storage errors warn!( - previous_response_id = %prev_resp_id, - error = %e, - "failed to retrieve conversation state" + "[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Failed to retrieve conversation state for {}: {}", + request_id, prev_resp_id, e ); // Restore original_input_items since we passed ownership original_input_items = extract_input_items(&responses_req.input); @@ -288,7 +244,10 @@ async fn llm_chat_inner( } } } else { - debug!("upstream supports ResponsesAPI natively"); + debug!( + "[PLANO_REQ_ID:{}] | BRIGHT_STAFF | Upstream supports ResponsesAPI natively.", + request_id + ); } } } @@ -297,29 +256,15 @@ async fn llm_chat_inner( let client_request_bytes_for_upstream = ProviderRequestType::to_bytes(&client_request).unwrap(); // Determine routing using the dedicated router_chat module - // This gets its own span for latency and error tracking - let routing_span = info_span!( - "routing", - component = "routing", - http.method = "POST", - http.target = %request_path, - model.requested = %model_from_request, - model.alias_resolved = %alias_resolved_model, - route.selected_model = tracing::field::Empty, - routing.determination_ms = tracing::field::Empty, - ); - let routing_result = match async { - set_service_name(operation_component::ROUTING); - router_chat_get_upstream_model( - router_service, - client_request, // Pass the original request - router_chat will convert it - &traceparent, - &request_path, - &request_id, - ) - .await - } - .instrument(routing_span) + let routing_result = match router_chat_get_upstream_model( + router_service, + client_request, // Pass the original request - router_chat will convert it + trace_collector.clone(), + &traceparent, + &request_path, + &request_id, + &custom_attrs, + ) .await { Ok(result) => result, @@ -333,37 +278,22 @@ async fn llm_chat_inner( // Determine final model to use // Router returns "none" as a sentinel value when it doesn't select a specific model let router_selected_model = routing_result.model_name; - let resolved_model = if router_selected_model != "none" { + let model_name = if router_selected_model != "none" { // Router selected a specific model via routing preferences router_selected_model } else { // Router returned "none" sentinel, use validated resolved_model from request - alias_resolved_model.clone() + resolved_model.clone() }; - tracing::Span::current().record(tracing_llm::MODEL_NAME, resolved_model.as_str()); - - let span_name = if model_from_request == resolved_model { - format!("POST {} {}", request_path, resolved_model) - } else { - format!( - "POST {} {} -> {}", - request_path, model_from_request, resolved_model - ) - }; - get_active_span(|span| { - span.update_name(span_name.clone()); - }); debug!( - url = %full_qualified_llm_provider_url, - provider_hint = %resolved_model, - upstream_model = %model_name_only, - "Routing to upstream" + "[PLANO_REQ_ID:{}] | ARCH_ROUTER URL | {}, Provider Hint: {}, Model for upstream: {}", + request_id, full_qualified_llm_provider_url, model_name, model_name_only ); request_headers.insert( ARCH_PROVIDER_HINT_HEADER, - header::HeaderValue::from_str(&resolved_model).unwrap(), + header::HeaderValue::from_str(&model_name).unwrap(), ); request_headers.insert( @@ -373,18 +303,12 @@ async fn llm_chat_inner( // remove content-length header if it exists request_headers.remove(header::CONTENT_LENGTH); - // Inject current LLM span's trace context so upstream spans are children of plano(llm) - global::get_text_map_propagator(|propagator| { - let cx = tracing_opentelemetry::OpenTelemetrySpanExt::context(&tracing::Span::current()); - propagator.inject_context(&cx, &mut HeaderInjector(&mut request_headers)); - }); - // Capture start time right before sending request to upstream let request_start_time = std::time::Instant::now(); - let _request_start_system_time = std::time::SystemTime::now(); + let request_start_system_time = std::time::SystemTime::now(); let llm_response = match reqwest::Client::new() - .post(&full_qualified_llm_provider_url) + .post(full_qualified_llm_provider_url) .headers(request_headers) .body(client_request_bytes_for_upstream) .send() @@ -411,12 +335,30 @@ async fn llm_chat_inner( // Build LLM span with actual status code using constants let byte_stream = llm_response.bytes_stream(); + // Build the LLM span (will be finalized after streaming completes) + let llm_span = build_llm_span( + &traceparent, + &request_path, + &resolved_model, + &model_name, + upstream_status.as_u16(), + is_streaming_request, + request_start_system_time, + tool_names, + user_message_preview, + temperature, + &llm_providers, + &custom_attrs, + ) + .await; + // Create base processor for metrics and tracing let base_processor = ObservableStreamProcessor::new( + trace_collector, operation_component::LLM, - span_name, + llm_span, request_start_time, - messages_for_signals, + Some(messages_for_signals), ); // === v1/responses state management: Wrap with ResponsesStateProcessor === @@ -437,8 +379,8 @@ async fn llm_chat_inner( base_processor, state_store, original_input_items, - alias_resolved_model.clone(), resolved_model.clone(), + model_name.clone(), is_streaming_request, false, // Not OpenAI upstream since should_manage_state is true content_encoding, @@ -479,6 +421,93 @@ fn resolve_model_alias( model_from_request.to_string() } +/// Builds the LLM span with all required and optional attributes. +#[allow(clippy::too_many_arguments)] +async fn build_llm_span( + traceparent: &str, + request_path: &str, + resolved_model: &str, + model_name: &str, + status_code: u16, + is_streaming: bool, + start_time: std::time::SystemTime, + tool_names: Option>, + user_message_preview: Option, + temperature: Option, + llm_providers: &Arc>>, + custom_attrs: &HashMap, +) -> common::traces::Span { + use crate::tracing::{http, llm, OperationNameBuilder}; + use common::traces::{parse_traceparent, SpanBuilder, SpanKind}; + + // Calculate the upstream path based on provider configuration + let upstream_path = get_upstream_path( + llm_providers, + model_name, + request_path, + resolved_model, + is_streaming, + ) + .await; + + // Build operation name showing path transformation if different + let operation_name = if request_path != upstream_path { + OperationNameBuilder::new() + .with_method("POST") + .with_path(format!("{} >> {}", request_path, upstream_path)) + .with_target(resolved_model) + .build() + } else { + OperationNameBuilder::new() + .with_method("POST") + .with_path(request_path) + .with_target(resolved_model) + .build() + }; + + let (trace_id, parent_span_id) = parse_traceparent(traceparent); + + let mut span_builder = SpanBuilder::new(&operation_name) + .with_trace_id(&trace_id) + .with_kind(SpanKind::Client) + .with_start_time(start_time) + .with_attribute(http::METHOD, "POST") + .with_attribute(http::STATUS_CODE, status_code.to_string()) + .with_attribute(http::TARGET, request_path.to_string()) + .with_attribute(http::UPSTREAM_TARGET, upstream_path) + .with_attribute(llm::MODEL_NAME, resolved_model.to_string()) + .with_attribute(llm::IS_STREAMING, is_streaming.to_string()); + + // Only set parent span ID if it exists (not a root span) + if let Some(parent) = parent_span_id { + span_builder = span_builder.with_parent_span_id(&parent); + } + + // Add optional attributes + if let Some(temp) = temperature { + span_builder = span_builder.with_attribute(llm::TEMPERATURE, temp.to_string()); + } + + if let Some(tools) = tool_names { + let formatted_tools = tools + .iter() + .map(|name| format!("{}(...)", name)) + .collect::>() + .join("\n"); + span_builder = span_builder.with_attribute(llm::TOOLS, formatted_tools); + } + + if let Some(preview) = user_message_preview { + span_builder = span_builder.with_attribute(llm::USER_MESSAGE_PREVIEW, preview); + } + + for (key, value) in custom_attrs { + span_builder = span_builder.with_attribute(key, value); + } + + span_builder.build() +} + /// Calculates the upstream path for the provider based on the model name. /// Looks up provider configuration, gets the ProviderId and base_url_path_prefix, /// then uses target_endpoint_for_provider to calculate the correct upstream path. diff --git a/crates/brightstaff/src/handlers/router_chat.rs b/crates/brightstaff/src/handlers/router_chat.rs index d71734fa..210e6bf6 100644 --- a/crates/brightstaff/src/handlers/router_chat.rs +++ b/crates/brightstaff/src/handlers/router_chat.rs @@ -1,12 +1,14 @@ use common::configuration::ModelUsagePreference; +use common::traces::{parse_traceparent, SpanBuilder, SpanKind, TraceCollector}; use hermesllm::clients::endpoints::SupportedUpstreamAPIs; use hermesllm::{ProviderRequest, ProviderRequestType}; use hyper::StatusCode; +use std::collections::HashMap; use std::sync::Arc; use tracing::{debug, info, warn}; use crate::router::llm_router::RouterService; -use crate::tracing::routing; +use crate::tracing::{http, operation_component, routing, OperationNameBuilder}; pub struct RoutingResult { pub model_name: String, @@ -34,9 +36,11 @@ impl RoutingError { pub async fn router_chat_get_upstream_model( router_service: Arc, client_request: ProviderRequestType, + trace_collector: Arc, traceparent: &str, request_path: &str, request_id: &str, + custom_attrs: &HashMap, ) -> Result { // Clone metadata for routing before converting (which consumes client_request) let routing_metadata = client_request.metadata().clone(); @@ -53,14 +57,14 @@ pub async fn router_chat_get_upstream_model( | ProviderRequestType::BedrockConverseStream(_) | ProviderRequestType::ResponsesAPIRequest(_), ) => { - warn!("unexpected: got non-ChatCompletions request after converting to OpenAI format"); + warn!("Unexpected: got non-ChatCompletions request after converting to OpenAI format"); return Err(RoutingError::internal_error( "Request conversion failed".to_string(), )); } Err(err) => { warn!( - "failed to convert request to ChatCompletionsRequest: {}", + "Failed to convert request to ChatCompletionsRequest: {}", err ); return Err(RoutingError::internal_error(format!( @@ -71,8 +75,9 @@ pub async fn router_chat_get_upstream_model( }; debug!( - request = %serde_json::to_string(&chat_request).unwrap(), - "router request" + "[PLANO_REQ_ID: {:?}]: ROUTER_REQ: {}", + request_id, + &serde_json::to_string(&chat_request).unwrap() ); // Extract usage preferences from metadata @@ -108,14 +113,16 @@ pub async fn router_chat_get_upstream_model( }; info!( - has_usage_preferences = usage_preferences.is_some(), - path = %request_path, - latest_message = %latest_message_for_log, - "processing router request" + "[PLANO_REQ_ID: {:?}] | ROUTER_REQ | Usage preferences from request: {}, request_path: {}, latest message: {}", + request_id, + usage_preferences.is_some(), + request_path, + latest_message_for_log ); // Capture start time for routing span let routing_start_time = std::time::Instant::now(); + let routing_start_system_time = std::time::SystemTime::now(); // Attempt to determine route using the router service let routing_result = router_service @@ -127,21 +134,47 @@ pub async fn router_chat_get_upstream_model( ) .await; - let determination_ms = routing_start_time.elapsed().as_millis() as i64; - let current_span = tracing::Span::current(); - current_span.record(routing::ROUTE_DETERMINATION_MS, determination_ms); - match routing_result { Ok(route) => match route { Some((_, model_name)) => { - current_span.record("route.selected_model", model_name.as_str()); + // Record successful routing span + let mut attrs: HashMap = HashMap::new(); + attrs.insert("route.selected_model".to_string(), model_name.clone()); + for (key, value) in custom_attrs { + attrs.entry(key.clone()).or_insert_with(|| value.clone()); + } + record_routing_span( + trace_collector, + traceparent, + routing_start_time, + routing_start_system_time, + attrs, + ) + .await; + Ok(RoutingResult { model_name }) } None => { // No route determined, return sentinel value "none" // This signals to llm.rs to use the original validated request model - current_span.record("route.selected_model", "none"); - info!("no route determined, using default model"); + info!( + "[PLANO_REQ_ID: {}] | ROUTER_REQ | No route determined, returning sentinel 'none'", + request_id + ); + + let mut attrs = HashMap::new(); + attrs.insert("route.selected_model".to_string(), "none".to_string()); + for (key, value) in custom_attrs { + attrs.entry(key.clone()).or_insert_with(|| value.clone()); + } + record_routing_span( + trace_collector, + traceparent, + routing_start_time, + routing_start_system_time, + attrs, + ) + .await; Ok(RoutingResult { model_name: "none".to_string(), @@ -149,7 +182,22 @@ pub async fn router_chat_get_upstream_model( } }, Err(err) => { - current_span.record("route.selected_model", "unknown"); + // Record failed routing span + let mut attrs = HashMap::new(); + attrs.insert("route.selected_model".to_string(), "unknown".to_string()); + attrs.insert("error.message".to_string(), err.to_string()); + for (key, value) in custom_attrs { + attrs.entry(key.clone()).or_insert_with(|| value.clone()); + } + record_routing_span( + trace_collector, + traceparent, + routing_start_time, + routing_start_system_time, + attrs, + ) + .await; + Err(RoutingError::internal_error(format!( "Failed to determine route: {}", err @@ -157,3 +205,53 @@ pub async fn router_chat_get_upstream_model( } } } + +/// Helper function to record a routing span with the given attributes. +/// Reduces code duplication across different routing outcomes. +async fn record_routing_span( + trace_collector: Arc, + traceparent: &str, + start_time: std::time::Instant, + start_system_time: std::time::SystemTime, + attrs: HashMap, +) { + // The routing always uses OpenAI Chat Completions format internally, + // so we log that as the actual API being used for routing + let routing_api_path = "/v1/chat/completions"; + + let routing_operation_name = OperationNameBuilder::new() + .with_method("POST") + .with_path(routing_api_path) + .with_target("Arch-Router-1.5B") + .build(); + + let (trace_id, parent_span_id) = parse_traceparent(traceparent); + + // Build the routing span directly using constants + let mut span_builder = SpanBuilder::new(&routing_operation_name) + .with_trace_id(&trace_id) + .with_kind(SpanKind::Client) + .with_start_time(start_system_time) + .with_end_time(std::time::SystemTime::now()) + .with_attribute(http::METHOD, "POST") + .with_attribute(http::TARGET, routing_api_path.to_string()) + .with_attribute( + routing::ROUTE_DETERMINATION_MS, + start_time.elapsed().as_millis().to_string(), + ); + + // Only set parent span ID if it exists (not a root span) + if let Some(parent) = parent_span_id { + span_builder = span_builder.with_parent_span_id(&parent); + } + + // Add all custom attributes + for (key, value) in attrs { + span_builder = span_builder.with_attribute(key, value); + } + + let span = span_builder.build(); + + // Record the span directly to the collector + trace_collector.record_span(operation_component::ROUTING, span); +} diff --git a/crates/brightstaff/src/main.rs b/crates/brightstaff/src/main.rs index fff69b00..5e8af979 100644 --- a/crates/brightstaff/src/main.rs +++ b/crates/brightstaff/src/main.rs @@ -14,6 +14,7 @@ use common::consts::{ CHAT_COMPLETIONS_PATH, MESSAGES_PATH, OPENAI_RESPONSES_API_PATH, PLANO_ORCHESTRATOR_MODEL_NAME, }; use common::llm_providers::LlmProviders; +use common::traces::TraceCollector; use http_body_util::{combinators::BoxBody, BodyExt, Empty}; use hyper::body::Incoming; use hyper::server::conn::http1; @@ -114,11 +115,22 @@ async fn main() -> Result<(), Box> { )); let model_aliases = Arc::new(plano_config.model_aliases.clone()); + let tracing_config = Arc::new(plano_config.tracing.clone()); // Initialize trace collector and start background flusher // Tracing is enabled if the tracing config is present in plano_config.yaml // Pass Some(true/false) to override, or None to use env var OTEL_TRACING_ENABLED - // OpenTelemetry automatic instrumentation is configured in utils/tracing.rs + let tracing_enabled = if tracing_config.is_some() { + info!("Tracing configuration found in plano_config.yaml"); + Some(true) + } else { + info!( + "No tracing configuration in plano_config.yaml, will check OTEL_TRACING_ENABLED env var" + ); + None + }; + let trace_collector = Arc::new(TraceCollector::new(tracing_enabled)); + let _flusher_handle = trace_collector.clone().start_background_flusher(); // Initialize conversation state storage for v1/responses // Configurable via plano_config.yaml state_storage section @@ -128,10 +140,7 @@ async fn main() -> Result<(), Box> { if let Some(storage_config) = &plano_config.state_storage { let storage: Arc = match storage_config.storage_type { common::configuration::StateStorageType::Memory => { - info!( - storage_type = "memory", - "initialized conversation state storage" - ); + info!("Initialized conversation state storage: Memory"); Arc::new(MemoryConversationalStorage::new()) } common::configuration::StateStorageType::Postgres => { @@ -140,11 +149,8 @@ async fn main() -> Result<(), Box> { .as_ref() .expect("connection_string is required for postgres state_storage"); - debug!(connection_string = %connection_string, "postgres connection"); - info!( - storage_type = "postgres", - "initializing conversation state storage" - ); + debug!("Postgres connection string (full): {}", connection_string); + info!("Initializing conversation state storage: Postgres"); Arc::new( PostgreSQLConversationStorage::new(connection_string.clone()) .await @@ -154,7 +160,7 @@ async fn main() -> Result<(), Box> { }; Some(storage) } else { - info!("no state_storage configured, conversation state management disabled"); + info!("No state_storage configured - conversation state management disabled"); None }; @@ -173,6 +179,8 @@ async fn main() -> Result<(), Box> { let llm_providers = llm_providers.clone(); let agents_list = combined_agents_filters_list.clone(); let listeners = listeners.clone(); + let trace_collector = trace_collector.clone(); + let tracing_config = tracing_config.clone(); let state_storage = state_storage.clone(); let service = service_fn(move |req| { let router_service = Arc::clone(&router_service); @@ -183,6 +191,8 @@ async fn main() -> Result<(), Box> { let model_aliases = Arc::clone(&model_aliases); let agents_list = agents_list.clone(); let listeners = listeners.clone(); + let trace_collector = trace_collector.clone(); + let tracing_config = tracing_config.clone(); let state_storage = state_storage.clone(); async move { @@ -202,6 +212,8 @@ async fn main() -> Result<(), Box> { fully_qualified_url, agents_list, listeners, + trace_collector, + tracing_config, ) .with_context(parent_cx) .await; @@ -219,6 +231,8 @@ async fn main() -> Result<(), Box> { fully_qualified_url, model_aliases, llm_providers, + trace_collector, + tracing_config, state_storage, ) .with_context(parent_cx) @@ -259,7 +273,7 @@ async fn main() -> Result<(), Box> { Ok(response) } _ => { - debug!(method = %req.method(), path = %req.uri().path(), "no route found"); + debug!("No route for {} {}", req.method(), req.uri().path()); let mut not_found = Response::new(empty()); *not_found.status_mut() = StatusCode::NOT_FOUND; Ok(not_found) @@ -269,13 +283,13 @@ async fn main() -> Result<(), Box> { }); tokio::task::spawn(async move { - debug!(peer = ?peer_addr, "accepted connection"); + debug!("Accepted connection from {:?}", peer_addr); if let Err(err) = http1::Builder::new() // .serve_connection(io, service_fn(chat_completion)) .serve_connection(io, service) .await { - warn!(error = ?err, "error serving connection"); + warn!("Error serving connection: {:?}", err); } }); } diff --git a/crates/brightstaff/src/tracing/custom_attributes.rs b/crates/brightstaff/src/tracing/custom_attributes.rs new file mode 100644 index 00000000..9f6d7c01 --- /dev/null +++ b/crates/brightstaff/src/tracing/custom_attributes.rs @@ -0,0 +1,137 @@ +use std::collections::HashMap; + +use common::configuration::{CustomTraceAttribute, CustomTraceAttributeType}; +use common::traces::SpanBuilder; +use hyper::header::{HeaderMap, HeaderName}; + +pub fn extract_custom_trace_attributes( + headers: &HeaderMap, + custom_attributes: Option<&[CustomTraceAttribute]>, +) -> HashMap { + let mut attributes = HashMap::new(); + let Some(custom_attributes) = custom_attributes else { + return attributes; + }; + + for attribute in custom_attributes { + // Normalize/validate the configured header name; skip invalid names. + let header_name = match HeaderName::from_bytes(attribute.header.as_bytes()) { + Ok(name) => name, + Err(_) => continue, + }; + + // Extract header value as UTF-8 text; skip missing or invalid values. + let raw_value = match headers + .get(header_name) + .and_then(|value| value.to_str().ok()) + { + Some(value) => value.trim(), + None => continue, + }; + + // Parse the header value according to the configured type. + let parsed_value = match attribute.value_type { + CustomTraceAttributeType::Str => Some(raw_value.to_string()), + CustomTraceAttributeType::Bool => raw_value.parse::().ok().map(|v| v.to_string()), + CustomTraceAttributeType::Float => raw_value.parse::().ok().map(|v| v.to_string()), + CustomTraceAttributeType::Int => raw_value.parse::().ok().map(|v| v.to_string()), + }; + + // Only include attributes that successfully parsed. + if let Some(value) = parsed_value { + attributes.insert(attribute.key.clone(), value); + } + } + + attributes +} + +pub fn collect_custom_trace_attributes( + headers: &HeaderMap, + custom_attributes: Option<&[CustomTraceAttribute]>, +) -> HashMap { + extract_custom_trace_attributes(headers, custom_attributes) +} + +pub fn append_span_attributes( + mut span_builder: SpanBuilder, + attributes: &HashMap, +) -> SpanBuilder { + for (key, value) in attributes { + span_builder = span_builder.with_attribute(key, value); + } + span_builder +} + +#[cfg(test)] +mod tests { + use super::extract_custom_trace_attributes; + use common::configuration::{CustomTraceAttribute, CustomTraceAttributeType}; + use hyper::header::{HeaderMap, HeaderValue}; + + #[test] + fn extracts_and_parses_custom_headers() { + let mut headers = HeaderMap::new(); + headers.insert("x-workspace-id", HeaderValue::from_static("ws_123")); + headers.insert("x-tenant-id", HeaderValue::from_static("ten_456")); + headers.insert("x-user-id", HeaderValue::from_static("usr_789")); + headers.insert("x-admin-level", HeaderValue::from_static("3")); + headers.insert("x-is-internal", HeaderValue::from_static("true")); + headers.insert("x-budget", HeaderValue::from_static("42.5")); + headers.insert("x-bad-int", HeaderValue::from_static("nope")); + + let custom_attributes = vec![ + CustomTraceAttribute { + key: "workspace.id".to_string(), + value_type: CustomTraceAttributeType::Str, + header: "x-workspace-id".to_string(), + }, + CustomTraceAttribute { + key: "tenant.id".to_string(), + value_type: CustomTraceAttributeType::Str, + header: "x-tenant-id".to_string(), + }, + CustomTraceAttribute { + key: "user.id".to_string(), + value_type: CustomTraceAttributeType::Str, + header: "x-user-id".to_string(), + }, + CustomTraceAttribute { + key: "admin.level".to_string(), + value_type: CustomTraceAttributeType::Int, + header: "x-admin-level".to_string(), + }, + CustomTraceAttribute { + key: "is.internal".to_string(), + value_type: CustomTraceAttributeType::Bool, + header: "x-is-internal".to_string(), + }, + CustomTraceAttribute { + key: "budget.value".to_string(), + value_type: CustomTraceAttributeType::Float, + header: "x-budget".to_string(), + }, + CustomTraceAttribute { + key: "bad.int".to_string(), + value_type: CustomTraceAttributeType::Int, + header: "x-bad-int".to_string(), + }, + CustomTraceAttribute { + key: "missing.header".to_string(), + value_type: CustomTraceAttributeType::Str, + header: "x-missing".to_string(), + }, + ]; + + let attrs = extract_custom_trace_attributes(&headers, Some(&custom_attributes)); + + assert_eq!(attrs.get("workspace.id"), Some(&"ws_123".to_string())); + assert_eq!(attrs.get("tenant.id"), Some(&"ten_456".to_string())); + assert_eq!(attrs.get("user.id"), Some(&"usr_789".to_string())); + assert_eq!(attrs.get("admin.level"), Some(&"3".to_string())); + assert_eq!(attrs.get("is.internal"), Some(&"true".to_string())); + assert_eq!(attrs.get("budget.value"), Some(&"42.5".to_string())); + assert!(!attrs.contains_key("bad.int")); + assert!(!attrs.contains_key("missing.header")); + } +} diff --git a/crates/brightstaff/src/tracing/mod.rs b/crates/brightstaff/src/tracing/mod.rs index 7332170c..f0d18ea4 100644 --- a/crates/brightstaff/src/tracing/mod.rs +++ b/crates/brightstaff/src/tracing/mod.rs @@ -1,35 +1,9 @@ mod constants; -mod service_name_exporter; +mod custom_attributes; pub use constants::{ error, http, llm, operation_component, routing, signals, OperationNameBuilder, }; -pub use service_name_exporter::{ServiceNameOverrideExporter, SERVICE_NAME_OVERRIDE_KEY}; - -use opentelemetry::trace::get_active_span; -use opentelemetry::KeyValue; - -/// Sets the service name override on the current active OpenTelemetry span. -/// -/// This function adds the `service.name.override` attribute to the active -/// OpenTelemetry span, which allows observability backends to filter and group -/// spans by their logical service (e.g., `plano(llm)`, `plano(filter)`). -/// -/// # Arguments -/// * `service_name` - The service name to use (e.g., `operation_component::LLM`) -/// -/// # Example -/// ```rust,ignore -/// use brightstaff::tracing::{set_service_name, operation_component}; -/// -/// // Inside a traced function: -/// set_service_name(operation_component::LLM); -/// ``` -pub fn set_service_name(service_name: &str) { - get_active_span(|span| { - span.set_attribute(KeyValue::new( - SERVICE_NAME_OVERRIDE_KEY, - service_name.to_string(), - )); - }); -} +pub use custom_attributes::{ + append_span_attributes, collect_custom_trace_attributes, extract_custom_trace_attributes, +}; diff --git a/crates/common/src/configuration.rs b/crates/common/src/configuration.rs index 0a683b8b..b6888ac6 100644 --- a/crates/common/src/configuration.rs +++ b/crates/common/src/configuration.rs @@ -90,8 +90,24 @@ pub struct Overrides { pub struct Tracing { pub sampling_rate: Option, pub trace_arch_internal: Option, - pub random_sampling: Option, - pub opentracing_grpc_endpoint: Option, + pub custom_attributes: Option>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CustomTraceAttribute { + pub key: String, + #[serde(rename = "type")] + pub value_type: CustomTraceAttributeType, + pub header: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +pub enum CustomTraceAttributeType { + Str, + Bool, + Float, + Int, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Default)] diff --git a/demos/agent_orchestration/travel_agents/config.yaml b/demos/agent_orchestration/travel_agents/config.yaml index 2cb24d71..4bfc6ea5 100644 --- a/demos/agent_orchestration/travel_agents/config.yaml +++ b/demos/agent_orchestration/travel_agents/config.yaml @@ -55,3 +55,22 @@ listeners: tracing: random_sampling: 100 + custom_attributes: + - header: x-workspace-id + key: workspace.id + type: str + - header: x-tenant-id + key: tenant.id + type: str + - header: x-user-id + key: user.id + type: str + - header: x-admin-level + key: admin.level + type: int + - header: x-is-internal + key: is.internal + type: bool + - header: x-budget + key: budget.value + type: float diff --git a/demos/agent_orchestration/travel_agents/test.rest b/demos/agent_orchestration/travel_agents/test.rest index f3ecaf66..0d188104 100644 --- a/demos/agent_orchestration/travel_agents/test.rest +++ b/demos/agent_orchestration/travel_agents/test.rest @@ -3,6 +3,12 @@ ### Travel Agent Chat Completion Request POST {{llm_endpoint}}/v1/chat/completions HTTP/1.1 Content-Type: application/json +X-Workspace-Id: ws_7e2c5d91b4224f59b0e6a4e0125c21b3 +X-Tenant-Id: ten_4102a8c7fa6542b084b395d2df184a9a +X-User-Id: usr_19df7e6751b846f9ba026776e3c12abe +X-Admin-Level: 3 +X-Is-Internal: true +X-Budget: 42.5 { "model": "gpt-4o",