mirror of
https://github.com/katanemo/plano.git
synced 2026-05-21 13:55:15 +02:00
fix: restore LLM span recordings and replace unwrap panics with proper errors
- Restore llm.model, llm.tools, llm.user_message_preview, llm.temperature span field recordings that were lost during refactor - Replace agents.as_ref().unwrap() and agent_map.get().unwrap() in orchestrator with proper error returns - Replace from_endpoint().unwrap() with ok_or_else returning 400 - Replace to_bytes().unwrap() with match returning 500 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
ebe0e18e3b
commit
6fcecb60c3
2 changed files with 71 additions and 15 deletions
|
|
@ -202,7 +202,9 @@ async fn handle_agent_chat_inner(
|
||||||
// Create agent map for pipeline processing and agent selection
|
// Create agent map for pipeline processing and agent selection
|
||||||
let agent_map = {
|
let agent_map = {
|
||||||
let agents = agents_list.read().await;
|
let agents = agents_list.read().await;
|
||||||
let agents = agents.as_ref().unwrap();
|
let agents = agents.as_ref().ok_or_else(|| {
|
||||||
|
AgentFilterChainError::RequestParsing(serde_json::Error::custom("No agents configured"))
|
||||||
|
})?;
|
||||||
agent_selector.create_agent_map(agents)
|
agent_selector.create_agent_map(agents)
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -269,7 +271,12 @@ async fn handle_agent_chat_inner(
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// Get agent details and invoke
|
// Get agent details and invoke
|
||||||
let agent = agent_map.get(&agent_name).unwrap();
|
let agent = agent_map.get(&agent_name).ok_or_else(|| {
|
||||||
|
AgentFilterChainError::RequestParsing(serde_json::Error::custom(format!(
|
||||||
|
"Selected agent '{}' not found in configuration",
|
||||||
|
agent_name
|
||||||
|
)))
|
||||||
|
})?;
|
||||||
|
|
||||||
debug!(agent = %agent_name, "invoking agent");
|
debug!(agent = %agent_name, "invoking agent");
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -21,13 +21,15 @@ use tracing::{debug, info, info_span, warn, Instrument};
|
||||||
mod router;
|
mod router;
|
||||||
|
|
||||||
use crate::handlers::request::extract_request_id;
|
use crate::handlers::request::extract_request_id;
|
||||||
use crate::handlers::utils::{create_streaming_response, ObservableStreamProcessor};
|
use crate::handlers::utils::{
|
||||||
|
create_streaming_response, truncate_message, ObservableStreamProcessor,
|
||||||
|
};
|
||||||
use crate::router::llm::RouterService;
|
use crate::router::llm::RouterService;
|
||||||
use crate::state::response_state_processor::ResponsesStateProcessor;
|
use crate::state::response_state_processor::ResponsesStateProcessor;
|
||||||
use crate::state::{
|
use crate::state::{
|
||||||
extract_input_items, retrieve_and_combine_input, StateStorage, StateStorageError,
|
extract_input_items, retrieve_and_combine_input, StateStorage, StateStorageError,
|
||||||
};
|
};
|
||||||
use crate::tracing::{operation_component, set_service_name};
|
use crate::tracing::{llm as tracing_llm, operation_component, set_service_name};
|
||||||
use router::router_chat_get_upstream_model;
|
use router::router_chat_get_upstream_model;
|
||||||
|
|
||||||
fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
|
fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
|
||||||
|
|
@ -55,6 +57,10 @@ pub async fn llm_chat(
|
||||||
request_id = %request_id,
|
request_id = %request_id,
|
||||||
http.method = %request.method(),
|
http.method = %request.method(),
|
||||||
http.path = %request_path,
|
http.path = %request_path,
|
||||||
|
llm.model = tracing::field::Empty,
|
||||||
|
llm.tools = tracing::field::Empty,
|
||||||
|
llm.user_message_preview = tracing::field::Empty,
|
||||||
|
llm.temperature = tracing::field::Empty,
|
||||||
);
|
);
|
||||||
|
|
||||||
// Execute the rest of the handler inside the span
|
// Execute the rest of the handler inside the span
|
||||||
|
|
@ -107,8 +113,28 @@ async fn llm_chat_inner(
|
||||||
is_streaming_request,
|
is_streaming_request,
|
||||||
is_responses_api_client,
|
is_responses_api_client,
|
||||||
messages_for_signals,
|
messages_for_signals,
|
||||||
|
temperature,
|
||||||
|
tool_names,
|
||||||
|
user_message_preview,
|
||||||
} = parsed;
|
} = parsed;
|
||||||
|
|
||||||
|
// Record LLM-specific span attributes
|
||||||
|
let span = tracing::Span::current();
|
||||||
|
if let Some(temp) = temperature {
|
||||||
|
span.record(tracing_llm::TEMPERATURE, tracing::field::display(temp));
|
||||||
|
}
|
||||||
|
if let Some(tools) = &tool_names {
|
||||||
|
let formatted_tools = tools
|
||||||
|
.iter()
|
||||||
|
.map(|name| format!("{}(...)", name))
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join("\n");
|
||||||
|
span.record(tracing_llm::TOOLS, formatted_tools.as_str());
|
||||||
|
}
|
||||||
|
if let Some(preview) = &user_message_preview {
|
||||||
|
span.record(tracing_llm::USER_MESSAGE_PREVIEW, preview.as_str());
|
||||||
|
}
|
||||||
|
|
||||||
// --- Phase 2: Resolve conversation state (v1/responses API) ---
|
// --- Phase 2: Resolve conversation state (v1/responses API) ---
|
||||||
let state_ctx = match resolve_conversation_state(
|
let state_ctx = match resolve_conversation_state(
|
||||||
&mut client_request,
|
&mut client_request,
|
||||||
|
|
@ -126,9 +152,16 @@ async fn llm_chat_inner(
|
||||||
};
|
};
|
||||||
|
|
||||||
// Serialize request for upstream BEFORE router consumes it
|
// Serialize request for upstream BEFORE router consumes it
|
||||||
let client_request_bytes_for_upstream: Bytes = ProviderRequestType::to_bytes(&client_request)
|
let client_request_bytes_for_upstream: Bytes =
|
||||||
.unwrap()
|
match ProviderRequestType::to_bytes(&client_request) {
|
||||||
.into();
|
Ok(bytes) => bytes.into(),
|
||||||
|
Err(err) => {
|
||||||
|
warn!(error = %err, "failed to serialize request for upstream");
|
||||||
|
let mut r = Response::new(full(format!("Failed to serialize request: {}", err)));
|
||||||
|
*r.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
|
||||||
|
return Ok(r);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// --- Phase 3: Route the request ---
|
// --- Phase 3: Route the request ---
|
||||||
let routing_span = info_span!(
|
let routing_span = info_span!(
|
||||||
|
|
@ -170,6 +203,7 @@ async fn llm_chat_inner(
|
||||||
} else {
|
} else {
|
||||||
alias_resolved_model.clone()
|
alias_resolved_model.clone()
|
||||||
};
|
};
|
||||||
|
tracing::Span::current().record(tracing_llm::MODEL_NAME, resolved_model.as_str());
|
||||||
|
|
||||||
// --- Phase 4: Forward to upstream and stream back ---
|
// --- Phase 4: Forward to upstream and stream back ---
|
||||||
send_upstream(
|
send_upstream(
|
||||||
|
|
@ -203,6 +237,9 @@ struct PreparedRequest {
|
||||||
is_streaming_request: bool,
|
is_streaming_request: bool,
|
||||||
is_responses_api_client: bool,
|
is_responses_api_client: bool,
|
||||||
messages_for_signals: Option<Vec<Message>>,
|
messages_for_signals: Option<Vec<Message>>,
|
||||||
|
temperature: Option<f32>,
|
||||||
|
tool_names: Option<Vec<String>>,
|
||||||
|
user_message_preview: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse the body, resolve the model alias, and validate the model exists.
|
/// Parse the body, resolve the model alias, and validate the model exists.
|
||||||
|
|
@ -229,17 +266,21 @@ async fn parse_and_validate_request(
|
||||||
"request body received"
|
"request body received"
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut client_request = ProviderRequestType::try_from((
|
let api_type = SupportedAPIsFromClient::from_endpoint(request_path).ok_or_else(|| {
|
||||||
&chat_request_bytes[..],
|
warn!(path = %request_path, "unsupported endpoint");
|
||||||
&SupportedAPIsFromClient::from_endpoint(request_path).unwrap(),
|
let mut r = Response::new(full(format!("Unsupported endpoint: {}", request_path)));
|
||||||
))
|
|
||||||
.map_err(|err| {
|
|
||||||
warn!(error = %err, "failed to parse request as ProviderRequestType");
|
|
||||||
let mut r = Response::new(full(format!("Failed to parse request: {}", err)));
|
|
||||||
*r.status_mut() = StatusCode::BAD_REQUEST;
|
*r.status_mut() = StatusCode::BAD_REQUEST;
|
||||||
r
|
r
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
|
let mut client_request = ProviderRequestType::try_from((&chat_request_bytes[..], &api_type))
|
||||||
|
.map_err(|err| {
|
||||||
|
warn!(error = %err, "failed to parse request as ProviderRequestType");
|
||||||
|
let mut r = Response::new(full(format!("Failed to parse request: {}", err)));
|
||||||
|
*r.status_mut() = StatusCode::BAD_REQUEST;
|
||||||
|
r
|
||||||
|
})?;
|
||||||
|
|
||||||
let client_api = SupportedAPIsFromClient::from_endpoint(request_path);
|
let client_api = SupportedAPIsFromClient::from_endpoint(request_path);
|
||||||
let is_responses_api_client = matches!(
|
let is_responses_api_client = matches!(
|
||||||
client_api,
|
client_api,
|
||||||
|
|
@ -247,6 +288,7 @@ async fn parse_and_validate_request(
|
||||||
);
|
);
|
||||||
|
|
||||||
let model_from_request = client_request.model().to_string();
|
let model_from_request = client_request.model().to_string();
|
||||||
|
let temperature = client_request.get_temperature();
|
||||||
let is_streaming_request = client_request.is_streaming();
|
let is_streaming_request = client_request.is_streaming();
|
||||||
let alias_resolved_model = resolve_model_alias(&model_from_request, model_aliases);
|
let alias_resolved_model = resolve_model_alias(&model_from_request, model_aliases);
|
||||||
|
|
||||||
|
|
@ -273,7 +315,11 @@ async fn parse_and_validate_request(
|
||||||
.map(|(_, model)| model.to_string())
|
.map(|(_, model)| model.to_string())
|
||||||
.unwrap_or_else(|| alias_resolved_model.clone());
|
.unwrap_or_else(|| alias_resolved_model.clone());
|
||||||
|
|
||||||
// Extract messages for signal analysis before mutating client_request
|
// Extract span attributes and messages before mutating client_request
|
||||||
|
let tool_names = client_request.get_tool_names();
|
||||||
|
let user_message_preview = client_request
|
||||||
|
.get_recent_user_message()
|
||||||
|
.map(|msg| truncate_message(&msg, 50));
|
||||||
let messages_for_signals = Some(client_request.get_messages());
|
let messages_for_signals = Some(client_request.get_messages());
|
||||||
|
|
||||||
// Set the upstream model name and strip routing metadata
|
// Set the upstream model name and strip routing metadata
|
||||||
|
|
@ -290,6 +336,9 @@ async fn parse_and_validate_request(
|
||||||
is_streaming_request,
|
is_streaming_request,
|
||||||
is_responses_api_client,
|
is_responses_api_client,
|
||||||
messages_for_signals,
|
messages_for_signals,
|
||||||
|
temperature,
|
||||||
|
tool_names,
|
||||||
|
user_message_preview,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue