diff --git a/crates/brightstaff/src/handlers/agent_chat_completions.rs b/crates/brightstaff/src/handlers/agent_chat_completions.rs index 46e9ead1..bac9607b 100644 --- a/crates/brightstaff/src/handlers/agent_chat_completions.rs +++ b/crates/brightstaff/src/handlers/agent_chat_completions.rs @@ -206,11 +206,16 @@ async fn handle_agent_chat( let message: Vec = client_request.get_messages(); // Extract trace parent for routing - let trace_parent = request_headers + let traceparent = request_headers .iter() .find(|(key, _)| key.as_str() == TRACE_PARENT_HEADER) .map(|(_, value)| value.to_str().unwrap_or_default().to_string()); + let request_id = request_headers + .get(common::consts::REQUEST_ID_HEADER) + .and_then(|val| val.to_str().ok()) + .map(|s| s.to_string()); + // Create agent map for pipeline processing and agent selection let agent_map = { let agents = agents_list.read().await; @@ -219,7 +224,7 @@ async fn handle_agent_chat( }; // Parse trace parent to get trace_id and parent_span_id - let (trace_id, parent_span_id) = if let Some(ref tp) = trace_parent { + let (trace_id, parent_span_id) = if let Some(ref tp) = traceparent { parse_traceparent(tp) } else { (String::new(), None) @@ -231,7 +236,7 @@ async fn handle_agent_chat( let selection_start_instant = Instant::now(); let selected_agents = agent_selector - .select_agents(&message, &listener, trace_parent.clone()) + .select_agents(&message, &listener, traceparent.clone(), request_id.clone()) .await?; // Record agent selection span diff --git a/crates/brightstaff/src/handlers/agent_selector.rs b/crates/brightstaff/src/handlers/agent_selector.rs index 78fbc654..727dc6c0 100644 --- a/crates/brightstaff/src/handlers/agent_selector.rs +++ b/crates/brightstaff/src/handlers/agent_selector.rs @@ -109,6 +109,7 @@ impl AgentSelector { messages: &[Message], listener: &Listener, trace_parent: Option, + request_id: Option, ) -> Result, AgentSelectionError> { let agents = listener .agents @@ -131,7 +132,7 @@ impl AgentSelector { match self .orchestrator_service - .determine_orchestration(messages, trace_parent, Some(usage_preferences)) + .determine_orchestration(messages, trace_parent, Some(usage_preferences), request_id) .await { Ok(Some(routes)) => { diff --git a/crates/brightstaff/src/handlers/llm.rs b/crates/brightstaff/src/handlers/llm.rs index b3cea5e1..6e78f8b0 100644 --- a/crates/brightstaff/src/handlers/llm.rs +++ b/crates/brightstaff/src/handlers/llm.rs @@ -44,22 +44,40 @@ pub async fn llm_chat( ) -> Result>, hyper::Error> { let request_path = request.uri().path().to_string(); let request_headers = request.headers().clone(); - let request_id = request_headers + let request_id: String = match request_headers .get(REQUEST_ID_HEADER) .and_then(|h| h.to_str().ok()) .map(|s| s.to_string()) - .unwrap_or_else(|| "unknown".to_string()); + { + Some(id) => id, + None => { + let generated_id = uuid::Uuid::new_v4().to_string(); + warn!( + "[PLANO_REQ_ID:{}] | REQUEST_ID header missing, generated new ID", + generated_id + ); + generated_id + } + }; // Extract or generate traceparent - this establishes the trace context for all spans - let traceparent: String = request_headers + let traceparent: String = match request_headers .get(TRACE_PARENT_HEADER) .and_then(|h| h.to_str().ok()) .map(|s| s.to_string()) - .unwrap_or_else(|| { + { + Some(tp) => tp, + None => { use uuid::Uuid; let trace_id = Uuid::new_v4().to_string().replace("-", ""); - format!("00-{}-0000000000000000-01", trace_id) - }); + let generated_tp = format!("00-{}-0000000000000000-01", trace_id); + warn!( + "[PLANO_REQ_ID:{}] | TRACE_PARENT header missing, generated new traceparent: {}", + request_id, generated_tp + ); + generated_tp + } + }; let mut request_headers = request_headers; let chat_request_bytes = request.collect().await?.to_bytes(); @@ -207,10 +225,10 @@ pub async fn llm_chat( let routing_result = match router_chat_get_upstream_model( router_service, client_request, // Pass the original request - router_chat will convert it - &request_headers, trace_collector.clone(), &traceparent, &request_path, + &request_id, ) .await { @@ -321,7 +339,7 @@ pub async fn llm_chat( is_streaming_request, false, // Not OpenAI upstream since should_manage_state is true content_encoding, - request_id.clone(), + request_id, ); create_streaming_response(byte_stream, state_processor, 16) } else { diff --git a/crates/brightstaff/src/handlers/router_chat.rs b/crates/brightstaff/src/handlers/router_chat.rs index ed0d6d31..67e25338 100644 --- a/crates/brightstaff/src/handlers/router_chat.rs +++ b/crates/brightstaff/src/handlers/router_chat.rs @@ -1,5 +1,4 @@ use common::configuration::ModelUsagePreference; -use common::consts::REQUEST_ID_HEADER; use common::traces::{parse_traceparent, SpanBuilder, SpanKind, TraceCollector}; use hermesllm::clients::endpoints::SupportedUpstreamAPIs; use hermesllm::{ProviderRequest, ProviderRequestType}; @@ -37,17 +36,13 @@ impl RoutingError { pub async fn router_chat_get_upstream_model( router_service: Arc, client_request: ProviderRequestType, - request_headers: &hyper::HeaderMap, trace_collector: Arc, traceparent: &str, request_path: &str, + request_id: &str, ) -> Result { // Clone metadata for routing before converting (which consumes client_request) let routing_metadata = client_request.metadata().clone(); - let request_id = request_headers - .get(REQUEST_ID_HEADER) - .and_then(|value| value.to_str().ok()) - .unwrap_or("unknown"); // Convert to ChatCompletionsRequest for routing (regardless of input type) let chat_request = match ProviderRequestType::try_from(( @@ -79,17 +74,11 @@ pub async fn router_chat_get_upstream_model( }; debug!( - "[PLANO_REQ_ID: {}]: ROUTER_REQ: {}", + "[PLANO_REQ_ID: {:?}]: ROUTER_REQ: {}", request_id, &serde_json::to_string(&chat_request).unwrap() ); - // Extract trace_parent from headers - let trace_parent = request_headers - .iter() - .find(|(ty, _)| ty.as_str() == "traceparent") - .map(|(_, value)| value.to_str().unwrap_or_default().to_string()); - // Extract usage preferences from metadata let usage_preferences_str: Option = routing_metadata.as_ref().and_then(|metadata| { metadata @@ -121,7 +110,7 @@ pub async fn router_chat_get_upstream_model( }; info!( - "[PLANO_REQ_ID: {}] | ROUTER_REQ | Usage preferences from request: {}, request_path: {}, latest message: {}", + "[PLANO_REQ_ID: {:?}] | ROUTER_REQ | Usage preferences from request: {}, request_path: {}, latest message: {}", request_id, usage_preferences.is_some(), request_path, @@ -134,7 +123,12 @@ pub async fn router_chat_get_upstream_model( // Attempt to determine route using the router service let routing_result = router_service - .determine_route(&chat_request.messages, trace_parent, usage_preferences) + .determine_route( + &chat_request.messages, + traceparent, + usage_preferences, + request_id, + ) .await; match routing_result { diff --git a/crates/brightstaff/src/router/llm_router.rs b/crates/brightstaff/src/router/llm_router.rs index e0e85a8d..e99652d3 100644 --- a/crates/brightstaff/src/router/llm_router.rs +++ b/crates/brightstaff/src/router/llm_router.rs @@ -2,7 +2,7 @@ use std::{collections::HashMap, sync::Arc}; use common::{ configuration::{LlmProvider, ModelUsagePreference, RoutingPreference}, - consts::ARCH_PROVIDER_HINT_HEADER, + consts::{ARCH_PROVIDER_HINT_HEADER, REQUEST_ID_HEADER, TRACE_PARENT_HEADER}, }; use hermesllm::apis::openai::{ChatCompletionsResponse, Message}; use hyper::header; @@ -77,8 +77,9 @@ impl RouterService { pub async fn determine_route( &self, messages: &[Message], - trace_parent: Option, + traceparent: &str, usage_preferences: Option>, + request_id: &str, ) -> Result> { if messages.is_empty() { return Ok(None); @@ -116,12 +117,15 @@ impl RouterService { header::HeaderValue::from_str(&self.routing_provider_name).unwrap(), ); - if let Some(trace_parent) = trace_parent { - llm_route_request_headers.insert( - header::HeaderName::from_static("traceparent"), - header::HeaderValue::from_str(&trace_parent).unwrap(), - ); - } + llm_route_request_headers.insert( + header::HeaderName::from_static(TRACE_PARENT_HEADER), + header::HeaderValue::from_str(traceparent).unwrap(), + ); + + llm_route_request_headers.insert( + header::HeaderName::from_static(REQUEST_ID_HEADER), + header::HeaderValue::from_str(request_id).unwrap(), + ); llm_route_request_headers.insert( header::HeaderName::from_static("model"), diff --git a/crates/brightstaff/src/router/plano_orchestrator.rs b/crates/brightstaff/src/router/plano_orchestrator.rs index 5aff4b11..ee91d52e 100644 --- a/crates/brightstaff/src/router/plano_orchestrator.rs +++ b/crates/brightstaff/src/router/plano_orchestrator.rs @@ -2,7 +2,10 @@ use std::{collections::HashMap, sync::Arc}; use common::{ configuration::{AgentUsagePreference, OrchestrationPreference}, - consts::{ARCH_PROVIDER_HINT_HEADER, PLANO_ORCHESTRATOR_MODEL_NAME}, + consts::{ + ARCH_PROVIDER_HINT_HEADER, PLANO_ORCHESTRATOR_MODEL_NAME, REQUEST_ID_HEADER, + TRACE_PARENT_HEADER, + }, }; use hermesllm::apis::openai::{ChatCompletionsResponse, Message}; use hyper::header; @@ -56,6 +59,7 @@ impl OrchestratorService { messages: &[Message], trace_parent: Option, usage_preferences: Option>, + request_id: Option, ) -> Result>> { if messages.is_empty() { return Ok(None); @@ -94,11 +98,18 @@ impl OrchestratorService { if let Some(trace_parent) = trace_parent { orchestration_request_headers.insert( - header::HeaderName::from_static("traceparent"), + header::HeaderName::from_static(TRACE_PARENT_HEADER), header::HeaderValue::from_str(&trace_parent).unwrap(), ); } + if let Some(request_id) = request_id { + orchestration_request_headers.insert( + header::HeaderName::from_static(REQUEST_ID_HEADER), + header::HeaderValue::from_str(&request_id).unwrap(), + ); + } + orchestration_request_headers.insert( header::HeaderName::from_static("model"), header::HeaderValue::from_static(PLANO_ORCHESTRATOR_MODEL_NAME), diff --git a/demos/use_cases/travel_agents/src/travel_agents/weather_agent.py b/demos/use_cases/travel_agents/src/travel_agents/weather_agent.py index c080aae7..76566d79 100644 --- a/demos/use_cases/travel_agents/src/travel_agents/weather_agent.py +++ b/demos/use_cases/travel_agents/src/travel_agents/weather_agent.py @@ -61,7 +61,13 @@ def get_last_user_content(messages: list) -> str: return "" -async def get_weather_data(request: Request, messages: list, days: int = 1): +async def get_weather_data( + request: Request, + messages: list, + days: int = 1, + traceparent_header: str = None, + request_id: str = None, +): """Extract location from user's conversation and fetch weather data from Open-Meteo API. This function does two things: @@ -97,8 +103,9 @@ If no city can be found, output: NOT_FOUND""" else: ctx = extract(request.headers) extra_headers = {} + if request_id: + extra_headers["x-request-id"] = request_id inject(extra_headers, context=ctx) - # For location extraction, pass full conversation for context (e.g., "there" = previous destination) response = await openai_client_via_plano.chat.completions.create( model=LOCATION_MODEL, @@ -226,12 +233,12 @@ If no city can be found, output: NOT_FOUND""" "day_name": date_obj.strftime("%A"), "temperature_c": round(temp_c, 1) if temp_c is not None else None, "temperature_f": celsius_to_fahrenheit(temp_c), - "temperature_max_c": round(temp_max, 1) - if temp_max is not None - else None, - "temperature_min_c": round(temp_min, 1) - if temp_min is not None - else None, + "temperature_max_c": ( + round(temp_max, 1) if temp_max is not None else None + ), + "temperature_min_c": ( + round(temp_min, 1) if temp_min is not None else None + ), "weather_code": weather_code, "sunrise": sunrise.split("T")[1] if sunrise else None, "sunset": sunset.split("T")[1] if sunset else None, @@ -270,9 +277,10 @@ async def handle_request(request: Request): ) traceparent_header = request.headers.get("traceparent") + request_id = request.headers.get("x-request-id") return StreamingResponse( - invoke_weather_agent(request, request_body, traceparent_header), + invoke_weather_agent(request, request_body, traceparent_header, request_id), media_type="text/plain", headers={ "content-type": "text/event-stream", @@ -281,7 +289,10 @@ async def handle_request(request: Request): async def invoke_weather_agent( - request: Request, request_body: dict, traceparent_header: str = None + request: Request, + request_body: dict, + traceparent_header: str = None, + request_id: str = None, ): """Generate streaming chat completions.""" messages = request_body.get("messages", []) @@ -304,7 +315,9 @@ async def invoke_weather_agent( days = min(requested_days, 16) # API supports max 16 days # Get live weather data (location extraction happens inside this function) - weather_data = await get_weather_data(request, messages, days) + weather_data = await get_weather_data( + request, messages, days, traceparent_header, request_id + ) # Create weather context to append to user message forecast_type = "forecast" if days > 1 else "current weather" @@ -351,6 +364,8 @@ Present the weather information to the user in a clear, readable format. If ther try: ctx = extract(request.headers) extra_headers = {"x-envoy-max-retries": "3"} + if request_id: + extra_headers["x-request-id"] = request_id inject(extra_headers, context=ctx) stream = await openai_client_via_plano.chat.completions.create(