pass request_id in orchestrator and routing model (#678)

This commit is contained in:
Adil Hafeez 2026-01-07 12:04:10 -08:00 committed by GitHub
parent b4543ba56c
commit 78b2ae0cf7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 96 additions and 48 deletions

View file

@ -206,11 +206,16 @@ async fn handle_agent_chat(
let message: Vec<OpenAIMessage> = client_request.get_messages();
// Extract trace parent for routing
let trace_parent = request_headers
let traceparent = request_headers
.iter()
.find(|(key, _)| key.as_str() == TRACE_PARENT_HEADER)
.map(|(_, value)| value.to_str().unwrap_or_default().to_string());
let request_id = request_headers
.get(common::consts::REQUEST_ID_HEADER)
.and_then(|val| val.to_str().ok())
.map(|s| s.to_string());
// Create agent map for pipeline processing and agent selection
let agent_map = {
let agents = agents_list.read().await;
@ -219,7 +224,7 @@ async fn handle_agent_chat(
};
// Parse trace parent to get trace_id and parent_span_id
let (trace_id, parent_span_id) = if let Some(ref tp) = trace_parent {
let (trace_id, parent_span_id) = if let Some(ref tp) = traceparent {
parse_traceparent(tp)
} else {
(String::new(), None)
@ -231,7 +236,7 @@ async fn handle_agent_chat(
let selection_start_instant = Instant::now();
let selected_agents = agent_selector
.select_agents(&message, &listener, trace_parent.clone())
.select_agents(&message, &listener, traceparent.clone(), request_id.clone())
.await?;
// Record agent selection span

View file

@ -109,6 +109,7 @@ impl AgentSelector {
messages: &[Message],
listener: &Listener,
trace_parent: Option<String>,
request_id: Option<String>,
) -> Result<Vec<AgentFilterChain>, AgentSelectionError> {
let agents = listener
.agents
@ -131,7 +132,7 @@ impl AgentSelector {
match self
.orchestrator_service
.determine_orchestration(messages, trace_parent, Some(usage_preferences))
.determine_orchestration(messages, trace_parent, Some(usage_preferences), request_id)
.await
{
Ok(Some(routes)) => {

View file

@ -44,22 +44,40 @@ pub async fn llm_chat(
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let request_path = request.uri().path().to_string();
let request_headers = request.headers().clone();
let request_id = request_headers
let request_id: String = match request_headers
.get(REQUEST_ID_HEADER)
.and_then(|h| h.to_str().ok())
.map(|s| s.to_string())
.unwrap_or_else(|| "unknown".to_string());
{
Some(id) => id,
None => {
let generated_id = uuid::Uuid::new_v4().to_string();
warn!(
"[PLANO_REQ_ID:{}] | REQUEST_ID header missing, generated new ID",
generated_id
);
generated_id
}
};
// Extract or generate traceparent - this establishes the trace context for all spans
let traceparent: String = request_headers
let traceparent: String = match request_headers
.get(TRACE_PARENT_HEADER)
.and_then(|h| h.to_str().ok())
.map(|s| s.to_string())
.unwrap_or_else(|| {
{
Some(tp) => tp,
None => {
use uuid::Uuid;
let trace_id = Uuid::new_v4().to_string().replace("-", "");
format!("00-{}-0000000000000000-01", trace_id)
});
let generated_tp = format!("00-{}-0000000000000000-01", trace_id);
warn!(
"[PLANO_REQ_ID:{}] | TRACE_PARENT header missing, generated new traceparent: {}",
request_id, generated_tp
);
generated_tp
}
};
let mut request_headers = request_headers;
let chat_request_bytes = request.collect().await?.to_bytes();
@ -207,10 +225,10 @@ pub async fn llm_chat(
let routing_result = match router_chat_get_upstream_model(
router_service,
client_request, // Pass the original request - router_chat will convert it
&request_headers,
trace_collector.clone(),
&traceparent,
&request_path,
&request_id,
)
.await
{
@ -321,7 +339,7 @@ pub async fn llm_chat(
is_streaming_request,
false, // Not OpenAI upstream since should_manage_state is true
content_encoding,
request_id.clone(),
request_id,
);
create_streaming_response(byte_stream, state_processor, 16)
} else {

View file

@ -1,5 +1,4 @@
use common::configuration::ModelUsagePreference;
use common::consts::REQUEST_ID_HEADER;
use common::traces::{parse_traceparent, SpanBuilder, SpanKind, TraceCollector};
use hermesllm::clients::endpoints::SupportedUpstreamAPIs;
use hermesllm::{ProviderRequest, ProviderRequestType};
@ -37,17 +36,13 @@ impl RoutingError {
pub async fn router_chat_get_upstream_model(
router_service: Arc<RouterService>,
client_request: ProviderRequestType,
request_headers: &hyper::HeaderMap,
trace_collector: Arc<TraceCollector>,
traceparent: &str,
request_path: &str,
request_id: &str,
) -> Result<RoutingResult, RoutingError> {
// Clone metadata for routing before converting (which consumes client_request)
let routing_metadata = client_request.metadata().clone();
let request_id = request_headers
.get(REQUEST_ID_HEADER)
.and_then(|value| value.to_str().ok())
.unwrap_or("unknown");
// Convert to ChatCompletionsRequest for routing (regardless of input type)
let chat_request = match ProviderRequestType::try_from((
@ -79,17 +74,11 @@ pub async fn router_chat_get_upstream_model(
};
debug!(
"[PLANO_REQ_ID: {}]: ROUTER_REQ: {}",
"[PLANO_REQ_ID: {:?}]: ROUTER_REQ: {}",
request_id,
&serde_json::to_string(&chat_request).unwrap()
);
// Extract trace_parent from headers
let trace_parent = request_headers
.iter()
.find(|(ty, _)| ty.as_str() == "traceparent")
.map(|(_, value)| value.to_str().unwrap_or_default().to_string());
// Extract usage preferences from metadata
let usage_preferences_str: Option<String> = routing_metadata.as_ref().and_then(|metadata| {
metadata
@ -121,7 +110,7 @@ pub async fn router_chat_get_upstream_model(
};
info!(
"[PLANO_REQ_ID: {}] | ROUTER_REQ | Usage preferences from request: {}, request_path: {}, latest message: {}",
"[PLANO_REQ_ID: {:?}] | ROUTER_REQ | Usage preferences from request: {}, request_path: {}, latest message: {}",
request_id,
usage_preferences.is_some(),
request_path,
@ -134,7 +123,12 @@ pub async fn router_chat_get_upstream_model(
// Attempt to determine route using the router service
let routing_result = router_service
.determine_route(&chat_request.messages, trace_parent, usage_preferences)
.determine_route(
&chat_request.messages,
traceparent,
usage_preferences,
request_id,
)
.await;
match routing_result {

View file

@ -2,7 +2,7 @@ use std::{collections::HashMap, sync::Arc};
use common::{
configuration::{LlmProvider, ModelUsagePreference, RoutingPreference},
consts::ARCH_PROVIDER_HINT_HEADER,
consts::{ARCH_PROVIDER_HINT_HEADER, REQUEST_ID_HEADER, TRACE_PARENT_HEADER},
};
use hermesllm::apis::openai::{ChatCompletionsResponse, Message};
use hyper::header;
@ -77,8 +77,9 @@ impl RouterService {
pub async fn determine_route(
&self,
messages: &[Message],
trace_parent: Option<String>,
traceparent: &str,
usage_preferences: Option<Vec<ModelUsagePreference>>,
request_id: &str,
) -> Result<Option<(String, String)>> {
if messages.is_empty() {
return Ok(None);
@ -116,12 +117,15 @@ impl RouterService {
header::HeaderValue::from_str(&self.routing_provider_name).unwrap(),
);
if let Some(trace_parent) = trace_parent {
llm_route_request_headers.insert(
header::HeaderName::from_static("traceparent"),
header::HeaderValue::from_str(&trace_parent).unwrap(),
);
}
llm_route_request_headers.insert(
header::HeaderName::from_static(TRACE_PARENT_HEADER),
header::HeaderValue::from_str(traceparent).unwrap(),
);
llm_route_request_headers.insert(
header::HeaderName::from_static(REQUEST_ID_HEADER),
header::HeaderValue::from_str(request_id).unwrap(),
);
llm_route_request_headers.insert(
header::HeaderName::from_static("model"),

View file

@ -2,7 +2,10 @@ use std::{collections::HashMap, sync::Arc};
use common::{
configuration::{AgentUsagePreference, OrchestrationPreference},
consts::{ARCH_PROVIDER_HINT_HEADER, PLANO_ORCHESTRATOR_MODEL_NAME},
consts::{
ARCH_PROVIDER_HINT_HEADER, PLANO_ORCHESTRATOR_MODEL_NAME, REQUEST_ID_HEADER,
TRACE_PARENT_HEADER,
},
};
use hermesllm::apis::openai::{ChatCompletionsResponse, Message};
use hyper::header;
@ -56,6 +59,7 @@ impl OrchestratorService {
messages: &[Message],
trace_parent: Option<String>,
usage_preferences: Option<Vec<AgentUsagePreference>>,
request_id: Option<String>,
) -> Result<Option<Vec<(String, String)>>> {
if messages.is_empty() {
return Ok(None);
@ -94,11 +98,18 @@ impl OrchestratorService {
if let Some(trace_parent) = trace_parent {
orchestration_request_headers.insert(
header::HeaderName::from_static("traceparent"),
header::HeaderName::from_static(TRACE_PARENT_HEADER),
header::HeaderValue::from_str(&trace_parent).unwrap(),
);
}
if let Some(request_id) = request_id {
orchestration_request_headers.insert(
header::HeaderName::from_static(REQUEST_ID_HEADER),
header::HeaderValue::from_str(&request_id).unwrap(),
);
}
orchestration_request_headers.insert(
header::HeaderName::from_static("model"),
header::HeaderValue::from_static(PLANO_ORCHESTRATOR_MODEL_NAME),

View file

@ -61,7 +61,13 @@ def get_last_user_content(messages: list) -> str:
return ""
async def get_weather_data(request: Request, messages: list, days: int = 1):
async def get_weather_data(
request: Request,
messages: list,
days: int = 1,
traceparent_header: str = None,
request_id: str = None,
):
"""Extract location from user's conversation and fetch weather data from Open-Meteo API.
This function does two things:
@ -97,8 +103,9 @@ If no city can be found, output: NOT_FOUND"""
else:
ctx = extract(request.headers)
extra_headers = {}
if request_id:
extra_headers["x-request-id"] = request_id
inject(extra_headers, context=ctx)
# For location extraction, pass full conversation for context (e.g., "there" = previous destination)
response = await openai_client_via_plano.chat.completions.create(
model=LOCATION_MODEL,
@ -226,12 +233,12 @@ If no city can be found, output: NOT_FOUND"""
"day_name": date_obj.strftime("%A"),
"temperature_c": round(temp_c, 1) if temp_c is not None else None,
"temperature_f": celsius_to_fahrenheit(temp_c),
"temperature_max_c": round(temp_max, 1)
if temp_max is not None
else None,
"temperature_min_c": round(temp_min, 1)
if temp_min is not None
else None,
"temperature_max_c": (
round(temp_max, 1) if temp_max is not None else None
),
"temperature_min_c": (
round(temp_min, 1) if temp_min is not None else None
),
"weather_code": weather_code,
"sunrise": sunrise.split("T")[1] if sunrise else None,
"sunset": sunset.split("T")[1] if sunset else None,
@ -270,9 +277,10 @@ async def handle_request(request: Request):
)
traceparent_header = request.headers.get("traceparent")
request_id = request.headers.get("x-request-id")
return StreamingResponse(
invoke_weather_agent(request, request_body, traceparent_header),
invoke_weather_agent(request, request_body, traceparent_header, request_id),
media_type="text/plain",
headers={
"content-type": "text/event-stream",
@ -281,7 +289,10 @@ async def handle_request(request: Request):
async def invoke_weather_agent(
request: Request, request_body: dict, traceparent_header: str = None
request: Request,
request_body: dict,
traceparent_header: str = None,
request_id: str = None,
):
"""Generate streaming chat completions."""
messages = request_body.get("messages", [])
@ -304,7 +315,9 @@ async def invoke_weather_agent(
days = min(requested_days, 16) # API supports max 16 days
# Get live weather data (location extraction happens inside this function)
weather_data = await get_weather_data(request, messages, days)
weather_data = await get_weather_data(
request, messages, days, traceparent_header, request_id
)
# Create weather context to append to user message
forecast_type = "forecast" if days > 1 else "current weather"
@ -351,6 +364,8 @@ Present the weather information to the user in a clear, readable format. If ther
try:
ctx = extract(request.headers)
extra_headers = {"x-envoy-max-retries": "3"}
if request_id:
extra_headers["x-request-id"] = request_id
inject(extra_headers, context=ctx)
stream = await openai_client_via_plano.chat.completions.create(