From ee52c608f7f06d12c185a012437f1e1bd4467fd9 Mon Sep 17 00:00:00 2001 From: Salman Paracha Date: Thu, 4 Sep 2025 19:28:47 -0700 Subject: [PATCH] fixed test cases and added more structured logs --- crates/llm_gateway/src/stream_context.rs | 216 ++++++++++++------ .../hurl_tests/simple.hurl | 4 +- 2 files changed, 154 insertions(+), 66 deletions(-) diff --git a/crates/llm_gateway/src/stream_context.rs b/crates/llm_gateway/src/stream_context.rs index 7053848e..223e9a90 100644 --- a/crates/llm_gateway/src/stream_context.rs +++ b/crates/llm_gateway/src/stream_context.rs @@ -109,10 +109,11 @@ impl StreamContext { provider_hint, )); - debug!( - "request received: llm provider hint: {}, selected provider: {}", + info!( + "[ARCHGW_REQ_ID:{}] PROVIDER_SELECTION: hint='{}' -> selected='{}'", + self.context_id, self.get_http_request_header(ARCH_PROVIDER_HINT_HEADER) - .unwrap_or_default(), + .unwrap_or("none".to_string()), self.llm_provider.as_ref().unwrap().name ); } @@ -183,7 +184,11 @@ impl StreamContext { // Tokenize and record token count. let token_count = tokenizer::token_count(model, json_string).unwrap_or(0); - debug!("Recorded input token count: {}", token_count); + info!( + "[ARCHGW_REQ_ID:{}] TOKEN_COUNT: model='{}' input_tokens={}", + self.context_id, model, token_count + ); + // Record the token count to metrics. self.metrics .input_sequence_length @@ -191,14 +196,20 @@ impl StreamContext { // Check if rate limiting needs to be applied. if let Some(selector) = self.ratelimit_selector.take() { - log::debug!("Applying ratelimit for model: {}", model); + info!( + "[ARCHGW_REQ_ID:{}] RATELIMIT_CHECK: model='{}' selector='{}:{}'", + self.context_id, model, selector.key, selector.value + ); ratelimit::ratelimits(None).read().unwrap().check_limit( model.to_owned(), selector, NonZero::new(token_count as u32).unwrap(), )?; } else { - debug!("No rate limit applied for model: {}", model); + debug!( + "[ARCHGW_REQ_ID:{}] RATELIMIT_SKIP: model='{}' (no selector)", + self.context_id, model + ); } Ok(()) @@ -214,14 +225,17 @@ impl StreamContext { Ok(duration) => { let duration_ms = duration.as_millis(); info!( - "on_http_response_body: time to first token: {}ms", - duration_ms + "[ARCHGW_REQ_ID:{}] TIME_TO_FIRST_TOKEN: {}ms", + self.context_id, duration_ms ); self.ttft_duration = Some(duration); self.metrics.time_to_first_token.record(duration_ms as u64); } Err(e) => { - warn!("SystemTime error: {:?}", e); + warn!( + "[ARCHGW_REQ_ID:{}] TIME_MEASUREMENT_ERROR: {:?}", + self.context_id, e + ); } } } @@ -233,7 +247,10 @@ impl StreamContext { Ok(duration) => { // Convert the duration to milliseconds let duration_ms = duration.as_millis(); - info!("on_http_response_body: request latency: {}ms", duration_ms); + info!( + "[ARCHGW_REQ_ID:{}] REQUEST_COMPLETE: latency={}ms tokens={}", + self.context_id, duration_ms, self.response_tokens + ); // Record the latency to the latency histogram self.metrics.request_latency.record(duration_ms as u64); @@ -244,8 +261,9 @@ impl StreamContext { // Record the time per output token self.metrics.time_per_output_token.record(tpot); - debug!( - "time per token: {}ms, tokens per second: {}", + info!( + "[ARCHGW_REQ_ID:{}] TOKEN_THROUGHPUT: time_per_token={}ms tokens_per_second={}", + self.context_id, tpot, 1000 / tpot ); @@ -301,18 +319,17 @@ impl StreamContext { fn read_raw_response_body(&mut self, body_size: usize) -> Result, Action> { if self.streaming_response { - let chunk_start = 0; let chunk_size = body_size; debug!( - "on_http_response_body: streaming response reading, {}..{}", - chunk_start, chunk_size + "[ARCHGW_REQ_ID:{}] UPSTREAM_RESPONSE_CHUNK: streaming=true chunk_size={}", + self.context_id, chunk_size ); let streaming_chunk = match self.get_http_response_body(0, chunk_size) { Some(chunk) => chunk, None => { warn!( - "response body empty, chunk_start: {}, chunk_size: {}", - chunk_start, chunk_size + "[ARCHGW_REQ_ID:{}] UPSTREAM_RESPONSE_ERROR: empty chunk, size={}", + self.context_id, chunk_size ); return Err(Action::Continue); } @@ -320,9 +337,10 @@ impl StreamContext { if streaming_chunk.len() != chunk_size { warn!( - "chunk size mismatch: read: {} != requested: {}", - streaming_chunk.len(), - chunk_size + "[ARCHGW_REQ_ID:{}] UPSTREAM_RESPONSE_MISMATCH: expected={} actual={}", + self.context_id, + chunk_size, + streaming_chunk.len() ); } Ok(streaming_chunk) @@ -330,7 +348,10 @@ impl StreamContext { if body_size == 0 { return Err(Action::Continue); } - debug!("non streaming response bytes read: 0:{}", body_size); + debug!( + "[ARCHGW_REQ_ID:{}] UPSTREAM_RESPONSE_COMPLETE: streaming=false body_size={}", + self.context_id, body_size + ); match self.get_http_response_body(0, body_size) { Some(body) => Ok(body), None => { @@ -342,12 +363,12 @@ impl StreamContext { } fn debug_log_body(&self, body: &[u8]) { - if log::log_enabled!(log::Level::Debug) { - debug!( - "raw response data (converted to utf8): {}", - String::from_utf8_lossy(body) - ); - } + debug!( + "[ARCHGW_REQ_ID:{}] UPSTREAM_RAW_RESPONSE: body_size={} content={}", + self.context_id, + body.len(), + String::from_utf8_lossy(body) + ); } fn handle_streaming_response( @@ -355,7 +376,12 @@ impl StreamContext { body: &[u8], provider_id: ProviderId, ) -> Result, Action> { - debug!("processing streaming response"); + debug!( + "[ARCHGW_REQ_ID:{}] STREAMING_PROCESS: provider_id={:?} chunk_size={}", + self.context_id, + provider_id, + body.len() + ); match self.client_api.as_ref() { Some(client_api) => { let client_api = client_api.clone(); // Clone to avoid borrowing issues @@ -392,16 +418,29 @@ impl StreamContext { self.record_ttft_if_needed(); if provider_response.is_final() { - debug!("Received final streaming chunk"); + debug!( + "[ARCHGW_REQ_ID:{}] STREAMING_FINAL_CHUNK: total_tokens={}", + self.context_id, self.response_tokens + ); } if let Some(content) = provider_response.content_delta() { let estimated_tokens = content.len() / 4; self.response_tokens += estimated_tokens.max(1); + debug!( + "[ARCHGW_REQ_ID:{}] STREAMING_TOKEN_UPDATE: delta_chars={} estimated_tokens={} total_tokens={}", + self.context_id, + content.len(), + estimated_tokens.max(1), + self.response_tokens + ); } } Err(e) => { - warn!("Error processing streaming chunk: {}", e); + warn!( + "[ARCHGW_REQ_ID:{}] STREAMING_CHUNK_ERROR: {}", + self.context_id, e + ); return Err(Action::Continue); } } @@ -426,19 +465,22 @@ impl StreamContext { body: &[u8], provider_id: ProviderId, ) -> Result, Action> { + info!( + "[ARCHGW_REQ_ID:{}] NON_STREAMING_PROCESS: provider_id={:?} body_size={}", + self.context_id, + provider_id, + body.len() + ); + let response: ProviderResponseType = match self.client_api.as_ref() { Some(client_api) => { match ProviderResponseType::try_from((body, client_api, &provider_id)) { Ok(response) => response, Err(e) => { warn!( - "could not parse response: {}, body str: {}", - e, - String::from_utf8_lossy(body) - ); - debug!( - "on_http_response_body: S[{}], response body: {}", + "[ARCHGW_REQ_ID:{}] UPSTREAM_RESPONSE_PARSE_ERROR: {} | body: {}", self.context_id, + e, String::from_utf8_lossy(body) ); self.send_server_error( @@ -450,7 +492,10 @@ impl StreamContext { } } None => { - warn!("Missing client_api for non-streaming response"); + warn!( + "[ARCHGW_REQ_ID:{}] UPSTREAM_RESPONSE_ERROR: missing client_api", + self.context_id + ); return Err(Action::Continue); } }; @@ -459,22 +504,28 @@ impl StreamContext { if let Some((prompt_tokens, completion_tokens, total_tokens)) = response.extract_usage_counts() { - debug!( - "Response usage: prompt={}, completion={}, total={}", - prompt_tokens, completion_tokens, total_tokens + info!( + "[ARCHGW_REQ_ID:{}] RESPONSE_USAGE: prompt_tokens={} completion_tokens={} total_tokens={}", + self.context_id, + prompt_tokens, + completion_tokens, + total_tokens ); self.response_tokens = completion_tokens; } else { - warn!("No usage information found in response"); + warn!( + "[ARCHGW_REQ_ID:{}] RESPONSE_USAGE: no usage information found", + self.context_id + ); } // Serialize the normalized response back to JSON bytes match serde_json::to_vec(&response) { Ok(bytes) => { debug!( - "non streaming response data after serialization. length: {}, converted to utf8: {}", - bytes.len(), - String::from_utf8_lossy(&bytes) - ); + "[ARCHGW_REQ_ID:{}] CLIENT_RESPONSE_PAYLOAD: {}", + self.context_id, + String::from_utf8_lossy(&bytes) + ); Ok(bytes) } Err(e) => { @@ -576,7 +627,7 @@ impl HttpContext for StreamContext { fn on_http_request_body(&mut self, body_size: usize, end_of_stream: bool) -> Action { debug!( - "on_http_request_body [S={}] bytes={} end_stream={}", + "[ARCHGW_REQ_ID:{}] REQUEST_BODY_CHUNK: bytes={} end_stream={}", self.context_id, body_size, end_of_stream ); @@ -612,11 +663,26 @@ impl HttpContext for StreamContext { //We need to deserialize the request body based on the resolved API let mut deserialized_client_request: ProviderRequestType = match self.client_api.as_ref() { Some(the_client_api) => { + info!( + "[ARCHGW_REQ_ID:{}] CLIENT_REQUEST_RECEIVED: api={:?} body_size={}", + self.context_id, + the_client_api, + body_bytes.len() + ); + + debug!( + "[ARCHGW_REQ_ID:{}] CLIENT_REQUEST_PAYLOAD: {}", + self.context_id, + String::from_utf8_lossy(&body_bytes) + ); + match ProviderRequestType::try_from((&body_bytes[..], the_client_api)) { Ok(deserialized) => deserialized, Err(e) => { - debug!( - "on_http_request_body: request body: {}", + warn!( + "[ARCHGW_REQ_ID:{}] CLIENT_REQUEST_PARSE_ERROR: {} | body: {}", + self.context_id, + e, String::from_utf8_lossy(&body_bytes) ); self.send_server_error( @@ -656,6 +722,13 @@ impl HttpContext for StreamContext { if use_agent_orchestrator { "agent_orchestrator".to_string() } else { + warn!( + "[ARCHGW_REQ_ID:{}] MODEL_RESOLUTION_ERROR: no model specified | req_model='{}' provider='{}' config_model={:?}", + self.context_id, + model_requested, + self.llm_provider().name, + self.llm_provider().model + ); self.send_server_error( ServerError::BadRequest { why: format!( @@ -679,10 +752,12 @@ impl HttpContext for StreamContext { self.user_message = deserialized_client_request.get_recent_user_message(); info!( - "on_http_request_body: provider: {}, model requested (in body): {}, model selected: {}", - self.llm_provider().name, + "[ARCHGW_REQ_ID:{}] MODEL_RESOLUTION: req_model='{}' -> resolved_model='{}' provider='{}' streaming={}", + self.context_id, model_requested, - model_name.unwrap_or(&"None".to_string()), + resolved_model, + self.llm_provider().name, + deserialized_client_request.is_streaming() ); // Use provider interface for streaming detection and setup @@ -703,21 +778,34 @@ impl HttpContext for StreamContext { // Convert chat completion request to llm provider specific request using provider interface let serialized_body_bytes_upstream = match self.resolved_api.as_ref() { Some(upstream) => { + info!( + "[ARCHGW_REQ_ID:{}] UPSTREAM_TRANSFORM: client_api={:?} -> upstream_api={:?}", + self.context_id, self.client_api, upstream + ); + match ProviderRequestType::try_from((&deserialized_client_request, upstream)) { - Ok(request) => match request.to_bytes() { - Ok(bytes) => bytes, - Err(e) => { - warn!("Failed to serialize request body: {}", e); - self.send_server_error( - ServerError::LogicError(format!( - "Request serialization error: {}", - e - )), - Some(StatusCode::BAD_REQUEST), - ); - return Action::Pause; + Ok(request) => { + debug!( + "[ARCHGW_REQ_ID:{}] UPSTREAM_REQUEST_PAYLOAD: {}", + self.context_id, + String::from_utf8_lossy(&request.to_bytes().unwrap_or_default()) + ); + + match request.to_bytes() { + Ok(bytes) => bytes, + Err(e) => { + warn!("Failed to serialize request body: {}", e); + self.send_server_error( + ServerError::LogicError(format!( + "Request serialization error: {}", + e + )), + Some(StatusCode::BAD_REQUEST), + ); + return Action::Pause; + } } - }, + } Err(e) => { warn!("Failed to create provider request: {}", e); self.send_server_error( diff --git a/demos/use_cases/preference_based_routing/hurl_tests/simple.hurl b/demos/use_cases/preference_based_routing/hurl_tests/simple.hurl index d9b243e7..1aa56271 100644 --- a/demos/use_cases/preference_based_routing/hurl_tests/simple.hurl +++ b/demos/use_cases/preference_based_routing/hurl_tests/simple.hurl @@ -2,7 +2,7 @@ POST http://localhost:12000/v1/chat/completions Content-Type: application/json { - "model": "openai/gpt-4.1", + "model": "openai/gpt-4o-mini", "messages": [ { "role": "user", @@ -13,7 +13,7 @@ Content-Type: application/json HTTP 200 [Asserts] header "content-type" == "application/json" -jsonpath "$.model" matches /^gpt-4.1/ +jsonpath "$.model" matches /^gpt-4o-mini/ jsonpath "$.usage" != null jsonpath "$.choices[0].message.content" != null jsonpath "$.choices[0].message.role" == "assistant"