mirror of
https://github.com/katanemo/plano.git
synced 2026-06-17 15:25:17 +02:00
pass request_id in orchestrator and routing model
also updated travel agent demo to pass request_id
This commit is contained in:
parent
57327ba667
commit
8cf40a3eb7
8 changed files with 77 additions and 34 deletions
|
|
@ -2,14 +2,14 @@
|
|||
nodaemon=true
|
||||
|
||||
[program:brightstaff]
|
||||
command=sh -c "envsubst < /app/arch_config_rendered.yaml > /app/arch_config_rendered.env_sub.yaml && RUST_LOG=info ARCH_CONFIG_PATH_RENDERED=/app/arch_config_rendered.env_sub.yaml /app/brightstaff 2>&1 | tee /var/log/brightstaff.log | while IFS= read -r line; do echo '[brightstaff]' \"$line\"; done"
|
||||
command=sh -c "envsubst < /app/arch_config_rendered.yaml > /app/arch_config_rendered.env_sub.yaml && RUST_LOG=debug ARCH_CONFIG_PATH_RENDERED=/app/arch_config_rendered.env_sub.yaml /app/brightstaff 2>&1 | tee /var/log/brightstaff.log | while IFS= read -r line; do echo '[brightstaff]' \"$line\"; done"
|
||||
stdout_logfile=/dev/stdout
|
||||
redirect_stderr=true
|
||||
stdout_logfile_maxbytes=0
|
||||
stderr_logfile_maxbytes=0
|
||||
|
||||
[program:envoy]
|
||||
command=/bin/sh -c "uv run python -m planoai.config_generator && envsubst < /etc/envoy/envoy.yaml > /etc/envoy.env_sub.yaml && envoy -c /etc/envoy.env_sub.yaml --component-log-level wasm:info --log-format '[%%Y-%%m-%%d %%T.%%e][%%l] %%v' 2>&1 | tee /var/log/envoy.log | while IFS= read -r line; do echo '[archgw_logs]' \"$line\"; done"
|
||||
command=/bin/sh -c "uv run python -m planoai.config_generator && envsubst < /etc/envoy/envoy.yaml > /etc/envoy.env_sub.yaml && envoy -c /etc/envoy.env_sub.yaml --component-log-level wasm:debug --log-format '[%%Y-%%m-%%d %%T.%%e][%%l] %%v' 2>&1 | tee /var/log/envoy.log | while IFS= read -r line; do echo '[plano_logs] ' \"$line\"; done"
|
||||
stdout_logfile=/dev/stdout
|
||||
redirect_stderr=true
|
||||
stdout_logfile_maxbytes=0
|
||||
|
|
|
|||
|
|
@ -211,6 +211,11 @@ async fn handle_agent_chat(
|
|||
.find(|(key, _)| key.as_str() == TRACE_PARENT_HEADER)
|
||||
.map(|(_, value)| value.to_str().unwrap_or_default().to_string());
|
||||
|
||||
let request_id = request_headers
|
||||
.get(common::consts::REQUEST_ID_HEADER)
|
||||
.and_then(|val| val.to_str().ok())
|
||||
.map(|s| s.to_string());
|
||||
|
||||
// Create agent map for pipeline processing and agent selection
|
||||
let agent_map = {
|
||||
let agents = agents_list.read().await;
|
||||
|
|
@ -231,7 +236,12 @@ async fn handle_agent_chat(
|
|||
let selection_start_instant = Instant::now();
|
||||
|
||||
let selected_agents = agent_selector
|
||||
.select_agents(&message, &listener, trace_parent.clone())
|
||||
.select_agents(
|
||||
&message,
|
||||
&listener,
|
||||
trace_parent.clone(),
|
||||
request_id.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Record agent selection span
|
||||
|
|
|
|||
|
|
@ -109,6 +109,7 @@ impl AgentSelector {
|
|||
messages: &[Message],
|
||||
listener: &Listener,
|
||||
trace_parent: Option<String>,
|
||||
request_id: Option<String>,
|
||||
) -> Result<Vec<AgentFilterChain>, AgentSelectionError> {
|
||||
let agents = listener
|
||||
.agents
|
||||
|
|
@ -131,7 +132,7 @@ impl AgentSelector {
|
|||
|
||||
match self
|
||||
.orchestrator_service
|
||||
.determine_orchestration(messages, trace_parent, Some(usage_preferences))
|
||||
.determine_orchestration(messages, trace_parent, Some(usage_preferences), request_id)
|
||||
.await
|
||||
{
|
||||
Ok(Some(routes)) => {
|
||||
|
|
|
|||
|
|
@ -208,6 +208,7 @@ pub async fn llm_chat(
|
|||
trace_collector.clone(),
|
||||
&traceparent,
|
||||
&request_path,
|
||||
Some(request_id.to_string()),
|
||||
)
|
||||
.await
|
||||
{
|
||||
|
|
|
|||
|
|
@ -1,5 +1,4 @@
|
|||
use common::configuration::ModelUsagePreference;
|
||||
use common::consts::REQUEST_ID_HEADER;
|
||||
use common::traces::{parse_traceparent, SpanBuilder, SpanKind, TraceCollector};
|
||||
use hermesllm::clients::endpoints::SupportedUpstreamAPIs;
|
||||
use hermesllm::{ProviderRequest, ProviderRequestType};
|
||||
|
|
@ -37,17 +36,16 @@ impl RoutingError {
|
|||
pub async fn router_chat_get_upstream_model(
|
||||
router_service: Arc<RouterService>,
|
||||
client_request: ProviderRequestType,
|
||||
request_headers: &hyper::HeaderMap,
|
||||
_request_headers: &hyper::HeaderMap,
|
||||
trace_collector: Arc<TraceCollector>,
|
||||
traceparent: &str,
|
||||
request_path: &str,
|
||||
request_id: Option<String>,
|
||||
) -> Result<RoutingResult, RoutingError> {
|
||||
// Clone metadata for routing before converting (which consumes client_request)
|
||||
let routing_metadata = client_request.metadata().clone();
|
||||
let request_id = request_headers
|
||||
.get(REQUEST_ID_HEADER)
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.unwrap_or("unknown");
|
||||
|
||||
let request_id = request_id.unwrap_or_else(|| "unknown".to_string());
|
||||
|
||||
// Convert to ChatCompletionsRequest for routing (regardless of input type)
|
||||
let chat_request = match ProviderRequestType::try_from((
|
||||
|
|
@ -79,17 +77,11 @@ pub async fn router_chat_get_upstream_model(
|
|||
};
|
||||
|
||||
debug!(
|
||||
"[PLANO_REQ_ID: {}]: ROUTER_REQ: {}",
|
||||
"[PLANO_REQ_ID: {:?}]: ROUTER_REQ: {}",
|
||||
request_id,
|
||||
&serde_json::to_string(&chat_request).unwrap()
|
||||
);
|
||||
|
||||
// Extract trace_parent from headers
|
||||
let trace_parent = request_headers
|
||||
.iter()
|
||||
.find(|(ty, _)| ty.as_str() == "traceparent")
|
||||
.map(|(_, value)| value.to_str().unwrap_or_default().to_string());
|
||||
|
||||
// Extract usage preferences from metadata
|
||||
let usage_preferences_str: Option<String> = routing_metadata.as_ref().and_then(|metadata| {
|
||||
metadata
|
||||
|
|
@ -121,7 +113,7 @@ pub async fn router_chat_get_upstream_model(
|
|||
};
|
||||
|
||||
info!(
|
||||
"[PLANO_REQ_ID: {}] | ROUTER_REQ | Usage preferences from request: {}, request_path: {}, latest message: {}",
|
||||
"[PLANO_REQ_ID: {:?}] | ROUTER_REQ | Usage preferences from request: {}, request_path: {}, latest message: {}",
|
||||
request_id,
|
||||
usage_preferences.is_some(),
|
||||
request_path,
|
||||
|
|
@ -134,7 +126,12 @@ pub async fn router_chat_get_upstream_model(
|
|||
|
||||
// Attempt to determine route using the router service
|
||||
let routing_result = router_service
|
||||
.determine_route(&chat_request.messages, trace_parent, usage_preferences)
|
||||
.determine_route(
|
||||
&chat_request.messages,
|
||||
Some(traceparent.to_string()),
|
||||
usage_preferences,
|
||||
Some(request_id.clone()),
|
||||
)
|
||||
.await;
|
||||
|
||||
match routing_result {
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ use std::{collections::HashMap, sync::Arc};
|
|||
|
||||
use common::{
|
||||
configuration::{LlmProvider, ModelUsagePreference, RoutingPreference},
|
||||
consts::ARCH_PROVIDER_HINT_HEADER,
|
||||
consts::{ARCH_PROVIDER_HINT_HEADER, REQUEST_ID_HEADER, TRACE_PARENT_HEADER},
|
||||
};
|
||||
use hermesllm::apis::openai::{ChatCompletionsResponse, Message};
|
||||
use hyper::header;
|
||||
|
|
@ -79,6 +79,7 @@ impl RouterService {
|
|||
messages: &[Message],
|
||||
trace_parent: Option<String>,
|
||||
usage_preferences: Option<Vec<ModelUsagePreference>>,
|
||||
request_id: Option<String>,
|
||||
) -> Result<Option<(String, String)>> {
|
||||
if messages.is_empty() {
|
||||
return Ok(None);
|
||||
|
|
@ -118,11 +119,18 @@ impl RouterService {
|
|||
|
||||
if let Some(trace_parent) = trace_parent {
|
||||
llm_route_request_headers.insert(
|
||||
header::HeaderName::from_static("traceparent"),
|
||||
header::HeaderName::from_static(TRACE_PARENT_HEADER),
|
||||
header::HeaderValue::from_str(&trace_parent).unwrap(),
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(request_id) = request_id {
|
||||
llm_route_request_headers.insert(
|
||||
header::HeaderName::from_static(REQUEST_ID_HEADER),
|
||||
header::HeaderValue::from_str(&request_id).unwrap(),
|
||||
);
|
||||
}
|
||||
|
||||
llm_route_request_headers.insert(
|
||||
header::HeaderName::from_static("model"),
|
||||
header::HeaderValue::from_static("arch-router"),
|
||||
|
|
|
|||
|
|
@ -2,7 +2,10 @@ use std::{collections::HashMap, sync::Arc};
|
|||
|
||||
use common::{
|
||||
configuration::{AgentUsagePreference, OrchestrationPreference},
|
||||
consts::{ARCH_PROVIDER_HINT_HEADER, PLANO_ORCHESTRATOR_MODEL_NAME},
|
||||
consts::{
|
||||
ARCH_PROVIDER_HINT_HEADER, PLANO_ORCHESTRATOR_MODEL_NAME, REQUEST_ID_HEADER,
|
||||
TRACE_PARENT_HEADER,
|
||||
},
|
||||
};
|
||||
use hermesllm::apis::openai::{ChatCompletionsResponse, Message};
|
||||
use hyper::header;
|
||||
|
|
@ -56,6 +59,7 @@ impl OrchestratorService {
|
|||
messages: &[Message],
|
||||
trace_parent: Option<String>,
|
||||
usage_preferences: Option<Vec<AgentUsagePreference>>,
|
||||
request_id: Option<String>,
|
||||
) -> Result<Option<Vec<(String, String)>>> {
|
||||
if messages.is_empty() {
|
||||
return Ok(None);
|
||||
|
|
@ -94,11 +98,18 @@ impl OrchestratorService {
|
|||
|
||||
if let Some(trace_parent) = trace_parent {
|
||||
orchestration_request_headers.insert(
|
||||
header::HeaderName::from_static("traceparent"),
|
||||
header::HeaderName::from_static(TRACE_PARENT_HEADER),
|
||||
header::HeaderValue::from_str(&trace_parent).unwrap(),
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(request_id) = request_id {
|
||||
orchestration_request_headers.insert(
|
||||
header::HeaderName::from_static(REQUEST_ID_HEADER),
|
||||
header::HeaderValue::from_str(&request_id).unwrap(),
|
||||
);
|
||||
}
|
||||
|
||||
orchestration_request_headers.insert(
|
||||
header::HeaderName::from_static("model"),
|
||||
header::HeaderValue::from_static(PLANO_ORCHESTRATOR_MODEL_NAME),
|
||||
|
|
|
|||
|
|
@ -61,7 +61,13 @@ def get_last_user_content(messages: list) -> str:
|
|||
return ""
|
||||
|
||||
|
||||
async def get_weather_data(request: Request, messages: list, days: int = 1):
|
||||
async def get_weather_data(
|
||||
request: Request,
|
||||
messages: list,
|
||||
days: int = 1,
|
||||
traceparent_header: str = None,
|
||||
request_id: str = None,
|
||||
):
|
||||
"""Extract location from user's conversation and fetch weather data from Open-Meteo API.
|
||||
|
||||
This function does two things:
|
||||
|
|
@ -97,8 +103,9 @@ If no city can be found, output: NOT_FOUND"""
|
|||
else:
|
||||
ctx = extract(request.headers)
|
||||
extra_headers = {}
|
||||
if request_id:
|
||||
extra_headers["x-request-id"] = request_id
|
||||
inject(extra_headers, context=ctx)
|
||||
|
||||
# For location extraction, pass full conversation for context (e.g., "there" = previous destination)
|
||||
response = await openai_client_via_plano.chat.completions.create(
|
||||
model=LOCATION_MODEL,
|
||||
|
|
@ -226,12 +233,12 @@ If no city can be found, output: NOT_FOUND"""
|
|||
"day_name": date_obj.strftime("%A"),
|
||||
"temperature_c": round(temp_c, 1) if temp_c is not None else None,
|
||||
"temperature_f": celsius_to_fahrenheit(temp_c),
|
||||
"temperature_max_c": round(temp_max, 1)
|
||||
if temp_max is not None
|
||||
else None,
|
||||
"temperature_min_c": round(temp_min, 1)
|
||||
if temp_min is not None
|
||||
else None,
|
||||
"temperature_max_c": (
|
||||
round(temp_max, 1) if temp_max is not None else None
|
||||
),
|
||||
"temperature_min_c": (
|
||||
round(temp_min, 1) if temp_min is not None else None
|
||||
),
|
||||
"weather_code": weather_code,
|
||||
"sunrise": sunrise.split("T")[1] if sunrise else None,
|
||||
"sunset": sunset.split("T")[1] if sunset else None,
|
||||
|
|
@ -270,9 +277,10 @@ async def handle_request(request: Request):
|
|||
)
|
||||
|
||||
traceparent_header = request.headers.get("traceparent")
|
||||
request_id = request.headers.get("x-request-id")
|
||||
|
||||
return StreamingResponse(
|
||||
invoke_weather_agent(request, request_body, traceparent_header),
|
||||
invoke_weather_agent(request, request_body, traceparent_header, request_id),
|
||||
media_type="text/plain",
|
||||
headers={
|
||||
"content-type": "text/event-stream",
|
||||
|
|
@ -281,7 +289,10 @@ async def handle_request(request: Request):
|
|||
|
||||
|
||||
async def invoke_weather_agent(
|
||||
request: Request, request_body: dict, traceparent_header: str = None
|
||||
request: Request,
|
||||
request_body: dict,
|
||||
traceparent_header: str = None,
|
||||
request_id: str = None,
|
||||
):
|
||||
"""Generate streaming chat completions."""
|
||||
messages = request_body.get("messages", [])
|
||||
|
|
@ -304,7 +315,9 @@ async def invoke_weather_agent(
|
|||
days = min(requested_days, 16) # API supports max 16 days
|
||||
|
||||
# Get live weather data (location extraction happens inside this function)
|
||||
weather_data = await get_weather_data(request, messages, days)
|
||||
weather_data = await get_weather_data(
|
||||
request, messages, days, traceparent_header, request_id
|
||||
)
|
||||
|
||||
# Create weather context to append to user message
|
||||
forecast_type = "forecast" if days > 1 else "current weather"
|
||||
|
|
@ -351,6 +364,8 @@ Present the weather information to the user in a clear, readable format. If ther
|
|||
try:
|
||||
ctx = extract(request.headers)
|
||||
extra_headers = {"x-envoy-max-retries": "3"}
|
||||
if request_id:
|
||||
extra_headers["x-request-id"] = request_id
|
||||
inject(extra_headers, context=ctx)
|
||||
|
||||
stream = await openai_client_via_plano.chat.completions.create(
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue