diff --git a/config/supervisord.conf b/config/supervisord.conf index 4e4274d5..9cf1661b 100644 --- a/config/supervisord.conf +++ b/config/supervisord.conf @@ -2,14 +2,14 @@ nodaemon=true [program:brightstaff] -command=sh -c "envsubst < /app/arch_config_rendered.yaml > /app/arch_config_rendered.env_sub.yaml && RUST_LOG=info ARCH_CONFIG_PATH_RENDERED=/app/arch_config_rendered.env_sub.yaml /app/brightstaff 2>&1 | tee /var/log/brightstaff.log | while IFS= read -r line; do echo '[brightstaff]' \"$line\"; done" +command=sh -c "envsubst < /app/arch_config_rendered.yaml > /app/arch_config_rendered.env_sub.yaml && RUST_LOG=debug ARCH_CONFIG_PATH_RENDERED=/app/arch_config_rendered.env_sub.yaml /app/brightstaff 2>&1 | tee /var/log/brightstaff.log | while IFS= read -r line; do echo '[brightstaff]' \"$line\"; done" stdout_logfile=/dev/stdout redirect_stderr=true stdout_logfile_maxbytes=0 stderr_logfile_maxbytes=0 [program:envoy] -command=/bin/sh -c "uv run python -m planoai.config_generator && envsubst < /etc/envoy/envoy.yaml > /etc/envoy.env_sub.yaml && envoy -c /etc/envoy.env_sub.yaml --component-log-level wasm:info --log-format '[%%Y-%%m-%%d %%T.%%e][%%l] %%v' 2>&1 | tee /var/log/envoy.log | while IFS= read -r line; do echo '[archgw_logs]' \"$line\"; done" +command=/bin/sh -c "uv run python -m planoai.config_generator && envsubst < /etc/envoy/envoy.yaml > /etc/envoy.env_sub.yaml && envoy -c /etc/envoy.env_sub.yaml --component-log-level wasm:debug --log-format '[%%Y-%%m-%%d %%T.%%e][%%l] %%v' 2>&1 | tee /var/log/envoy.log | while IFS= read -r line; do echo '[plano_logs] ' \"$line\"; done" stdout_logfile=/dev/stdout redirect_stderr=true stdout_logfile_maxbytes=0 diff --git a/crates/brightstaff/src/handlers/agent_chat_completions.rs b/crates/brightstaff/src/handlers/agent_chat_completions.rs index 46e9ead1..345ec69a 100644 --- a/crates/brightstaff/src/handlers/agent_chat_completions.rs +++ b/crates/brightstaff/src/handlers/agent_chat_completions.rs @@ -211,6 +211,11 @@ async fn handle_agent_chat( .find(|(key, _)| key.as_str() == TRACE_PARENT_HEADER) .map(|(_, value)| value.to_str().unwrap_or_default().to_string()); + let request_id = request_headers + .get(common::consts::REQUEST_ID_HEADER) + .and_then(|val| val.to_str().ok()) + .map(|s| s.to_string()); + // Create agent map for pipeline processing and agent selection let agent_map = { let agents = agents_list.read().await; @@ -231,7 +236,12 @@ async fn handle_agent_chat( let selection_start_instant = Instant::now(); let selected_agents = agent_selector - .select_agents(&message, &listener, trace_parent.clone()) + .select_agents( + &message, + &listener, + trace_parent.clone(), + request_id.clone(), + ) .await?; // Record agent selection span diff --git a/crates/brightstaff/src/handlers/agent_selector.rs b/crates/brightstaff/src/handlers/agent_selector.rs index 78fbc654..727dc6c0 100644 --- a/crates/brightstaff/src/handlers/agent_selector.rs +++ b/crates/brightstaff/src/handlers/agent_selector.rs @@ -109,6 +109,7 @@ impl AgentSelector { messages: &[Message], listener: &Listener, trace_parent: Option, + request_id: Option, ) -> Result, AgentSelectionError> { let agents = listener .agents @@ -131,7 +132,7 @@ impl AgentSelector { match self .orchestrator_service - .determine_orchestration(messages, trace_parent, Some(usage_preferences)) + .determine_orchestration(messages, trace_parent, Some(usage_preferences), request_id) .await { Ok(Some(routes)) => { diff --git a/crates/brightstaff/src/handlers/llm.rs b/crates/brightstaff/src/handlers/llm.rs index b311976a..37cb008e 100644 --- a/crates/brightstaff/src/handlers/llm.rs +++ b/crates/brightstaff/src/handlers/llm.rs @@ -208,6 +208,7 @@ pub async fn llm_chat( trace_collector.clone(), &traceparent, &request_path, + Some(request_id.to_string()), ) .await { diff --git a/crates/brightstaff/src/handlers/router_chat.rs b/crates/brightstaff/src/handlers/router_chat.rs index ed0d6d31..b4045095 100644 --- a/crates/brightstaff/src/handlers/router_chat.rs +++ b/crates/brightstaff/src/handlers/router_chat.rs @@ -1,5 +1,4 @@ use common::configuration::ModelUsagePreference; -use common::consts::REQUEST_ID_HEADER; use common::traces::{parse_traceparent, SpanBuilder, SpanKind, TraceCollector}; use hermesllm::clients::endpoints::SupportedUpstreamAPIs; use hermesllm::{ProviderRequest, ProviderRequestType}; @@ -37,17 +36,16 @@ impl RoutingError { pub async fn router_chat_get_upstream_model( router_service: Arc, client_request: ProviderRequestType, - request_headers: &hyper::HeaderMap, + _request_headers: &hyper::HeaderMap, trace_collector: Arc, traceparent: &str, request_path: &str, + request_id: Option, ) -> Result { // Clone metadata for routing before converting (which consumes client_request) let routing_metadata = client_request.metadata().clone(); - let request_id = request_headers - .get(REQUEST_ID_HEADER) - .and_then(|value| value.to_str().ok()) - .unwrap_or("unknown"); + + let request_id = request_id.unwrap_or_else(|| "unknown".to_string()); // Convert to ChatCompletionsRequest for routing (regardless of input type) let chat_request = match ProviderRequestType::try_from(( @@ -79,17 +77,11 @@ pub async fn router_chat_get_upstream_model( }; debug!( - "[PLANO_REQ_ID: {}]: ROUTER_REQ: {}", + "[PLANO_REQ_ID: {:?}]: ROUTER_REQ: {}", request_id, &serde_json::to_string(&chat_request).unwrap() ); - // Extract trace_parent from headers - let trace_parent = request_headers - .iter() - .find(|(ty, _)| ty.as_str() == "traceparent") - .map(|(_, value)| value.to_str().unwrap_or_default().to_string()); - // Extract usage preferences from metadata let usage_preferences_str: Option = routing_metadata.as_ref().and_then(|metadata| { metadata @@ -121,7 +113,7 @@ pub async fn router_chat_get_upstream_model( }; info!( - "[PLANO_REQ_ID: {}] | ROUTER_REQ | Usage preferences from request: {}, request_path: {}, latest message: {}", + "[PLANO_REQ_ID: {:?}] | ROUTER_REQ | Usage preferences from request: {}, request_path: {}, latest message: {}", request_id, usage_preferences.is_some(), request_path, @@ -134,7 +126,12 @@ pub async fn router_chat_get_upstream_model( // Attempt to determine route using the router service let routing_result = router_service - .determine_route(&chat_request.messages, trace_parent, usage_preferences) + .determine_route( + &chat_request.messages, + Some(traceparent.to_string()), + usage_preferences, + Some(request_id.clone()), + ) .await; match routing_result { diff --git a/crates/brightstaff/src/router/llm_router.rs b/crates/brightstaff/src/router/llm_router.rs index e0e85a8d..6d9711f2 100644 --- a/crates/brightstaff/src/router/llm_router.rs +++ b/crates/brightstaff/src/router/llm_router.rs @@ -2,7 +2,7 @@ use std::{collections::HashMap, sync::Arc}; use common::{ configuration::{LlmProvider, ModelUsagePreference, RoutingPreference}, - consts::ARCH_PROVIDER_HINT_HEADER, + consts::{ARCH_PROVIDER_HINT_HEADER, REQUEST_ID_HEADER, TRACE_PARENT_HEADER}, }; use hermesllm::apis::openai::{ChatCompletionsResponse, Message}; use hyper::header; @@ -79,6 +79,7 @@ impl RouterService { messages: &[Message], trace_parent: Option, usage_preferences: Option>, + request_id: Option, ) -> Result> { if messages.is_empty() { return Ok(None); @@ -118,11 +119,18 @@ impl RouterService { if let Some(trace_parent) = trace_parent { llm_route_request_headers.insert( - header::HeaderName::from_static("traceparent"), + header::HeaderName::from_static(TRACE_PARENT_HEADER), header::HeaderValue::from_str(&trace_parent).unwrap(), ); } + if let Some(request_id) = request_id { + llm_route_request_headers.insert( + header::HeaderName::from_static(REQUEST_ID_HEADER), + header::HeaderValue::from_str(&request_id).unwrap(), + ); + } + llm_route_request_headers.insert( header::HeaderName::from_static("model"), header::HeaderValue::from_static("arch-router"), diff --git a/crates/brightstaff/src/router/plano_orchestrator.rs b/crates/brightstaff/src/router/plano_orchestrator.rs index 5aff4b11..ee91d52e 100644 --- a/crates/brightstaff/src/router/plano_orchestrator.rs +++ b/crates/brightstaff/src/router/plano_orchestrator.rs @@ -2,7 +2,10 @@ use std::{collections::HashMap, sync::Arc}; use common::{ configuration::{AgentUsagePreference, OrchestrationPreference}, - consts::{ARCH_PROVIDER_HINT_HEADER, PLANO_ORCHESTRATOR_MODEL_NAME}, + consts::{ + ARCH_PROVIDER_HINT_HEADER, PLANO_ORCHESTRATOR_MODEL_NAME, REQUEST_ID_HEADER, + TRACE_PARENT_HEADER, + }, }; use hermesllm::apis::openai::{ChatCompletionsResponse, Message}; use hyper::header; @@ -56,6 +59,7 @@ impl OrchestratorService { messages: &[Message], trace_parent: Option, usage_preferences: Option>, + request_id: Option, ) -> Result>> { if messages.is_empty() { return Ok(None); @@ -94,11 +98,18 @@ impl OrchestratorService { if let Some(trace_parent) = trace_parent { orchestration_request_headers.insert( - header::HeaderName::from_static("traceparent"), + header::HeaderName::from_static(TRACE_PARENT_HEADER), header::HeaderValue::from_str(&trace_parent).unwrap(), ); } + if let Some(request_id) = request_id { + orchestration_request_headers.insert( + header::HeaderName::from_static(REQUEST_ID_HEADER), + header::HeaderValue::from_str(&request_id).unwrap(), + ); + } + orchestration_request_headers.insert( header::HeaderName::from_static("model"), header::HeaderValue::from_static(PLANO_ORCHESTRATOR_MODEL_NAME), diff --git a/demos/use_cases/travel_agents/src/travel_agents/weather_agent.py b/demos/use_cases/travel_agents/src/travel_agents/weather_agent.py index c080aae7..76566d79 100644 --- a/demos/use_cases/travel_agents/src/travel_agents/weather_agent.py +++ b/demos/use_cases/travel_agents/src/travel_agents/weather_agent.py @@ -61,7 +61,13 @@ def get_last_user_content(messages: list) -> str: return "" -async def get_weather_data(request: Request, messages: list, days: int = 1): +async def get_weather_data( + request: Request, + messages: list, + days: int = 1, + traceparent_header: str = None, + request_id: str = None, +): """Extract location from user's conversation and fetch weather data from Open-Meteo API. This function does two things: @@ -97,8 +103,9 @@ If no city can be found, output: NOT_FOUND""" else: ctx = extract(request.headers) extra_headers = {} + if request_id: + extra_headers["x-request-id"] = request_id inject(extra_headers, context=ctx) - # For location extraction, pass full conversation for context (e.g., "there" = previous destination) response = await openai_client_via_plano.chat.completions.create( model=LOCATION_MODEL, @@ -226,12 +233,12 @@ If no city can be found, output: NOT_FOUND""" "day_name": date_obj.strftime("%A"), "temperature_c": round(temp_c, 1) if temp_c is not None else None, "temperature_f": celsius_to_fahrenheit(temp_c), - "temperature_max_c": round(temp_max, 1) - if temp_max is not None - else None, - "temperature_min_c": round(temp_min, 1) - if temp_min is not None - else None, + "temperature_max_c": ( + round(temp_max, 1) if temp_max is not None else None + ), + "temperature_min_c": ( + round(temp_min, 1) if temp_min is not None else None + ), "weather_code": weather_code, "sunrise": sunrise.split("T")[1] if sunrise else None, "sunset": sunset.split("T")[1] if sunset else None, @@ -270,9 +277,10 @@ async def handle_request(request: Request): ) traceparent_header = request.headers.get("traceparent") + request_id = request.headers.get("x-request-id") return StreamingResponse( - invoke_weather_agent(request, request_body, traceparent_header), + invoke_weather_agent(request, request_body, traceparent_header, request_id), media_type="text/plain", headers={ "content-type": "text/event-stream", @@ -281,7 +289,10 @@ async def handle_request(request: Request): async def invoke_weather_agent( - request: Request, request_body: dict, traceparent_header: str = None + request: Request, + request_body: dict, + traceparent_header: str = None, + request_id: str = None, ): """Generate streaming chat completions.""" messages = request_body.get("messages", []) @@ -304,7 +315,9 @@ async def invoke_weather_agent( days = min(requested_days, 16) # API supports max 16 days # Get live weather data (location extraction happens inside this function) - weather_data = await get_weather_data(request, messages, days) + weather_data = await get_weather_data( + request, messages, days, traceparent_header, request_id + ) # Create weather context to append to user message forecast_type = "forecast" if days > 1 else "current weather" @@ -351,6 +364,8 @@ Present the weather information to the user in a clear, readable format. If ther try: ctx = extract(request.headers) extra_headers = {"x-envoy-max-retries": "3"} + if request_id: + extra_headers["x-request-id"] = request_id inject(extra_headers, context=ctx) stream = await openai_client_via_plano.chat.completions.create(