mirror of
https://github.com/katanemo/plano.git
synced 2026-06-17 15:25:17 +02:00
add custom trace attributes
This commit is contained in:
parent
d70f79b41c
commit
3a663aa780
10 changed files with 764 additions and 409 deletions
|
|
@ -382,8 +382,27 @@ properties:
|
|||
type: integer
|
||||
trace_arch_internal:
|
||||
type: boolean
|
||||
opentracing_grpc_endpoint:
|
||||
type: string
|
||||
custom_attributes:
|
||||
type: array
|
||||
items:
|
||||
type: object
|
||||
properties:
|
||||
key:
|
||||
type: string
|
||||
type:
|
||||
type: string
|
||||
enum:
|
||||
- str
|
||||
- bool
|
||||
- float
|
||||
- int
|
||||
header:
|
||||
type: string
|
||||
additionalProperties: false
|
||||
required:
|
||||
- key
|
||||
- type
|
||||
- header
|
||||
additionalProperties: false
|
||||
mode:
|
||||
type: string
|
||||
|
|
|
|||
|
|
@ -1,7 +1,10 @@
|
|||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use std::time::{Instant, SystemTime};
|
||||
|
||||
use bytes::Bytes;
|
||||
use common::configuration::Tracing;
|
||||
use common::consts::TRACE_PARENT_HEADER;
|
||||
use common::traces::{generate_random_span_id, parse_traceparent, SpanBuilder, SpanKind};
|
||||
use hermesllm::apis::OpenAIMessage;
|
||||
use hermesllm::clients::SupportedAPIsFromClient;
|
||||
use hermesllm::providers::request::ProviderRequest;
|
||||
|
|
@ -9,15 +12,17 @@ use hermesllm::ProviderRequestType;
|
|||
use http_body_util::combinators::BoxBody;
|
||||
use http_body_util::BodyExt;
|
||||
use hyper::{Request, Response};
|
||||
use opentelemetry::trace::get_active_span;
|
||||
use serde::ser::Error as SerError;
|
||||
use tracing::{debug, info, info_span, warn, Instrument};
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
use super::agent_selector::{AgentSelectionError, AgentSelector};
|
||||
use super::pipeline_processor::{PipelineError, PipelineProcessor};
|
||||
use super::response_handler::ResponseHandler;
|
||||
use crate::router::plano_orchestrator::OrchestratorService;
|
||||
use crate::tracing::{operation_component, set_service_name};
|
||||
use crate::tracing::{
|
||||
append_span_attributes, collect_custom_trace_attributes, http, operation_component,
|
||||
OperationNameBuilder,
|
||||
};
|
||||
|
||||
/// Main errors for agent chat completions
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
|
|
@ -40,122 +45,95 @@ pub async fn agent_chat(
|
|||
_: String,
|
||||
agents_list: Arc<tokio::sync::RwLock<Option<Vec<common::configuration::Agent>>>>,
|
||||
listeners: Arc<tokio::sync::RwLock<Vec<common::configuration::Listener>>>,
|
||||
trace_collector: Arc<common::traces::TraceCollector>,
|
||||
tracing_config: Arc<Option<Tracing>>,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||
// Extract request_id from headers or generate a new one
|
||||
let request_id: String = match request
|
||||
.headers()
|
||||
.get(common::consts::REQUEST_ID_HEADER)
|
||||
.and_then(|h| h.to_str().ok())
|
||||
.map(|s| s.to_string())
|
||||
match handle_agent_chat(
|
||||
request,
|
||||
orchestrator_service,
|
||||
agents_list,
|
||||
listeners,
|
||||
trace_collector,
|
||||
tracing_config,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Some(id) => id,
|
||||
None => uuid::Uuid::new_v4().to_string(),
|
||||
};
|
||||
Ok(response) => Ok(response),
|
||||
Err(err) => {
|
||||
// Check if this is a client error from the pipeline that should be cascaded
|
||||
if let AgentFilterChainError::Pipeline(PipelineError::ClientError {
|
||||
agent,
|
||||
status,
|
||||
body,
|
||||
}) = &err
|
||||
{
|
||||
warn!(
|
||||
"Client error from agent '{}' (HTTP {}): {}",
|
||||
agent, status, body
|
||||
);
|
||||
|
||||
// Create a span with request_id that will be included in all log lines
|
||||
let request_span = info_span!(
|
||||
"(orchestrator)",
|
||||
component = "orchestrator",
|
||||
request_id = %request_id,
|
||||
http.method = %request.method(),
|
||||
http.path = %request.uri().path()
|
||||
);
|
||||
|
||||
// Execute the handler inside the span
|
||||
async {
|
||||
// Set service name for orchestrator operations
|
||||
set_service_name(operation_component::ORCHESTRATOR);
|
||||
|
||||
match handle_agent_chat_inner(
|
||||
request,
|
||||
orchestrator_service,
|
||||
agents_list,
|
||||
listeners,
|
||||
request_id,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(response) => Ok(response),
|
||||
Err(err) => {
|
||||
// Check if this is a client error from the pipeline that should be cascaded
|
||||
if let AgentFilterChainError::Pipeline(PipelineError::ClientError {
|
||||
agent,
|
||||
status,
|
||||
body,
|
||||
}) = &err
|
||||
{
|
||||
warn!(
|
||||
agent = %agent,
|
||||
status = %status,
|
||||
body = %body,
|
||||
"client error from agent"
|
||||
);
|
||||
|
||||
// Create error response with the original status code and body
|
||||
let error_json = serde_json::json!({
|
||||
"error": "ClientError",
|
||||
"agent": agent,
|
||||
"status": status,
|
||||
"agent_response": body
|
||||
});
|
||||
|
||||
let json_string = error_json.to_string();
|
||||
let mut response =
|
||||
Response::new(ResponseHandler::create_full_body(json_string));
|
||||
*response.status_mut() = hyper::StatusCode::from_u16(*status)
|
||||
.unwrap_or(hyper::StatusCode::BAD_REQUEST);
|
||||
response.headers_mut().insert(
|
||||
hyper::header::CONTENT_TYPE,
|
||||
"application/json".parse().unwrap(),
|
||||
);
|
||||
return Ok(response);
|
||||
}
|
||||
|
||||
// Print detailed error information with full error chain for other errors
|
||||
let mut error_chain = Vec::new();
|
||||
let mut current_error: &dyn std::error::Error = &err;
|
||||
|
||||
// Collect the full error chain
|
||||
loop {
|
||||
error_chain.push(current_error.to_string());
|
||||
match current_error.source() {
|
||||
Some(source) => current_error = source,
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
|
||||
// Log the complete error chain
|
||||
warn!(error_chain = ?error_chain, "agent chat error chain");
|
||||
warn!(root_error = ?err, "root error");
|
||||
|
||||
// Create structured error response as JSON
|
||||
// Create error response with the original status code and body
|
||||
let error_json = serde_json::json!({
|
||||
"error": {
|
||||
"type": "AgentFilterChainError",
|
||||
"message": err.to_string(),
|
||||
"error_chain": error_chain,
|
||||
"debug_info": format!("{:?}", err)
|
||||
}
|
||||
"error": "ClientError",
|
||||
"agent": agent,
|
||||
"status": status,
|
||||
"agent_response": body
|
||||
});
|
||||
|
||||
// Log the error for debugging
|
||||
info!(error = %error_json, "structured error info");
|
||||
|
||||
// Return JSON error response
|
||||
Ok(ResponseHandler::create_json_error_response(&error_json))
|
||||
let json_string = error_json.to_string();
|
||||
let mut response = Response::new(ResponseHandler::create_full_body(json_string));
|
||||
*response.status_mut() =
|
||||
hyper::StatusCode::from_u16(*status).unwrap_or(hyper::StatusCode::BAD_REQUEST);
|
||||
response.headers_mut().insert(
|
||||
hyper::header::CONTENT_TYPE,
|
||||
"application/json".parse().unwrap(),
|
||||
);
|
||||
return Ok(response);
|
||||
}
|
||||
|
||||
// Print detailed error information with full error chain for other errors
|
||||
let mut error_chain = Vec::new();
|
||||
let mut current_error: &dyn std::error::Error = &err;
|
||||
|
||||
// Collect the full error chain
|
||||
loop {
|
||||
error_chain.push(current_error.to_string());
|
||||
match current_error.source() {
|
||||
Some(source) => current_error = source,
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
|
||||
// Log the complete error chain
|
||||
warn!("Agent chat error chain: {:#?}", error_chain);
|
||||
warn!("Root error: {:?}", err);
|
||||
|
||||
// Create structured error response as JSON
|
||||
let error_json = serde_json::json!({
|
||||
"error": {
|
||||
"type": "AgentFilterChainError",
|
||||
"message": err.to_string(),
|
||||
"error_chain": error_chain,
|
||||
"debug_info": format!("{:?}", err)
|
||||
}
|
||||
});
|
||||
|
||||
// Log the error for debugging
|
||||
info!("Structured error info: {}", error_json);
|
||||
|
||||
// Return JSON error response
|
||||
Ok(ResponseHandler::create_json_error_response(&error_json))
|
||||
}
|
||||
}
|
||||
.instrument(request_span)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn handle_agent_chat_inner(
|
||||
async fn handle_agent_chat(
|
||||
request: Request<hyper::body::Incoming>,
|
||||
orchestrator_service: Arc<OrchestratorService>,
|
||||
agents_list: Arc<tokio::sync::RwLock<Option<Vec<common::configuration::Agent>>>>,
|
||||
listeners: Arc<tokio::sync::RwLock<Vec<common::configuration::Listener>>>,
|
||||
request_id: String,
|
||||
trace_collector: Arc<common::traces::TraceCollector>,
|
||||
tracing_config: Arc<Option<Tracing>>,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, AgentFilterChainError> {
|
||||
// Initialize services
|
||||
let agent_selector = AgentSelector::new(orchestrator_service);
|
||||
|
|
@ -169,18 +147,14 @@ async fn handle_agent_chat_inner(
|
|||
.and_then(|name| name.to_str().ok());
|
||||
|
||||
// Find the appropriate listener
|
||||
let listener: common::configuration::Listener = {
|
||||
let listener = {
|
||||
let listeners = listeners.read().await;
|
||||
agent_selector
|
||||
.find_listener(listener_name, &listeners)
|
||||
.await?
|
||||
};
|
||||
|
||||
get_active_span(|span| {
|
||||
span.update_name(listener.name.to_string());
|
||||
});
|
||||
|
||||
info!(listener = %listener.name, "handling request");
|
||||
info!("Handling request for listener: {}", listener.name);
|
||||
|
||||
// Parse request body
|
||||
let request_path = request
|
||||
|
|
@ -195,8 +169,12 @@ async fn handle_agent_chat_inner(
|
|||
let mut headers = request.headers().clone();
|
||||
headers.remove(common::consts::ENVOY_ORIGINAL_PATH_HEADER);
|
||||
|
||||
// Set the request_id in headers if not already present
|
||||
if !headers.contains_key(common::consts::REQUEST_ID_HEADER) {
|
||||
let request_id = uuid::Uuid::new_v4().to_string();
|
||||
info!(
|
||||
"Request id not found in headers, generated new request id: {}",
|
||||
request_id
|
||||
);
|
||||
headers.insert(
|
||||
common::consts::REQUEST_ID_HEADER,
|
||||
hyper::header::HeaderValue::from_str(&request_id).unwrap(),
|
||||
|
|
@ -205,12 +183,19 @@ async fn handle_agent_chat_inner(
|
|||
|
||||
headers
|
||||
};
|
||||
let custom_attrs = collect_custom_trace_attributes(
|
||||
&request_headers,
|
||||
tracing_config
|
||||
.as_ref()
|
||||
.as_ref()
|
||||
.and_then(|tracing| tracing.custom_attributes.as_deref()),
|
||||
);
|
||||
|
||||
let chat_request_bytes = request.collect().await?.to_bytes();
|
||||
|
||||
debug!(
|
||||
body = %String::from_utf8_lossy(&chat_request_bytes),
|
||||
"received request body"
|
||||
"Received request body (raw utf8): {}",
|
||||
String::from_utf8_lossy(&chat_request_bytes)
|
||||
);
|
||||
|
||||
// Determine the API type from the endpoint
|
||||
|
|
@ -224,7 +209,7 @@ async fn handle_agent_chat_inner(
|
|||
let client_request = match ProviderRequestType::try_from((&chat_request_bytes[..], &api_type)) {
|
||||
Ok(request) => request,
|
||||
Err(err) => {
|
||||
warn!("failed to parse request as ProviderRequestType: {}", err);
|
||||
warn!("Failed to parse request as ProviderRequestType: {}", err);
|
||||
let err_msg = format!("Failed to parse request: {}", err);
|
||||
return Err(AgentFilterChainError::RequestParsing(
|
||||
serde_json::Error::custom(err_msg),
|
||||
|
|
@ -234,6 +219,12 @@ async fn handle_agent_chat_inner(
|
|||
|
||||
let message: Vec<OpenAIMessage> = client_request.get_messages();
|
||||
|
||||
// Extract trace parent for routing
|
||||
let traceparent = request_headers
|
||||
.iter()
|
||||
.find(|(key, _)| key.as_str() == TRACE_PARENT_HEADER)
|
||||
.map(|(_, value)| value.to_str().unwrap_or_default().to_string());
|
||||
|
||||
let request_id = request_headers
|
||||
.get(common::consts::REQUEST_ID_HEADER)
|
||||
.and_then(|val| val.to_str().ok())
|
||||
|
|
@ -246,58 +237,90 @@ async fn handle_agent_chat_inner(
|
|||
agent_selector.create_agent_map(agents)
|
||||
};
|
||||
|
||||
// Parse trace parent to get trace_id and parent_span_id
|
||||
let (trace_id, parent_span_id) = if let Some(ref tp) = traceparent {
|
||||
parse_traceparent(tp)
|
||||
} else {
|
||||
(String::new(), None)
|
||||
};
|
||||
|
||||
// Select appropriate agents using arch orchestrator llm model
|
||||
let selection_start = Instant::now();
|
||||
let selection_span_id = generate_random_span_id();
|
||||
let selection_start_time = SystemTime::now();
|
||||
let selection_start_instant = Instant::now();
|
||||
|
||||
let selected_agents = agent_selector
|
||||
.select_agents(&message, &listener, request_id.clone())
|
||||
.select_agents(&message, &listener, traceparent.clone(), request_id.clone())
|
||||
.await?;
|
||||
|
||||
// Record selection attributes on the current orchestrator span
|
||||
let selection_elapsed_ms = selection_start.elapsed().as_secs_f64() * 1000.0;
|
||||
get_active_span(|span| {
|
||||
span.set_attribute(opentelemetry::KeyValue::new(
|
||||
"selection.listener",
|
||||
listener.name.clone(),
|
||||
));
|
||||
span.set_attribute(opentelemetry::KeyValue::new(
|
||||
"selection.agent_count",
|
||||
selected_agents.len() as i64,
|
||||
));
|
||||
span.set_attribute(opentelemetry::KeyValue::new(
|
||||
// Record agent selection span
|
||||
let selection_end_time = SystemTime::now();
|
||||
let selection_elapsed = selection_start_instant.elapsed();
|
||||
let selection_operation_name = OperationNameBuilder::new()
|
||||
.with_method("POST")
|
||||
.with_path("/agents/select")
|
||||
.with_target(&listener.name)
|
||||
.build();
|
||||
|
||||
let mut selection_span_builder = append_span_attributes(
|
||||
SpanBuilder::new(&selection_operation_name)
|
||||
.with_span_id(selection_span_id)
|
||||
.with_kind(SpanKind::Internal)
|
||||
.with_start_time(selection_start_time)
|
||||
.with_end_time(selection_end_time)
|
||||
.with_attribute(http::METHOD, "POST")
|
||||
.with_attribute(http::TARGET, "/agents/select")
|
||||
.with_attribute("selection.listener", listener.name.clone())
|
||||
.with_attribute("selection.agent_count", selected_agents.len().to_string())
|
||||
.with_attribute(
|
||||
"selection.agents",
|
||||
selected_agents
|
||||
.iter()
|
||||
.map(|a| a.id.as_str())
|
||||
.collect::<Vec<_>>()
|
||||
.join(","),
|
||||
));
|
||||
span.set_attribute(opentelemetry::KeyValue::new(
|
||||
"selection.determination_ms",
|
||||
format!("{:.2}", selection_elapsed_ms),
|
||||
));
|
||||
});
|
||||
|
||||
info!(
|
||||
count = selected_agents.len(),
|
||||
"selected agents for execution"
|
||||
)
|
||||
.with_attribute(
|
||||
"duration_ms",
|
||||
format!("{:.2}", selection_elapsed.as_secs_f64() * 1000.0),
|
||||
),
|
||||
&custom_attrs,
|
||||
);
|
||||
|
||||
if !trace_id.is_empty() {
|
||||
selection_span_builder = selection_span_builder.with_trace_id(trace_id.clone());
|
||||
}
|
||||
if let Some(parent_id) = parent_span_id.clone() {
|
||||
selection_span_builder = selection_span_builder.with_parent_span_id(parent_id);
|
||||
}
|
||||
|
||||
let selection_span = selection_span_builder.build();
|
||||
trace_collector.record_span(operation_component::ORCHESTRATOR, selection_span);
|
||||
|
||||
info!("Selected {} agent(s) for execution", selected_agents.len());
|
||||
|
||||
// Execute agents sequentially, passing output from one to the next
|
||||
let mut current_messages = message.clone();
|
||||
let agent_count = selected_agents.len();
|
||||
|
||||
for (agent_index, selected_agent) in selected_agents.iter().enumerate() {
|
||||
// Get agent name
|
||||
let agent_name = selected_agent.id.clone();
|
||||
let is_last_agent = agent_index == agent_count - 1;
|
||||
|
||||
debug!(
|
||||
agent_index = agent_index + 1,
|
||||
total = agent_count,
|
||||
agent = %agent_name,
|
||||
"processing agent"
|
||||
"Processing agent {}/{}: {}",
|
||||
agent_index + 1,
|
||||
agent_count,
|
||||
selected_agent.id
|
||||
);
|
||||
|
||||
// Record the start time for agent span
|
||||
let agent_start_time = SystemTime::now();
|
||||
let agent_start_instant = Instant::now();
|
||||
let span_id = generate_random_span_id();
|
||||
|
||||
// Get agent name
|
||||
let agent_name = selected_agent.id.clone();
|
||||
|
||||
// Process the filter chain
|
||||
let chat_history = pipeline_processor
|
||||
.process_filter_chain(
|
||||
|
|
@ -305,71 +328,91 @@ async fn handle_agent_chat_inner(
|
|||
selected_agent,
|
||||
&agent_map,
|
||||
&request_headers,
|
||||
Some(&trace_collector),
|
||||
trace_id.clone(),
|
||||
span_id.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Get agent details and invoke
|
||||
let agent = agent_map.get(&agent_name).unwrap();
|
||||
|
||||
debug!(agent = %agent_name, "invoking agent");
|
||||
debug!("Invoking agent: {}", agent_name);
|
||||
|
||||
let agent_span = info_span!(
|
||||
"agent",
|
||||
agent_id = %agent_name,
|
||||
message_count = chat_history.len(),
|
||||
let llm_response = pipeline_processor
|
||||
.invoke_agent(
|
||||
&chat_history,
|
||||
client_request.clone(),
|
||||
agent,
|
||||
&request_headers,
|
||||
trace_id.clone(),
|
||||
span_id.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Record agent span
|
||||
let agent_end_time = SystemTime::now();
|
||||
let agent_elapsed = agent_start_instant.elapsed();
|
||||
let full_path = format!("/agents{}", request_path);
|
||||
let operation_name = OperationNameBuilder::new()
|
||||
.with_method("POST")
|
||||
.with_path(&full_path)
|
||||
.with_target(&agent_name)
|
||||
.build();
|
||||
|
||||
let mut span_builder = append_span_attributes(
|
||||
SpanBuilder::new(&operation_name)
|
||||
.with_span_id(span_id)
|
||||
.with_kind(SpanKind::Internal)
|
||||
.with_start_time(agent_start_time)
|
||||
.with_end_time(agent_end_time)
|
||||
.with_attribute(http::METHOD, "POST")
|
||||
.with_attribute(http::TARGET, full_path)
|
||||
.with_attribute("agent.name", agent_name.clone())
|
||||
.with_attribute(
|
||||
"agent.sequence",
|
||||
format!("{}/{}", agent_index + 1, agent_count),
|
||||
)
|
||||
.with_attribute(
|
||||
"duration_ms",
|
||||
format!("{:.2}", agent_elapsed.as_secs_f64() * 1000.0),
|
||||
),
|
||||
&custom_attrs,
|
||||
);
|
||||
|
||||
let llm_response = async {
|
||||
set_service_name(operation_component::AGENT);
|
||||
get_active_span(|span| {
|
||||
span.update_name(format!("{} /v1/chat/completions", agent_name));
|
||||
});
|
||||
|
||||
pipeline_processor
|
||||
.invoke_agent(
|
||||
&chat_history,
|
||||
client_request.clone(),
|
||||
agent,
|
||||
&request_headers,
|
||||
)
|
||||
.await
|
||||
if !trace_id.is_empty() {
|
||||
span_builder = span_builder.with_trace_id(trace_id.clone());
|
||||
}
|
||||
.instrument(agent_span.clone())
|
||||
.await?;
|
||||
if let Some(parent_id) = parent_span_id.clone() {
|
||||
span_builder = span_builder.with_parent_span_id(parent_id);
|
||||
}
|
||||
|
||||
let span = span_builder.build();
|
||||
trace_collector.record_span(operation_component::AGENT, span);
|
||||
|
||||
// If this is the last agent, return the streaming response
|
||||
if is_last_agent {
|
||||
info!(
|
||||
agent = %agent_name,
|
||||
"completed agent chain, returning response"
|
||||
"Completed agent chain, returning response from last agent: {}",
|
||||
agent_name
|
||||
);
|
||||
// Capture the orchestrator span (parent of the agent span) so it
|
||||
// stays open for the full streaming duration alongside the agent span.
|
||||
let orchestrator_span = tracing::Span::current();
|
||||
return async {
|
||||
response_handler
|
||||
.create_streaming_response(
|
||||
llm_response,
|
||||
tracing::Span::current(), // agent span (inner)
|
||||
orchestrator_span, // orchestrator span (outer)
|
||||
)
|
||||
.await
|
||||
.map_err(AgentFilterChainError::from)
|
||||
}
|
||||
.instrument(agent_span)
|
||||
.await;
|
||||
return response_handler
|
||||
.create_streaming_response(llm_response)
|
||||
.await
|
||||
.map_err(AgentFilterChainError::from);
|
||||
}
|
||||
|
||||
// For intermediate agents, collect the full response and pass to next agent
|
||||
debug!(agent = %agent_name, "collecting response from intermediate agent");
|
||||
let response_text = async { response_handler.collect_full_response(llm_response).await }
|
||||
.instrument(agent_span)
|
||||
.await?;
|
||||
debug!(
|
||||
"Collecting response from intermediate agent: {}",
|
||||
agent_name
|
||||
);
|
||||
let response_text = response_handler.collect_full_response(llm_response).await?;
|
||||
|
||||
info!(
|
||||
agent = %agent_name,
|
||||
response_len = response_text.len(),
|
||||
"agent completed, passing response to next agent"
|
||||
"Agent {} completed, passing {} character response to next agent",
|
||||
agent_name,
|
||||
response_text.len()
|
||||
);
|
||||
|
||||
// remove last message and add new one at the end
|
||||
|
|
|
|||
|
|
@ -1,9 +1,10 @@
|
|||
use bytes::Bytes;
|
||||
use common::configuration::ModelAlias;
|
||||
use common::configuration::{ModelAlias, Tracing};
|
||||
use common::consts::{
|
||||
ARCH_IS_STREAMING_HEADER, ARCH_PROVIDER_HINT_HEADER, REQUEST_ID_HEADER, TRACE_PARENT_HEADER,
|
||||
};
|
||||
use common::llm_providers::LlmProviders;
|
||||
use common::traces::TraceCollector;
|
||||
use hermesllm::apis::openai_responses::InputParam;
|
||||
use hermesllm::clients::{SupportedAPIsFromClient, SupportedUpstreamAPIs};
|
||||
use hermesllm::{ProviderRequest, ProviderRequestType};
|
||||
|
|
@ -11,13 +12,10 @@ use http_body_util::combinators::BoxBody;
|
|||
use http_body_util::{BodyExt, Full};
|
||||
use hyper::header::{self};
|
||||
use hyper::{Request, Response, StatusCode};
|
||||
use opentelemetry::global;
|
||||
use opentelemetry::trace::get_active_span;
|
||||
use opentelemetry_http::HeaderInjector;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::{debug, info, info_span, warn, Instrument};
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
use crate::handlers::router_chat::router_chat_get_upstream_model;
|
||||
use crate::handlers::utils::{
|
||||
|
|
@ -28,7 +26,7 @@ use crate::state::response_state_processor::ResponsesStateProcessor;
|
|||
use crate::state::{
|
||||
extract_input_items, retrieve_and_combine_input, StateStorage, StateStorageError,
|
||||
};
|
||||
use crate::tracing::{llm as tracing_llm, operation_component, set_service_name};
|
||||
use crate::tracing::{collect_custom_trace_attributes, operation_component};
|
||||
|
||||
fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
|
||||
Full::new(chunk.into())
|
||||
|
|
@ -36,69 +34,43 @@ fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
|
|||
.boxed()
|
||||
}
|
||||
|
||||
// ! we reached the limit of the number of arguments for a function
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn llm_chat(
|
||||
request: Request<hyper::body::Incoming>,
|
||||
router_service: Arc<RouterService>,
|
||||
full_qualified_llm_provider_url: String,
|
||||
model_aliases: Arc<Option<HashMap<String, ModelAlias>>>,
|
||||
llm_providers: Arc<RwLock<LlmProviders>>,
|
||||
trace_collector: Arc<TraceCollector>,
|
||||
tracing_config: Arc<Option<Tracing>>, // ! right here
|
||||
state_storage: Option<Arc<dyn StateStorage>>,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||
let request_path = request.uri().path().to_string();
|
||||
let request_headers = request.headers().clone();
|
||||
let custom_attrs = collect_custom_trace_attributes(
|
||||
&request_headers,
|
||||
tracing_config
|
||||
.as_ref()
|
||||
.as_ref()
|
||||
.and_then(|tracing| tracing.custom_attributes.as_deref()),
|
||||
);
|
||||
let request_id: String = match request_headers
|
||||
.get(REQUEST_ID_HEADER)
|
||||
.and_then(|h| h.to_str().ok())
|
||||
.map(|s| s.to_string())
|
||||
{
|
||||
Some(id) => id,
|
||||
None => uuid::Uuid::new_v4().to_string(),
|
||||
None => {
|
||||
let generated_id = uuid::Uuid::new_v4().to_string();
|
||||
warn!(
|
||||
"[PLANO_REQ_ID:{}] | REQUEST_ID header missing, generated new ID",
|
||||
generated_id
|
||||
);
|
||||
generated_id
|
||||
}
|
||||
};
|
||||
|
||||
// Create a span with request_id that will be included in all log lines
|
||||
let request_span = info_span!(
|
||||
"llm",
|
||||
component = "llm",
|
||||
request_id = %request_id,
|
||||
http.method = %request.method(),
|
||||
http.path = %request_path,
|
||||
llm.model = tracing::field::Empty,
|
||||
llm.tools = tracing::field::Empty,
|
||||
llm.user_message_preview = tracing::field::Empty,
|
||||
llm.temperature = tracing::field::Empty,
|
||||
);
|
||||
|
||||
// Execute the rest of the handler inside the span
|
||||
llm_chat_inner(
|
||||
request,
|
||||
router_service,
|
||||
full_qualified_llm_provider_url,
|
||||
model_aliases,
|
||||
llm_providers,
|
||||
state_storage,
|
||||
request_id,
|
||||
request_path,
|
||||
request_headers,
|
||||
)
|
||||
.instrument(request_span)
|
||||
.await
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn llm_chat_inner(
|
||||
request: Request<hyper::body::Incoming>,
|
||||
router_service: Arc<RouterService>,
|
||||
full_qualified_llm_provider_url: String,
|
||||
model_aliases: Arc<Option<HashMap<String, ModelAlias>>>,
|
||||
llm_providers: Arc<RwLock<LlmProviders>>,
|
||||
state_storage: Option<Arc<dyn StateStorage>>,
|
||||
request_id: String,
|
||||
request_path: String,
|
||||
mut request_headers: hyper::HeaderMap,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||
// Set service name for LLM operations
|
||||
set_service_name(operation_component::LLM);
|
||||
|
||||
// Extract or generate traceparent - this establishes the trace context for all spans
|
||||
let traceparent: String = match request_headers
|
||||
.get(TRACE_PARENT_HEADER)
|
||||
|
|
@ -111,18 +83,20 @@ async fn llm_chat_inner(
|
|||
let trace_id = Uuid::new_v4().to_string().replace("-", "");
|
||||
let generated_tp = format!("00-{}-0000000000000000-01", trace_id);
|
||||
warn!(
|
||||
generated_traceparent = %generated_tp,
|
||||
"TRACE_PARENT header missing, generated new traceparent"
|
||||
"[PLANO_REQ_ID:{}] | TRACE_PARENT header missing, generated new traceparent: {}",
|
||||
request_id, generated_tp
|
||||
);
|
||||
generated_tp
|
||||
}
|
||||
};
|
||||
|
||||
let mut request_headers = request_headers;
|
||||
let chat_request_bytes = request.collect().await?.to_bytes();
|
||||
|
||||
debug!(
|
||||
body = %String::from_utf8_lossy(&chat_request_bytes),
|
||||
"request body received"
|
||||
"[PLANO_REQ_ID:{}] | REQUEST_BODY (UTF8): {}",
|
||||
request_id,
|
||||
String::from_utf8_lossy(&chat_request_bytes)
|
||||
);
|
||||
|
||||
let mut client_request = match ProviderRequestType::try_from((
|
||||
|
|
@ -132,10 +106,13 @@ async fn llm_chat_inner(
|
|||
Ok(request) => request,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
error = %err,
|
||||
"failed to parse request as ProviderRequestType"
|
||||
"[PLANO_REQ_ID:{}] | FAILURE | Failed to parse request as ProviderRequestType: {}",
|
||||
request_id, err
|
||||
);
|
||||
let err_msg = format!(
|
||||
"[PLANO_REQ_ID:{}] | FAILURE | Failed to parse request: {}",
|
||||
request_id, err
|
||||
);
|
||||
let err_msg = format!("Failed to parse request: {}", err);
|
||||
let mut bad_request = Response::new(full(err_msg));
|
||||
*bad_request.status_mut() = StatusCode::BAD_REQUEST;
|
||||
return Ok(bad_request);
|
||||
|
|
@ -155,21 +132,16 @@ async fn llm_chat_inner(
|
|||
let model_from_request = client_request.model().to_string();
|
||||
let temperature = client_request.get_temperature();
|
||||
let is_streaming_request = client_request.is_streaming();
|
||||
let alias_resolved_model = resolve_model_alias(&model_from_request, &model_aliases);
|
||||
let resolved_model = resolve_model_alias(&model_from_request, &model_aliases);
|
||||
|
||||
// Validate that the requested model exists in configuration
|
||||
// This matches the validation in llm_gateway routing.rs
|
||||
if llm_providers
|
||||
.read()
|
||||
.await
|
||||
.get(&alias_resolved_model)
|
||||
.is_none()
|
||||
{
|
||||
if llm_providers.read().await.get(&resolved_model).is_none() {
|
||||
let err_msg = format!(
|
||||
"Model '{}' not found in configured providers",
|
||||
alias_resolved_model
|
||||
resolved_model
|
||||
);
|
||||
warn!(model = %alias_resolved_model, "model not found in configured providers");
|
||||
warn!("[PLANO_REQ_ID:{}] | FAILURE | {}", request_id, err_msg);
|
||||
let mut bad_request = Response::new(full(err_msg));
|
||||
*bad_request.status_mut() = StatusCode::BAD_REQUEST;
|
||||
return Ok(bad_request);
|
||||
|
|
@ -177,10 +149,10 @@ async fn llm_chat_inner(
|
|||
|
||||
// Handle provider/model slug format (e.g., "openai/gpt-4")
|
||||
// Extract just the model name for upstream (providers don't understand the slug)
|
||||
let model_name_only = if let Some((_, model)) = alias_resolved_model.split_once('/') {
|
||||
let model_name_only = if let Some((_, model)) = resolved_model.split_once('/') {
|
||||
model.to_string()
|
||||
} else {
|
||||
alias_resolved_model.clone()
|
||||
resolved_model.clone()
|
||||
};
|
||||
|
||||
// Extract tool names and user message preview for span attributes
|
||||
|
|
@ -188,30 +160,18 @@ async fn llm_chat_inner(
|
|||
let user_message_preview = client_request
|
||||
.get_recent_user_message()
|
||||
.map(|msg| truncate_message(&msg, 50));
|
||||
let span = tracing::Span::current();
|
||||
if let Some(temp) = temperature {
|
||||
span.record(tracing_llm::TEMPERATURE, tracing::field::display(temp));
|
||||
}
|
||||
if let Some(tools) = &tool_names {
|
||||
let formatted_tools = tools
|
||||
.iter()
|
||||
.map(|name| format!("{}(...)", name))
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n");
|
||||
span.record(tracing_llm::TOOLS, formatted_tools.as_str());
|
||||
}
|
||||
if let Some(preview) = &user_message_preview {
|
||||
span.record(tracing_llm::USER_MESSAGE_PREVIEW, preview.as_str());
|
||||
}
|
||||
|
||||
// Extract messages for signal analysis (clone before moving client_request)
|
||||
let messages_for_signals = Some(client_request.get_messages());
|
||||
let messages_for_signals = client_request.get_messages();
|
||||
|
||||
// Set the model to just the model name (without provider prefix)
|
||||
// This ensures upstream receives "gpt-4" not "openai/gpt-4"
|
||||
client_request.set_model(model_name_only.clone());
|
||||
if client_request.remove_metadata_key("plano_preference_config") {
|
||||
debug!("removed plano_preference_config from metadata");
|
||||
if client_request.remove_metadata_key("archgw_preference_config") {
|
||||
debug!(
|
||||
"[PLANO_REQ_ID:{}] Removed archgw_preference_config from metadata",
|
||||
request_id
|
||||
);
|
||||
}
|
||||
|
||||
// === v1/responses state management: Determine upstream API and combine input if needed ===
|
||||
|
|
@ -230,9 +190,9 @@ async fn llm_chat_inner(
|
|||
// Get the upstream path and check if it's ResponsesAPI
|
||||
let upstream_path = get_upstream_path(
|
||||
&llm_providers,
|
||||
&alias_resolved_model,
|
||||
&resolved_model,
|
||||
&request_path,
|
||||
&alias_resolved_model,
|
||||
&resolved_model,
|
||||
is_streaming_request,
|
||||
)
|
||||
.await;
|
||||
|
|
@ -259,17 +219,14 @@ async fn llm_chat_inner(
|
|||
// Update both the request and original_input_items
|
||||
responses_req.input = InputParam::Items(combined_input.clone());
|
||||
original_input_items = combined_input;
|
||||
info!(
|
||||
items = original_input_items.len(),
|
||||
"updated request with conversation history"
|
||||
);
|
||||
info!("[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Updated request with conversation history ({} items)", request_id, original_input_items.len());
|
||||
}
|
||||
Err(StateStorageError::NotFound(_)) => {
|
||||
// Return 409 Conflict when previous_response_id not found
|
||||
warn!(previous_response_id = %prev_resp_id, "previous response_id not found");
|
||||
warn!("[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Previous response_id not found: {}", request_id, prev_resp_id);
|
||||
let err_msg = format!(
|
||||
"Conversation state not found for previous_response_id: {}",
|
||||
prev_resp_id
|
||||
"[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Conversation state not found for previous_response_id: {}",
|
||||
request_id, prev_resp_id
|
||||
);
|
||||
let mut conflict_response = Response::new(full(err_msg));
|
||||
*conflict_response.status_mut() = StatusCode::CONFLICT;
|
||||
|
|
@ -278,9 +235,8 @@ async fn llm_chat_inner(
|
|||
Err(e) => {
|
||||
// Log warning but continue on other storage errors
|
||||
warn!(
|
||||
previous_response_id = %prev_resp_id,
|
||||
error = %e,
|
||||
"failed to retrieve conversation state"
|
||||
"[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Failed to retrieve conversation state for {}: {}",
|
||||
request_id, prev_resp_id, e
|
||||
);
|
||||
// Restore original_input_items since we passed ownership
|
||||
original_input_items = extract_input_items(&responses_req.input);
|
||||
|
|
@ -288,7 +244,10 @@ async fn llm_chat_inner(
|
|||
}
|
||||
}
|
||||
} else {
|
||||
debug!("upstream supports ResponsesAPI natively");
|
||||
debug!(
|
||||
"[PLANO_REQ_ID:{}] | BRIGHT_STAFF | Upstream supports ResponsesAPI natively.",
|
||||
request_id
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -297,29 +256,15 @@ async fn llm_chat_inner(
|
|||
let client_request_bytes_for_upstream = ProviderRequestType::to_bytes(&client_request).unwrap();
|
||||
|
||||
// Determine routing using the dedicated router_chat module
|
||||
// This gets its own span for latency and error tracking
|
||||
let routing_span = info_span!(
|
||||
"routing",
|
||||
component = "routing",
|
||||
http.method = "POST",
|
||||
http.target = %request_path,
|
||||
model.requested = %model_from_request,
|
||||
model.alias_resolved = %alias_resolved_model,
|
||||
route.selected_model = tracing::field::Empty,
|
||||
routing.determination_ms = tracing::field::Empty,
|
||||
);
|
||||
let routing_result = match async {
|
||||
set_service_name(operation_component::ROUTING);
|
||||
router_chat_get_upstream_model(
|
||||
router_service,
|
||||
client_request, // Pass the original request - router_chat will convert it
|
||||
&traceparent,
|
||||
&request_path,
|
||||
&request_id,
|
||||
)
|
||||
.await
|
||||
}
|
||||
.instrument(routing_span)
|
||||
let routing_result = match router_chat_get_upstream_model(
|
||||
router_service,
|
||||
client_request, // Pass the original request - router_chat will convert it
|
||||
trace_collector.clone(),
|
||||
&traceparent,
|
||||
&request_path,
|
||||
&request_id,
|
||||
&custom_attrs,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(result) => result,
|
||||
|
|
@ -333,37 +278,22 @@ async fn llm_chat_inner(
|
|||
// Determine final model to use
|
||||
// Router returns "none" as a sentinel value when it doesn't select a specific model
|
||||
let router_selected_model = routing_result.model_name;
|
||||
let resolved_model = if router_selected_model != "none" {
|
||||
let model_name = if router_selected_model != "none" {
|
||||
// Router selected a specific model via routing preferences
|
||||
router_selected_model
|
||||
} else {
|
||||
// Router returned "none" sentinel, use validated resolved_model from request
|
||||
alias_resolved_model.clone()
|
||||
resolved_model.clone()
|
||||
};
|
||||
tracing::Span::current().record(tracing_llm::MODEL_NAME, resolved_model.as_str());
|
||||
|
||||
let span_name = if model_from_request == resolved_model {
|
||||
format!("POST {} {}", request_path, resolved_model)
|
||||
} else {
|
||||
format!(
|
||||
"POST {} {} -> {}",
|
||||
request_path, model_from_request, resolved_model
|
||||
)
|
||||
};
|
||||
get_active_span(|span| {
|
||||
span.update_name(span_name.clone());
|
||||
});
|
||||
|
||||
debug!(
|
||||
url = %full_qualified_llm_provider_url,
|
||||
provider_hint = %resolved_model,
|
||||
upstream_model = %model_name_only,
|
||||
"Routing to upstream"
|
||||
"[PLANO_REQ_ID:{}] | ARCH_ROUTER URL | {}, Provider Hint: {}, Model for upstream: {}",
|
||||
request_id, full_qualified_llm_provider_url, model_name, model_name_only
|
||||
);
|
||||
|
||||
request_headers.insert(
|
||||
ARCH_PROVIDER_HINT_HEADER,
|
||||
header::HeaderValue::from_str(&resolved_model).unwrap(),
|
||||
header::HeaderValue::from_str(&model_name).unwrap(),
|
||||
);
|
||||
|
||||
request_headers.insert(
|
||||
|
|
@ -373,18 +303,12 @@ async fn llm_chat_inner(
|
|||
// remove content-length header if it exists
|
||||
request_headers.remove(header::CONTENT_LENGTH);
|
||||
|
||||
// Inject current LLM span's trace context so upstream spans are children of plano(llm)
|
||||
global::get_text_map_propagator(|propagator| {
|
||||
let cx = tracing_opentelemetry::OpenTelemetrySpanExt::context(&tracing::Span::current());
|
||||
propagator.inject_context(&cx, &mut HeaderInjector(&mut request_headers));
|
||||
});
|
||||
|
||||
// Capture start time right before sending request to upstream
|
||||
let request_start_time = std::time::Instant::now();
|
||||
let _request_start_system_time = std::time::SystemTime::now();
|
||||
let request_start_system_time = std::time::SystemTime::now();
|
||||
|
||||
let llm_response = match reqwest::Client::new()
|
||||
.post(&full_qualified_llm_provider_url)
|
||||
.post(full_qualified_llm_provider_url)
|
||||
.headers(request_headers)
|
||||
.body(client_request_bytes_for_upstream)
|
||||
.send()
|
||||
|
|
@ -411,12 +335,30 @@ async fn llm_chat_inner(
|
|||
// Build LLM span with actual status code using constants
|
||||
let byte_stream = llm_response.bytes_stream();
|
||||
|
||||
// Build the LLM span (will be finalized after streaming completes)
|
||||
let llm_span = build_llm_span(
|
||||
&traceparent,
|
||||
&request_path,
|
||||
&resolved_model,
|
||||
&model_name,
|
||||
upstream_status.as_u16(),
|
||||
is_streaming_request,
|
||||
request_start_system_time,
|
||||
tool_names,
|
||||
user_message_preview,
|
||||
temperature,
|
||||
&llm_providers,
|
||||
&custom_attrs,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Create base processor for metrics and tracing
|
||||
let base_processor = ObservableStreamProcessor::new(
|
||||
trace_collector,
|
||||
operation_component::LLM,
|
||||
span_name,
|
||||
llm_span,
|
||||
request_start_time,
|
||||
messages_for_signals,
|
||||
Some(messages_for_signals),
|
||||
);
|
||||
|
||||
// === v1/responses state management: Wrap with ResponsesStateProcessor ===
|
||||
|
|
@ -437,8 +379,8 @@ async fn llm_chat_inner(
|
|||
base_processor,
|
||||
state_store,
|
||||
original_input_items,
|
||||
alias_resolved_model.clone(),
|
||||
resolved_model.clone(),
|
||||
model_name.clone(),
|
||||
is_streaming_request,
|
||||
false, // Not OpenAI upstream since should_manage_state is true
|
||||
content_encoding,
|
||||
|
|
@ -479,6 +421,93 @@ fn resolve_model_alias(
|
|||
model_from_request.to_string()
|
||||
}
|
||||
|
||||
/// Builds the LLM span with all required and optional attributes.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn build_llm_span(
|
||||
traceparent: &str,
|
||||
request_path: &str,
|
||||
resolved_model: &str,
|
||||
model_name: &str,
|
||||
status_code: u16,
|
||||
is_streaming: bool,
|
||||
start_time: std::time::SystemTime,
|
||||
tool_names: Option<Vec<String>>,
|
||||
user_message_preview: Option<String>,
|
||||
temperature: Option<f32>,
|
||||
llm_providers: &Arc<RwLock<Vec<LlmProvider>>>,
|
||||
custom_attrs: &HashMap<String, String>,
|
||||
) -> common::traces::Span {
|
||||
use crate::tracing::{http, llm, OperationNameBuilder};
|
||||
use common::traces::{parse_traceparent, SpanBuilder, SpanKind};
|
||||
|
||||
// Calculate the upstream path based on provider configuration
|
||||
let upstream_path = get_upstream_path(
|
||||
llm_providers,
|
||||
model_name,
|
||||
request_path,
|
||||
resolved_model,
|
||||
is_streaming,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Build operation name showing path transformation if different
|
||||
let operation_name = if request_path != upstream_path {
|
||||
OperationNameBuilder::new()
|
||||
.with_method("POST")
|
||||
.with_path(format!("{} >> {}", request_path, upstream_path))
|
||||
.with_target(resolved_model)
|
||||
.build()
|
||||
} else {
|
||||
OperationNameBuilder::new()
|
||||
.with_method("POST")
|
||||
.with_path(request_path)
|
||||
.with_target(resolved_model)
|
||||
.build()
|
||||
};
|
||||
|
||||
let (trace_id, parent_span_id) = parse_traceparent(traceparent);
|
||||
|
||||
let mut span_builder = SpanBuilder::new(&operation_name)
|
||||
.with_trace_id(&trace_id)
|
||||
.with_kind(SpanKind::Client)
|
||||
.with_start_time(start_time)
|
||||
.with_attribute(http::METHOD, "POST")
|
||||
.with_attribute(http::STATUS_CODE, status_code.to_string())
|
||||
.with_attribute(http::TARGET, request_path.to_string())
|
||||
.with_attribute(http::UPSTREAM_TARGET, upstream_path)
|
||||
.with_attribute(llm::MODEL_NAME, resolved_model.to_string())
|
||||
.with_attribute(llm::IS_STREAMING, is_streaming.to_string());
|
||||
|
||||
// Only set parent span ID if it exists (not a root span)
|
||||
if let Some(parent) = parent_span_id {
|
||||
span_builder = span_builder.with_parent_span_id(&parent);
|
||||
}
|
||||
|
||||
// Add optional attributes
|
||||
if let Some(temp) = temperature {
|
||||
span_builder = span_builder.with_attribute(llm::TEMPERATURE, temp.to_string());
|
||||
}
|
||||
|
||||
if let Some(tools) = tool_names {
|
||||
let formatted_tools = tools
|
||||
.iter()
|
||||
.map(|name| format!("{}(...)", name))
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n");
|
||||
span_builder = span_builder.with_attribute(llm::TOOLS, formatted_tools);
|
||||
}
|
||||
|
||||
if let Some(preview) = user_message_preview {
|
||||
span_builder = span_builder.with_attribute(llm::USER_MESSAGE_PREVIEW, preview);
|
||||
}
|
||||
|
||||
for (key, value) in custom_attrs {
|
||||
span_builder = span_builder.with_attribute(key, value);
|
||||
}
|
||||
|
||||
span_builder.build()
|
||||
}
|
||||
|
||||
/// Calculates the upstream path for the provider based on the model name.
|
||||
/// Looks up provider configuration, gets the ProviderId and base_url_path_prefix,
|
||||
/// then uses target_endpoint_for_provider to calculate the correct upstream path.
|
||||
|
|
|
|||
|
|
@ -1,12 +1,14 @@
|
|||
use common::configuration::ModelUsagePreference;
|
||||
use common::traces::{parse_traceparent, SpanBuilder, SpanKind, TraceCollector};
|
||||
use hermesllm::clients::endpoints::SupportedUpstreamAPIs;
|
||||
use hermesllm::{ProviderRequest, ProviderRequestType};
|
||||
use hyper::StatusCode;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
use crate::router::llm_router::RouterService;
|
||||
use crate::tracing::routing;
|
||||
use crate::tracing::{http, operation_component, routing, OperationNameBuilder};
|
||||
|
||||
pub struct RoutingResult {
|
||||
pub model_name: String,
|
||||
|
|
@ -34,9 +36,11 @@ impl RoutingError {
|
|||
pub async fn router_chat_get_upstream_model(
|
||||
router_service: Arc<RouterService>,
|
||||
client_request: ProviderRequestType,
|
||||
trace_collector: Arc<TraceCollector>,
|
||||
traceparent: &str,
|
||||
request_path: &str,
|
||||
request_id: &str,
|
||||
custom_attrs: &HashMap<String, String>,
|
||||
) -> Result<RoutingResult, RoutingError> {
|
||||
// Clone metadata for routing before converting (which consumes client_request)
|
||||
let routing_metadata = client_request.metadata().clone();
|
||||
|
|
@ -53,14 +57,14 @@ pub async fn router_chat_get_upstream_model(
|
|||
| ProviderRequestType::BedrockConverseStream(_)
|
||||
| ProviderRequestType::ResponsesAPIRequest(_),
|
||||
) => {
|
||||
warn!("unexpected: got non-ChatCompletions request after converting to OpenAI format");
|
||||
warn!("Unexpected: got non-ChatCompletions request after converting to OpenAI format");
|
||||
return Err(RoutingError::internal_error(
|
||||
"Request conversion failed".to_string(),
|
||||
));
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to convert request to ChatCompletionsRequest: {}",
|
||||
"Failed to convert request to ChatCompletionsRequest: {}",
|
||||
err
|
||||
);
|
||||
return Err(RoutingError::internal_error(format!(
|
||||
|
|
@ -71,8 +75,9 @@ pub async fn router_chat_get_upstream_model(
|
|||
};
|
||||
|
||||
debug!(
|
||||
request = %serde_json::to_string(&chat_request).unwrap(),
|
||||
"router request"
|
||||
"[PLANO_REQ_ID: {:?}]: ROUTER_REQ: {}",
|
||||
request_id,
|
||||
&serde_json::to_string(&chat_request).unwrap()
|
||||
);
|
||||
|
||||
// Extract usage preferences from metadata
|
||||
|
|
@ -108,14 +113,16 @@ pub async fn router_chat_get_upstream_model(
|
|||
};
|
||||
|
||||
info!(
|
||||
has_usage_preferences = usage_preferences.is_some(),
|
||||
path = %request_path,
|
||||
latest_message = %latest_message_for_log,
|
||||
"processing router request"
|
||||
"[PLANO_REQ_ID: {:?}] | ROUTER_REQ | Usage preferences from request: {}, request_path: {}, latest message: {}",
|
||||
request_id,
|
||||
usage_preferences.is_some(),
|
||||
request_path,
|
||||
latest_message_for_log
|
||||
);
|
||||
|
||||
// Capture start time for routing span
|
||||
let routing_start_time = std::time::Instant::now();
|
||||
let routing_start_system_time = std::time::SystemTime::now();
|
||||
|
||||
// Attempt to determine route using the router service
|
||||
let routing_result = router_service
|
||||
|
|
@ -127,21 +134,47 @@ pub async fn router_chat_get_upstream_model(
|
|||
)
|
||||
.await;
|
||||
|
||||
let determination_ms = routing_start_time.elapsed().as_millis() as i64;
|
||||
let current_span = tracing::Span::current();
|
||||
current_span.record(routing::ROUTE_DETERMINATION_MS, determination_ms);
|
||||
|
||||
match routing_result {
|
||||
Ok(route) => match route {
|
||||
Some((_, model_name)) => {
|
||||
current_span.record("route.selected_model", model_name.as_str());
|
||||
// Record successful routing span
|
||||
let mut attrs: HashMap<String, String> = HashMap::new();
|
||||
attrs.insert("route.selected_model".to_string(), model_name.clone());
|
||||
for (key, value) in custom_attrs {
|
||||
attrs.entry(key.clone()).or_insert_with(|| value.clone());
|
||||
}
|
||||
record_routing_span(
|
||||
trace_collector,
|
||||
traceparent,
|
||||
routing_start_time,
|
||||
routing_start_system_time,
|
||||
attrs,
|
||||
)
|
||||
.await;
|
||||
|
||||
Ok(RoutingResult { model_name })
|
||||
}
|
||||
None => {
|
||||
// No route determined, return sentinel value "none"
|
||||
// This signals to llm.rs to use the original validated request model
|
||||
current_span.record("route.selected_model", "none");
|
||||
info!("no route determined, using default model");
|
||||
info!(
|
||||
"[PLANO_REQ_ID: {}] | ROUTER_REQ | No route determined, returning sentinel 'none'",
|
||||
request_id
|
||||
);
|
||||
|
||||
let mut attrs = HashMap::new();
|
||||
attrs.insert("route.selected_model".to_string(), "none".to_string());
|
||||
for (key, value) in custom_attrs {
|
||||
attrs.entry(key.clone()).or_insert_with(|| value.clone());
|
||||
}
|
||||
record_routing_span(
|
||||
trace_collector,
|
||||
traceparent,
|
||||
routing_start_time,
|
||||
routing_start_system_time,
|
||||
attrs,
|
||||
)
|
||||
.await;
|
||||
|
||||
Ok(RoutingResult {
|
||||
model_name: "none".to_string(),
|
||||
|
|
@ -149,7 +182,22 @@ pub async fn router_chat_get_upstream_model(
|
|||
}
|
||||
},
|
||||
Err(err) => {
|
||||
current_span.record("route.selected_model", "unknown");
|
||||
// Record failed routing span
|
||||
let mut attrs = HashMap::new();
|
||||
attrs.insert("route.selected_model".to_string(), "unknown".to_string());
|
||||
attrs.insert("error.message".to_string(), err.to_string());
|
||||
for (key, value) in custom_attrs {
|
||||
attrs.entry(key.clone()).or_insert_with(|| value.clone());
|
||||
}
|
||||
record_routing_span(
|
||||
trace_collector,
|
||||
traceparent,
|
||||
routing_start_time,
|
||||
routing_start_system_time,
|
||||
attrs,
|
||||
)
|
||||
.await;
|
||||
|
||||
Err(RoutingError::internal_error(format!(
|
||||
"Failed to determine route: {}",
|
||||
err
|
||||
|
|
@ -157,3 +205,53 @@ pub async fn router_chat_get_upstream_model(
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper function to record a routing span with the given attributes.
|
||||
/// Reduces code duplication across different routing outcomes.
|
||||
async fn record_routing_span(
|
||||
trace_collector: Arc<TraceCollector>,
|
||||
traceparent: &str,
|
||||
start_time: std::time::Instant,
|
||||
start_system_time: std::time::SystemTime,
|
||||
attrs: HashMap<String, String>,
|
||||
) {
|
||||
// The routing always uses OpenAI Chat Completions format internally,
|
||||
// so we log that as the actual API being used for routing
|
||||
let routing_api_path = "/v1/chat/completions";
|
||||
|
||||
let routing_operation_name = OperationNameBuilder::new()
|
||||
.with_method("POST")
|
||||
.with_path(routing_api_path)
|
||||
.with_target("Arch-Router-1.5B")
|
||||
.build();
|
||||
|
||||
let (trace_id, parent_span_id) = parse_traceparent(traceparent);
|
||||
|
||||
// Build the routing span directly using constants
|
||||
let mut span_builder = SpanBuilder::new(&routing_operation_name)
|
||||
.with_trace_id(&trace_id)
|
||||
.with_kind(SpanKind::Client)
|
||||
.with_start_time(start_system_time)
|
||||
.with_end_time(std::time::SystemTime::now())
|
||||
.with_attribute(http::METHOD, "POST")
|
||||
.with_attribute(http::TARGET, routing_api_path.to_string())
|
||||
.with_attribute(
|
||||
routing::ROUTE_DETERMINATION_MS,
|
||||
start_time.elapsed().as_millis().to_string(),
|
||||
);
|
||||
|
||||
// Only set parent span ID if it exists (not a root span)
|
||||
if let Some(parent) = parent_span_id {
|
||||
span_builder = span_builder.with_parent_span_id(&parent);
|
||||
}
|
||||
|
||||
// Add all custom attributes
|
||||
for (key, value) in attrs {
|
||||
span_builder = span_builder.with_attribute(key, value);
|
||||
}
|
||||
|
||||
let span = span_builder.build();
|
||||
|
||||
// Record the span directly to the collector
|
||||
trace_collector.record_span(operation_component::ROUTING, span);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ use common::consts::{
|
|||
CHAT_COMPLETIONS_PATH, MESSAGES_PATH, OPENAI_RESPONSES_API_PATH, PLANO_ORCHESTRATOR_MODEL_NAME,
|
||||
};
|
||||
use common::llm_providers::LlmProviders;
|
||||
use common::traces::TraceCollector;
|
||||
use http_body_util::{combinators::BoxBody, BodyExt, Empty};
|
||||
use hyper::body::Incoming;
|
||||
use hyper::server::conn::http1;
|
||||
|
|
@ -114,11 +115,22 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|||
));
|
||||
|
||||
let model_aliases = Arc::new(plano_config.model_aliases.clone());
|
||||
let tracing_config = Arc::new(plano_config.tracing.clone());
|
||||
|
||||
// Initialize trace collector and start background flusher
|
||||
// Tracing is enabled if the tracing config is present in plano_config.yaml
|
||||
// Pass Some(true/false) to override, or None to use env var OTEL_TRACING_ENABLED
|
||||
// OpenTelemetry automatic instrumentation is configured in utils/tracing.rs
|
||||
let tracing_enabled = if tracing_config.is_some() {
|
||||
info!("Tracing configuration found in plano_config.yaml");
|
||||
Some(true)
|
||||
} else {
|
||||
info!(
|
||||
"No tracing configuration in plano_config.yaml, will check OTEL_TRACING_ENABLED env var"
|
||||
);
|
||||
None
|
||||
};
|
||||
let trace_collector = Arc::new(TraceCollector::new(tracing_enabled));
|
||||
let _flusher_handle = trace_collector.clone().start_background_flusher();
|
||||
|
||||
// Initialize conversation state storage for v1/responses
|
||||
// Configurable via plano_config.yaml state_storage section
|
||||
|
|
@ -128,10 +140,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|||
if let Some(storage_config) = &plano_config.state_storage {
|
||||
let storage: Arc<dyn StateStorage> = match storage_config.storage_type {
|
||||
common::configuration::StateStorageType::Memory => {
|
||||
info!(
|
||||
storage_type = "memory",
|
||||
"initialized conversation state storage"
|
||||
);
|
||||
info!("Initialized conversation state storage: Memory");
|
||||
Arc::new(MemoryConversationalStorage::new())
|
||||
}
|
||||
common::configuration::StateStorageType::Postgres => {
|
||||
|
|
@ -140,11 +149,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|||
.as_ref()
|
||||
.expect("connection_string is required for postgres state_storage");
|
||||
|
||||
debug!(connection_string = %connection_string, "postgres connection");
|
||||
info!(
|
||||
storage_type = "postgres",
|
||||
"initializing conversation state storage"
|
||||
);
|
||||
debug!("Postgres connection string (full): {}", connection_string);
|
||||
info!("Initializing conversation state storage: Postgres");
|
||||
Arc::new(
|
||||
PostgreSQLConversationStorage::new(connection_string.clone())
|
||||
.await
|
||||
|
|
@ -154,7 +160,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|||
};
|
||||
Some(storage)
|
||||
} else {
|
||||
info!("no state_storage configured, conversation state management disabled");
|
||||
info!("No state_storage configured - conversation state management disabled");
|
||||
None
|
||||
};
|
||||
|
||||
|
|
@ -173,6 +179,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|||
let llm_providers = llm_providers.clone();
|
||||
let agents_list = combined_agents_filters_list.clone();
|
||||
let listeners = listeners.clone();
|
||||
let trace_collector = trace_collector.clone();
|
||||
let tracing_config = tracing_config.clone();
|
||||
let state_storage = state_storage.clone();
|
||||
let service = service_fn(move |req| {
|
||||
let router_service = Arc::clone(&router_service);
|
||||
|
|
@ -183,6 +191,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|||
let model_aliases = Arc::clone(&model_aliases);
|
||||
let agents_list = agents_list.clone();
|
||||
let listeners = listeners.clone();
|
||||
let trace_collector = trace_collector.clone();
|
||||
let tracing_config = tracing_config.clone();
|
||||
let state_storage = state_storage.clone();
|
||||
|
||||
async move {
|
||||
|
|
@ -202,6 +212,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|||
fully_qualified_url,
|
||||
agents_list,
|
||||
listeners,
|
||||
trace_collector,
|
||||
tracing_config,
|
||||
)
|
||||
.with_context(parent_cx)
|
||||
.await;
|
||||
|
|
@ -219,6 +231,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|||
fully_qualified_url,
|
||||
model_aliases,
|
||||
llm_providers,
|
||||
trace_collector,
|
||||
tracing_config,
|
||||
state_storage,
|
||||
)
|
||||
.with_context(parent_cx)
|
||||
|
|
@ -259,7 +273,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|||
Ok(response)
|
||||
}
|
||||
_ => {
|
||||
debug!(method = %req.method(), path = %req.uri().path(), "no route found");
|
||||
debug!("No route for {} {}", req.method(), req.uri().path());
|
||||
let mut not_found = Response::new(empty());
|
||||
*not_found.status_mut() = StatusCode::NOT_FOUND;
|
||||
Ok(not_found)
|
||||
|
|
@ -269,13 +283,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|||
});
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
debug!(peer = ?peer_addr, "accepted connection");
|
||||
debug!("Accepted connection from {:?}", peer_addr);
|
||||
if let Err(err) = http1::Builder::new()
|
||||
// .serve_connection(io, service_fn(chat_completion))
|
||||
.serve_connection(io, service)
|
||||
.await
|
||||
{
|
||||
warn!(error = ?err, "error serving connection");
|
||||
warn!("Error serving connection: {:?}", err);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
|||
137
crates/brightstaff/src/tracing/custom_attributes.rs
Normal file
137
crates/brightstaff/src/tracing/custom_attributes.rs
Normal file
|
|
@ -0,0 +1,137 @@
|
|||
use std::collections::HashMap;
|
||||
|
||||
use common::configuration::{CustomTraceAttribute, CustomTraceAttributeType};
|
||||
use common::traces::SpanBuilder;
|
||||
use hyper::header::{HeaderMap, HeaderName};
|
||||
|
||||
pub fn extract_custom_trace_attributes(
|
||||
headers: &HeaderMap,
|
||||
custom_attributes: Option<&[CustomTraceAttribute]>,
|
||||
) -> HashMap<String, String> {
|
||||
let mut attributes = HashMap::new();
|
||||
let Some(custom_attributes) = custom_attributes else {
|
||||
return attributes;
|
||||
};
|
||||
|
||||
for attribute in custom_attributes {
|
||||
// Normalize/validate the configured header name; skip invalid names.
|
||||
let header_name = match HeaderName::from_bytes(attribute.header.as_bytes()) {
|
||||
Ok(name) => name,
|
||||
Err(_) => continue,
|
||||
};
|
||||
|
||||
// Extract header value as UTF-8 text; skip missing or invalid values.
|
||||
let raw_value = match headers
|
||||
.get(header_name)
|
||||
.and_then(|value| value.to_str().ok())
|
||||
{
|
||||
Some(value) => value.trim(),
|
||||
None => continue,
|
||||
};
|
||||
|
||||
// Parse the header value according to the configured type.
|
||||
let parsed_value = match attribute.value_type {
|
||||
CustomTraceAttributeType::Str => Some(raw_value.to_string()),
|
||||
CustomTraceAttributeType::Bool => raw_value.parse::<bool>().ok().map(|v| v.to_string()),
|
||||
CustomTraceAttributeType::Float => raw_value.parse::<f64>().ok().map(|v| v.to_string()),
|
||||
CustomTraceAttributeType::Int => raw_value.parse::<i64>().ok().map(|v| v.to_string()),
|
||||
};
|
||||
|
||||
// Only include attributes that successfully parsed.
|
||||
if let Some(value) = parsed_value {
|
||||
attributes.insert(attribute.key.clone(), value);
|
||||
}
|
||||
}
|
||||
|
||||
attributes
|
||||
}
|
||||
|
||||
pub fn collect_custom_trace_attributes(
|
||||
headers: &HeaderMap,
|
||||
custom_attributes: Option<&[CustomTraceAttribute]>,
|
||||
) -> HashMap<String, String> {
|
||||
extract_custom_trace_attributes(headers, custom_attributes)
|
||||
}
|
||||
|
||||
pub fn append_span_attributes(
|
||||
mut span_builder: SpanBuilder,
|
||||
attributes: &HashMap<String, String>,
|
||||
) -> SpanBuilder {
|
||||
for (key, value) in attributes {
|
||||
span_builder = span_builder.with_attribute(key, value);
|
||||
}
|
||||
span_builder
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::extract_custom_trace_attributes;
|
||||
use common::configuration::{CustomTraceAttribute, CustomTraceAttributeType};
|
||||
use hyper::header::{HeaderMap, HeaderValue};
|
||||
|
||||
#[test]
|
||||
fn extracts_and_parses_custom_headers() {
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("x-workspace-id", HeaderValue::from_static("ws_123"));
|
||||
headers.insert("x-tenant-id", HeaderValue::from_static("ten_456"));
|
||||
headers.insert("x-user-id", HeaderValue::from_static("usr_789"));
|
||||
headers.insert("x-admin-level", HeaderValue::from_static("3"));
|
||||
headers.insert("x-is-internal", HeaderValue::from_static("true"));
|
||||
headers.insert("x-budget", HeaderValue::from_static("42.5"));
|
||||
headers.insert("x-bad-int", HeaderValue::from_static("nope"));
|
||||
|
||||
let custom_attributes = vec![
|
||||
CustomTraceAttribute {
|
||||
key: "workspace.id".to_string(),
|
||||
value_type: CustomTraceAttributeType::Str,
|
||||
header: "x-workspace-id".to_string(),
|
||||
},
|
||||
CustomTraceAttribute {
|
||||
key: "tenant.id".to_string(),
|
||||
value_type: CustomTraceAttributeType::Str,
|
||||
header: "x-tenant-id".to_string(),
|
||||
},
|
||||
CustomTraceAttribute {
|
||||
key: "user.id".to_string(),
|
||||
value_type: CustomTraceAttributeType::Str,
|
||||
header: "x-user-id".to_string(),
|
||||
},
|
||||
CustomTraceAttribute {
|
||||
key: "admin.level".to_string(),
|
||||
value_type: CustomTraceAttributeType::Int,
|
||||
header: "x-admin-level".to_string(),
|
||||
},
|
||||
CustomTraceAttribute {
|
||||
key: "is.internal".to_string(),
|
||||
value_type: CustomTraceAttributeType::Bool,
|
||||
header: "x-is-internal".to_string(),
|
||||
},
|
||||
CustomTraceAttribute {
|
||||
key: "budget.value".to_string(),
|
||||
value_type: CustomTraceAttributeType::Float,
|
||||
header: "x-budget".to_string(),
|
||||
},
|
||||
CustomTraceAttribute {
|
||||
key: "bad.int".to_string(),
|
||||
value_type: CustomTraceAttributeType::Int,
|
||||
header: "x-bad-int".to_string(),
|
||||
},
|
||||
CustomTraceAttribute {
|
||||
key: "missing.header".to_string(),
|
||||
value_type: CustomTraceAttributeType::Str,
|
||||
header: "x-missing".to_string(),
|
||||
},
|
||||
];
|
||||
|
||||
let attrs = extract_custom_trace_attributes(&headers, Some(&custom_attributes));
|
||||
|
||||
assert_eq!(attrs.get("workspace.id"), Some(&"ws_123".to_string()));
|
||||
assert_eq!(attrs.get("tenant.id"), Some(&"ten_456".to_string()));
|
||||
assert_eq!(attrs.get("user.id"), Some(&"usr_789".to_string()));
|
||||
assert_eq!(attrs.get("admin.level"), Some(&"3".to_string()));
|
||||
assert_eq!(attrs.get("is.internal"), Some(&"true".to_string()));
|
||||
assert_eq!(attrs.get("budget.value"), Some(&"42.5".to_string()));
|
||||
assert!(!attrs.contains_key("bad.int"));
|
||||
assert!(!attrs.contains_key("missing.header"));
|
||||
}
|
||||
}
|
||||
|
|
@ -1,35 +1,9 @@
|
|||
mod constants;
|
||||
mod service_name_exporter;
|
||||
mod custom_attributes;
|
||||
|
||||
pub use constants::{
|
||||
error, http, llm, operation_component, routing, signals, OperationNameBuilder,
|
||||
};
|
||||
pub use service_name_exporter::{ServiceNameOverrideExporter, SERVICE_NAME_OVERRIDE_KEY};
|
||||
|
||||
use opentelemetry::trace::get_active_span;
|
||||
use opentelemetry::KeyValue;
|
||||
|
||||
/// Sets the service name override on the current active OpenTelemetry span.
|
||||
///
|
||||
/// This function adds the `service.name.override` attribute to the active
|
||||
/// OpenTelemetry span, which allows observability backends to filter and group
|
||||
/// spans by their logical service (e.g., `plano(llm)`, `plano(filter)`).
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `service_name` - The service name to use (e.g., `operation_component::LLM`)
|
||||
///
|
||||
/// # Example
|
||||
/// ```rust,ignore
|
||||
/// use brightstaff::tracing::{set_service_name, operation_component};
|
||||
///
|
||||
/// // Inside a traced function:
|
||||
/// set_service_name(operation_component::LLM);
|
||||
/// ```
|
||||
pub fn set_service_name(service_name: &str) {
|
||||
get_active_span(|span| {
|
||||
span.set_attribute(KeyValue::new(
|
||||
SERVICE_NAME_OVERRIDE_KEY,
|
||||
service_name.to_string(),
|
||||
));
|
||||
});
|
||||
}
|
||||
pub use custom_attributes::{
|
||||
append_span_attributes, collect_custom_trace_attributes, extract_custom_trace_attributes,
|
||||
};
|
||||
|
|
|
|||
|
|
@ -90,8 +90,24 @@ pub struct Overrides {
|
|||
pub struct Tracing {
|
||||
pub sampling_rate: Option<f64>,
|
||||
pub trace_arch_internal: Option<bool>,
|
||||
pub random_sampling: Option<u32>,
|
||||
pub opentracing_grpc_endpoint: Option<String>,
|
||||
pub custom_attributes: Option<Vec<CustomTraceAttribute>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct CustomTraceAttribute {
|
||||
pub key: String,
|
||||
#[serde(rename = "type")]
|
||||
pub value_type: CustomTraceAttributeType,
|
||||
pub header: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum CustomTraceAttributeType {
|
||||
Str,
|
||||
Bool,
|
||||
Float,
|
||||
Int,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Default)]
|
||||
|
|
|
|||
|
|
@ -55,3 +55,22 @@ listeners:
|
|||
|
||||
tracing:
|
||||
random_sampling: 100
|
||||
custom_attributes:
|
||||
- header: x-workspace-id
|
||||
key: workspace.id
|
||||
type: str
|
||||
- header: x-tenant-id
|
||||
key: tenant.id
|
||||
type: str
|
||||
- header: x-user-id
|
||||
key: user.id
|
||||
type: str
|
||||
- header: x-admin-level
|
||||
key: admin.level
|
||||
type: int
|
||||
- header: x-is-internal
|
||||
key: is.internal
|
||||
type: bool
|
||||
- header: x-budget
|
||||
key: budget.value
|
||||
type: float
|
||||
|
|
|
|||
|
|
@ -3,6 +3,12 @@
|
|||
### Travel Agent Chat Completion Request
|
||||
POST {{llm_endpoint}}/v1/chat/completions HTTP/1.1
|
||||
Content-Type: application/json
|
||||
X-Workspace-Id: ws_7e2c5d91b4224f59b0e6a4e0125c21b3
|
||||
X-Tenant-Id: ten_4102a8c7fa6542b084b395d2df184a9a
|
||||
X-User-Id: usr_19df7e6751b846f9ba026776e3c12abe
|
||||
X-Admin-Level: 3
|
||||
X-Is-Internal: true
|
||||
X-Budget: 42.5
|
||||
|
||||
{
|
||||
"model": "gpt-4o",
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue