refactor: update tracing configuration to use span attributes and adjust related handlers

This commit is contained in:
Musa 2026-02-06 11:53:19 -08:00
parent 8713a80638
commit 20e9953a8e
No known key found for this signature in database
7 changed files with 122 additions and 65 deletions

View file

@ -382,10 +382,19 @@ properties:
type: integer
trace_arch_internal:
type: boolean
span_attribute_header_prefixes:
type: array
items:
type: string
span_attributes:
type: object
properties:
header_prefixes:
type: array
items:
type: string
static:
type: object
additionalProperties:
type: string
additionalProperties: false
additionalProperties: false
mode:
type: string
enum:

View file

@ -2,7 +2,7 @@ use std::sync::Arc;
use std::time::{Instant, SystemTime};
use bytes::Bytes;
use common::configuration::Tracing;
use common::configuration::SpanAttributes;
use common::consts::TRACE_PARENT_HEADER;
use common::traces::{generate_random_span_id, parse_traceparent, SpanBuilder, SpanKind};
use hermesllm::apis::OpenAIMessage;
@ -46,7 +46,7 @@ pub async fn agent_chat(
agents_list: Arc<tokio::sync::RwLock<Option<Vec<common::configuration::Agent>>>>,
listeners: Arc<tokio::sync::RwLock<Vec<common::configuration::Listener>>>,
trace_collector: Arc<common::traces::TraceCollector>,
tracing_config: Arc<Option<Tracing>>,
span_attributes: Arc<Option<SpanAttributes>>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
match handle_agent_chat(
request,
@ -54,7 +54,7 @@ pub async fn agent_chat(
agents_list,
listeners,
trace_collector,
tracing_config,
span_attributes,
)
.await
{
@ -133,7 +133,7 @@ async fn handle_agent_chat(
agents_list: Arc<tokio::sync::RwLock<Option<Vec<common::configuration::Agent>>>>,
listeners: Arc<tokio::sync::RwLock<Vec<common::configuration::Listener>>>,
trace_collector: Arc<common::traces::TraceCollector>,
tracing_config: Arc<Option<Tracing>>,
span_attributes: Arc<Option<SpanAttributes>>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, AgentFilterChainError> {
// Initialize services
let agent_selector = AgentSelector::new(orchestrator_service);
@ -185,10 +185,7 @@ async fn handle_agent_chat(
};
let custom_attrs = collect_custom_trace_attributes(
&request_headers,
tracing_config
.as_ref()
.as_ref()
.and_then(|tracing| tracing.span_attribute_header_prefixes.as_deref()),
span_attributes.as_ref().as_ref(),
);
let chat_request_bytes = request.collect().await?.to_bytes();
@ -264,26 +261,26 @@ async fn handle_agent_chat(
let mut selection_span_builder = append_span_attributes(
SpanBuilder::new(&selection_operation_name)
.with_span_id(selection_span_id)
.with_kind(SpanKind::Internal)
.with_start_time(selection_start_time)
.with_end_time(selection_end_time)
.with_attribute(http::METHOD, "POST")
.with_attribute(http::TARGET, "/agents/select")
.with_attribute("selection.listener", listener.name.clone())
.with_attribute("selection.agent_count", selected_agents.len().to_string())
.with_attribute(
"selection.agents",
selected_agents
.iter()
.map(|a| a.id.as_str())
.collect::<Vec<_>>()
.join(","),
)
.with_attribute(
"duration_ms",
format!("{:.2}", selection_elapsed.as_secs_f64() * 1000.0),
),
.with_span_id(selection_span_id)
.with_kind(SpanKind::Internal)
.with_start_time(selection_start_time)
.with_end_time(selection_end_time)
.with_attribute(http::METHOD, "POST")
.with_attribute(http::TARGET, "/agents/select")
.with_attribute("selection.listener", listener.name.clone())
.with_attribute("selection.agent_count", selected_agents.len().to_string())
.with_attribute(
"selection.agents",
selected_agents
.iter()
.map(|a| a.id.as_str())
.collect::<Vec<_>>()
.join(","),
)
.with_attribute(
"duration_ms",
format!("{:.2}", selection_elapsed.as_secs_f64() * 1000.0),
),
&custom_attrs,
);
@ -362,21 +359,21 @@ async fn handle_agent_chat(
let mut span_builder = append_span_attributes(
SpanBuilder::new(&operation_name)
.with_span_id(span_id)
.with_kind(SpanKind::Internal)
.with_start_time(agent_start_time)
.with_end_time(agent_end_time)
.with_attribute(http::METHOD, "POST")
.with_attribute(http::TARGET, full_path)
.with_attribute("agent.name", agent_name.clone())
.with_attribute(
"agent.sequence",
format!("{}/{}", agent_index + 1, agent_count),
)
.with_attribute(
"duration_ms",
format!("{:.2}", agent_elapsed.as_secs_f64() * 1000.0),
),
.with_span_id(span_id)
.with_kind(SpanKind::Internal)
.with_start_time(agent_start_time)
.with_end_time(agent_end_time)
.with_attribute(http::METHOD, "POST")
.with_attribute(http::TARGET, full_path)
.with_attribute("agent.name", agent_name.clone())
.with_attribute(
"agent.sequence",
format!("{}/{}", agent_index + 1, agent_count),
)
.with_attribute(
"duration_ms",
format!("{:.2}", agent_elapsed.as_secs_f64() * 1000.0),
),
&custom_attrs,
);

View file

@ -1,5 +1,5 @@
use bytes::Bytes;
use common::configuration::{ModelAlias, Tracing};
use common::configuration::{ModelAlias, SpanAttributes};
use common::consts::{
ARCH_IS_STREAMING_HEADER, ARCH_PROVIDER_HINT_HEADER, REQUEST_ID_HEADER, TRACE_PARENT_HEADER,
};
@ -34,7 +34,6 @@ fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
.boxed()
}
// ! we reached the limit of the number of arguments for a function
#[allow(clippy::too_many_arguments)]
pub async fn llm_chat(
request: Request<hyper::body::Incoming>,
@ -43,17 +42,14 @@ pub async fn llm_chat(
model_aliases: Arc<Option<HashMap<String, ModelAlias>>>,
llm_providers: Arc<RwLock<LlmProviders>>,
trace_collector: Arc<TraceCollector>,
tracing_config: Arc<Option<Tracing>>, // ! right here
span_attributes: Arc<Option<SpanAttributes>>,
state_storage: Option<Arc<dyn StateStorage>>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let request_path = request.uri().path().to_string();
let request_headers = request.headers().clone();
let custom_attrs = collect_custom_trace_attributes(
&request_headers,
tracing_config
.as_ref()
.as_ref()
.and_then(|tracing| tracing.span_attribute_header_prefixes.as_deref()),
span_attributes.as_ref().as_ref(),
);
let request_id: String = match request_headers
.get(REQUEST_ID_HEADER)

View file

@ -115,12 +115,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
));
let model_aliases = Arc::new(plano_config.model_aliases.clone());
let tracing_config = Arc::new(plano_config.tracing.clone());
let span_attributes = Arc::new(
plano_config
.tracing
.as_ref()
.and_then(|tracing| tracing.span_attributes.clone()),
);
// Initialize trace collector and start background flusher
// Tracing is enabled if the tracing config is present in plano_config.yaml
// Pass Some(true/false) to override, or None to use env var OTEL_TRACING_ENABLED
let tracing_enabled = if tracing_config.is_some() {
let tracing_enabled = if plano_config.tracing.is_some() {
info!("Tracing configuration found in plano_config.yaml");
Some(true)
} else {
@ -180,7 +185,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let agents_list = combined_agents_filters_list.clone();
let listeners = listeners.clone();
let trace_collector = trace_collector.clone();
let tracing_config = tracing_config.clone();
let span_attributes = span_attributes.clone();
let state_storage = state_storage.clone();
let service = service_fn(move |req| {
let router_service = Arc::clone(&router_service);
@ -192,7 +197,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let agents_list = agents_list.clone();
let listeners = listeners.clone();
let trace_collector = trace_collector.clone();
let tracing_config = tracing_config.clone();
let span_attributes = span_attributes.clone();
let state_storage = state_storage.clone();
async move {
@ -213,7 +218,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
agents_list,
listeners,
trace_collector,
tracing_config,
span_attributes,
)
.with_context(parent_cx)
.await;
@ -232,7 +237,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
model_aliases,
llm_providers,
trace_collector,
tracing_config,
span_attributes,
state_storage,
)
.with_context(parent_cx)

View file

@ -1,5 +1,6 @@
use std::collections::HashMap;
use common::configuration::SpanAttributes;
use common::traces::SpanBuilder;
use hyper::header::HeaderMap;
@ -47,9 +48,24 @@ pub fn extract_custom_trace_attributes(
pub fn collect_custom_trace_attributes(
headers: &HeaderMap,
span_attribute_header_prefixes: Option<&[String]>,
span_attributes: Option<&SpanAttributes>,
) -> HashMap<String, String> {
extract_custom_trace_attributes(headers, span_attribute_header_prefixes)
let mut attributes = HashMap::new();
let Some(span_attributes) = span_attributes else {
return attributes;
};
if let Some(static_attributes) = span_attributes.static_attributes.as_ref() {
for (key, value) in static_attributes {
attributes.insert(key.clone(), value.clone());
}
}
attributes.extend(extract_custom_trace_attributes(
headers,
span_attributes.header_prefixes.as_deref(),
));
attributes
}
pub fn append_span_attributes(
@ -83,4 +99,30 @@ mod tests {
assert_eq!(attrs.get("admin.level"), Some(&"3".to_string()));
assert!(!attrs.contains_key("other.id"));
}
#[test]
fn returns_empty_when_prefixes_missing_or_empty() {
let mut headers = HeaderMap::new();
headers.insert("x-katanemo-tenant-id", HeaderValue::from_static("ten_456"));
let attrs_none = extract_custom_trace_attributes(&headers, None);
assert!(attrs_none.is_empty());
let empty_prefixes: Vec<String> = Vec::new();
let attrs_empty = extract_custom_trace_attributes(&headers, Some(&empty_prefixes));
assert!(attrs_empty.is_empty());
}
#[test]
fn supports_multiple_prefixes() {
let mut headers = HeaderMap::new();
headers.insert("x-katanemo-tenant-id", HeaderValue::from_static("ten_456"));
headers.insert("x-tenant-user-id", HeaderValue::from_static("usr_789"));
let prefixes = vec!["x-katanemo-".to_string(), "x-tenant-".to_string()];
let attrs = extract_custom_trace_attributes(&headers, Some(&prefixes));
assert_eq!(attrs.get("tenant.id"), Some(&"ten_456".to_string()));
assert_eq!(attrs.get("user.id"), Some(&"usr_789".to_string()));
}
}

View file

@ -90,7 +90,14 @@ pub struct Overrides {
pub struct Tracing {
pub sampling_rate: Option<f64>,
pub trace_arch_internal: Option<bool>,
pub span_attribute_header_prefixes: Option<Vec<String>>,
pub span_attributes: Option<SpanAttributes>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct SpanAttributes {
pub header_prefixes: Option<Vec<String>>,
#[serde(rename = "static")]
pub static_attributes: Option<HashMap<String, String>>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Default)]

View file

@ -55,5 +55,6 @@ listeners:
tracing:
random_sampling: 100
span_attribute_header_prefixes:
- x-katanemo-
span_attributes:
header_prefixes:
- x-katanemo-