diff --git a/config/arch_config_schema.yaml b/config/arch_config_schema.yaml index 003bb9b4..8fd98e2c 100644 --- a/config/arch_config_schema.yaml +++ b/config/arch_config_schema.yaml @@ -382,6 +382,27 @@ properties: type: integer trace_arch_internal: type: boolean + custom_attributes: + type: array + items: + type: object + properties: + key: + type: string + type: + type: string + enum: + - str + - bool + - float + - int + header: + type: string + additionalProperties: false + required: + - key + - type + - header additionalProperties: false mode: type: string diff --git a/crates/brightstaff/src/handlers/agent_chat_completions.rs b/crates/brightstaff/src/handlers/agent_chat_completions.rs index 5ced34c0..da836ae7 100644 --- a/crates/brightstaff/src/handlers/agent_chat_completions.rs +++ b/crates/brightstaff/src/handlers/agent_chat_completions.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use std::time::{Instant, SystemTime}; use bytes::Bytes; +use common::configuration::Tracing; use common::consts::TRACE_PARENT_HEADER; use common::traces::{generate_random_span_id, parse_traceparent, SpanBuilder, SpanKind}; use hermesllm::apis::OpenAIMessage; @@ -18,7 +19,9 @@ use super::agent_selector::{AgentSelectionError, AgentSelector}; use super::pipeline_processor::{PipelineError, PipelineProcessor}; use super::response_handler::ResponseHandler; use crate::router::plano_orchestrator::OrchestratorService; -use crate::tracing::{http, operation_component, OperationNameBuilder}; +use crate::tracing::{ + extract_custom_trace_attributes, http, operation_component, OperationNameBuilder, +}; /// Main errors for agent chat completions #[derive(Debug, thiserror::Error)] @@ -42,6 +45,7 @@ pub async fn agent_chat( agents_list: Arc>>>, listeners: Arc>>, trace_collector: Arc, + tracing_config: Arc>, ) -> Result>, hyper::Error> { match handle_agent_chat( request, @@ -49,6 +53,7 @@ pub async fn agent_chat( agents_list, listeners, trace_collector, + tracing_config, ) .await { @@ -127,6 +132,7 @@ async fn handle_agent_chat( agents_list: Arc>>>, listeners: Arc>>, trace_collector: Arc, + tracing_config: Arc>, ) -> Result>, AgentFilterChainError> { // Initialize services let agent_selector = AgentSelector::new(orchestrator_service); @@ -176,6 +182,13 @@ async fn handle_agent_chat( headers }; + let custom_attrs = extract_custom_trace_attributes( + &request_headers, + tracing_config + .as_ref() + .as_ref() + .and_then(|tracing| tracing.custom_attributes.as_deref()), + ); let chat_request_bytes = request.collect().await?.to_bytes(); @@ -269,6 +282,9 @@ async fn handle_agent_chat( "duration_ms", format!("{:.2}", selection_elapsed.as_secs_f64() * 1000.0), ); + for (key, value) in &custom_attrs { + selection_span_builder = selection_span_builder.with_attribute(key, value); + } if !trace_id.is_empty() { selection_span_builder = selection_span_builder.with_trace_id(trace_id.clone()); @@ -359,6 +375,9 @@ async fn handle_agent_chat( "duration_ms", format!("{:.2}", agent_elapsed.as_secs_f64() * 1000.0), ); + for (key, value) in &custom_attrs { + span_builder = span_builder.with_attribute(key, value); + } if !trace_id.is_empty() { span_builder = span_builder.with_trace_id(trace_id.clone()); diff --git a/crates/brightstaff/src/handlers/llm.rs b/crates/brightstaff/src/handlers/llm.rs index e1fe5a93..910d10fb 100644 --- a/crates/brightstaff/src/handlers/llm.rs +++ b/crates/brightstaff/src/handlers/llm.rs @@ -1,5 +1,5 @@ use bytes::Bytes; -use common::configuration::ModelAlias; +use common::configuration::{ModelAlias, Tracing}; use common::consts::{ ARCH_IS_STREAMING_HEADER, ARCH_PROVIDER_HINT_HEADER, REQUEST_ID_HEADER, TRACE_PARENT_HEADER, }; @@ -26,7 +26,7 @@ use crate::state::response_state_processor::ResponsesStateProcessor; use crate::state::{ extract_input_items, retrieve_and_combine_input, StateStorage, StateStorageError, }; -use crate::tracing::operation_component; +use crate::tracing::{extract_custom_trace_attributes, operation_component}; fn full>(chunk: T) -> BoxBody { Full::new(chunk.into()) @@ -34,6 +34,8 @@ fn full>(chunk: T) -> BoxBody { .boxed() } +// ! we reached the limit of the number of arguments for a function +#[allow(clippy::too_many_arguments)] pub async fn llm_chat( request: Request, router_service: Arc, @@ -41,10 +43,18 @@ pub async fn llm_chat( model_aliases: Arc>>, llm_providers: Arc>, trace_collector: Arc, + tracing_config: Arc>, // ! right here state_storage: Option>, ) -> Result>, hyper::Error> { let request_path = request.uri().path().to_string(); let request_headers = request.headers().clone(); + let custom_attrs = extract_custom_trace_attributes( + &request_headers, + tracing_config + .as_ref() + .as_ref() + .and_then(|tracing| tracing.custom_attributes.as_deref()), + ); let request_id: String = match request_headers .get(REQUEST_ID_HEADER) .and_then(|h| h.to_str().ok()) @@ -253,6 +263,7 @@ pub async fn llm_chat( &traceparent, &request_path, &request_id, + &custom_attrs, ) .await { @@ -337,6 +348,7 @@ pub async fn llm_chat( user_message_preview, temperature, &llm_providers, + &custom_attrs, ) .await; @@ -422,7 +434,8 @@ async fn build_llm_span( tool_names: Option>, user_message_preview: Option, temperature: Option, - llm_providers: &Arc>, + llm_providers: &Arc>>, + custom_attrs: &HashMap, ) -> common::traces::Span { use crate::tracing::{http, llm, OperationNameBuilder}; use common::traces::{parse_traceparent, SpanBuilder, SpanKind}; @@ -488,6 +501,10 @@ async fn build_llm_span( span_builder = span_builder.with_attribute(llm::USER_MESSAGE_PREVIEW, preview); } + for (key, value) in custom_attrs { + span_builder = span_builder.with_attribute(key, value); + } + span_builder.build() } diff --git a/crates/brightstaff/src/handlers/router_chat.rs b/crates/brightstaff/src/handlers/router_chat.rs index c3a517e0..29cd0f87 100644 --- a/crates/brightstaff/src/handlers/router_chat.rs +++ b/crates/brightstaff/src/handlers/router_chat.rs @@ -40,6 +40,7 @@ pub async fn router_chat_get_upstream_model( traceparent: &str, request_path: &str, request_id: &str, + custom_attrs: &HashMap, ) -> Result { // Clone metadata for routing before converting (which consumes client_request) let routing_metadata = client_request.metadata().clone(); @@ -139,6 +140,9 @@ pub async fn router_chat_get_upstream_model( // Record successful routing span let mut attrs: HashMap = HashMap::new(); attrs.insert("route.selected_model".to_string(), model_name.clone()); + for (key, value) in custom_attrs { + attrs.entry(key.clone()).or_insert_with(|| value.clone()); + } record_routing_span( trace_collector, traceparent, @@ -160,6 +164,9 @@ pub async fn router_chat_get_upstream_model( let mut attrs = HashMap::new(); attrs.insert("route.selected_model".to_string(), "none".to_string()); + for (key, value) in custom_attrs { + attrs.entry(key.clone()).or_insert_with(|| value.clone()); + } record_routing_span( trace_collector, traceparent, @@ -179,6 +186,9 @@ pub async fn router_chat_get_upstream_model( let mut attrs = HashMap::new(); attrs.insert("route.selected_model".to_string(), "unknown".to_string()); attrs.insert("error.message".to_string(), err.to_string()); + for (key, value) in custom_attrs { + attrs.entry(key.clone()).or_insert_with(|| value.clone()); + } record_routing_span( trace_collector, traceparent, diff --git a/crates/brightstaff/src/main.rs b/crates/brightstaff/src/main.rs index b8fa8832..1a896f8f 100644 --- a/crates/brightstaff/src/main.rs +++ b/crates/brightstaff/src/main.rs @@ -112,6 +112,7 @@ async fn main() -> Result<(), Box> { )); let model_aliases = Arc::new(arch_config.model_aliases.clone()); + let tracing_config = Arc::new(arch_config.tracing.clone()); // Initialize trace collector and start background flusher // Tracing is enabled if the tracing config is present in arch_config.yaml @@ -176,6 +177,7 @@ async fn main() -> Result<(), Box> { let agents_list = combined_agents_filters_list.clone(); let listeners = listeners.clone(); let trace_collector = trace_collector.clone(); + let tracing_config = tracing_config.clone(); let state_storage = state_storage.clone(); let service = service_fn(move |req| { let router_service = Arc::clone(&router_service); @@ -187,6 +189,7 @@ async fn main() -> Result<(), Box> { let agents_list = agents_list.clone(); let listeners = listeners.clone(); let trace_collector = trace_collector.clone(); + let tracing_config = tracing_config.clone(); let state_storage = state_storage.clone(); async move { @@ -207,6 +210,7 @@ async fn main() -> Result<(), Box> { agents_list, listeners, trace_collector, + tracing_config, ) .with_context(parent_cx) .await; @@ -225,6 +229,7 @@ async fn main() -> Result<(), Box> { model_aliases, llm_providers, trace_collector, + tracing_config, state_storage, ) .with_context(parent_cx) diff --git a/crates/brightstaff/src/tracing/custom_attributes.rs b/crates/brightstaff/src/tracing/custom_attributes.rs new file mode 100644 index 00000000..1e1b1f08 --- /dev/null +++ b/crates/brightstaff/src/tracing/custom_attributes.rs @@ -0,0 +1,119 @@ +use std::collections::HashMap; + +use common::configuration::{CustomTraceAttribute, CustomTraceAttributeType}; +use hyper::header::{HeaderMap, HeaderName}; + +pub fn extract_custom_trace_attributes( + headers: &HeaderMap, + custom_attributes: Option<&[CustomTraceAttribute]>, +) -> HashMap { + let mut attributes = HashMap::new(); + let Some(custom_attributes) = custom_attributes else { + return attributes; + }; + + for attribute in custom_attributes { + // Normalize/validate the configured header name; skip invalid names. + let header_name = match HeaderName::from_bytes(attribute.header.as_bytes()) { + Ok(name) => name, + Err(_) => continue, + }; + + // Extract header value as UTF-8 text; skip missing or invalid values. + let raw_value = match headers + .get(header_name) + .and_then(|value| value.to_str().ok()) + { + Some(value) => value.trim(), + None => continue, + }; + + // Parse the header value according to the configured type. + let parsed_value = match attribute.value_type { + CustomTraceAttributeType::Str => Some(raw_value.to_string()), + CustomTraceAttributeType::Bool => raw_value.parse::().ok().map(|v| v.to_string()), + CustomTraceAttributeType::Float => raw_value.parse::().ok().map(|v| v.to_string()), + CustomTraceAttributeType::Int => raw_value.parse::().ok().map(|v| v.to_string()), + }; + + // Only include attributes that successfully parsed. + if let Some(value) = parsed_value { + attributes.insert(attribute.key.clone(), value); + } + } + + attributes +} + +#[cfg(test)] +mod tests { + use super::extract_custom_trace_attributes; + use common::configuration::{CustomTraceAttribute, CustomTraceAttributeType}; + use hyper::header::{HeaderMap, HeaderValue}; + + #[test] + fn extracts_and_parses_custom_headers() { + let mut headers = HeaderMap::new(); + headers.insert("x-workspace-id", HeaderValue::from_static("ws_123")); + headers.insert("x-tenant-id", HeaderValue::from_static("ten_456")); + headers.insert("x-user-id", HeaderValue::from_static("usr_789")); + headers.insert("x-admin-level", HeaderValue::from_static("3")); + headers.insert("x-is-internal", HeaderValue::from_static("true")); + headers.insert("x-budget", HeaderValue::from_static("42.5")); + headers.insert("x-bad-int", HeaderValue::from_static("nope")); + + let custom_attributes = vec![ + CustomTraceAttribute { + key: "workspace.id".to_string(), + value_type: CustomTraceAttributeType::Str, + header: "x-workspace-id".to_string(), + }, + CustomTraceAttribute { + key: "tenant.id".to_string(), + value_type: CustomTraceAttributeType::Str, + header: "x-tenant-id".to_string(), + }, + CustomTraceAttribute { + key: "user.id".to_string(), + value_type: CustomTraceAttributeType::Str, + header: "x-user-id".to_string(), + }, + CustomTraceAttribute { + key: "admin.level".to_string(), + value_type: CustomTraceAttributeType::Int, + header: "x-admin-level".to_string(), + }, + CustomTraceAttribute { + key: "is.internal".to_string(), + value_type: CustomTraceAttributeType::Bool, + header: "x-is-internal".to_string(), + }, + CustomTraceAttribute { + key: "budget.value".to_string(), + value_type: CustomTraceAttributeType::Float, + header: "x-budget".to_string(), + }, + CustomTraceAttribute { + key: "bad.int".to_string(), + value_type: CustomTraceAttributeType::Int, + header: "x-bad-int".to_string(), + }, + CustomTraceAttribute { + key: "missing.header".to_string(), + value_type: CustomTraceAttributeType::Str, + header: "x-missing".to_string(), + }, + ]; + + let attrs = extract_custom_trace_attributes(&headers, Some(&custom_attributes)); + + assert_eq!(attrs.get("workspace.id"), Some(&"ws_123".to_string())); + assert_eq!(attrs.get("tenant.id"), Some(&"ten_456".to_string())); + assert_eq!(attrs.get("user.id"), Some(&"usr_789".to_string())); + assert_eq!(attrs.get("admin.level"), Some(&"3".to_string())); + assert_eq!(attrs.get("is.internal"), Some(&"true".to_string())); + assert_eq!(attrs.get("budget.value"), Some(&"42.5".to_string())); + assert!(!attrs.contains_key("bad.int")); + assert!(!attrs.contains_key("missing.header")); + } +} diff --git a/crates/brightstaff/src/tracing/mod.rs b/crates/brightstaff/src/tracing/mod.rs index 09ec6f2a..e3834e2b 100644 --- a/crates/brightstaff/src/tracing/mod.rs +++ b/crates/brightstaff/src/tracing/mod.rs @@ -1,5 +1,7 @@ mod constants; +mod custom_attributes; pub use constants::{ error, http, llm, operation_component, routing, signals, OperationNameBuilder, }; +pub use custom_attributes::extract_custom_trace_attributes; diff --git a/crates/common/src/configuration.rs b/crates/common/src/configuration.rs index c600ed5d..fec1d66d 100644 --- a/crates/common/src/configuration.rs +++ b/crates/common/src/configuration.rs @@ -90,6 +90,24 @@ pub struct Overrides { pub struct Tracing { pub sampling_rate: Option, pub trace_arch_internal: Option, + pub custom_attributes: Option>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CustomTraceAttribute { + pub key: String, + #[serde(rename = "type")] + pub value_type: CustomTraceAttributeType, + pub header: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +pub enum CustomTraceAttributeType { + Str, + Bool, + Float, + Int, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Default)] diff --git a/demos/use_cases/travel_agents/config.yaml b/demos/use_cases/travel_agents/config.yaml index 2cb24d71..4bfc6ea5 100644 --- a/demos/use_cases/travel_agents/config.yaml +++ b/demos/use_cases/travel_agents/config.yaml @@ -55,3 +55,22 @@ listeners: tracing: random_sampling: 100 + custom_attributes: + - header: x-workspace-id + key: workspace.id + type: str + - header: x-tenant-id + key: tenant.id + type: str + - header: x-user-id + key: user.id + type: str + - header: x-admin-level + key: admin.level + type: int + - header: x-is-internal + key: is.internal + type: bool + - header: x-budget + key: budget.value + type: float diff --git a/demos/use_cases/travel_agents/test.rest b/demos/use_cases/travel_agents/test.rest index f3ecaf66..0d188104 100644 --- a/demos/use_cases/travel_agents/test.rest +++ b/demos/use_cases/travel_agents/test.rest @@ -3,6 +3,12 @@ ### Travel Agent Chat Completion Request POST {{llm_endpoint}}/v1/chat/completions HTTP/1.1 Content-Type: application/json +X-Workspace-Id: ws_7e2c5d91b4224f59b0e6a4e0125c21b3 +X-Tenant-Id: ten_4102a8c7fa6542b084b395d2df184a9a +X-User-Id: usr_19df7e6751b846f9ba026776e3c12abe +X-Admin-Level: 3 +X-Is-Internal: true +X-Budget: 42.5 { "model": "gpt-4o",