diff --git a/config/arch_config_schema.yaml b/config/arch_config_schema.yaml index 5f93c5ca..55310891 100644 --- a/config/arch_config_schema.yaml +++ b/config/arch_config_schema.yaml @@ -382,10 +382,19 @@ properties: type: integer trace_arch_internal: type: boolean - span_attribute_header_prefixes: - type: array - items: - type: string + span_attributes: + type: object + properties: + header_prefixes: + type: array + items: + type: string + static: + type: object + additionalProperties: + type: string + additionalProperties: false + additionalProperties: false mode: type: string enum: diff --git a/crates/brightstaff/src/handlers/agent_chat_completions.rs b/crates/brightstaff/src/handlers/agent_chat_completions.rs index e8ee5e13..6784d0db 100644 --- a/crates/brightstaff/src/handlers/agent_chat_completions.rs +++ b/crates/brightstaff/src/handlers/agent_chat_completions.rs @@ -1,8 +1,9 @@ +use std::collections::HashMap; use std::sync::Arc; use std::time::{Instant, SystemTime}; use bytes::Bytes; -use common::configuration::Tracing; +use common::configuration::SpanAttributes; use common::consts::TRACE_PARENT_HEADER; use common::traces::{generate_random_span_id, parse_traceparent, SpanBuilder, SpanKind}; use hermesllm::apis::OpenAIMessage; @@ -45,7 +46,7 @@ pub async fn agent_chat( agents_list: Arc>>>, listeners: Arc>>, trace_collector: Arc, - tracing_config: Arc>, + span_attributes: Arc>, ) -> Result>, hyper::Error> { match handle_agent_chat( request, @@ -53,7 +54,7 @@ pub async fn agent_chat( agents_list, listeners, trace_collector, - tracing_config, + span_attributes, ) .await { @@ -132,7 +133,7 @@ async fn handle_agent_chat( agents_list: Arc>>>, listeners: Arc>>, trace_collector: Arc, - tracing_config: Arc>, + span_attributes: Arc>, ) -> Result>, AgentFilterChainError> { // Initialize services let agent_selector = AgentSelector::new(orchestrator_service); @@ -182,13 +183,26 @@ async fn handle_agent_chat( headers }; - let custom_attrs = extract_custom_trace_attributes( - &request_headers, - tracing_config - .as_ref() - .as_ref() - .and_then(|tracing| tracing.span_attribute_header_prefixes.as_deref()), - ); + let mut header_prefixes: Option<&[String]> = None; + let mut static_attributes: Option<&HashMap> = None; + if let Some(attrs) = span_attributes.as_ref() { + header_prefixes = attrs.header_prefixes.as_deref(); + static_attributes = attrs.static_attributes.as_ref(); + } + let mut custom_attrs = HashMap::new(); + if let Some(static_attributes) = static_attributes { + for (key, value) in static_attributes { + custom_attrs.insert(key.clone(), value.clone()); + } + } + if let Some(prefixes) = header_prefixes { + if !prefixes.is_empty() { + custom_attrs.extend(extract_custom_trace_attributes( + &request_headers, + Some(prefixes), + )); + } + } let chat_request_bytes = request.collect().await?.to_bytes(); @@ -243,6 +257,13 @@ async fn handle_agent_chat( (String::new(), None) }; + let apply_custom_attrs = |mut builder: SpanBuilder| { + for (key, value) in &custom_attrs { + builder = builder.with_attribute(key, value); + } + builder + }; + // Select appropriate agents using arch orchestrator llm model let selection_span_id = generate_random_span_id(); let selection_start_time = SystemTime::now(); @@ -261,30 +282,29 @@ async fn handle_agent_chat( .with_target(&listener.name) .build(); - let mut selection_span_builder = SpanBuilder::new(&selection_operation_name) - .with_span_id(selection_span_id) - .with_kind(SpanKind::Internal) - .with_start_time(selection_start_time) - .with_end_time(selection_end_time) - .with_attribute(http::METHOD, "POST") - .with_attribute(http::TARGET, "/agents/select") - .with_attribute("selection.listener", listener.name.clone()) - .with_attribute("selection.agent_count", selected_agents.len().to_string()) - .with_attribute( - "selection.agents", - selected_agents - .iter() - .map(|a| a.id.as_str()) - .collect::>() - .join(","), - ) - .with_attribute( - "duration_ms", - format!("{:.2}", selection_elapsed.as_secs_f64() * 1000.0), - ); - for (key, value) in &custom_attrs { - selection_span_builder = selection_span_builder.with_attribute(key, value); - } + let mut selection_span_builder = apply_custom_attrs( + SpanBuilder::new(&selection_operation_name) + .with_span_id(selection_span_id) + .with_kind(SpanKind::Internal) + .with_start_time(selection_start_time) + .with_end_time(selection_end_time) + .with_attribute(http::METHOD, "POST") + .with_attribute(http::TARGET, "/agents/select") + .with_attribute("selection.listener", listener.name.clone()) + .with_attribute("selection.agent_count", selected_agents.len().to_string()) + .with_attribute( + "selection.agents", + selected_agents + .iter() + .map(|a| a.id.as_str()) + .collect::>() + .join(","), + ) + .with_attribute( + "duration_ms", + format!("{:.2}", selection_elapsed.as_secs_f64() * 1000.0), + ), + ); if !trace_id.is_empty() { selection_span_builder = selection_span_builder.with_trace_id(trace_id.clone()); @@ -359,25 +379,24 @@ async fn handle_agent_chat( .with_target(&agent_name) .build(); - let mut span_builder = SpanBuilder::new(&operation_name) - .with_span_id(span_id) - .with_kind(SpanKind::Internal) - .with_start_time(agent_start_time) - .with_end_time(agent_end_time) - .with_attribute(http::METHOD, "POST") - .with_attribute(http::TARGET, full_path) - .with_attribute("agent.name", agent_name.clone()) - .with_attribute( - "agent.sequence", - format!("{}/{}", agent_index + 1, agent_count), - ) - .with_attribute( - "duration_ms", - format!("{:.2}", agent_elapsed.as_secs_f64() * 1000.0), - ); - for (key, value) in &custom_attrs { - span_builder = span_builder.with_attribute(key, value); - } + let mut span_builder = apply_custom_attrs( + SpanBuilder::new(&operation_name) + .with_span_id(span_id) + .with_kind(SpanKind::Internal) + .with_start_time(agent_start_time) + .with_end_time(agent_end_time) + .with_attribute(http::METHOD, "POST") + .with_attribute(http::TARGET, full_path) + .with_attribute("agent.name", agent_name.clone()) + .with_attribute( + "agent.sequence", + format!("{}/{}", agent_index + 1, agent_count), + ) + .with_attribute( + "duration_ms", + format!("{:.2}", agent_elapsed.as_secs_f64() * 1000.0), + ), + ); if !trace_id.is_empty() { span_builder = span_builder.with_trace_id(trace_id.clone()); diff --git a/crates/brightstaff/src/handlers/llm.rs b/crates/brightstaff/src/handlers/llm.rs index 7c6d449b..c86d6ded 100644 --- a/crates/brightstaff/src/handlers/llm.rs +++ b/crates/brightstaff/src/handlers/llm.rs @@ -1,5 +1,5 @@ use bytes::Bytes; -use common::configuration::{ModelAlias, Tracing}; +use common::configuration::{ModelAlias, SpanAttributes}; use common::consts::{ ARCH_IS_STREAMING_HEADER, ARCH_PROVIDER_HINT_HEADER, REQUEST_ID_HEADER, TRACE_PARENT_HEADER, }; @@ -34,7 +34,6 @@ fn full>(chunk: T) -> BoxBody { .boxed() } -// ! we reached the limit of the number of arguments for a function #[allow(clippy::too_many_arguments)] pub async fn llm_chat( request: Request, @@ -43,18 +42,31 @@ pub async fn llm_chat( model_aliases: Arc>>, llm_providers: Arc>, trace_collector: Arc, - tracing_config: Arc>, // ! right here + span_attributes: Arc>, state_storage: Option>, ) -> Result>, hyper::Error> { let request_path = request.uri().path().to_string(); let request_headers = request.headers().clone(); - let custom_attrs = extract_custom_trace_attributes( - &request_headers, - tracing_config - .as_ref() - .as_ref() - .and_then(|tracing| tracing.span_attribute_header_prefixes.as_deref()), - ); + let mut header_prefixes: Option<&[String]> = None; + let mut static_attributes: Option<&HashMap> = None; + if let Some(attrs) = span_attributes.as_ref() { + header_prefixes = attrs.header_prefixes.as_deref(); + static_attributes = attrs.static_attributes.as_ref(); + } + let mut custom_attrs = HashMap::new(); + if let Some(static_attributes) = static_attributes { + for (key, value) in static_attributes { + custom_attrs.insert(key.clone(), value.clone()); + } + } + if let Some(prefixes) = header_prefixes { + if !prefixes.is_empty() { + custom_attrs.extend(extract_custom_trace_attributes( + &request_headers, + Some(prefixes), + )); + } + } let request_id: String = match request_headers .get(REQUEST_ID_HEADER) .and_then(|h| h.to_str().ok()) diff --git a/crates/brightstaff/src/main.rs b/crates/brightstaff/src/main.rs index 1a896f8f..3da38317 100644 --- a/crates/brightstaff/src/main.rs +++ b/crates/brightstaff/src/main.rs @@ -112,7 +112,12 @@ async fn main() -> Result<(), Box> { )); let model_aliases = Arc::new(arch_config.model_aliases.clone()); - let tracing_config = Arc::new(arch_config.tracing.clone()); + let span_attributes = Arc::new( + arch_config + .tracing + .as_ref() + .and_then(|tracing| tracing.span_attributes.clone()), + ); // Initialize trace collector and start background flusher // Tracing is enabled if the tracing config is present in arch_config.yaml @@ -177,7 +182,7 @@ async fn main() -> Result<(), Box> { let agents_list = combined_agents_filters_list.clone(); let listeners = listeners.clone(); let trace_collector = trace_collector.clone(); - let tracing_config = tracing_config.clone(); + let span_attributes = span_attributes.clone(); let state_storage = state_storage.clone(); let service = service_fn(move |req| { let router_service = Arc::clone(&router_service); @@ -189,7 +194,7 @@ async fn main() -> Result<(), Box> { let agents_list = agents_list.clone(); let listeners = listeners.clone(); let trace_collector = trace_collector.clone(); - let tracing_config = tracing_config.clone(); + let span_attributes = span_attributes.clone(); let state_storage = state_storage.clone(); async move { @@ -210,7 +215,7 @@ async fn main() -> Result<(), Box> { agents_list, listeners, trace_collector, - tracing_config, + span_attributes, ) .with_context(parent_cx) .await; @@ -229,7 +234,7 @@ async fn main() -> Result<(), Box> { model_aliases, llm_providers, trace_collector, - tracing_config, + span_attributes, state_storage, ) .with_context(parent_cx) diff --git a/crates/brightstaff/src/tracing/custom_attributes.rs b/crates/brightstaff/src/tracing/custom_attributes.rs index 7047a5bb..3aa96430 100644 --- a/crates/brightstaff/src/tracing/custom_attributes.rs +++ b/crates/brightstaff/src/tracing/custom_attributes.rs @@ -65,4 +65,30 @@ mod tests { assert_eq!(attrs.get("admin.level"), Some(&"3".to_string())); assert!(!attrs.contains_key("other.id")); } + + #[test] + fn returns_empty_when_prefixes_missing_or_empty() { + let mut headers = HeaderMap::new(); + headers.insert("x-katanemo-tenant-id", HeaderValue::from_static("ten_456")); + + let attrs_none = extract_custom_trace_attributes(&headers, None); + assert!(attrs_none.is_empty()); + + let empty_prefixes: Vec = Vec::new(); + let attrs_empty = extract_custom_trace_attributes(&headers, Some(&empty_prefixes)); + assert!(attrs_empty.is_empty()); + } + + #[test] + fn supports_multiple_prefixes() { + let mut headers = HeaderMap::new(); + headers.insert("x-katanemo-tenant-id", HeaderValue::from_static("ten_456")); + headers.insert("x-tenant-user-id", HeaderValue::from_static("usr_789")); + + let prefixes = vec!["x-katanemo-".to_string(), "x-tenant-".to_string()]; + let attrs = extract_custom_trace_attributes(&headers, Some(&prefixes)); + + assert_eq!(attrs.get("tenant.id"), Some(&"ten_456".to_string())); + assert_eq!(attrs.get("user.id"), Some(&"usr_789".to_string())); + } } diff --git a/crates/common/src/configuration.rs b/crates/common/src/configuration.rs index 031c24c0..9ac013b0 100644 --- a/crates/common/src/configuration.rs +++ b/crates/common/src/configuration.rs @@ -90,7 +90,14 @@ pub struct Overrides { pub struct Tracing { pub sampling_rate: Option, pub trace_arch_internal: Option, - pub span_attribute_header_prefixes: Option>, + pub span_attributes: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct SpanAttributes { + pub header_prefixes: Option>, + #[serde(rename = "static")] + pub static_attributes: Option>, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Default)] diff --git a/demos/use_cases/travel_agents/config.yaml b/demos/use_cases/travel_agents/config.yaml index cb2a632c..ce9b8f44 100644 --- a/demos/use_cases/travel_agents/config.yaml +++ b/demos/use_cases/travel_agents/config.yaml @@ -55,5 +55,6 @@ listeners: tracing: random_sampling: 100 - span_attribute_header_prefixes: - - x-katanemo- + span_attributes: + header_prefixes: + - x-katanemo-