add custom trace attributes

This commit is contained in:
Musa 2026-01-27 12:17:42 -08:00
parent 4a6cea3545
commit f61d72052c
10 changed files with 240 additions and 4 deletions

View file

@ -382,6 +382,27 @@ properties:
type: integer
trace_arch_internal:
type: boolean
custom_attributes:
type: array
items:
type: object
properties:
key:
type: string
type:
type: string
enum:
- str
- bool
- float
- int
header:
type: string
additionalProperties: false
required:
- key
- type
- header
additionalProperties: false
mode:
type: string

View file

@ -2,6 +2,7 @@ use std::sync::Arc;
use std::time::{Instant, SystemTime};
use bytes::Bytes;
use common::configuration::Tracing;
use common::consts::TRACE_PARENT_HEADER;
use common::traces::{generate_random_span_id, parse_traceparent, SpanBuilder, SpanKind};
use hermesllm::apis::OpenAIMessage;
@ -18,7 +19,9 @@ use super::agent_selector::{AgentSelectionError, AgentSelector};
use super::pipeline_processor::{PipelineError, PipelineProcessor};
use super::response_handler::ResponseHandler;
use crate::router::plano_orchestrator::OrchestratorService;
use crate::tracing::{http, operation_component, OperationNameBuilder};
use crate::tracing::{
extract_custom_trace_attributes, http, operation_component, OperationNameBuilder,
};
/// Main errors for agent chat completions
#[derive(Debug, thiserror::Error)]
@ -42,6 +45,7 @@ pub async fn agent_chat(
agents_list: Arc<tokio::sync::RwLock<Option<Vec<common::configuration::Agent>>>>,
listeners: Arc<tokio::sync::RwLock<Vec<common::configuration::Listener>>>,
trace_collector: Arc<common::traces::TraceCollector>,
tracing_config: Arc<Option<Tracing>>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
match handle_agent_chat(
request,
@ -49,6 +53,7 @@ pub async fn agent_chat(
agents_list,
listeners,
trace_collector,
tracing_config,
)
.await
{
@ -127,6 +132,7 @@ async fn handle_agent_chat(
agents_list: Arc<tokio::sync::RwLock<Option<Vec<common::configuration::Agent>>>>,
listeners: Arc<tokio::sync::RwLock<Vec<common::configuration::Listener>>>,
trace_collector: Arc<common::traces::TraceCollector>,
tracing_config: Arc<Option<Tracing>>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, AgentFilterChainError> {
// Initialize services
let agent_selector = AgentSelector::new(orchestrator_service);
@ -176,6 +182,13 @@ async fn handle_agent_chat(
headers
};
let custom_attrs = extract_custom_trace_attributes(
&request_headers,
tracing_config
.as_ref()
.as_ref()
.and_then(|tracing| tracing.custom_attributes.as_deref()),
);
let chat_request_bytes = request.collect().await?.to_bytes();
@ -269,6 +282,9 @@ async fn handle_agent_chat(
"duration_ms",
format!("{:.2}", selection_elapsed.as_secs_f64() * 1000.0),
);
for (key, value) in &custom_attrs {
selection_span_builder = selection_span_builder.with_attribute(key, value);
}
if !trace_id.is_empty() {
selection_span_builder = selection_span_builder.with_trace_id(trace_id.clone());
@ -359,6 +375,9 @@ async fn handle_agent_chat(
"duration_ms",
format!("{:.2}", agent_elapsed.as_secs_f64() * 1000.0),
);
for (key, value) in &custom_attrs {
span_builder = span_builder.with_attribute(key, value);
}
if !trace_id.is_empty() {
span_builder = span_builder.with_trace_id(trace_id.clone());

View file

@ -1,5 +1,5 @@
use bytes::Bytes;
use common::configuration::ModelAlias;
use common::configuration::{ModelAlias, Tracing};
use common::consts::{
ARCH_IS_STREAMING_HEADER, ARCH_PROVIDER_HINT_HEADER, REQUEST_ID_HEADER, TRACE_PARENT_HEADER,
};
@ -26,7 +26,7 @@ use crate::state::response_state_processor::ResponsesStateProcessor;
use crate::state::{
extract_input_items, retrieve_and_combine_input, StateStorage, StateStorageError,
};
use crate::tracing::operation_component;
use crate::tracing::{extract_custom_trace_attributes, operation_component};
fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
Full::new(chunk.into())
@ -34,6 +34,8 @@ fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
.boxed()
}
// ! we reached the limit of the number of arguments for a function
#[allow(clippy::too_many_arguments)]
pub async fn llm_chat(
request: Request<hyper::body::Incoming>,
router_service: Arc<RouterService>,
@ -41,10 +43,18 @@ pub async fn llm_chat(
model_aliases: Arc<Option<HashMap<String, ModelAlias>>>,
llm_providers: Arc<RwLock<LlmProviders>>,
trace_collector: Arc<TraceCollector>,
tracing_config: Arc<Option<Tracing>>, // ! right here
state_storage: Option<Arc<dyn StateStorage>>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let request_path = request.uri().path().to_string();
let request_headers = request.headers().clone();
let custom_attrs = extract_custom_trace_attributes(
&request_headers,
tracing_config
.as_ref()
.as_ref()
.and_then(|tracing| tracing.custom_attributes.as_deref()),
);
let request_id: String = match request_headers
.get(REQUEST_ID_HEADER)
.and_then(|h| h.to_str().ok())
@ -253,6 +263,7 @@ pub async fn llm_chat(
&traceparent,
&request_path,
&request_id,
&custom_attrs,
)
.await
{
@ -337,6 +348,7 @@ pub async fn llm_chat(
user_message_preview,
temperature,
&llm_providers,
&custom_attrs,
)
.await;
@ -422,7 +434,8 @@ async fn build_llm_span(
tool_names: Option<Vec<String>>,
user_message_preview: Option<String>,
temperature: Option<f32>,
llm_providers: &Arc<RwLock<LlmProviders>>,
llm_providers: &Arc<RwLock<Vec<LlmProvider>>>,
custom_attrs: &HashMap<String, String>,
) -> common::traces::Span {
use crate::tracing::{http, llm, OperationNameBuilder};
use common::traces::{parse_traceparent, SpanBuilder, SpanKind};
@ -488,6 +501,10 @@ async fn build_llm_span(
span_builder = span_builder.with_attribute(llm::USER_MESSAGE_PREVIEW, preview);
}
for (key, value) in custom_attrs {
span_builder = span_builder.with_attribute(key, value);
}
span_builder.build()
}

View file

@ -40,6 +40,7 @@ pub async fn router_chat_get_upstream_model(
traceparent: &str,
request_path: &str,
request_id: &str,
custom_attrs: &HashMap<String, String>,
) -> Result<RoutingResult, RoutingError> {
// Clone metadata for routing before converting (which consumes client_request)
let routing_metadata = client_request.metadata().clone();
@ -139,6 +140,9 @@ pub async fn router_chat_get_upstream_model(
// Record successful routing span
let mut attrs: HashMap<String, String> = HashMap::new();
attrs.insert("route.selected_model".to_string(), model_name.clone());
for (key, value) in custom_attrs {
attrs.entry(key.clone()).or_insert_with(|| value.clone());
}
record_routing_span(
trace_collector,
traceparent,
@ -160,6 +164,9 @@ pub async fn router_chat_get_upstream_model(
let mut attrs = HashMap::new();
attrs.insert("route.selected_model".to_string(), "none".to_string());
for (key, value) in custom_attrs {
attrs.entry(key.clone()).or_insert_with(|| value.clone());
}
record_routing_span(
trace_collector,
traceparent,
@ -179,6 +186,9 @@ pub async fn router_chat_get_upstream_model(
let mut attrs = HashMap::new();
attrs.insert("route.selected_model".to_string(), "unknown".to_string());
attrs.insert("error.message".to_string(), err.to_string());
for (key, value) in custom_attrs {
attrs.entry(key.clone()).or_insert_with(|| value.clone());
}
record_routing_span(
trace_collector,
traceparent,

View file

@ -112,6 +112,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
));
let model_aliases = Arc::new(arch_config.model_aliases.clone());
let tracing_config = Arc::new(arch_config.tracing.clone());
// Initialize trace collector and start background flusher
// Tracing is enabled if the tracing config is present in arch_config.yaml
@ -176,6 +177,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let agents_list = combined_agents_filters_list.clone();
let listeners = listeners.clone();
let trace_collector = trace_collector.clone();
let tracing_config = tracing_config.clone();
let state_storage = state_storage.clone();
let service = service_fn(move |req| {
let router_service = Arc::clone(&router_service);
@ -187,6 +189,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let agents_list = agents_list.clone();
let listeners = listeners.clone();
let trace_collector = trace_collector.clone();
let tracing_config = tracing_config.clone();
let state_storage = state_storage.clone();
async move {
@ -207,6 +210,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
agents_list,
listeners,
trace_collector,
tracing_config,
)
.with_context(parent_cx)
.await;
@ -225,6 +229,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
model_aliases,
llm_providers,
trace_collector,
tracing_config,
state_storage,
)
.with_context(parent_cx)

View file

@ -0,0 +1,119 @@
use std::collections::HashMap;
use common::configuration::{CustomTraceAttribute, CustomTraceAttributeType};
use hyper::header::{HeaderMap, HeaderName};
pub fn extract_custom_trace_attributes(
headers: &HeaderMap,
custom_attributes: Option<&[CustomTraceAttribute]>,
) -> HashMap<String, String> {
let mut attributes = HashMap::new();
let Some(custom_attributes) = custom_attributes else {
return attributes;
};
for attribute in custom_attributes {
// Normalize/validate the configured header name; skip invalid names.
let header_name = match HeaderName::from_bytes(attribute.header.as_bytes()) {
Ok(name) => name,
Err(_) => continue,
};
// Extract header value as UTF-8 text; skip missing or invalid values.
let raw_value = match headers
.get(header_name)
.and_then(|value| value.to_str().ok())
{
Some(value) => value.trim(),
None => continue,
};
// Parse the header value according to the configured type.
let parsed_value = match attribute.value_type {
CustomTraceAttributeType::Str => Some(raw_value.to_string()),
CustomTraceAttributeType::Bool => raw_value.parse::<bool>().ok().map(|v| v.to_string()),
CustomTraceAttributeType::Float => raw_value.parse::<f64>().ok().map(|v| v.to_string()),
CustomTraceAttributeType::Int => raw_value.parse::<i64>().ok().map(|v| v.to_string()),
};
// Only include attributes that successfully parsed.
if let Some(value) = parsed_value {
attributes.insert(attribute.key.clone(), value);
}
}
attributes
}
#[cfg(test)]
mod tests {
use super::extract_custom_trace_attributes;
use common::configuration::{CustomTraceAttribute, CustomTraceAttributeType};
use hyper::header::{HeaderMap, HeaderValue};
#[test]
fn extracts_and_parses_custom_headers() {
let mut headers = HeaderMap::new();
headers.insert("x-workspace-id", HeaderValue::from_static("ws_123"));
headers.insert("x-tenant-id", HeaderValue::from_static("ten_456"));
headers.insert("x-user-id", HeaderValue::from_static("usr_789"));
headers.insert("x-admin-level", HeaderValue::from_static("3"));
headers.insert("x-is-internal", HeaderValue::from_static("true"));
headers.insert("x-budget", HeaderValue::from_static("42.5"));
headers.insert("x-bad-int", HeaderValue::from_static("nope"));
let custom_attributes = vec![
CustomTraceAttribute {
key: "workspace.id".to_string(),
value_type: CustomTraceAttributeType::Str,
header: "x-workspace-id".to_string(),
},
CustomTraceAttribute {
key: "tenant.id".to_string(),
value_type: CustomTraceAttributeType::Str,
header: "x-tenant-id".to_string(),
},
CustomTraceAttribute {
key: "user.id".to_string(),
value_type: CustomTraceAttributeType::Str,
header: "x-user-id".to_string(),
},
CustomTraceAttribute {
key: "admin.level".to_string(),
value_type: CustomTraceAttributeType::Int,
header: "x-admin-level".to_string(),
},
CustomTraceAttribute {
key: "is.internal".to_string(),
value_type: CustomTraceAttributeType::Bool,
header: "x-is-internal".to_string(),
},
CustomTraceAttribute {
key: "budget.value".to_string(),
value_type: CustomTraceAttributeType::Float,
header: "x-budget".to_string(),
},
CustomTraceAttribute {
key: "bad.int".to_string(),
value_type: CustomTraceAttributeType::Int,
header: "x-bad-int".to_string(),
},
CustomTraceAttribute {
key: "missing.header".to_string(),
value_type: CustomTraceAttributeType::Str,
header: "x-missing".to_string(),
},
];
let attrs = extract_custom_trace_attributes(&headers, Some(&custom_attributes));
assert_eq!(attrs.get("workspace.id"), Some(&"ws_123".to_string()));
assert_eq!(attrs.get("tenant.id"), Some(&"ten_456".to_string()));
assert_eq!(attrs.get("user.id"), Some(&"usr_789".to_string()));
assert_eq!(attrs.get("admin.level"), Some(&"3".to_string()));
assert_eq!(attrs.get("is.internal"), Some(&"true".to_string()));
assert_eq!(attrs.get("budget.value"), Some(&"42.5".to_string()));
assert!(!attrs.contains_key("bad.int"));
assert!(!attrs.contains_key("missing.header"));
}
}

View file

@ -1,5 +1,7 @@
mod constants;
mod custom_attributes;
pub use constants::{
error, http, llm, operation_component, routing, signals, OperationNameBuilder,
};
pub use custom_attributes::extract_custom_trace_attributes;

View file

@ -90,6 +90,24 @@ pub struct Overrides {
pub struct Tracing {
pub sampling_rate: Option<f64>,
pub trace_arch_internal: Option<bool>,
pub custom_attributes: Option<Vec<CustomTraceAttribute>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CustomTraceAttribute {
pub key: String,
#[serde(rename = "type")]
pub value_type: CustomTraceAttributeType,
pub header: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum CustomTraceAttributeType {
Str,
Bool,
Float,
Int,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Default)]

View file

@ -55,3 +55,22 @@ listeners:
tracing:
random_sampling: 100
custom_attributes:
- header: x-workspace-id
key: workspace.id
type: str
- header: x-tenant-id
key: tenant.id
type: str
- header: x-user-id
key: user.id
type: str
- header: x-admin-level
key: admin.level
type: int
- header: x-is-internal
key: is.internal
type: bool
- header: x-budget
key: budget.value
type: float

View file

@ -3,6 +3,12 @@
### Travel Agent Chat Completion Request
POST {{llm_endpoint}}/v1/chat/completions HTTP/1.1
Content-Type: application/json
X-Workspace-Id: ws_7e2c5d91b4224f59b0e6a4e0125c21b3
X-Tenant-Id: ten_4102a8c7fa6542b084b395d2df184a9a
X-User-Id: usr_19df7e6751b846f9ba026776e3c12abe
X-Admin-Level: 3
X-Is-Internal: true
X-Budget: 42.5
{
"model": "gpt-4o",