mirror of
https://github.com/katanemo/plano.git
synced 2026-05-21 13:55:15 +02:00
fix: restore span attributes, OTEL_SERVICE_NAME, config path, and error status
This commit is contained in:
parent
9748cdd857
commit
a0513fe191
7 changed files with 55 additions and 11 deletions
|
|
@ -1,7 +1,7 @@
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use common::configuration::{Agent, Listener, ModelAlias};
|
use common::configuration::{Agent, Listener, ModelAlias, SpanAttributes};
|
||||||
use common::llm_providers::LlmProviders;
|
use common::llm_providers::LlmProviders;
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
|
|
||||||
|
|
@ -22,6 +22,7 @@ pub struct AppState {
|
||||||
pub listeners: Arc<RwLock<Vec<Listener>>>,
|
pub listeners: Arc<RwLock<Vec<Listener>>>,
|
||||||
pub state_storage: Option<Arc<dyn StateStorage>>,
|
pub state_storage: Option<Arc<dyn StateStorage>>,
|
||||||
pub llm_provider_url: String,
|
pub llm_provider_url: String,
|
||||||
|
pub span_attributes: Arc<Option<SpanAttributes>>,
|
||||||
/// Shared HTTP client for upstream LLM requests (connection pooling / keep-alive).
|
/// Shared HTTP client for upstream LLM requests (connection pooling / keep-alive).
|
||||||
pub http_client: reqwest::Client,
|
pub http_client: reqwest::Client,
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,7 @@ use crate::app_state::AppState;
|
||||||
use crate::handlers::errors::build_error_chain_response;
|
use crate::handlers::errors::build_error_chain_response;
|
||||||
use crate::handlers::request::extract_request_id;
|
use crate::handlers::request::extract_request_id;
|
||||||
use crate::handlers::response::ResponseHandler;
|
use crate::handlers::response::ResponseHandler;
|
||||||
use crate::tracing::{operation_component, set_service_name};
|
use crate::tracing::{collect_custom_trace_attributes, operation_component, set_service_name};
|
||||||
|
|
||||||
/// Main errors for agent chat completions
|
/// Main errors for agent chat completions
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
|
|
@ -41,6 +41,8 @@ pub async fn agent_chat(
|
||||||
state: Arc<AppState>,
|
state: Arc<AppState>,
|
||||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||||
let request_id = extract_request_id(&request);
|
let request_id = extract_request_id(&request);
|
||||||
|
let custom_attrs =
|
||||||
|
collect_custom_trace_attributes(request.headers(), state.span_attributes.as_ref().as_ref());
|
||||||
|
|
||||||
// Create a span with request_id that will be included in all log lines
|
// Create a span with request_id that will be included in all log lines
|
||||||
let request_span = info_span!(
|
let request_span = info_span!(
|
||||||
|
|
@ -56,7 +58,7 @@ pub async fn agent_chat(
|
||||||
// Set service name for orchestrator operations
|
// Set service name for orchestrator operations
|
||||||
set_service_name(operation_component::ORCHESTRATOR);
|
set_service_name(operation_component::ORCHESTRATOR);
|
||||||
|
|
||||||
match handle_agent_chat_inner(request, state, request_id).await {
|
match handle_agent_chat_inner(request, state, request_id, custom_attrs).await {
|
||||||
Ok(response) => Ok(response),
|
Ok(response) => Ok(response),
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
// Check if this is a client error from the pipeline that should be cascaded
|
// Check if this is a client error from the pipeline that should be cascaded
|
||||||
|
|
@ -84,7 +86,7 @@ pub async fn agent_chat(
|
||||||
let mut response =
|
let mut response =
|
||||||
Response::new(ResponseHandler::create_full_body(json_string));
|
Response::new(ResponseHandler::create_full_body(json_string));
|
||||||
*response.status_mut() = hyper::StatusCode::from_u16(*status)
|
*response.status_mut() = hyper::StatusCode::from_u16(*status)
|
||||||
.unwrap_or(hyper::StatusCode::BAD_REQUEST);
|
.unwrap_or(hyper::StatusCode::INTERNAL_SERVER_ERROR);
|
||||||
response.headers_mut().insert(
|
response.headers_mut().insert(
|
||||||
hyper::header::CONTENT_TYPE,
|
hyper::header::CONTENT_TYPE,
|
||||||
"application/json".parse().unwrap(),
|
"application/json".parse().unwrap(),
|
||||||
|
|
@ -104,6 +106,7 @@ async fn handle_agent_chat_inner(
|
||||||
request: Request<hyper::body::Incoming>,
|
request: Request<hyper::body::Incoming>,
|
||||||
state: Arc<AppState>,
|
state: Arc<AppState>,
|
||||||
request_id: String,
|
request_id: String,
|
||||||
|
custom_attrs: std::collections::HashMap<String, String>,
|
||||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, AgentFilterChainError> {
|
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, AgentFilterChainError> {
|
||||||
// Initialize services
|
// Initialize services
|
||||||
let agent_selector = AgentSelector::new(Arc::clone(&state.orchestrator_service));
|
let agent_selector = AgentSelector::new(Arc::clone(&state.orchestrator_service));
|
||||||
|
|
@ -126,6 +129,9 @@ async fn handle_agent_chat_inner(
|
||||||
|
|
||||||
get_active_span(|span| {
|
get_active_span(|span| {
|
||||||
span.update_name(listener.name.to_string());
|
span.update_name(listener.name.to_string());
|
||||||
|
for (key, value) in &custom_attrs {
|
||||||
|
span.set_attribute(opentelemetry::KeyValue::new(key.clone(), value.clone()));
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
info!(listener = %listener.name, "handling request");
|
info!(listener = %listener.name, "handling request");
|
||||||
|
|
@ -276,6 +282,12 @@ async fn handle_agent_chat_inner(
|
||||||
set_service_name(operation_component::AGENT);
|
set_service_name(operation_component::AGENT);
|
||||||
get_active_span(|span| {
|
get_active_span(|span| {
|
||||||
span.update_name(format!("{} /v1/chat/completions", agent_name));
|
span.update_name(format!("{} /v1/chat/completions", agent_name));
|
||||||
|
for (key, value) in &custom_attrs {
|
||||||
|
span.set_attribute(opentelemetry::KeyValue::new(
|
||||||
|
key.clone(),
|
||||||
|
value.clone(),
|
||||||
|
));
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
pipeline_processor
|
pipeline_processor
|
||||||
|
|
|
||||||
|
|
@ -29,7 +29,9 @@ use crate::state::response_state_processor::ResponsesStateProcessor;
|
||||||
use crate::state::{
|
use crate::state::{
|
||||||
extract_input_items, retrieve_and_combine_input, StateStorage, StateStorageError,
|
extract_input_items, retrieve_and_combine_input, StateStorage, StateStorageError,
|
||||||
};
|
};
|
||||||
use crate::tracing::{llm as tracing_llm, operation_component, set_service_name};
|
use crate::tracing::{
|
||||||
|
collect_custom_trace_attributes, llm as tracing_llm, operation_component, set_service_name,
|
||||||
|
};
|
||||||
use router::router_chat_get_upstream_model;
|
use router::router_chat_get_upstream_model;
|
||||||
|
|
||||||
fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
|
fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
|
||||||
|
|
@ -45,6 +47,8 @@ pub async fn llm_chat(
|
||||||
let request_path = request.uri().path().to_string();
|
let request_path = request.uri().path().to_string();
|
||||||
let request_headers = request.headers().clone();
|
let request_headers = request.headers().clone();
|
||||||
let request_id = extract_request_id(&request);
|
let request_id = extract_request_id(&request);
|
||||||
|
let custom_attrs =
|
||||||
|
collect_custom_trace_attributes(&request_headers, state.span_attributes.as_ref().as_ref());
|
||||||
|
|
||||||
// Create a span with request_id that will be included in all log lines
|
// Create a span with request_id that will be included in all log lines
|
||||||
let request_span = info_span!(
|
let request_span = info_span!(
|
||||||
|
|
@ -60,7 +64,14 @@ pub async fn llm_chat(
|
||||||
);
|
);
|
||||||
|
|
||||||
// Execute the rest of the handler inside the span
|
// Execute the rest of the handler inside the span
|
||||||
llm_chat_inner(request, state, request_id, request_path, request_headers)
|
llm_chat_inner(
|
||||||
|
request,
|
||||||
|
state,
|
||||||
|
custom_attrs,
|
||||||
|
request_id,
|
||||||
|
request_path,
|
||||||
|
request_headers,
|
||||||
|
)
|
||||||
.instrument(request_span)
|
.instrument(request_span)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
@ -68,12 +79,18 @@ pub async fn llm_chat(
|
||||||
async fn llm_chat_inner(
|
async fn llm_chat_inner(
|
||||||
request: Request<hyper::body::Incoming>,
|
request: Request<hyper::body::Incoming>,
|
||||||
state: Arc<AppState>,
|
state: Arc<AppState>,
|
||||||
|
custom_attrs: HashMap<String, String>,
|
||||||
request_id: String,
|
request_id: String,
|
||||||
request_path: String,
|
request_path: String,
|
||||||
mut request_headers: hyper::HeaderMap,
|
mut request_headers: hyper::HeaderMap,
|
||||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||||
// Set service name for LLM operations
|
// Set service name for LLM operations
|
||||||
set_service_name(operation_component::LLM);
|
set_service_name(operation_component::LLM);
|
||||||
|
get_active_span(|span| {
|
||||||
|
for (key, value) in &custom_attrs {
|
||||||
|
span.set_attribute(opentelemetry::KeyValue::new(key.clone(), value.clone()));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
let traceparent = extract_or_generate_traceparent(&request_headers);
|
let traceparent = extract_or_generate_traceparent(&request_headers);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -72,11 +72,11 @@ fn cors_preflight() -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Err
|
||||||
/// Load and parse the YAML configuration file.
|
/// Load and parse the YAML configuration file.
|
||||||
///
|
///
|
||||||
/// The path is read from `PLANO_CONFIG_PATH_RENDERED` (env) or falls back to
|
/// The path is read from `PLANO_CONFIG_PATH_RENDERED` (env) or falls back to
|
||||||
/// `./arch_config_rendered.yaml`.
|
/// `./plano_config_rendered.yaml`.
|
||||||
fn load_config() -> Result<Configuration, Box<dyn std::error::Error + Send + Sync>> {
|
fn load_config() -> Result<Configuration, Box<dyn std::error::Error + Send + Sync>> {
|
||||||
let path = env::var("PLANO_CONFIG_PATH_RENDERED")
|
let path = env::var("PLANO_CONFIG_PATH_RENDERED")
|
||||||
.unwrap_or_else(|_| "./arch_config_rendered.yaml".to_string());
|
.unwrap_or_else(|_| "./plano_config_rendered.yaml".to_string());
|
||||||
eprintln!("loading arch_config.yaml from {}", path);
|
eprintln!("loading plano_config.yaml from {}", path);
|
||||||
|
|
||||||
let contents = fs::read_to_string(&path).map_err(|e| format!("failed to read {path}: {e}"))?;
|
let contents = fs::read_to_string(&path).map_err(|e| format!("failed to read {path}: {e}"))?;
|
||||||
|
|
||||||
|
|
@ -136,6 +136,13 @@ async fn init_app_state(
|
||||||
|
|
||||||
let state_storage = init_state_storage(config).await?;
|
let state_storage = init_state_storage(config).await?;
|
||||||
|
|
||||||
|
let span_attributes = Arc::new(
|
||||||
|
config
|
||||||
|
.tracing
|
||||||
|
.as_ref()
|
||||||
|
.and_then(|tracing| tracing.span_attributes.clone()),
|
||||||
|
);
|
||||||
|
|
||||||
Ok(AppState {
|
Ok(AppState {
|
||||||
router_service,
|
router_service,
|
||||||
orchestrator_service,
|
orchestrator_service,
|
||||||
|
|
@ -145,6 +152,7 @@ async fn init_app_state(
|
||||||
listeners: Arc::new(RwLock::new(config.listeners.clone())),
|
listeners: Arc::new(RwLock::new(config.listeners.clone())),
|
||||||
state_storage,
|
state_storage,
|
||||||
llm_provider_url,
|
llm_provider_url,
|
||||||
|
span_attributes,
|
||||||
http_client: reqwest::Client::new(),
|
http_client: reqwest::Client::new(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -52,6 +52,7 @@ pub fn collect_custom_trace_attributes(
|
||||||
attributes
|
attributes
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
pub fn append_span_attributes(
|
pub fn append_span_attributes(
|
||||||
mut span_builder: SpanBuilder,
|
mut span_builder: SpanBuilder,
|
||||||
attributes: &HashMap<String, String>,
|
attributes: &HashMap<String, String>,
|
||||||
|
|
|
||||||
|
|
@ -99,6 +99,9 @@ pub fn init_tracer(tracing_config: Option<&Tracing>) -> &'static SdkTracerProvid
|
||||||
// Create OTLP exporter to send spans to collector.
|
// Create OTLP exporter to send spans to collector.
|
||||||
// Use `if let` to destructure the endpoint, avoiding an unwrap.
|
// Use `if let` to destructure the endpoint, avoiding an unwrap.
|
||||||
if let Some(endpoint) = otel_endpoint.as_deref().filter(|_| tracing_enabled) {
|
if let Some(endpoint) = otel_endpoint.as_deref().filter(|_| tracing_enabled) {
|
||||||
|
if std::env::var("OTEL_SERVICE_NAME").is_err() {
|
||||||
|
std::env::set_var("OTEL_SERVICE_NAME", "plano");
|
||||||
|
}
|
||||||
// Create ServiceNameOverrideExporter to support per-span service names
|
// Create ServiceNameOverrideExporter to support per-span service names
|
||||||
// This allows spans to have different service names (e.g., plano(orchestrator),
|
// This allows spans to have different service names (e.g., plano(orchestrator),
|
||||||
// plano(filter), plano(llm)) by setting the "service.name.override" attribute
|
// plano(filter), plano(llm)) by setting the "service.name.override" attribute
|
||||||
|
|
|
||||||
|
|
@ -1,10 +1,12 @@
|
||||||
mod constants;
|
mod constants;
|
||||||
|
mod custom_attributes;
|
||||||
mod init;
|
mod init;
|
||||||
mod service_name_exporter;
|
mod service_name_exporter;
|
||||||
|
|
||||||
pub use constants::{
|
pub use constants::{
|
||||||
error, http, llm, operation_component, routing, signals, OperationNameBuilder,
|
error, http, llm, operation_component, routing, signals, OperationNameBuilder,
|
||||||
};
|
};
|
||||||
|
pub use custom_attributes::collect_custom_trace_attributes;
|
||||||
pub use init::init_tracer;
|
pub use init::init_tracer;
|
||||||
pub use service_name_exporter::{ServiceNameOverrideExporter, SERVICE_NAME_OVERRIDE_KEY};
|
pub use service_name_exporter::{ServiceNameOverrideExporter, SERVICE_NAME_OVERRIDE_KEY};
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue