diff --git a/crates/brightstaff/src/handlers/llm.rs b/crates/brightstaff/src/handlers/llm.rs index 5e744c8d..5af6f112 100644 --- a/crates/brightstaff/src/handlers/llm.rs +++ b/crates/brightstaff/src/handlers/llm.rs @@ -12,7 +12,7 @@ use hyper::{Request, Response, StatusCode}; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::RwLock; -use tracing::{debug, warn}; +use tracing::{debug, info, warn}; use crate::router::llm_router::RouterService; use crate::handlers::utils::{create_streaming_response, ObservableStreamProcessor, truncate_message}; @@ -63,7 +63,7 @@ pub async fn llm_chat( let chat_request_bytes = request.collect().await?.to_bytes(); debug!( - "[PLANO_REQ_ID:{}] | BRIGHTSTAFF | REQUEST BODY (raw utf8): {}", + "[PLANO_REQ_ID:{}] | REQUEST_BODY (UTF8): {}", request_id, String::from_utf8_lossy(&chat_request_bytes) ); @@ -74,8 +74,8 @@ pub async fn llm_chat( )) { Ok(request) => request, Err(err) => { - warn!("[PLANO_REQ_ID:{}] | BRIGHTSTAFF | Failed to parse request as ProviderRequestType: {}", request_id, err); - let err_msg = format!("[PLANO_REQ_ID:{}] | BRIGHTSTAFF | Failed to parse request: {}", request_id, err); + warn!("[PLANO_REQ_ID:{}] | FAILURE | Failed to parse request as ProviderRequestType: {}", request_id, err); + let err_msg = format!("[PLANO_REQ_ID:{}] | FAILURE | Failed to parse request: {}", request_id, err); let mut bad_request = Response::new(full(err_msg)); *bad_request.status_mut() = StatusCode::BAD_REQUEST; return Ok(bad_request); @@ -101,7 +101,7 @@ pub async fn llm_chat( client_request.set_model(resolved_model.clone()); if client_request.remove_metadata_key("archgw_preference_config") { - debug!("[PLANO (BRIGHTSTAFF)] Removed archgw_preference_config from metadata"); + debug!("[PLANO_REQ_ID:{}] Removed archgw_preference_config from metadata", request_id); } // === v1/responses state management: Determine upstream API and combine input if needed === @@ -140,14 +140,14 @@ pub async fn llm_chat( // Update both the request and original_input_items responses_req.input = InputParam::Items(combined_input.clone()); original_input_items = combined_input; - debug!("[PLANO (BRIGHTSTAFF)] Updated request with conversation history ({} items)", original_input_items.len()); + info!("[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Updated request with conversation history ({} items)", request_id, original_input_items.len()); } Err(StateStorageError::NotFound(_)) => { // Return 409 Conflict when previous_response_id not found - warn!("[PLANO (BRIGHTSTAFF)] Previous response_id not found: {}", prev_resp_id); + warn!("[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Previous response_id not found: {}", request_id, prev_resp_id); let err_msg = format!( - "[PLANO (BRIGHTSTAFF)] Conversation state not found for previous_response_id: {}", - prev_resp_id + "[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Conversation state not found for previous_response_id: {}", + request_id, prev_resp_id ); let mut conflict_response = Response::new(full(err_msg)); *conflict_response.status_mut() = StatusCode::CONFLICT; @@ -156,8 +156,8 @@ pub async fn llm_chat( Err(e) => { // Log warning but continue on other storage errors warn!( - "Failed to retrieve conversation state for {}: {}", - prev_resp_id, e + "[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Failed to retrieve conversation state for {}: {}", + request_id, prev_resp_id, e ); // Restore original_input_items since we passed ownership original_input_items = extract_input_items(&responses_req.input); @@ -165,7 +165,7 @@ pub async fn llm_chat( } } } else { - debug!("[PLANO (BRIGHTSTAFF)] Upstream supports ResponsesAPI natively, passing through without state management"); + debug!("[PLANO_REQ_ID:{}] | BRIGHT_STAFF | Upstream supports ResponsesAPI natively.", request_id); } } } @@ -195,8 +195,8 @@ pub async fn llm_chat( let model_name = routing_result.model_name; debug!( - "[PLANO ARCH_ROUTER] URL: {}, Resolved Model: {}", - full_qualified_llm_provider_url, model_name + "[PLANO_REQ_ID:{}] | ARCH_ROUTER URL | {}, Resolved Model: {}", + request_id, full_qualified_llm_provider_url, model_name ); request_headers.insert( diff --git a/crates/brightstaff/src/handlers/router_chat.rs b/crates/brightstaff/src/handlers/router_chat.rs index 09b09975..a927a0eb 100644 --- a/crates/brightstaff/src/handlers/router_chat.rs +++ b/crates/brightstaff/src/handlers/router_chat.rs @@ -1,4 +1,5 @@ use common::configuration::ModelUsagePreference; +use common::consts::{REQUEST_ID_HEADER}; use common::traces::{TraceCollector, SpanKind, SpanBuilder, parse_traceparent}; use hermesllm::clients::endpoints::SupportedUpstreamAPIs; use hermesllm::{ProviderRequest, ProviderRequestType}; @@ -43,6 +44,10 @@ pub async fn router_chat_get_upstream_model( ) -> Result { // Clone metadata for routing before converting (which consumes client_request) let routing_metadata = client_request.metadata().clone(); + let request_id = request_headers + .get(REQUEST_ID_HEADER) + .and_then(|value| value.to_str().ok()) + .unwrap_or("unknown"); // Convert to ChatCompletionsRequest for routing (regardless of input type) let chat_request = match ProviderRequestType::try_from(( @@ -73,7 +78,8 @@ pub async fn router_chat_get_upstream_model( }; debug!( - "[ARCH_ROUTER REQ]: {}", + "[PLANO_REQ_ID: {}]: ROUTER_REQ: {}", + request_id, &serde_json::to_string(&chat_request).unwrap() ); @@ -114,14 +120,13 @@ pub async fn router_chat_get_upstream_model( }; info!( - "request received, request type: chat_completion, usage preferences from request: {}, request path: {}, latest message: {}", + "[PLANO_REQ_ID: {}] | ROUTER_REQ | Usage preferences from request: {}, request_path: {}, latest message: {}", + request_id, usage_preferences.is_some(), request_path, latest_message_for_log ); - debug!("usage preferences from request: {:?}", usage_preferences); - // Capture start time for routing span let routing_start_time = std::time::Instant::now(); let routing_start_system_time = std::time::SystemTime::now(); @@ -153,7 +158,8 @@ pub async fn router_chat_get_upstream_model( None => { // No route determined, use default model from request info!( - "No route determined, using default model from request: {}", + "[PLANO_REQ_ID: {}] | ROUTER_REQ | No route determined, using default model from request: {}", + request_id, chat_request.model ); diff --git a/crates/brightstaff/src/state/mod.rs b/crates/brightstaff/src/state/mod.rs index 2eedae6f..efd9b0a5 100644 --- a/crates/brightstaff/src/state/mod.rs +++ b/crates/brightstaff/src/state/mod.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; use std::error::Error; use std::fmt; use std::sync::Arc; -use tracing::{debug, info}; +use tracing::{debug}; pub mod memory; pub mod response_state_processor; @@ -139,19 +139,9 @@ pub async fn retrieve_and_combine_input( previous_response_id: &str, current_input: Vec, ) -> Result, StateStorageError> { - info!( - "Retrieving conversation state for previous_response_id: {}", - previous_response_id - ); // First get the previous state let prev_state = storage.get(previous_response_id).await?; let combined_input = storage.merge(&prev_state, current_input); - - debug!( - "Retrieved and merged conversation state: {} total input items", - combined_input.len() - ); - Ok(combined_input) } diff --git a/crates/brightstaff/src/state/response_state_processor.rs b/crates/brightstaff/src/state/response_state_processor.rs index c634df4e..b3ce6787 100644 --- a/crates/brightstaff/src/state/response_state_processor.rs +++ b/crates/brightstaff/src/state/response_state_processor.rs @@ -93,7 +93,7 @@ impl ResponsesStateProcessor

{ match decoder.read_to_end(&mut decompressed) { Ok(_) => { debug!( - "[PLANO_REQ_ID:{}] | BRIGHTSTAFF | STATE_PROCESSOR | Successfully decompressed {} bytes to {} bytes", + "[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Successfully decompressed {} bytes to {} bytes", self.request_id, self.chunk_buffer.len(), decompressed.len() @@ -102,7 +102,7 @@ impl ResponsesStateProcessor

{ } Err(e) => { warn!( - "[PLANO_REQ_ID:{}] | BRIGHTSTAFF | STATE_PROCESSOR | Failed to decompress gzip buffer: {}", + "[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Failed to decompress gzip buffer: {}", self.request_id, e ); @@ -112,7 +112,7 @@ impl ResponsesStateProcessor

{ } Some(encoding) => { warn!( - "[PLANO_REQ_ID:{}] | BRIGHTSTAFF | STATE_PROCESSOR | Unsupported Content-Encoding: {}. Only gzip is currently supported.", + "[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Unsupported Content-Encoding: {}. Only gzip is currently supported.", self.request_id, encoding ); @@ -143,12 +143,11 @@ impl ResponsesStateProcessor

{ if let Ok(stream_event) = serde_json::from_str::(data_str) { // Check if this is a ResponseCompleted event if let ResponsesAPIStreamEvent::ResponseCompleted { response, .. } = stream_event { - debug!( - "[PLANO_REQ_ID:{}] | BRIGHTSTAFF | STATE_PROCESSOR | Captured streaming response.completed: response_id={}, output_items={}, output_json={}", + info!( + "[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Captured streaming response.completed: response_id={}, output_items={}", self.request_id, response.id, - response.output.len(), - serde_json::to_string(&response.output).unwrap_or_else(|_| "serialization_error".to_string()) + response.output.len() ); self.response_id = Some(response.id.clone()); self.output_items = Some(response.output.clone()); @@ -176,7 +175,7 @@ impl ResponsesStateProcessor

{ match serde_json::from_slice::(&decompressed) { Ok(response) => { info!( - "[PLANO_REQ_ID:{}] | BRIGHTSTAFF | STATE_PROCESSOR | Captured non-streaming response: response_id={}, output_items={}", + "[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Captured non-streaming response: response_id={}, output_items={}", self.request_id, response.id, response.output.len() @@ -189,7 +188,7 @@ impl ResponsesStateProcessor

{ let chunk_preview = String::from_utf8_lossy(&decompressed); let preview_len = chunk_preview.len().min(200); warn!( - "[PLANO_REQ_ID:{}] | BRIGHTSTAFF | STATE_PROCESSOR | Failed to parse non-streaming ResponsesAPIResponse: {}. Decompressed preview (first {} bytes): {}", + "[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Failed to parse non-streaming ResponsesAPIResponse: {}. Decompressed preview (first {} bytes): {}", self.request_id, e, preview_len, @@ -223,7 +222,7 @@ impl StreamProcessor for ResponsesStateProcessor

{ // Skip storage for OpenAI upstream if self.is_openai_upstream { debug!( - "[PLANO_REQ_ID:{}] | BRIGHTSTAFF | STATE_PROCESSOR | Skipping state storage for OpenAI upstream provider", + "[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Skipping state storage for OpenAI upstream provider", self.request_id ); return; @@ -231,17 +230,11 @@ impl StreamProcessor for ResponsesStateProcessor

{ // Store state if we captured response_id and output if let (Some(response_id), Some(output_items)) = (&self.response_id, &self.output_items) { - debug!( - "[PLANO_REQ_ID:{}] | BRIGHTSTAFF | STATE_PROCESSOR | Output items before conversion: {}", - self.request_id, - serde_json::to_string(&output_items).unwrap_or_else(|_| "serialization_error".to_string()) - ); - // Convert output items to input items for next request let output_as_inputs = outputs_to_inputs(output_items); debug!( - "[PLANO_REQ_ID:{}] | BRIGHTSTAFF | STATE_PROCESSOR | Converting outputs to inputs: output_items_count={}, converted_input_items_count={}", + "[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Converting outputs to inputs: output_items_count={}, converted_input_items_count={}", self.request_id, output_items.len(), output_as_inputs.len() ); @@ -250,7 +243,7 @@ impl StreamProcessor for ResponsesStateProcessor

{ combined_input.extend(output_as_inputs); debug!( - "[PLANO_REQ_ID:{}] | BRIGHTSTAFF | STATE_PROCESSOR | Storing state: original_input_count={}, combined_input_count={}, combined_json={}", + "[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Storing state: original_input_count={}, combined_input_count={}, combined_json={}", self.request_id, self.original_input.len(), combined_input.len(), @@ -272,18 +265,20 @@ impl StreamProcessor for ResponsesStateProcessor

{ let storage = self.storage.clone(); let response_id_clone = response_id.clone(); let request_id = self.request_id.clone(); + let items_count = state.input_items.len(); tokio::spawn(async move { match storage.put(state).await { Ok(()) => { - debug!( - "[PLANO_REQ_ID:{}] | BRIGHTSTAFF | STATE_PROCESSOR | Successfully stored conversation state for response_id: {}", + info!( + "[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Successfully stored conversation state for response_id: {}, items_count={}", request_id, - response_id_clone + response_id_clone, + items_count ); } Err(e) => { warn!( - "[PLANO_REQ_ID:{}] | BRIGHTSTAFF | STATE_PROCESSOR | Failed to store conversation state for response_id {}: {}", + "[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Failed to store conversation state for response_id {}: {}", request_id, response_id_clone, e @@ -293,7 +288,7 @@ impl StreamProcessor for ResponsesStateProcessor

{ }); } else { warn!( - "[PLANO_REQ_ID:{}] | BRIGHTSTAFF | STATE_PROCESSOR | No response_id captured from upstream response - cannot store conversation state. response_id present: {}, output present: {}", + "[PLANO_REQ_ID:{}] | STATE_PROCESSOR | No response_id captured from upstream response - cannot store conversation state. response_id present: {}, output present: {}", self.request_id, self.response_id.is_some(), self.output_items.is_some()