fixed logs to follow the conversational flow a bit better

This commit is contained in:
Salman Paracha 2025-12-15 11:17:44 -08:00
parent bce917c9d4
commit 2d6107e460
4 changed files with 44 additions and 53 deletions

View file

@ -12,7 +12,7 @@ use hyper::{Request, Response, StatusCode};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, warn};
use tracing::{debug, info, warn};
use crate::router::llm_router::RouterService;
use crate::handlers::utils::{create_streaming_response, ObservableStreamProcessor, truncate_message};
@ -63,7 +63,7 @@ pub async fn llm_chat(
let chat_request_bytes = request.collect().await?.to_bytes();
debug!(
"[PLANO_REQ_ID:{}] | BRIGHTSTAFF | REQUEST BODY (raw utf8): {}",
"[PLANO_REQ_ID:{}] | REQUEST_BODY (UTF8): {}",
request_id,
String::from_utf8_lossy(&chat_request_bytes)
);
@ -74,8 +74,8 @@ pub async fn llm_chat(
)) {
Ok(request) => request,
Err(err) => {
warn!("[PLANO_REQ_ID:{}] | BRIGHTSTAFF | Failed to parse request as ProviderRequestType: {}", request_id, err);
let err_msg = format!("[PLANO_REQ_ID:{}] | BRIGHTSTAFF | Failed to parse request: {}", request_id, err);
warn!("[PLANO_REQ_ID:{}] | FAILURE | Failed to parse request as ProviderRequestType: {}", request_id, err);
let err_msg = format!("[PLANO_REQ_ID:{}] | FAILURE | Failed to parse request: {}", request_id, err);
let mut bad_request = Response::new(full(err_msg));
*bad_request.status_mut() = StatusCode::BAD_REQUEST;
return Ok(bad_request);
@ -101,7 +101,7 @@ pub async fn llm_chat(
client_request.set_model(resolved_model.clone());
if client_request.remove_metadata_key("archgw_preference_config") {
debug!("[PLANO (BRIGHTSTAFF)] Removed archgw_preference_config from metadata");
debug!("[PLANO_REQ_ID:{}] Removed archgw_preference_config from metadata", request_id);
}
// === v1/responses state management: Determine upstream API and combine input if needed ===
@ -140,14 +140,14 @@ pub async fn llm_chat(
// Update both the request and original_input_items
responses_req.input = InputParam::Items(combined_input.clone());
original_input_items = combined_input;
debug!("[PLANO (BRIGHTSTAFF)] Updated request with conversation history ({} items)", original_input_items.len());
info!("[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Updated request with conversation history ({} items)", request_id, original_input_items.len());
}
Err(StateStorageError::NotFound(_)) => {
// Return 409 Conflict when previous_response_id not found
warn!("[PLANO (BRIGHTSTAFF)] Previous response_id not found: {}", prev_resp_id);
warn!("[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Previous response_id not found: {}", request_id, prev_resp_id);
let err_msg = format!(
"[PLANO (BRIGHTSTAFF)] Conversation state not found for previous_response_id: {}",
prev_resp_id
"[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Conversation state not found for previous_response_id: {}",
request_id, prev_resp_id
);
let mut conflict_response = Response::new(full(err_msg));
*conflict_response.status_mut() = StatusCode::CONFLICT;
@ -156,8 +156,8 @@ pub async fn llm_chat(
Err(e) => {
// Log warning but continue on other storage errors
warn!(
"Failed to retrieve conversation state for {}: {}",
prev_resp_id, e
"[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Failed to retrieve conversation state for {}: {}",
request_id, prev_resp_id, e
);
// Restore original_input_items since we passed ownership
original_input_items = extract_input_items(&responses_req.input);
@ -165,7 +165,7 @@ pub async fn llm_chat(
}
}
} else {
debug!("[PLANO (BRIGHTSTAFF)] Upstream supports ResponsesAPI natively, passing through without state management");
debug!("[PLANO_REQ_ID:{}] | BRIGHT_STAFF | Upstream supports ResponsesAPI natively.", request_id);
}
}
}
@ -195,8 +195,8 @@ pub async fn llm_chat(
let model_name = routing_result.model_name;
debug!(
"[PLANO ARCH_ROUTER] URL: {}, Resolved Model: {}",
full_qualified_llm_provider_url, model_name
"[PLANO_REQ_ID:{}] | ARCH_ROUTER URL | {}, Resolved Model: {}",
request_id, full_qualified_llm_provider_url, model_name
);
request_headers.insert(

View file

@ -1,4 +1,5 @@
use common::configuration::ModelUsagePreference;
use common::consts::{REQUEST_ID_HEADER};
use common::traces::{TraceCollector, SpanKind, SpanBuilder, parse_traceparent};
use hermesllm::clients::endpoints::SupportedUpstreamAPIs;
use hermesllm::{ProviderRequest, ProviderRequestType};
@ -43,6 +44,10 @@ pub async fn router_chat_get_upstream_model(
) -> Result<RoutingResult, RoutingError> {
// Clone metadata for routing before converting (which consumes client_request)
let routing_metadata = client_request.metadata().clone();
let request_id = request_headers
.get(REQUEST_ID_HEADER)
.and_then(|value| value.to_str().ok())
.unwrap_or("unknown");
// Convert to ChatCompletionsRequest for routing (regardless of input type)
let chat_request = match ProviderRequestType::try_from((
@ -73,7 +78,8 @@ pub async fn router_chat_get_upstream_model(
};
debug!(
"[ARCH_ROUTER REQ]: {}",
"[PLANO_REQ_ID: {}]: ROUTER_REQ: {}",
request_id,
&serde_json::to_string(&chat_request).unwrap()
);
@ -114,14 +120,13 @@ pub async fn router_chat_get_upstream_model(
};
info!(
"request received, request type: chat_completion, usage preferences from request: {}, request path: {}, latest message: {}",
"[PLANO_REQ_ID: {}] | ROUTER_REQ | Usage preferences from request: {}, request_path: {}, latest message: {}",
request_id,
usage_preferences.is_some(),
request_path,
latest_message_for_log
);
debug!("usage preferences from request: {:?}", usage_preferences);
// Capture start time for routing span
let routing_start_time = std::time::Instant::now();
let routing_start_system_time = std::time::SystemTime::now();
@ -153,7 +158,8 @@ pub async fn router_chat_get_upstream_model(
None => {
// No route determined, use default model from request
info!(
"No route determined, using default model from request: {}",
"[PLANO_REQ_ID: {}] | ROUTER_REQ | No route determined, using default model from request: {}",
request_id,
chat_request.model
);

View file

@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize};
use std::error::Error;
use std::fmt;
use std::sync::Arc;
use tracing::{debug, info};
use tracing::{debug};
pub mod memory;
pub mod response_state_processor;
@ -139,19 +139,9 @@ pub async fn retrieve_and_combine_input(
previous_response_id: &str,
current_input: Vec<InputItem>,
) -> Result<Vec<InputItem>, StateStorageError> {
info!(
"Retrieving conversation state for previous_response_id: {}",
previous_response_id
);
// First get the previous state
let prev_state = storage.get(previous_response_id).await?;
let combined_input = storage.merge(&prev_state, current_input);
debug!(
"Retrieved and merged conversation state: {} total input items",
combined_input.len()
);
Ok(combined_input)
}

View file

@ -93,7 +93,7 @@ impl<P: StreamProcessor> ResponsesStateProcessor<P> {
match decoder.read_to_end(&mut decompressed) {
Ok(_) => {
debug!(
"[PLANO_REQ_ID:{}] | BRIGHTSTAFF | STATE_PROCESSOR | Successfully decompressed {} bytes to {} bytes",
"[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Successfully decompressed {} bytes to {} bytes",
self.request_id,
self.chunk_buffer.len(),
decompressed.len()
@ -102,7 +102,7 @@ impl<P: StreamProcessor> ResponsesStateProcessor<P> {
}
Err(e) => {
warn!(
"[PLANO_REQ_ID:{}] | BRIGHTSTAFF | STATE_PROCESSOR | Failed to decompress gzip buffer: {}",
"[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Failed to decompress gzip buffer: {}",
self.request_id,
e
);
@ -112,7 +112,7 @@ impl<P: StreamProcessor> ResponsesStateProcessor<P> {
}
Some(encoding) => {
warn!(
"[PLANO_REQ_ID:{}] | BRIGHTSTAFF | STATE_PROCESSOR | Unsupported Content-Encoding: {}. Only gzip is currently supported.",
"[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Unsupported Content-Encoding: {}. Only gzip is currently supported.",
self.request_id,
encoding
);
@ -143,12 +143,11 @@ impl<P: StreamProcessor> ResponsesStateProcessor<P> {
if let Ok(stream_event) = serde_json::from_str::<ResponsesAPIStreamEvent>(data_str) {
// Check if this is a ResponseCompleted event
if let ResponsesAPIStreamEvent::ResponseCompleted { response, .. } = stream_event {
debug!(
"[PLANO_REQ_ID:{}] | BRIGHTSTAFF | STATE_PROCESSOR | Captured streaming response.completed: response_id={}, output_items={}, output_json={}",
info!(
"[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Captured streaming response.completed: response_id={}, output_items={}",
self.request_id,
response.id,
response.output.len(),
serde_json::to_string(&response.output).unwrap_or_else(|_| "serialization_error".to_string())
response.output.len()
);
self.response_id = Some(response.id.clone());
self.output_items = Some(response.output.clone());
@ -176,7 +175,7 @@ impl<P: StreamProcessor> ResponsesStateProcessor<P> {
match serde_json::from_slice::<hermesllm::apis::openai_responses::ResponsesAPIResponse>(&decompressed) {
Ok(response) => {
info!(
"[PLANO_REQ_ID:{}] | BRIGHTSTAFF | STATE_PROCESSOR | Captured non-streaming response: response_id={}, output_items={}",
"[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Captured non-streaming response: response_id={}, output_items={}",
self.request_id,
response.id,
response.output.len()
@ -189,7 +188,7 @@ impl<P: StreamProcessor> ResponsesStateProcessor<P> {
let chunk_preview = String::from_utf8_lossy(&decompressed);
let preview_len = chunk_preview.len().min(200);
warn!(
"[PLANO_REQ_ID:{}] | BRIGHTSTAFF | STATE_PROCESSOR | Failed to parse non-streaming ResponsesAPIResponse: {}. Decompressed preview (first {} bytes): {}",
"[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Failed to parse non-streaming ResponsesAPIResponse: {}. Decompressed preview (first {} bytes): {}",
self.request_id,
e,
preview_len,
@ -223,7 +222,7 @@ impl<P: StreamProcessor> StreamProcessor for ResponsesStateProcessor<P> {
// Skip storage for OpenAI upstream
if self.is_openai_upstream {
debug!(
"[PLANO_REQ_ID:{}] | BRIGHTSTAFF | STATE_PROCESSOR | Skipping state storage for OpenAI upstream provider",
"[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Skipping state storage for OpenAI upstream provider",
self.request_id
);
return;
@ -231,17 +230,11 @@ impl<P: StreamProcessor> StreamProcessor for ResponsesStateProcessor<P> {
// Store state if we captured response_id and output
if let (Some(response_id), Some(output_items)) = (&self.response_id, &self.output_items) {
debug!(
"[PLANO_REQ_ID:{}] | BRIGHTSTAFF | STATE_PROCESSOR | Output items before conversion: {}",
self.request_id,
serde_json::to_string(&output_items).unwrap_or_else(|_| "serialization_error".to_string())
);
// Convert output items to input items for next request
let output_as_inputs = outputs_to_inputs(output_items);
debug!(
"[PLANO_REQ_ID:{}] | BRIGHTSTAFF | STATE_PROCESSOR | Converting outputs to inputs: output_items_count={}, converted_input_items_count={}",
"[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Converting outputs to inputs: output_items_count={}, converted_input_items_count={}",
self.request_id, output_items.len(), output_as_inputs.len()
);
@ -250,7 +243,7 @@ impl<P: StreamProcessor> StreamProcessor for ResponsesStateProcessor<P> {
combined_input.extend(output_as_inputs);
debug!(
"[PLANO_REQ_ID:{}] | BRIGHTSTAFF | STATE_PROCESSOR | Storing state: original_input_count={}, combined_input_count={}, combined_json={}",
"[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Storing state: original_input_count={}, combined_input_count={}, combined_json={}",
self.request_id,
self.original_input.len(),
combined_input.len(),
@ -272,18 +265,20 @@ impl<P: StreamProcessor> StreamProcessor for ResponsesStateProcessor<P> {
let storage = self.storage.clone();
let response_id_clone = response_id.clone();
let request_id = self.request_id.clone();
let items_count = state.input_items.len();
tokio::spawn(async move {
match storage.put(state).await {
Ok(()) => {
debug!(
"[PLANO_REQ_ID:{}] | BRIGHTSTAFF | STATE_PROCESSOR | Successfully stored conversation state for response_id: {}",
info!(
"[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Successfully stored conversation state for response_id: {}, items_count={}",
request_id,
response_id_clone
response_id_clone,
items_count
);
}
Err(e) => {
warn!(
"[PLANO_REQ_ID:{}] | BRIGHTSTAFF | STATE_PROCESSOR | Failed to store conversation state for response_id {}: {}",
"[PLANO_REQ_ID:{}] | STATE_PROCESSOR | Failed to store conversation state for response_id {}: {}",
request_id,
response_id_clone,
e
@ -293,7 +288,7 @@ impl<P: StreamProcessor> StreamProcessor for ResponsesStateProcessor<P> {
});
} else {
warn!(
"[PLANO_REQ_ID:{}] | BRIGHTSTAFF | STATE_PROCESSOR | No response_id captured from upstream response - cannot store conversation state. response_id present: {}, output present: {}",
"[PLANO_REQ_ID:{}] | STATE_PROCESSOR | No response_id captured from upstream response - cannot store conversation state. response_id present: {}, output present: {}",
self.request_id,
self.response_id.is_some(),
self.output_items.is_some()