From b861f41c031e78ecb91c55010fa99cfa22fffe63 Mon Sep 17 00:00:00 2001 From: Adil Hafeez Date: Thu, 5 Feb 2026 16:10:21 -0800 Subject: [PATCH] use standard logging format --- config/envoy.template.yaml | 2 +- crates/.vscode/launch.json | 2 +- .../src/handlers/agent_chat_completions.rs | 227 ++++++++++-------- .../src/handlers/agent_selector.rs | 12 +- .../src/handlers/function_calling.rs | 52 ++-- crates/brightstaff/src/handlers/llm.rs | 114 +++++---- .../src/handlers/pipeline_processor.rs | 44 ++-- .../src/handlers/response_handler.rs | 8 +- .../brightstaff/src/handlers/router_chat.rs | 23 +- crates/brightstaff/src/handlers/utils.rs | 80 +++--- crates/brightstaff/src/main.rs | 22 +- crates/brightstaff/src/router/llm_router.rs | 26 +- .../src/router/orchestrator_model_v1.rs | 30 +-- .../src/router/plano_orchestrator.rs | 26 +- .../brightstaff/src/router/router_model_v1.rs | 30 +-- .../src/state/response_state_processor.rs | 81 +++---- crates/brightstaff/src/utils/tracing.rs | 55 ++++- .../docker-compose.yaml | 2 + 18 files changed, 458 insertions(+), 378 deletions(-) diff --git a/config/envoy.template.yaml b/config/envoy.template.yaml index fe82edb5..f29c5964 100644 --- a/config/envoy.template.yaml +++ b/config/envoy.template.yaml @@ -1018,7 +1018,7 @@ static_resources: - endpoint: address: socket_address: - address: host.docker.internal + address: 0.0.0.0 port_value: 9091 hostname: localhost diff --git a/crates/.vscode/launch.json b/crates/.vscode/launch.json index 22defe04..ac4c3847 100644 --- a/crates/.vscode/launch.json +++ b/crates/.vscode/launch.json @@ -13,7 +13,7 @@ "env": { "RUST_LOG": "debug", "RUST_BACKTRACE": "1", - "ARCH_CONFIG_PATH_RENDERED": "../demos/use_cases/mcp_filter/config.yaml_rendered", + "ARCH_CONFIG_PATH_RENDERED": "../demos/use_cases/multi_agent_with_crewai_langchain/config.yaml_rendered", "OTEL_COLLECTOR_URL": "http://localhost:4317", "OTEL_TRACING_ENABLED": "true" }, diff --git a/crates/brightstaff/src/handlers/agent_chat_completions.rs b/crates/brightstaff/src/handlers/agent_chat_completions.rs index 427b3b1c..8498e5ce 100644 --- a/crates/brightstaff/src/handlers/agent_chat_completions.rs +++ b/crates/brightstaff/src/handlers/agent_chat_completions.rs @@ -10,7 +10,7 @@ use http_body_util::combinators::BoxBody; use http_body_util::BodyExt; use hyper::{Request, Response}; use serde::ser::Error as SerError; -use tracing::{debug, info, instrument, warn}; +use tracing::{debug, info, info_span, warn, Instrument}; use super::agent_selector::{AgentSelectionError, AgentSelector}; use super::pipeline_processor::{PipelineError, PipelineProcessor}; @@ -39,90 +39,117 @@ pub async fn agent_chat( agents_list: Arc>>>, listeners: Arc>>, ) -> Result>, hyper::Error> { - match handle_agent_chat(request, orchestrator_service, agents_list, listeners).await { - Ok(response) => Ok(response), - Err(err) => { - // Check if this is a client error from the pipeline that should be cascaded - if let AgentFilterChainError::Pipeline(PipelineError::ClientError { - agent, - status, - body, - }) = &err - { - warn!( - "Client error from agent '{}' (HTTP {}): {}", - agent, status, body - ); + // Extract request_id from headers or generate a new one + let request_id: String = match request + .headers() + .get(common::consts::REQUEST_ID_HEADER) + .and_then(|h| h.to_str().ok()) + .map(|s| s.to_string()) + { + Some(id) => id, + None => uuid::Uuid::new_v4().to_string(), + }; - // Create error response with the original status code and body - let error_json = serde_json::json!({ - "error": "ClientError", - "agent": agent, - "status": status, - "agent_response": body - }); - - let json_string = error_json.to_string(); - let mut response = Response::new(ResponseHandler::create_full_body(json_string)); - *response.status_mut() = - hyper::StatusCode::from_u16(*status).unwrap_or(hyper::StatusCode::BAD_REQUEST); - response.headers_mut().insert( - hyper::header::CONTENT_TYPE, - "application/json".parse().unwrap(), - ); - return Ok(response); - } - - // Print detailed error information with full error chain for other errors - let mut error_chain = Vec::new(); - let mut current_error: &dyn std::error::Error = &err; - - // Collect the full error chain - loop { - error_chain.push(current_error.to_string()); - match current_error.source() { - Some(source) => current_error = source, - None => break, - } - } - - // Log the complete error chain - warn!("Agent chat error chain: {:#?}", error_chain); - warn!("Root error: {:?}", err); - - // Create structured error response as JSON - let error_json = serde_json::json!({ - "error": { - "type": "AgentFilterChainError", - "message": err.to_string(), - "error_chain": error_chain, - "debug_info": format!("{:?}", err) - } - }); - - // Log the error for debugging - info!("Structured error info: {}", error_json); - - // Return JSON error response - Ok(ResponseHandler::create_json_error_response(&error_json)) - } - } -} - -#[instrument( - name = "agent_chat_handler", - skip(request, orchestrator_service, agents_list, listeners), - level = "info", - fields( + // Create a span with request_id that will be included in all log lines + let request_span = info_span!( + "agent_chat_handler", + request_id = %request_id, http.method = %request.method(), http.path = %request.uri().path() - ) -)] -async fn handle_agent_chat( + ); + + // Execute the handler inside the span + async { + match handle_agent_chat_inner( + request, + orchestrator_service, + agents_list, + listeners, + request_id, + ) + .await + { + Ok(response) => Ok(response), + Err(err) => { + // Check if this is a client error from the pipeline that should be cascaded + if let AgentFilterChainError::Pipeline(PipelineError::ClientError { + agent, + status, + body, + }) = &err + { + warn!( + agent = %agent, + status = %status, + body = %body, + "client error from agent" + ); + + // Create error response with the original status code and body + let error_json = serde_json::json!({ + "error": "ClientError", + "agent": agent, + "status": status, + "agent_response": body + }); + + let json_string = error_json.to_string(); + let mut response = + Response::new(ResponseHandler::create_full_body(json_string)); + *response.status_mut() = hyper::StatusCode::from_u16(*status) + .unwrap_or(hyper::StatusCode::BAD_REQUEST); + response.headers_mut().insert( + hyper::header::CONTENT_TYPE, + "application/json".parse().unwrap(), + ); + return Ok(response); + } + + // Print detailed error information with full error chain for other errors + let mut error_chain = Vec::new(); + let mut current_error: &dyn std::error::Error = &err; + + // Collect the full error chain + loop { + error_chain.push(current_error.to_string()); + match current_error.source() { + Some(source) => current_error = source, + None => break, + } + } + + // Log the complete error chain + warn!(error_chain = ?error_chain, "agent chat error chain"); + warn!(root_error = ?err, "root error"); + + // Create structured error response as JSON + let error_json = serde_json::json!({ + "error": { + "type": "AgentFilterChainError", + "message": err.to_string(), + "error_chain": error_chain, + "debug_info": format!("{:?}", err) + } + }); + + // Log the error for debugging + info!(error = %error_json, "structured error info"); + + // Return JSON error response + Ok(ResponseHandler::create_json_error_response(&error_json)) + } + } + } + .instrument(request_span) + .await +} + +async fn handle_agent_chat_inner( request: Request, orchestrator_service: Arc, agents_list: Arc>>>, listeners: Arc>>, + request_id: String, ) -> Result>, AgentFilterChainError> { // Initialize services let agent_selector = AgentSelector::new(orchestrator_service); @@ -143,7 +170,7 @@ async fn handle_agent_chat( .await? }; - info!("Handling request for listener: {}", listener.name); + info!(listener = %listener.name, "handling request"); // Parse request body let request_path = request @@ -158,12 +185,8 @@ async fn handle_agent_chat( let mut headers = request.headers().clone(); headers.remove(common::consts::ENVOY_ORIGINAL_PATH_HEADER); + // Set the request_id in headers if not already present if !headers.contains_key(common::consts::REQUEST_ID_HEADER) { - let request_id = uuid::Uuid::new_v4().to_string(); - info!( - "Request id not found in headers, generated new request id: {}", - request_id - ); headers.insert( common::consts::REQUEST_ID_HEADER, hyper::header::HeaderValue::from_str(&request_id).unwrap(), @@ -176,8 +199,8 @@ async fn handle_agent_chat( let chat_request_bytes = request.collect().await?.to_bytes(); debug!( - "Received request body (raw utf8): {}", - String::from_utf8_lossy(&chat_request_bytes) + body = %String::from_utf8_lossy(&chat_request_bytes), + "received request body" ); // Determine the API type from the endpoint @@ -191,7 +214,7 @@ async fn handle_agent_chat( let client_request = match ProviderRequestType::try_from((&chat_request_bytes[..], &api_type)) { Ok(request) => request, Err(err) => { - warn!("Failed to parse request as ProviderRequestType: {}", err); + warn!("failed to parse request as ProviderRequestType: {}", err); let err_msg = format!("Failed to parse request: {}", err); return Err(AgentFilterChainError::RequestParsing( serde_json::Error::custom(err_msg), @@ -224,7 +247,10 @@ async fn handle_agent_chat( .select_agents(&message, &listener, traceparent.clone(), request_id.clone()) .await?; - info!("Selected {} agent(s) for execution", selected_agents.len()); + info!( + count = selected_agents.len(), + "selected agents for execution" + ); // Execute agents sequentially, passing output from one to the next let mut current_messages = message.clone(); @@ -234,10 +260,10 @@ async fn handle_agent_chat( let is_last_agent = agent_index == agent_count - 1; debug!( - "Processing agent {}/{}: {}", - agent_index + 1, - agent_count, - selected_agent.id + agent_index = agent_index + 1, + total = agent_count, + agent = %selected_agent.id, + "processing agent" ); // Get agent name @@ -256,7 +282,7 @@ async fn handle_agent_chat( // Get agent details and invoke let agent = agent_map.get(&agent_name).unwrap(); - debug!("Invoking agent: {}", agent_name); + debug!(agent = %agent_name, "invoking agent"); let llm_response = pipeline_processor .invoke_agent( @@ -270,8 +296,8 @@ async fn handle_agent_chat( // If this is the last agent, return the streaming response if is_last_agent { info!( - "Completed agent chain, returning response from last agent: {}", - agent_name + agent = %agent_name, + "completed agent chain, returning response" ); return response_handler .create_streaming_response(llm_response) @@ -280,16 +306,13 @@ async fn handle_agent_chat( } // For intermediate agents, collect the full response and pass to next agent - debug!( - "Collecting response from intermediate agent: {}", - agent_name - ); + debug!(agent = %agent_name, "collecting response from intermediate agent"); let response_text = response_handler.collect_full_response(llm_response).await?; info!( - "Agent {} completed, passing {} character response to next agent", - agent_name, - response_text.len() + agent = %agent_name, + response_len = response_text.len(), + "agent completed, passing response to next agent" ); // remove last message and add new one at the end diff --git a/crates/brightstaff/src/handlers/agent_selector.rs b/crates/brightstaff/src/handlers/agent_selector.rs index 727dc6c0..e257134a 100644 --- a/crates/brightstaff/src/handlers/agent_selector.rs +++ b/crates/brightstaff/src/handlers/agent_selector.rs @@ -75,7 +75,7 @@ impl AgentSelector { .cloned() .or_else(|| { warn!( - "No default agent found, routing request to first agent: {}", + "no default agent found, routing request to first agent: {}", agents[0].id ); Some(agents[0].clone()) @@ -118,7 +118,7 @@ impl AgentSelector { // If only one agent, skip orchestration if agents.len() == 1 { - debug!("Only one agent available, skipping orchestration"); + debug!("only one agent available, skipping orchestration"); return Ok(vec![agents[0].clone()]); } @@ -136,11 +136,11 @@ impl AgentSelector { .await { Ok(Some(routes)) => { - debug!("Determined {} agent(s) via orchestration", routes.len()); + debug!(count = routes.len(), "determined agents via orchestration"); let mut selected_agents = Vec::new(); for (route_name, agent_name) in routes { - debug!("Processing route: {}, agent: {}", route_name, agent_name); + debug!(route = %route_name, agent = %agent_name, "processing route"); let selected_agent = agents .iter() .find(|a| a.id == agent_name) @@ -155,14 +155,14 @@ impl AgentSelector { } if selected_agents.is_empty() { - debug!("No agents determined using orchestration, using default agent"); + debug!("no agents determined via orchestration, using default"); Ok(vec![self.get_default_agent(agents, &listener.name)?]) } else { Ok(selected_agents) } } Ok(None) => { - debug!("No agents determined using orchestration, using default agent"); + debug!("no agents determined using orchestration, using default agent"); Ok(vec![self.get_default_agent(agents, &listener.name)?]) } Err(err) => Err(AgentSelectionError::OrchestrationError(err.to_string())), diff --git a/crates/brightstaff/src/handlers/function_calling.rs b/crates/brightstaff/src/handlers/function_calling.rs index 7ba15e2d..0d6c6d3c 100644 --- a/crates/brightstaff/src/handlers/function_calling.rs +++ b/crates/brightstaff/src/handlers/function_calling.rs @@ -944,7 +944,7 @@ impl ArchFunctionHandler { ) -> Result { use tracing::{error, info}; - info!("[Arch-Function] - ChatCompletion"); + info!("processing chat completion request"); let messages = self.process_messages( &request.messages, @@ -955,9 +955,9 @@ impl ArchFunctionHandler { )?; info!( - "[request to arch-fc]: model: {}, messages count: {}", - self.model_name, - messages.len() + model = %self.model_name, + message_count = messages.len(), + "sending request to arch-fc" ); let use_agent_orchestrator = request @@ -991,7 +991,7 @@ impl ArchFunctionHandler { } } } - info!("[Agent Orchestrator]: response received"); + info!("agent orchestrator response received"); } else if let Some(tools) = request.tools.as_ref() { let mut hallucination_state = HallucinationState::new(tools); let mut has_tool_calls = None; @@ -1040,7 +1040,10 @@ impl ArchFunctionHandler { } if has_tool_calls == Some(true) && has_hallucination { - info!("[Hallucination]: {}", hallucination_state.error_message); + info!( + "detected hallucination: {}", + hallucination_state.error_message + ); let clarify_messages = self.prefill_message(messages.clone(), &self.clarify_prefix); let clarify_request = self.create_request_with_extra_body(clarify_messages, false); @@ -1075,8 +1078,8 @@ impl ArchFunctionHandler { let response_dict = self.parse_model_response(&model_response); info!( - "[arch-fc]: raw model response: {}", - response_dict.raw_response + raw_response = %response_dict.raw_response, + "arch-fc model response" ); // General model response (no intent matched - should route to default target) @@ -1126,7 +1129,7 @@ impl ArchFunctionHandler { if verification.is_valid { info!( - "[Tool calls]: {:?}", + "tool calls extracted: {:?}", response_dict .tool_calls .iter() @@ -1143,7 +1146,7 @@ impl ArchFunctionHandler { tool_calls: Some(response_dict.tool_calls.clone()), } } else { - error!("Invalid tool call - {}", verification.error_message); + error!(error = %verification.error_message, "invalid tool call"); ResponseMessage { role: Role::Assistant, content: Some(String::new()), @@ -1155,7 +1158,7 @@ impl ArchFunctionHandler { } } } else { - error!("Tool calls present but no tools provided in request"); + error!("tool calls present but no tools provided in request"); ResponseMessage { role: Role::Assistant, content: Some(String::new()), @@ -1168,7 +1171,7 @@ impl ArchFunctionHandler { } } else { info!( - "[Tool calls]: {:?}", + "tool calls extracted: {:?}", response_dict .tool_calls .iter() @@ -1187,8 +1190,8 @@ impl ArchFunctionHandler { } } else { error!( - "Invalid tool calls in response: {}", - response_dict.error_message + error = %response_dict.error_message, + "invalid tool calls in response" ); ResponseMessage { role: Role::Assistant, @@ -1201,7 +1204,7 @@ impl ArchFunctionHandler { } } } else { - error!("Invalid model response - {}", model_response); + error!(response = %model_response, "invalid model response"); ResponseMessage { role: Role::Assistant, content: Some(String::new()), @@ -1244,7 +1247,7 @@ impl ArchFunctionHandler { metadata: Some(metadata), }; - info!("[response arch-fc]: {:?}", chat_completion_response); + info!(response = ?chat_completion_response, "arch-fc response"); Ok(chat_completion_response) } @@ -1331,7 +1334,7 @@ pub async fn function_calling_chat_handler( let mut body_json: Value = match serde_json::from_slice(&whole_body) { Ok(json) => json, Err(e) => { - error!("Failed to parse request body as JSON: {}", e); + error!(error = %e, "failed to parse request body as json"); let mut response = Response::new(full( serde_json::json!({ "error": format!("Invalid request body: {}", e) @@ -1355,13 +1358,13 @@ pub async fn function_calling_chat_handler( let chat_request: ChatCompletionsRequest = match serde_json::from_value(body_json) { Ok(req) => { info!( - "[request body]: {}", - serde_json::to_string(&req).unwrap_or_default() + request_body = %serde_json::to_string(&req).unwrap_or_default(), + "received request" ); req } Err(e) => { - error!("Failed to parse request body: {}", e); + error!(error = %e, "failed to parse request body"); let mut response = Response::new(full( serde_json::json!({ "error": format!("Invalid request body: {}", e) @@ -1384,7 +1387,10 @@ pub async fn function_calling_chat_handler( .and_then(|v| v.as_bool()) .unwrap_or(false); - info!("Use agent orchestrator: {}", use_agent_orchestrator); + info!( + use_agent_orchestrator = use_agent_orchestrator, + "handler mode" + ); // Create the appropriate handler let handler_name = if use_agent_orchestrator { @@ -1415,7 +1421,7 @@ pub async fn function_calling_chat_handler( match final_response { Ok(response_data) => { let response_json = serde_json::to_string(&response_data).unwrap_or_else(|e| { - error!("Failed to serialize response: {}", e); + error!(error = %e, "failed to serialize response"); serde_json::json!({"error": "Failed to serialize response"}).to_string() }); @@ -1428,7 +1434,7 @@ pub async fn function_calling_chat_handler( Ok(response) } Err(e) => { - error!("[{}] - Error in function calling: {}", handler_name, e); + error!(handler = handler_name, error = %e, "error in function calling"); let error_response = serde_json::json!({ "error": format!("[{}] - Error in function calling: {}", handler_name, e) diff --git a/crates/brightstaff/src/handlers/llm.rs b/crates/brightstaff/src/handlers/llm.rs index 1a16fb3c..0fbb151a 100644 --- a/crates/brightstaff/src/handlers/llm.rs +++ b/crates/brightstaff/src/handlers/llm.rs @@ -15,7 +15,7 @@ use opentelemetry::trace::get_active_span; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::RwLock; -use tracing::{debug, info, instrument, warn}; +use tracing::{debug, info, info_span, warn, Instrument}; use crate::handlers::router_chat::router_chat_get_upstream_model; use crate::handlers::utils::{ @@ -34,17 +34,6 @@ fn full>(chunk: T) -> BoxBody { .boxed() } -#[instrument( - name = "llm_chat_handler", - skip_all, - fields( - http.method = %request.method(), - http.path = %request.uri().path(), - model.requested = tracing::field::Empty, - model.alias_resolved = tracing::field::Empty, - model.routing_resolved = tracing::field::Empty - ) -)] pub async fn llm_chat( request: Request, router_service: Arc, @@ -61,16 +50,48 @@ pub async fn llm_chat( .map(|s| s.to_string()) { Some(id) => id, - None => { - let generated_id = uuid::Uuid::new_v4().to_string(); - warn!( - "[PLANO_REQ_ID:{}] | REQUEST_ID header missing, generated new ID", - generated_id - ); - generated_id - } + None => uuid::Uuid::new_v4().to_string(), }; + // Create a span with request_id that will be included in all log lines + let request_span = info_span!( + "llm_chat_handler", + request_id = %request_id, + http.method = %request.method(), + http.path = %request_path, + model.requested = tracing::field::Empty, + model.alias_resolved = tracing::field::Empty, + model.routing_resolved = tracing::field::Empty + ); + + // Execute the rest of the handler inside the span + llm_chat_inner( + request, + router_service, + full_qualified_llm_provider_url, + model_aliases, + llm_providers, + state_storage, + request_id, + request_path, + request_headers, + ) + .instrument(request_span) + .await +} + +#[allow(clippy::too_many_arguments)] +async fn llm_chat_inner( + request: Request, + router_service: Arc, + full_qualified_llm_provider_url: String, + model_aliases: Arc>>, + llm_providers: Arc>, + state_storage: Option>, + request_id: String, + request_path: String, + mut request_headers: hyper::HeaderMap, +) -> Result>, hyper::Error> { // Extract or generate traceparent - this establishes the trace context for all spans let traceparent: String = match request_headers .get(TRACE_PARENT_HEADER) @@ -83,20 +104,18 @@ pub async fn llm_chat( let trace_id = Uuid::new_v4().to_string().replace("-", ""); let generated_tp = format!("00-{}-0000000000000000-01", trace_id); warn!( - "[PLANO_REQ_ID:{}] | TRACE_PARENT header missing, generated new traceparent: {}", - request_id, generated_tp + generated_traceparent = %generated_tp, + "TRACE_PARENT header missing, generated new traceparent" ); generated_tp } }; - let mut request_headers = request_headers; let chat_request_bytes = request.collect().await?.to_bytes(); debug!( - "[PLANO_REQ_ID:{}] | REQUEST_BODY (UTF8): {}", - request_id, - String::from_utf8_lossy(&chat_request_bytes) + body = %String::from_utf8_lossy(&chat_request_bytes), + "request body received" ); let mut client_request = match ProviderRequestType::try_from(( @@ -106,13 +125,10 @@ pub async fn llm_chat( Ok(request) => request, Err(err) => { warn!( - "[PLANO_REQ_ID:{}] | FAILURE | Failed to parse request as ProviderRequestType: {}", - request_id, err - ); - let err_msg = format!( - "[PLANO_REQ_ID:{}] | FAILURE | Failed to parse request: {}", - request_id, err + error = %err, + "failed to parse request as ProviderRequestType" ); + let err_msg = format!("Failed to parse request: {}", err); let mut bad_request = Response::new(full(err_msg)); *bad_request.status_mut() = StatusCode::BAD_REQUEST; return Ok(bad_request); @@ -145,7 +161,7 @@ pub async fn llm_chat( "Model '{}' not found in configured providers", resolved_model ); - warn!("[PLANO_REQ_ID:{}] | FAILURE | {}", request_id, err_msg); + warn!(model = %resolved_model, "model not found in configured providers"); let mut bad_request = Response::new(full(err_msg)); *bad_request.status_mut() = StatusCode::BAD_REQUEST; return Ok(bad_request); @@ -172,10 +188,7 @@ pub async fn llm_chat( // This ensures upstream receives "gpt-4" not "openai/gpt-4" client_request.set_model(model_name_only.clone()); if client_request.remove_metadata_key("archgw_preference_config") { - debug!( - "[PLANO_REQ_ID:{}] Removed archgw_preference_config from metadata", - request_id - ); + debug!("removed archgw_preference_config from metadata"); } // === v1/responses state management: Determine upstream API and combine input if needed === @@ -223,14 +236,17 @@ pub async fn llm_chat( // Update both the request and original_input_items responses_req.input = InputParam::Items(combined_input.clone()); original_input_items = combined_input; - info!("[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Updated request with conversation history ({} items)", request_id, original_input_items.len()); + info!( + items = original_input_items.len(), + "updated request with conversation history" + ); } Err(StateStorageError::NotFound(_)) => { // Return 409 Conflict when previous_response_id not found - warn!("[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Previous response_id not found: {}", request_id, prev_resp_id); + warn!(previous_response_id = %prev_resp_id, "previous response_id not found"); let err_msg = format!( - "[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Conversation state not found for previous_response_id: {}", - request_id, prev_resp_id + "Conversation state not found for previous_response_id: {}", + prev_resp_id ); let mut conflict_response = Response::new(full(err_msg)); *conflict_response.status_mut() = StatusCode::CONFLICT; @@ -239,8 +255,9 @@ pub async fn llm_chat( Err(e) => { // Log warning but continue on other storage errors warn!( - "[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Failed to retrieve conversation state for {}: {}", - request_id, prev_resp_id, e + previous_response_id = %prev_resp_id, + error = %e, + "failed to retrieve conversation state" ); // Restore original_input_items since we passed ownership original_input_items = extract_input_items(&responses_req.input); @@ -248,10 +265,7 @@ pub async fn llm_chat( } } } else { - debug!( - "[PLANO_REQ_ID:{}] | BRIGHT_STAFF | Upstream supports ResponsesAPI natively.", - request_id - ); + debug!("upstream supports ResponsesAPI natively"); } } } @@ -296,8 +310,10 @@ pub async fn llm_chat( }); debug!( - "[PLANO_REQ_ID:{}] | ARCH_ROUTER URL | {}, Provider Hint: {}, Model for upstream: {}", - request_id, full_qualified_llm_provider_url, model_name, model_name_only + url = %full_qualified_llm_provider_url, + provider_hint = %model_name, + upstream_model = %model_name_only, + "Routing to upstream" ); request_headers.insert( diff --git a/crates/brightstaff/src/handlers/pipeline_processor.rs b/crates/brightstaff/src/handlers/pipeline_processor.rs index d8405b44..9c0e0b1b 100644 --- a/crates/brightstaff/src/handlers/pipeline_processor.rs +++ b/crates/brightstaff/src/handlers/pipeline_processor.rs @@ -116,7 +116,7 @@ impl PipelineProcessor { }; for agent_name in filter_chain { - debug!("Processing filter agent: {}", agent_name); + debug!(agent = %agent_name, "processing filter agent"); let agent = agent_map .get(agent_name) @@ -125,12 +125,12 @@ impl PipelineProcessor { let tool_name = agent.tool.as_deref().unwrap_or(&agent.id); info!( - "executing filter: {}/{}, url: {}, type: {}, conversation length: {}", - agent_name, - tool_name, - agent.url, - agent.agent_type.as_deref().unwrap_or("mcp"), - chat_history.len() + agent = %agent_name, + tool = %tool_name, + url = %agent.url, + agent_type = %agent.agent_type.as_deref().unwrap_or("mcp"), + conversation_len = chat_history.len(), + "executing filter" ); if agent.agent_type.as_deref().unwrap_or("mcp") == "mcp" { @@ -144,9 +144,9 @@ impl PipelineProcessor { } info!( - "Filter '{}' completed, updated conversation length: {}", - agent_name, - chat_history_updated.len() + agent = %agent_name, + updated_len = chat_history_updated.len(), + "filter completed" ); } @@ -214,9 +214,9 @@ impl PipelineProcessor { // Validate SSE format: first line should be "event: message" if lines.is_empty() || lines[0] != "event: message" { warn!( - "Invalid SSE response format from agent {}: expected 'event: message' as first line, got: {:?}", - agent_id, - lines.first() + agent = %agent_id, + first_line = ?lines.first(), + "invalid SSE response format" ); return Err(PipelineError::NoContentInResponse(format!( "Invalid SSE response format from agent {}: expected 'event: message' as first line", @@ -233,9 +233,9 @@ impl PipelineProcessor { if data_lines.len() != 1 { warn!( - "Expected exactly one 'data:' line from agent {}, found {}", - agent_id, - data_lines.len() + agent = %agent_id, + found = data_lines.len(), + "expected exactly one 'data:' line" ); return Err(PipelineError::NoContentInResponse(format!( "Expected exactly one 'data:' line from agent {}, found {}", @@ -453,7 +453,7 @@ impl PipelineProcessor { }; let notification_body = serde_json::to_string(&initialized_notification)?; - debug!("Sending initialized notification for agent {}", agent_id); + debug!("sending initialized notification for agent {}", agent_id); let headers = self.build_mcp_headers(request_headers, agent_id, Some(session_id))?; @@ -466,7 +466,7 @@ impl PipelineProcessor { .await?; info!( - "Initialized notification response status: {}", + "initialized notification response status: {}", response.status() ); @@ -474,7 +474,7 @@ impl PipelineProcessor { } async fn get_new_session_id(&self, agent_id: &str, request_headers: &HeaderMap) -> String { - info!("Initializing MCP session for agent {}", agent_id); + info!("initializing MCP session for agent {}", agent_id); let initialize_request = self.build_initialize_request(); let headers = self @@ -486,7 +486,7 @@ impl PipelineProcessor { .await .expect("Failed to initialize MCP session"); - info!("Initialize response status: {}", response.status()); + info!("initialize response status: {}", response.status()); let session_id = response .headers() @@ -496,7 +496,7 @@ impl PipelineProcessor { .to_string(); info!( - "Created new MCP session for agent {}: {}", + "created new MCP session for agent {}: {}", agent_id, session_id ); @@ -631,7 +631,7 @@ impl PipelineProcessor { let request_body = ProviderRequestType::to_bytes(&original_request).unwrap(); // let request_body = serde_json::to_string(&request)?; - debug!("Sending request to terminal agent {}", terminal_agent.id); + debug!("sending request to terminal agent {}", terminal_agent.id); let mut agent_headers = request_headers.clone(); agent_headers.remove(hyper::header::CONTENT_LENGTH); diff --git a/crates/brightstaff/src/handlers/response_handler.rs b/crates/brightstaff/src/handlers/response_handler.rs index d386df1e..422dd8da 100644 --- a/crates/brightstaff/src/handlers/response_handler.rs +++ b/crates/brightstaff/src/handlers/response_handler.rs @@ -97,13 +97,13 @@ impl ResponseHandler { let chunk = match item { Ok(chunk) => chunk, Err(err) => { - warn!("Error receiving chunk: {:?}", err); + warn!(error = ?err, "error receiving chunk"); break; } }; if tx.send(chunk).await.is_err() { - warn!("Receiver dropped"); + warn!("receiver dropped"); break; } } @@ -164,11 +164,11 @@ impl ResponseHandler { if let Some(content) = provider_response.content_delta() { accumulated_text.push_str(content); } else { - info!("No content delta in provider response"); + info!("no content delta in provider response"); } } Err(e) => { - warn!("Failed to parse provider response: {:?}", e); + warn!(error = ?e, "failed to parse provider response"); } } } diff --git a/crates/brightstaff/src/handlers/router_chat.rs b/crates/brightstaff/src/handlers/router_chat.rs index 7a3eac6a..370a4f63 100644 --- a/crates/brightstaff/src/handlers/router_chat.rs +++ b/crates/brightstaff/src/handlers/router_chat.rs @@ -52,14 +52,14 @@ pub async fn router_chat_get_upstream_model( | ProviderRequestType::BedrockConverseStream(_) | ProviderRequestType::ResponsesAPIRequest(_), ) => { - warn!("Unexpected: got non-ChatCompletions request after converting to OpenAI format"); + warn!("unexpected: got non-ChatCompletions request after converting to OpenAI format"); return Err(RoutingError::internal_error( "Request conversion failed".to_string(), )); } Err(err) => { warn!( - "Failed to convert request to ChatCompletionsRequest: {}", + "failed to convert request to ChatCompletionsRequest: {}", err ); return Err(RoutingError::internal_error(format!( @@ -70,9 +70,8 @@ pub async fn router_chat_get_upstream_model( }; debug!( - "[PLANO_REQ_ID: {:?}]: ROUTER_REQ: {}", - request_id, - &serde_json::to_string(&chat_request).unwrap() + request = %serde_json::to_string(&chat_request).unwrap(), + "router request" ); // Extract usage preferences from metadata @@ -108,11 +107,10 @@ pub async fn router_chat_get_upstream_model( }; info!( - "[PLANO_REQ_ID: {:?}] | ROUTER_REQ | Usage preferences from request: {}, request_path: {}, latest message: {}", - request_id, - usage_preferences.is_some(), - request_path, - latest_message_for_log + has_usage_preferences = usage_preferences.is_some(), + path = %request_path, + latest_message = %latest_message_for_log, + "processing router request" ); // Capture start time for routing span @@ -135,10 +133,7 @@ pub async fn router_chat_get_upstream_model( None => { // No route determined, return sentinel value "none" // This signals to llm.rs to use the original validated request model - info!( - "[PLANO_REQ_ID: {}] | ROUTER_REQ | No route determined, returning sentinel 'none'", - request_id - ); + info!("no route determined, using default model"); Ok(RoutingResult { model_name: "none".to_string(), diff --git a/crates/brightstaff/src/handlers/utils.rs b/crates/brightstaff/src/handlers/utils.rs index a9be32e2..4e2913fd 100644 --- a/crates/brightstaff/src/handlers/utils.rs +++ b/crates/brightstaff/src/handlers/utils.rs @@ -6,7 +6,7 @@ use std::time::Instant; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::StreamExt; -use tracing::{info, warn}; +use tracing::{info, warn, Instrument}; use crate::signals::{SignalAnalyzer, TextBasedSignalAnalyzer}; use hermesllm::apis::openai::Message; @@ -88,7 +88,7 @@ impl StreamProcessor for ObservableStreamProcessor { chunk_count = self.chunk_count, duration_ms = self.start_time.elapsed().as_millis(), time_to_first_token_ms = ?self.time_to_first_token, - "Streaming completed" + "streaming completed" ); } @@ -97,7 +97,7 @@ impl StreamProcessor for ObservableStreamProcessor { service = %self.service_name, error = error_msg, duration_ms = self.start_time.elapsed().as_millis(), - "Stream error" + "stream error" ); } } @@ -119,49 +119,55 @@ where { let (tx, rx) = mpsc::channel::(buffer_size); + // Capture the current span so the spawned task inherits the request context + let current_span = tracing::Span::current(); + // Spawn a task to process and forward chunks - let processor_handle = tokio::spawn(async move { - let mut is_first_chunk = true; + let processor_handle = tokio::spawn( + async move { + let mut is_first_chunk = true; - while let Some(item) = byte_stream.next().await { - let chunk = match item { - Ok(chunk) => chunk, - Err(err) => { - let err_msg = format!("Error receiving chunk: {:?}", err); - warn!("{}", err_msg); - processor.on_error(&err_msg); - break; + while let Some(item) = byte_stream.next().await { + let chunk = match item { + Ok(chunk) => chunk, + Err(err) => { + let err_msg = format!("Error receiving chunk: {:?}", err); + warn!(error = %err_msg, "stream error"); + processor.on_error(&err_msg); + break; + } + }; + + // Call on_first_bytes for the first chunk + if is_first_chunk { + processor.on_first_bytes(); + is_first_chunk = false; } - }; - // Call on_first_bytes for the first chunk - if is_first_chunk { - processor.on_first_bytes(); - is_first_chunk = false; - } - - // Process the chunk - match processor.process_chunk(chunk) { - Ok(Some(processed_chunk)) => { - if tx.send(processed_chunk).await.is_err() { - warn!("Receiver dropped"); + // Process the chunk + match processor.process_chunk(chunk) { + Ok(Some(processed_chunk)) => { + if tx.send(processed_chunk).await.is_err() { + warn!("receiver dropped"); + break; + } + } + Ok(None) => { + // Skip this chunk + continue; + } + Err(err) => { + warn!("processor error: {}", err); + processor.on_error(&err); break; } } - Ok(None) => { - // Skip this chunk - continue; - } - Err(err) => { - warn!("Processor error: {}", err); - processor.on_error(&err); - break; - } } - } - processor.on_complete(); - }); + processor.on_complete(); + } + .instrument(current_span), + ); // Convert channel receiver to HTTP stream let stream = ReceiverStream::new(rx).map(|chunk| Ok::<_, hyper::Error>(Frame::data(chunk))); diff --git a/crates/brightstaff/src/main.rs b/crates/brightstaff/src/main.rs index e8616768..eb75d21a 100644 --- a/crates/brightstaff/src/main.rs +++ b/crates/brightstaff/src/main.rs @@ -56,7 +56,7 @@ async fn main() -> Result<(), Box> { // loading arch_config.yaml file let arch_config_path = env::var("ARCH_CONFIG_PATH_RENDERED") .unwrap_or_else(|_| "./arch_config_rendered.yaml".to_string()); - info!("Loading arch_config.yaml from {}", arch_config_path); + info!(path = %arch_config_path, "loading arch_config.yaml"); let config_contents = fs::read_to_string(&arch_config_path).expect("Failed to read arch_config.yaml"); @@ -125,7 +125,10 @@ async fn main() -> Result<(), Box> { if let Some(storage_config) = &arch_config.state_storage { let storage: Arc = match storage_config.storage_type { common::configuration::StateStorageType::Memory => { - info!("Initialized conversation state storage: Memory"); + info!( + storage_type = "memory", + "initialized conversation state storage" + ); Arc::new(MemoryConversationalStorage::new()) } common::configuration::StateStorageType::Postgres => { @@ -134,8 +137,11 @@ async fn main() -> Result<(), Box> { .as_ref() .expect("connection_string is required for postgres state_storage"); - debug!("Postgres connection string (full): {}", connection_string); - info!("Initializing conversation state storage: Postgres"); + debug!(connection_string = %connection_string, "postgres connection"); + info!( + storage_type = "postgres", + "initializing conversation state storage" + ); Arc::new( PostgreSQLConversationStorage::new(connection_string.clone()) .await @@ -145,7 +151,7 @@ async fn main() -> Result<(), Box> { }; Some(storage) } else { - info!("No state_storage configured - conversation state management disabled"); + info!("no state_storage configured, conversation state management disabled"); None }; @@ -250,7 +256,7 @@ async fn main() -> Result<(), Box> { Ok(response) } _ => { - debug!("No route for {} {}", req.method(), req.uri().path()); + debug!(method = %req.method(), path = %req.uri().path(), "no route found"); let mut not_found = Response::new(empty()); *not_found.status_mut() = StatusCode::NOT_FOUND; Ok(not_found) @@ -260,13 +266,13 @@ async fn main() -> Result<(), Box> { }); tokio::task::spawn(async move { - debug!("Accepted connection from {:?}", peer_addr); + debug!(peer = ?peer_addr, "accepted connection"); if let Err(err) = http1::Builder::new() // .serve_connection(io, service_fn(chat_completion)) .serve_connection(io, service) .await { - warn!("Error serving connection: {:?}", err); + warn!(error = ?err, "error serving connection"); } }); } diff --git a/crates/brightstaff/src/router/llm_router.rs b/crates/brightstaff/src/router/llm_router.rs index 743630e6..ec3fe3ab 100644 --- a/crates/brightstaff/src/router/llm_router.rs +++ b/crates/brightstaff/src/router/llm_router.rs @@ -96,14 +96,14 @@ impl RouterService { .generate_request(messages, &usage_preferences); debug!( - "sending request to arch-router model: {}, endpoint: {}", - self.router_model.get_model_name(), - self.router_url + model = %self.router_model.get_model_name(), + endpoint = %self.router_url, + "sending request to arch-router" ); debug!( - "arch request body: {}", - &serde_json::to_string(&router_request).unwrap(), + body = %serde_json::to_string(&router_request).unwrap(), + "arch router request" ); let mut llm_route_request_headers = header::HeaderMap::new(); @@ -148,9 +148,9 @@ impl RouterService { Ok(response) => response, Err(err) => { warn!( - "Failed to parse JSON: {}. Body: {}", - err, - &serde_json::to_string(&body).unwrap() + error = %err, + body = %serde_json::to_string(&body).unwrap(), + "failed to parse json response" ); return Err(RoutingError::JsonError( err, @@ -160,7 +160,7 @@ impl RouterService { }; if chat_completion_response.choices.is_empty() { - warn!("No choices in router response: {}", body); + warn!(body = %body, "no choices in router response"); return Ok(None); } @@ -169,10 +169,10 @@ impl RouterService { .router_model .parse_response(content, &usage_preferences)?; info!( - "arch-router determined route: {}, selected_model: {:?}, response time: {}ms", - content.replace("\n", "\\n"), - parsed_response, - router_response_time.as_millis() + content = %content.replace("\n", "\\n"), + selected_model = ?parsed_response, + response_time_ms = router_response_time.as_millis(), + "arch-router determined route" ); if let Some(ref parsed_response) = parsed_response { diff --git a/crates/brightstaff/src/router/orchestrator_model_v1.rs b/crates/brightstaff/src/router/orchestrator_model_v1.rs index 8d64f8e7..c6d3d56d 100644 --- a/crates/brightstaff/src/router/orchestrator_model_v1.rs +++ b/crates/brightstaff/src/router/orchestrator_model_v1.rs @@ -197,12 +197,12 @@ impl OrchestratorModel for OrchestratorModelV1 { token_count += message_token_count; if token_count > self.max_token_length { debug!( - "OrchestratorModelV1: token count {} exceeds max token length {}, truncating conversation, selected message count {}, total message count: {}", - token_count, - self.max_token_length - , selected_messsage_count, - messages_vec.len() - ); + token_count = token_count, + max_tokens = self.max_token_length, + selected = selected_messsage_count, + total = messages_vec.len(), + "token count exceeds max, truncating conversation" + ); if message.role == Role::User { // If message that exceeds max token length is from user, we need to keep it selected_messages_list_reversed.push(message); @@ -214,9 +214,7 @@ impl OrchestratorModel for OrchestratorModelV1 { } if selected_messages_list_reversed.is_empty() { - debug!( - "OrchestratorModelV1: no messages selected, using the last message in the conversation" - ); + debug!("no messages selected, using last message"); if let Some(last_message) = messages_vec.last() { selected_messages_list_reversed.push(last_message); } @@ -228,12 +226,12 @@ impl OrchestratorModel for OrchestratorModelV1 { // - last() is the first message in the original conversation if let Some(first_message) = selected_messages_list_reversed.first() { if first_message.role != Role::User { - warn!("OrchestratorModelV1: last message in the conversation is not from user, this may lead to incorrect orchestration"); + warn!("last message is not from user, may lead to incorrect orchestration"); } } if let Some(last_message) = selected_messages_list_reversed.last() { if last_message.role != Role::User { - warn!("OrchestratorModelV1: first message in the selected conversation is not from user, this may lead to incorrect orchestration"); + warn!("first message is not from user, may lead to incorrect orchestration"); } } @@ -323,8 +321,9 @@ impl OrchestratorModel for OrchestratorModelV1 { result.push((selected_route, model_name)); } else { warn!( - "No matching model found for route: {}, usage preferences: {:?}", - selected_route, usage_preferences + route = %selected_route, + preferences = ?usage_preferences, + "no matching model found for route" ); } } @@ -339,8 +338,9 @@ impl OrchestratorModel for OrchestratorModelV1 { result.push((selected_route, model)); } else { warn!( - "No model found for route: {}, orchestrator model preferences: {:?}", - selected_route, self.agent_orchestration_to_model_map + route = %selected_route, + preferences = ?self.agent_orchestration_to_model_map, + "no model found for route" ); } } diff --git a/crates/brightstaff/src/router/plano_orchestrator.rs b/crates/brightstaff/src/router/plano_orchestrator.rs index ee91d52e..bd26f0f7 100644 --- a/crates/brightstaff/src/router/plano_orchestrator.rs +++ b/crates/brightstaff/src/router/plano_orchestrator.rs @@ -75,14 +75,14 @@ impl OrchestratorService { .generate_request(messages, &usage_preferences); debug!( - "sending request to arch-orchestrator model: {}, endpoint: {}", - self.orchestrator_model.get_model_name(), - self.orchestrator_url + model = %self.orchestrator_model.get_model_name(), + endpoint = %self.orchestrator_url, + "sending request to arch-orchestrator" ); debug!( - "arch orchestrator request body: {}", - &serde_json::to_string(&orchestrator_request).unwrap(), + body = %serde_json::to_string(&orchestrator_request).unwrap(), + "arch orchestrator request" ); let mut orchestration_request_headers = header::HeaderMap::new(); @@ -131,9 +131,9 @@ impl OrchestratorService { Ok(response) => response, Err(err) => { warn!( - "Failed to parse JSON: {}. Body: {}", - err, - &serde_json::to_string(&body).unwrap() + error = %err, + body = %serde_json::to_string(&body).unwrap(), + "failed to parse json response" ); return Err(OrchestrationError::JsonError( err, @@ -143,7 +143,7 @@ impl OrchestratorService { }; if chat_completion_response.choices.is_empty() { - warn!("No choices in orchestrator response: {}", body); + warn!(body = %body, "no choices in orchestrator response"); return Ok(None); } @@ -152,10 +152,10 @@ impl OrchestratorService { .orchestrator_model .parse_response(content, &usage_preferences)?; info!( - "arch-orchestrator determined routes: {}, selected_routes: {:?}, response time: {}ms", - content.replace("\n", "\\n"), - parsed_response, - orchestrator_response_time.as_millis() + content = %content.replace("\n", "\\n"), + selected_routes = ?parsed_response, + response_time_ms = orchestrator_response_time.as_millis(), + "arch-orchestrator determined routes" ); if let Some(ref parsed_response) = parsed_response { diff --git a/crates/brightstaff/src/router/router_model_v1.rs b/crates/brightstaff/src/router/router_model_v1.rs index 796dfaac..430b4f8e 100644 --- a/crates/brightstaff/src/router/router_model_v1.rs +++ b/crates/brightstaff/src/router/router_model_v1.rs @@ -94,12 +94,12 @@ impl RouterModel for RouterModelV1 { token_count += message_token_count; if token_count > self.max_token_length { debug!( - "RouterModelV1: token count {} exceeds max token length {}, truncating conversation, selected message count {}, total message count: {}", - token_count, - self.max_token_length - , selected_messsage_count, - messages_vec.len() - ); + token_count = token_count, + max_tokens = self.max_token_length, + selected = selected_messsage_count, + total = messages_vec.len(), + "token count exceeds max, truncating conversation" + ); if message.role == Role::User { // If message that exceeds max token length is from user, we need to keep it selected_messages_list_reversed.push(message); @@ -111,9 +111,7 @@ impl RouterModel for RouterModelV1 { } if selected_messages_list_reversed.is_empty() { - debug!( - "RouterModelV1: no messages selected, using the last message in the conversation" - ); + debug!("no messages selected, using last message"); if let Some(last_message) = messages_vec.last() { selected_messages_list_reversed.push(last_message); } @@ -122,12 +120,12 @@ impl RouterModel for RouterModelV1 { // ensure that first and last selected message is from user if let Some(first_message) = selected_messages_list_reversed.first() { if first_message.role != Role::User { - warn!("RouterModelV1: last message in the conversation is not from user, this may lead to incorrect routing"); + warn!("last message is not from user, may lead to incorrect routing"); } } if let Some(last_message) = selected_messages_list_reversed.last() { if last_message.role != Role::User { - warn!("RouterModelV1: first message in the conversation is not from user, this may lead to incorrect routing"); + warn!("first message is not from user, may lead to incorrect routing"); } } @@ -206,8 +204,9 @@ impl RouterModel for RouterModelV1 { return Ok(Some((selected_route, model_name))); } else { warn!( - "No matching model found for route: {}, usage preferences: {:?}", - selected_route, usage_preferences + route = %selected_route, + preferences = ?usage_preferences, + "no matching model found for route" ); return Ok(None); } @@ -219,8 +218,9 @@ impl RouterModel for RouterModelV1 { } warn!( - "No model found for route: {}, router model preferences: {:?}", - selected_route, self.llm_route_to_model_map + route = %selected_route, + preferences = ?self.llm_route_to_model_map, + "no model found for route" ); Ok(None) diff --git a/crates/brightstaff/src/state/response_state_processor.rs b/crates/brightstaff/src/state/response_state_processor.rs index 3d1e8673..0920324c 100644 --- a/crates/brightstaff/src/state/response_state_processor.rs +++ b/crates/brightstaff/src/state/response_state_processor.rs @@ -92,18 +92,16 @@ impl ResponsesStateProcessor

{ match decoder.read_to_end(&mut decompressed) { Ok(_) => { debug!( - "[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Successfully decompressed {} bytes to {} bytes", - self.request_id, - self.chunk_buffer.len(), - decompressed.len() + original_bytes = self.chunk_buffer.len(), + decompressed_bytes = decompressed.len(), + "Successfully decompressed response" ); decompressed } Err(e) => { warn!( - "[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Failed to decompress gzip buffer: {}", - self.request_id, - e + error = %e, + "Failed to decompress gzip buffer" ); self.chunk_buffer.clone() } @@ -111,9 +109,8 @@ impl ResponsesStateProcessor

{ } Some(encoding) => { warn!( - "[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Unsupported Content-Encoding: {}. Only gzip is currently supported.", - self.request_id, - encoding + encoding = %encoding, + "Unsupported Content-Encoding, only gzip is supported" ); self.chunk_buffer.clone() } @@ -143,10 +140,9 @@ impl ResponsesStateProcessor

{ serde_json::from_str::(data_str) { info!( - "[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Captured streaming response.completed: response_id={}, output_items={}", - self.request_id, - response.id, - response.output.len() + response_id = %response.id, + output_items = response.output.len(), + "Captured streaming response" ); self.response_id = Some(response.id.clone()); self.output_items = Some(response.output.clone()); @@ -175,24 +171,20 @@ impl ResponsesStateProcessor

{ ) { Ok(response) => { info!( - "[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Captured non-streaming response: response_id={}, output_items={}", - self.request_id, - response.id, - response.output.len() + response_id = %response.id, + output_items = response.output.len(), + "Captured non-streaming response" ); self.response_id = Some(response.id.clone()); self.output_items = Some(response.output.clone()); } Err(e) => { - // Log parse error with chunk preview for debugging let chunk_preview = String::from_utf8_lossy(&decompressed); let preview_len = chunk_preview.len().min(200); warn!( - "[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Failed to parse non-streaming ResponsesAPIResponse: {}. Decompressed preview (first {} bytes): {}", - self.request_id, - e, - preview_len, - &chunk_preview[..preview_len] + error = %e, + preview = %&chunk_preview[..preview_len], + "Failed to parse non-streaming ResponsesAPIResponse" ); } } @@ -221,10 +213,7 @@ impl StreamProcessor for ResponsesStateProcessor

{ // Skip storage for OpenAI upstream if self.is_openai_upstream { - debug!( - "[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Skipping state storage for OpenAI upstream provider", - self.request_id - ); + debug!("Skipping state storage for OpenAI upstream"); return; } @@ -234,8 +223,9 @@ impl StreamProcessor for ResponsesStateProcessor

{ let output_as_inputs = outputs_to_inputs(output_items); debug!( - "[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Converting outputs to inputs: output_items_count={}, converted_input_items_count={}", - self.request_id, output_items.len(), output_as_inputs.len() + output_items = output_items.len(), + converted_items = output_as_inputs.len(), + "Converting outputs to inputs" ); // Combine original input + output as new input history @@ -243,11 +233,9 @@ impl StreamProcessor for ResponsesStateProcessor

{ combined_input.extend(output_as_inputs); debug!( - "[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Storing state: original_input_count={}, combined_input_count={}, combined_json={}", - self.request_id, - self.original_input.len(), - combined_input.len(), - serde_json::to_string(&combined_input).unwrap_or_else(|_| "serialization_error".to_string()) + original_input = self.original_input.len(), + combined_input = combined_input.len(), + "Storing conversation state" ); let state = OpenAIConversationState { @@ -270,28 +258,27 @@ impl StreamProcessor for ResponsesStateProcessor

{ match storage.put(state).await { Ok(()) => { info!( - "[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Successfully stored conversation state for response_id: {}, items_count={}", - request_id, - response_id_clone, - items_count + request_id = %request_id, + response_id = %response_id_clone, + items = items_count, + "Stored conversation state" ); } Err(e) => { warn!( - "[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Failed to store conversation state for response_id {}: {}", - request_id, - response_id_clone, - e + request_id = %request_id, + response_id = %response_id_clone, + error = %e, + "Failed to store conversation state" ); } } }); } else { warn!( - "[PLANO_REQ_ID:{}] | STATE_PROCESSOR | No response_id captured from upstream response - cannot store conversation state. response_id present: {}, output present: {}", - self.request_id, - self.response_id.is_some(), - self.output_items.is_some() + has_response_id = self.response_id.is_some(), + has_output = self.output_items.is_some(), + "No response_id captured, cannot store conversation state" ); } } diff --git a/crates/brightstaff/src/utils/tracing.rs b/crates/brightstaff/src/utils/tracing.rs index 1326153a..81ac3bb2 100644 --- a/crates/brightstaff/src/utils/tracing.rs +++ b/crates/brightstaff/src/utils/tracing.rs @@ -8,6 +8,8 @@ use time::macros::format_description; use tracing::{Event, Subscriber}; use tracing_subscriber::fmt::{format, time::FormatTime, FmtContext, FormatEvent, FormatFields}; use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::registry::LookupSpan; +use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::EnvFilter; struct BracketedTime; @@ -30,7 +32,7 @@ struct BracketedFormatter; impl FormatEvent for BracketedFormatter where - S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>, + S: Subscriber + for<'a> LookupSpan<'a>, N: for<'a> FormatFields<'a> + 'static, { fn format_event( @@ -44,16 +46,37 @@ where write!( writer, - "[{}] ", + "[{}]", event.metadata().level().to_string().to_lowercase() )?; + // Extract request_id from span context if present + if let Some(scope) = ctx.event_scope() { + for span in scope.from_root() { + let extensions = span.extensions(); + if let Some(fields) = extensions.get::>() { + let fields_str = fields.fields.as_str(); + // Look for request_id in the formatted fields + if let Some(start) = fields_str.find("request_id=") { + let rest = &fields_str[start + 11..]; // Skip "request_id=" + let end = rest.find(|c: char| c.is_whitespace()).unwrap_or(rest.len()); + let rid = &rest[..end]; + write!(writer, "[request_id={}]", rid)?; + break; + } + } + } + } + + write!(writer, " ")?; ctx.field_format().format_fields(writer.by_ref(), event)?; writeln!(writer) } } +use tracing_subscriber::fmt::FormattedFields; + static INIT_LOGGER: OnceLock = OnceLock::new(); pub fn init_tracer() -> &'static SdkTracerProvider { @@ -94,10 +117,19 @@ pub fn init_tracer() -> &'static SdkTracerProvider { tracing_opentelemetry::layer().with_tracer(provider.tracer("brightstaff")); // Combine the OpenTelemetry layer with fmt layer using the registry + let env_filter = + EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); + + // Create fmt layer with span field formatting enabled (no ANSI to keep fields parseable) + let fmt_layer = tracing_subscriber::fmt::layer() + .event_format(BracketedFormatter) + .fmt_fields(format::DefaultFields::new()) + .with_ansi(false); + let subscriber = tracing_subscriber::registry() .with(telemetry_layer) - .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"))) - .with(tracing_subscriber::fmt::layer().event_format(BracketedFormatter)); + .with(env_filter) + .with(fmt_layer); tracing::subscriber::set_global_default(subscriber) .expect("Failed to set tracing subscriber"); @@ -108,11 +140,18 @@ pub fn init_tracer() -> &'static SdkTracerProvider { let provider = SdkTracerProvider::builder().build(); global::set_tracer_provider(provider.clone()); - tracing_subscriber::fmt() - .with_env_filter( - EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")), - ) + let env_filter = + EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); + + // Create fmt layer with span field formatting enabled (no ANSI to keep fields parseable) + let fmt_layer = tracing_subscriber::fmt::layer() .event_format(BracketedFormatter) + .fmt_fields(format::DefaultFields::new()) + .with_ansi(false); + + tracing_subscriber::registry() + .with(env_filter) + .with(fmt_layer) .init(); provider diff --git a/demos/use_cases/multi_agent_with_crewai_langchain/docker-compose.yaml b/demos/use_cases/multi_agent_with_crewai_langchain/docker-compose.yaml index e114442f..2ec797e5 100644 --- a/demos/use_cases/multi_agent_with_crewai_langchain/docker-compose.yaml +++ b/demos/use_cases/multi_agent_with_crewai_langchain/docker-compose.yaml @@ -6,6 +6,8 @@ services: dockerfile: Dockerfile ports: - "8001:8001" + - "11000:11000" + - "12001:12001" environment: - ARCH_CONFIG_PATH=/app/arch_config.yaml - OPENAI_API_KEY=${OPENAI_API_KEY:?OPENAI_API_KEY environment variable is required but not set}