mirror of
https://github.com/katanemo/plano.git
synced 2026-06-17 15:25:17 +02:00
refactor: update tracing configuration to use span attributes and adjust related handlers
This commit is contained in:
parent
0e95212416
commit
d015b40a2b
7 changed files with 155 additions and 76 deletions
|
|
@ -382,10 +382,19 @@ properties:
|
|||
type: integer
|
||||
trace_arch_internal:
|
||||
type: boolean
|
||||
span_attribute_header_prefixes:
|
||||
type: array
|
||||
items:
|
||||
type: string
|
||||
span_attributes:
|
||||
type: object
|
||||
properties:
|
||||
header_prefixes:
|
||||
type: array
|
||||
items:
|
||||
type: string
|
||||
static:
|
||||
type: object
|
||||
additionalProperties:
|
||||
type: string
|
||||
additionalProperties: false
|
||||
additionalProperties: false
|
||||
mode:
|
||||
type: string
|
||||
enum:
|
||||
|
|
|
|||
|
|
@ -1,8 +1,9 @@
|
|||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Instant, SystemTime};
|
||||
|
||||
use bytes::Bytes;
|
||||
use common::configuration::Tracing;
|
||||
use common::configuration::SpanAttributes;
|
||||
use common::consts::TRACE_PARENT_HEADER;
|
||||
use common::traces::{generate_random_span_id, parse_traceparent, SpanBuilder, SpanKind};
|
||||
use hermesllm::apis::OpenAIMessage;
|
||||
|
|
@ -45,7 +46,7 @@ pub async fn agent_chat(
|
|||
agents_list: Arc<tokio::sync::RwLock<Option<Vec<common::configuration::Agent>>>>,
|
||||
listeners: Arc<tokio::sync::RwLock<Vec<common::configuration::Listener>>>,
|
||||
trace_collector: Arc<common::traces::TraceCollector>,
|
||||
tracing_config: Arc<Option<Tracing>>,
|
||||
span_attributes: Arc<Option<SpanAttributes>>,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||
match handle_agent_chat(
|
||||
request,
|
||||
|
|
@ -53,7 +54,7 @@ pub async fn agent_chat(
|
|||
agents_list,
|
||||
listeners,
|
||||
trace_collector,
|
||||
tracing_config,
|
||||
span_attributes,
|
||||
)
|
||||
.await
|
||||
{
|
||||
|
|
@ -132,7 +133,7 @@ async fn handle_agent_chat(
|
|||
agents_list: Arc<tokio::sync::RwLock<Option<Vec<common::configuration::Agent>>>>,
|
||||
listeners: Arc<tokio::sync::RwLock<Vec<common::configuration::Listener>>>,
|
||||
trace_collector: Arc<common::traces::TraceCollector>,
|
||||
tracing_config: Arc<Option<Tracing>>,
|
||||
span_attributes: Arc<Option<SpanAttributes>>,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, AgentFilterChainError> {
|
||||
// Initialize services
|
||||
let agent_selector = AgentSelector::new(orchestrator_service);
|
||||
|
|
@ -182,13 +183,26 @@ async fn handle_agent_chat(
|
|||
|
||||
headers
|
||||
};
|
||||
let custom_attrs = extract_custom_trace_attributes(
|
||||
&request_headers,
|
||||
tracing_config
|
||||
.as_ref()
|
||||
.as_ref()
|
||||
.and_then(|tracing| tracing.span_attribute_header_prefixes.as_deref()),
|
||||
);
|
||||
let mut header_prefixes: Option<&[String]> = None;
|
||||
let mut static_attributes: Option<&HashMap<String, String>> = None;
|
||||
if let Some(attrs) = span_attributes.as_ref() {
|
||||
header_prefixes = attrs.header_prefixes.as_deref();
|
||||
static_attributes = attrs.static_attributes.as_ref();
|
||||
}
|
||||
let mut custom_attrs = HashMap::new();
|
||||
if let Some(static_attributes) = static_attributes {
|
||||
for (key, value) in static_attributes {
|
||||
custom_attrs.insert(key.clone(), value.clone());
|
||||
}
|
||||
}
|
||||
if let Some(prefixes) = header_prefixes {
|
||||
if !prefixes.is_empty() {
|
||||
custom_attrs.extend(extract_custom_trace_attributes(
|
||||
&request_headers,
|
||||
Some(prefixes),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
let chat_request_bytes = request.collect().await?.to_bytes();
|
||||
|
||||
|
|
@ -243,6 +257,13 @@ async fn handle_agent_chat(
|
|||
(String::new(), None)
|
||||
};
|
||||
|
||||
let apply_custom_attrs = |mut builder: SpanBuilder| {
|
||||
for (key, value) in &custom_attrs {
|
||||
builder = builder.with_attribute(key, value);
|
||||
}
|
||||
builder
|
||||
};
|
||||
|
||||
// Select appropriate agents using arch orchestrator llm model
|
||||
let selection_span_id = generate_random_span_id();
|
||||
let selection_start_time = SystemTime::now();
|
||||
|
|
@ -261,30 +282,29 @@ async fn handle_agent_chat(
|
|||
.with_target(&listener.name)
|
||||
.build();
|
||||
|
||||
let mut selection_span_builder = SpanBuilder::new(&selection_operation_name)
|
||||
.with_span_id(selection_span_id)
|
||||
.with_kind(SpanKind::Internal)
|
||||
.with_start_time(selection_start_time)
|
||||
.with_end_time(selection_end_time)
|
||||
.with_attribute(http::METHOD, "POST")
|
||||
.with_attribute(http::TARGET, "/agents/select")
|
||||
.with_attribute("selection.listener", listener.name.clone())
|
||||
.with_attribute("selection.agent_count", selected_agents.len().to_string())
|
||||
.with_attribute(
|
||||
"selection.agents",
|
||||
selected_agents
|
||||
.iter()
|
||||
.map(|a| a.id.as_str())
|
||||
.collect::<Vec<_>>()
|
||||
.join(","),
|
||||
)
|
||||
.with_attribute(
|
||||
"duration_ms",
|
||||
format!("{:.2}", selection_elapsed.as_secs_f64() * 1000.0),
|
||||
);
|
||||
for (key, value) in &custom_attrs {
|
||||
selection_span_builder = selection_span_builder.with_attribute(key, value);
|
||||
}
|
||||
let mut selection_span_builder = apply_custom_attrs(
|
||||
SpanBuilder::new(&selection_operation_name)
|
||||
.with_span_id(selection_span_id)
|
||||
.with_kind(SpanKind::Internal)
|
||||
.with_start_time(selection_start_time)
|
||||
.with_end_time(selection_end_time)
|
||||
.with_attribute(http::METHOD, "POST")
|
||||
.with_attribute(http::TARGET, "/agents/select")
|
||||
.with_attribute("selection.listener", listener.name.clone())
|
||||
.with_attribute("selection.agent_count", selected_agents.len().to_string())
|
||||
.with_attribute(
|
||||
"selection.agents",
|
||||
selected_agents
|
||||
.iter()
|
||||
.map(|a| a.id.as_str())
|
||||
.collect::<Vec<_>>()
|
||||
.join(","),
|
||||
)
|
||||
.with_attribute(
|
||||
"duration_ms",
|
||||
format!("{:.2}", selection_elapsed.as_secs_f64() * 1000.0),
|
||||
),
|
||||
);
|
||||
|
||||
if !trace_id.is_empty() {
|
||||
selection_span_builder = selection_span_builder.with_trace_id(trace_id.clone());
|
||||
|
|
@ -359,25 +379,24 @@ async fn handle_agent_chat(
|
|||
.with_target(&agent_name)
|
||||
.build();
|
||||
|
||||
let mut span_builder = SpanBuilder::new(&operation_name)
|
||||
.with_span_id(span_id)
|
||||
.with_kind(SpanKind::Internal)
|
||||
.with_start_time(agent_start_time)
|
||||
.with_end_time(agent_end_time)
|
||||
.with_attribute(http::METHOD, "POST")
|
||||
.with_attribute(http::TARGET, full_path)
|
||||
.with_attribute("agent.name", agent_name.clone())
|
||||
.with_attribute(
|
||||
"agent.sequence",
|
||||
format!("{}/{}", agent_index + 1, agent_count),
|
||||
)
|
||||
.with_attribute(
|
||||
"duration_ms",
|
||||
format!("{:.2}", agent_elapsed.as_secs_f64() * 1000.0),
|
||||
);
|
||||
for (key, value) in &custom_attrs {
|
||||
span_builder = span_builder.with_attribute(key, value);
|
||||
}
|
||||
let mut span_builder = apply_custom_attrs(
|
||||
SpanBuilder::new(&operation_name)
|
||||
.with_span_id(span_id)
|
||||
.with_kind(SpanKind::Internal)
|
||||
.with_start_time(agent_start_time)
|
||||
.with_end_time(agent_end_time)
|
||||
.with_attribute(http::METHOD, "POST")
|
||||
.with_attribute(http::TARGET, full_path)
|
||||
.with_attribute("agent.name", agent_name.clone())
|
||||
.with_attribute(
|
||||
"agent.sequence",
|
||||
format!("{}/{}", agent_index + 1, agent_count),
|
||||
)
|
||||
.with_attribute(
|
||||
"duration_ms",
|
||||
format!("{:.2}", agent_elapsed.as_secs_f64() * 1000.0),
|
||||
),
|
||||
);
|
||||
|
||||
if !trace_id.is_empty() {
|
||||
span_builder = span_builder.with_trace_id(trace_id.clone());
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
use bytes::Bytes;
|
||||
use common::configuration::{ModelAlias, Tracing};
|
||||
use common::configuration::{ModelAlias, SpanAttributes};
|
||||
use common::consts::{
|
||||
ARCH_IS_STREAMING_HEADER, ARCH_PROVIDER_HINT_HEADER, REQUEST_ID_HEADER, TRACE_PARENT_HEADER,
|
||||
};
|
||||
|
|
@ -34,7 +34,6 @@ fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
|
|||
.boxed()
|
||||
}
|
||||
|
||||
// ! we reached the limit of the number of arguments for a function
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn llm_chat(
|
||||
request: Request<hyper::body::Incoming>,
|
||||
|
|
@ -43,18 +42,31 @@ pub async fn llm_chat(
|
|||
model_aliases: Arc<Option<HashMap<String, ModelAlias>>>,
|
||||
llm_providers: Arc<RwLock<LlmProviders>>,
|
||||
trace_collector: Arc<TraceCollector>,
|
||||
tracing_config: Arc<Option<Tracing>>, // ! right here
|
||||
span_attributes: Arc<Option<SpanAttributes>>,
|
||||
state_storage: Option<Arc<dyn StateStorage>>,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||
let request_path = request.uri().path().to_string();
|
||||
let request_headers = request.headers().clone();
|
||||
let custom_attrs = extract_custom_trace_attributes(
|
||||
&request_headers,
|
||||
tracing_config
|
||||
.as_ref()
|
||||
.as_ref()
|
||||
.and_then(|tracing| tracing.span_attribute_header_prefixes.as_deref()),
|
||||
);
|
||||
let mut header_prefixes: Option<&[String]> = None;
|
||||
let mut static_attributes: Option<&HashMap<String, String>> = None;
|
||||
if let Some(attrs) = span_attributes.as_ref() {
|
||||
header_prefixes = attrs.header_prefixes.as_deref();
|
||||
static_attributes = attrs.static_attributes.as_ref();
|
||||
}
|
||||
let mut custom_attrs = HashMap::new();
|
||||
if let Some(static_attributes) = static_attributes {
|
||||
for (key, value) in static_attributes {
|
||||
custom_attrs.insert(key.clone(), value.clone());
|
||||
}
|
||||
}
|
||||
if let Some(prefixes) = header_prefixes {
|
||||
if !prefixes.is_empty() {
|
||||
custom_attrs.extend(extract_custom_trace_attributes(
|
||||
&request_headers,
|
||||
Some(prefixes),
|
||||
));
|
||||
}
|
||||
}
|
||||
let request_id: String = match request_headers
|
||||
.get(REQUEST_ID_HEADER)
|
||||
.and_then(|h| h.to_str().ok())
|
||||
|
|
|
|||
|
|
@ -112,7 +112,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|||
));
|
||||
|
||||
let model_aliases = Arc::new(arch_config.model_aliases.clone());
|
||||
let tracing_config = Arc::new(arch_config.tracing.clone());
|
||||
let span_attributes = Arc::new(
|
||||
arch_config
|
||||
.tracing
|
||||
.as_ref()
|
||||
.and_then(|tracing| tracing.span_attributes.clone()),
|
||||
);
|
||||
|
||||
// Initialize trace collector and start background flusher
|
||||
// Tracing is enabled if the tracing config is present in arch_config.yaml
|
||||
|
|
@ -177,7 +182,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|||
let agents_list = combined_agents_filters_list.clone();
|
||||
let listeners = listeners.clone();
|
||||
let trace_collector = trace_collector.clone();
|
||||
let tracing_config = tracing_config.clone();
|
||||
let span_attributes = span_attributes.clone();
|
||||
let state_storage = state_storage.clone();
|
||||
let service = service_fn(move |req| {
|
||||
let router_service = Arc::clone(&router_service);
|
||||
|
|
@ -189,7 +194,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|||
let agents_list = agents_list.clone();
|
||||
let listeners = listeners.clone();
|
||||
let trace_collector = trace_collector.clone();
|
||||
let tracing_config = tracing_config.clone();
|
||||
let span_attributes = span_attributes.clone();
|
||||
let state_storage = state_storage.clone();
|
||||
|
||||
async move {
|
||||
|
|
@ -210,7 +215,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|||
agents_list,
|
||||
listeners,
|
||||
trace_collector,
|
||||
tracing_config,
|
||||
span_attributes,
|
||||
)
|
||||
.with_context(parent_cx)
|
||||
.await;
|
||||
|
|
@ -229,7 +234,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|||
model_aliases,
|
||||
llm_providers,
|
||||
trace_collector,
|
||||
tracing_config,
|
||||
span_attributes,
|
||||
state_storage,
|
||||
)
|
||||
.with_context(parent_cx)
|
||||
|
|
|
|||
|
|
@ -65,4 +65,30 @@ mod tests {
|
|||
assert_eq!(attrs.get("admin.level"), Some(&"3".to_string()));
|
||||
assert!(!attrs.contains_key("other.id"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn returns_empty_when_prefixes_missing_or_empty() {
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("x-katanemo-tenant-id", HeaderValue::from_static("ten_456"));
|
||||
|
||||
let attrs_none = extract_custom_trace_attributes(&headers, None);
|
||||
assert!(attrs_none.is_empty());
|
||||
|
||||
let empty_prefixes: Vec<String> = Vec::new();
|
||||
let attrs_empty = extract_custom_trace_attributes(&headers, Some(&empty_prefixes));
|
||||
assert!(attrs_empty.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn supports_multiple_prefixes() {
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("x-katanemo-tenant-id", HeaderValue::from_static("ten_456"));
|
||||
headers.insert("x-tenant-user-id", HeaderValue::from_static("usr_789"));
|
||||
|
||||
let prefixes = vec!["x-katanemo-".to_string(), "x-tenant-".to_string()];
|
||||
let attrs = extract_custom_trace_attributes(&headers, Some(&prefixes));
|
||||
|
||||
assert_eq!(attrs.get("tenant.id"), Some(&"ten_456".to_string()));
|
||||
assert_eq!(attrs.get("user.id"), Some(&"usr_789".to_string()));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -90,7 +90,14 @@ pub struct Overrides {
|
|||
pub struct Tracing {
|
||||
pub sampling_rate: Option<f64>,
|
||||
pub trace_arch_internal: Option<bool>,
|
||||
pub span_attribute_header_prefixes: Option<Vec<String>>,
|
||||
pub span_attributes: Option<SpanAttributes>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
||||
pub struct SpanAttributes {
|
||||
pub header_prefixes: Option<Vec<String>>,
|
||||
#[serde(rename = "static")]
|
||||
pub static_attributes: Option<HashMap<String, String>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Default)]
|
||||
|
|
|
|||
|
|
@ -55,5 +55,6 @@ listeners:
|
|||
|
||||
tracing:
|
||||
random_sampling: 100
|
||||
span_attribute_header_prefixes:
|
||||
- x-katanemo-
|
||||
span_attributes:
|
||||
header_prefixes:
|
||||
- x-katanemo-
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue