From dd3aeca0e400bea99b67a709692b40256e8d6e30 Mon Sep 17 00:00:00 2001 From: Adil Hafeez Date: Wed, 26 Mar 2025 11:34:36 -0700 Subject: [PATCH] Use better logging --- arch/Dockerfile | 2 +- crates/llm_gateway/src/stream_context.rs | 91 ++++++++++++--------- crates/prompt_gateway/src/http_context.rs | 39 ++++----- crates/prompt_gateway/src/stream_context.rs | 34 ++++---- 4 files changed, 88 insertions(+), 78 deletions(-) diff --git a/arch/Dockerfile b/arch/Dockerfile index 98a113ad..7f933da5 100644 --- a/arch/Dockerfile +++ b/arch/Dockerfile @@ -29,4 +29,4 @@ RUN pip install requests RUN touch /var/log/envoy.log # ENTRYPOINT ["sh","-c", "python config_generator.py && envsubst < /etc/envoy/envoy.yaml > /etc/envoy.env_sub.yaml && envoy -c /etc/envoy.env_sub.yaml --log-level trace 2>&1 | tee /var/log/envoy.log"] -ENTRYPOINT ["sh","-c", "python config_generator.py && envsubst < /etc/envoy/envoy.yaml > /etc/envoy.env_sub.yaml && envoy -c /etc/envoy.env_sub.yaml --component-log-level wasm:debug 2>&1 | tee /var/log/envoy.log"] +ENTRYPOINT ["sh","-c", "python config_generator.py && envsubst < /etc/envoy/envoy.yaml > /etc/envoy.env_sub.yaml && envoy -c /etc/envoy.env_sub.yaml --component-log-level wasm:info 2>&1 | tee /var/log/envoy.log"] diff --git a/crates/llm_gateway/src/stream_context.rs b/crates/llm_gateway/src/stream_context.rs index c621393f..78d7e21e 100644 --- a/crates/llm_gateway/src/stream_context.rs +++ b/crates/llm_gateway/src/stream_context.rs @@ -15,7 +15,7 @@ use common::stats::{IncrementingMetric, RecordingMetric}; use common::tracing::{Event, Span, TraceData, Traceparent}; use common::{ratelimit, routing, tokenizer}; use http::StatusCode; -use log::{debug, trace, warn}; +use log::{debug, info, warn}; use proxy_wasm::hostcalls::get_current_time; use proxy_wasm::traits::*; use proxy_wasm::types::*; @@ -89,7 +89,7 @@ impl StreamContext { provider_hint, )); - trace!( + debug!( "request received: llm provider hint: {}, selected llm: {}, model: {}", self.get_http_request_header(ARCH_PROVIDER_HINT_HEADER) .unwrap_or_default(), @@ -140,7 +140,7 @@ impl StreamContext { } fn send_server_error(&self, error: ServerError, override_status_code: Option) { - debug!("server error occurred: {}", error); + warn!("server error occurred: {}", error); self.send_http_response( override_status_code .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR) @@ -159,7 +159,7 @@ impl StreamContext { // Tokenize and record token count. let token_count = tokenizer::token_count(model, json_string).unwrap_or(0); - trace!("Recorded input token count: {}", token_count); + debug!("Recorded input token count: {}", token_count); // Record the token count to metrics. self.metrics .input_sequence_length @@ -167,14 +167,14 @@ impl StreamContext { // Check if rate limiting needs to be applied. if let Some(selector) = self.ratelimit_selector.take() { - log::trace!("Applying ratelimit for model: {}", model); + log::debug!("Applying ratelimit for model: {}", model); ratelimit::ratelimits(None).read().unwrap().check_limit( model.to_owned(), selector, NonZero::new(token_count as u32).unwrap(), )?; } else { - trace!("No rate limit applied for model: {}", model); + debug!("No rate limit applied for model: {}", model); } Ok(()) @@ -200,7 +200,7 @@ impl HttpContext for StreamContext { }; if let Some(routing_header_value) = routing_header_value.as_ref() { - debug!("routing header already set: {}", routing_header_value); + info!("routing header already set: {}", routing_header_value); self.llm_provider = Some(Rc::new(LlmProvider { name: routing_header_value.to_string(), provider_interface: LlmProviderType::OpenAI, @@ -247,6 +247,11 @@ impl HttpContext for StreamContext { } fn on_http_request_body(&mut self, body_size: usize, end_of_stream: bool) -> Action { + debug!( + "on_http_request_body [S={}] bytes={} end_stream={}", + self.context_id, body_size, end_of_stream + ); + // Let the client send the gateway all the data before sending to the LLM_provider. // TODO: consider a streaming API. @@ -282,7 +287,10 @@ impl HttpContext for StreamContext { match serde_json::from_slice(&body_bytes) { Ok(deserialized) => deserialized, Err(e) => { - debug!("body str: {}", String::from_utf8_lossy(&body_bytes)); + debug!( + "on_http_request_body: request body: {}", + String::from_utf8_lossy(&body_bytes) + ); self.send_server_error( ServerError::Deserialization(e), Some(StatusCode::BAD_REQUEST), @@ -336,16 +344,19 @@ impl HttpContext for StreamContext { } } - debug!( - "provider: {:?}, model requested: {}, model selected: {:?}", + info!( + "on_http_request_body: provider: {}, model requested: {}, model selected: {}", self.llm_provider().name, model_requested, - model_name, + model_name.unwrap_or(&"None".to_string()), ); let chat_completion_request_str = serde_json::to_string(&deserialized_body).unwrap(); - trace!("request body: {}", chat_completion_request_str); + debug!( + "on_http_request_body: request body: {}", + chat_completion_request_str + ); if deserialized_body.stream { self.streaming_response = true; @@ -380,10 +391,9 @@ impl HttpContext for StreamContext { } fn on_http_response_headers(&mut self, _num_headers: usize, _end_of_stream: bool) -> Action { - trace!( + debug!( "on_http_response_headers [S={}] end_stream={}", - self.context_id, - _end_of_stream + self.context_id, _end_of_stream ); self.set_property( @@ -395,15 +405,18 @@ impl HttpContext for StreamContext { } fn on_http_response_body(&mut self, body_size: usize, end_of_stream: bool) -> Action { - trace!( + debug!( "on_http_response_body [S={}] bytes={} end_stream={}", - self.context_id, - body_size, - end_of_stream + self.context_id, body_size, end_of_stream ); + if self.request_body_sent_time.is_none() { + debug!("on_http_response_body: request body not sent, no doing any processing in llm filter"); + return Action::Continue; + } + if !self.is_chat_completions_request { - debug!("non-chatcompletion request"); + info!("on_http_response_body: non-chatcompletion request"); return Action::Continue; } @@ -415,7 +428,7 @@ impl HttpContext for StreamContext { Ok(duration) => { // Convert the duration to milliseconds let duration_ms = duration.as_millis(); - debug!("request latency: {}ms", duration_ms); + info!("on_http_response_body: request latency: {}ms", duration_ms); // Record the latency to the latency histogram self.metrics.request_latency.record(duration_ms as u64); @@ -426,7 +439,7 @@ impl HttpContext for StreamContext { // Record the time per output token self.metrics.time_per_output_token.record(tpot); - trace!( + debug!( "time per token: {}ms, tokens per second: {}", tpot, 1000 / tpot @@ -490,10 +503,9 @@ impl HttpContext for StreamContext { let body = if self.streaming_response { let chunk_start = 0; let chunk_size = body_size; - trace!( - "streaming response reading, {}..{}", - chunk_start, - chunk_size + debug!( + "on_http_response_body: streaming response reading, {}..{}", + chunk_start, chunk_size ); let streaming_chunk = match self.get_http_response_body(0, chunk_size) { Some(chunk) => chunk, @@ -515,7 +527,7 @@ impl HttpContext for StreamContext { } streaming_chunk } else { - trace!("non streaming response bytes read: 0:{}", body_size); + debug!("non streaming response bytes read: 0:{}", body_size); match self.get_http_response_body(0, body_size) { Some(body) => body, None => { @@ -528,7 +540,7 @@ impl HttpContext for StreamContext { let body_utf8 = match String::from_utf8(body) { Ok(body_utf8) => body_utf8, Err(e) => { - debug!("could not convert to utf8: {}", e); + warn!("could not convert to utf8: {}", e); return Action::Continue; } }; @@ -542,7 +554,7 @@ impl HttpContext for StreamContext { match ChatCompletionStreamResponseServerEvents::try_from(body_utf8.as_str()) { Ok(response) => response, Err(e) => { - debug!( + warn!( "invalid streaming response: body str: {}, {:?}", body_utf8, e ); @@ -551,8 +563,8 @@ impl HttpContext for StreamContext { }; if chat_completions_chunk_response_events.events.is_empty() { - debug!( - "cound't parse any streaming events: body str: {}", + warn!( + "couldn't parse any streaming events: body str: {}", body_utf8 ); return Action::Continue; @@ -571,7 +583,7 @@ impl HttpContext for StreamContext { { Ok(token_count) => token_count, Err(e) => { - debug!("could not get token count: {:?}", e); + warn!("could not get token count: {:?}", e); return Action::Continue; } }; @@ -585,7 +597,10 @@ impl HttpContext for StreamContext { match current_time.duration_since(self.start_time) { Ok(duration) => { let duration_ms = duration.as_millis(); - debug!("time to first token: {}ms", duration_ms); + info!( + "on_http_response_body: time to first token: {}ms", + duration_ms + ); self.ttft_duration = Some(duration); self.metrics.time_to_first_token.record(duration_ms as u64); } @@ -595,12 +610,12 @@ impl HttpContext for StreamContext { } } } else { - trace!("non streaming response"); + debug!("non streaming response"); let chat_completions_response: ChatCompletionsResponse = match serde_json::from_str(body_utf8.as_str()) { Ok(de) => de, Err(err) => { - debug!( + info!( "non chat-completion compliant response received err: {}, body: {}", err, body_utf8 ); @@ -617,11 +632,9 @@ impl HttpContext for StreamContext { } } - trace!( + debug!( "recv [S={}] total_tokens={} end_stream={}", - self.context_id, - self.response_tokens, - end_of_stream + self.context_id, self.response_tokens, end_of_stream ); Action::Continue diff --git a/crates/prompt_gateway/src/http_context.rs b/crates/prompt_gateway/src/http_context.rs index 9580e934..3a7dc7d9 100644 --- a/crates/prompt_gateway/src/http_context.rs +++ b/crates/prompt_gateway/src/http_context.rs @@ -14,7 +14,7 @@ use common::{ pii::obfuscate_auth_header, }; use http::StatusCode; -use log::{debug, trace, warn}; +use log::{debug, info, warn}; use proxy_wasm::{traits::HttpContext, types::Action}; use serde_json::Value; use std::{ @@ -39,7 +39,7 @@ impl HttpContext for StreamContext { if let Some(endpoints) = self.endpoints.as_ref() { if endpoints.len() == 1 { let (name, _) = endpoints.iter().next().unwrap(); - debug!("Setting ARCH_PROVIDER_HINT_HEADER to {}", name); + info!("Setting ARCH_PROVIDER_HINT_HEADER to {}", name); self.set_http_request_header(ARCH_ROUTING_HEADER, Some(name)); } else { warn!("Need single endpoint when use_agent_orchestrator is set"); @@ -63,7 +63,7 @@ impl HttpContext for StreamContext { self.is_chat_completions_request = request_path == CHAT_COMPLETIONS_PATH; - trace!( + debug!( "on_http_request_headers S[{}] req_headers={:?}", self.context_id, obfuscate_auth_header(&mut self.get_http_request_headers()) @@ -89,10 +89,9 @@ impl HttpContext for StreamContext { self.request_body_size = body_size; - trace!( + debug!( "on_http_request_body S[{}] body_size={}", - self.context_id, - body_size + self.context_id, body_size ); let body_bytes = match self.get_http_request_body(0, body_size) { @@ -109,7 +108,7 @@ impl HttpContext for StreamContext { } }; - trace!("request body: {}", String::from_utf8_lossy(&body_bytes)); + debug!("request body: {}", String::from_utf8_lossy(&body_bytes)); // Deserialize body into spec. // Currently OpenAI API. @@ -206,8 +205,8 @@ impl HttpContext for StreamContext { } }; - debug!("sending request to model server"); - trace!("request body: {}", json_data); + info!("on_http_request_body: sending request to model server"); + debug!("request body: {}", json_data); let timeout_str = MODEL_SERVER_REQUEST_TIMEOUT_MS.to_string(); @@ -248,7 +247,7 @@ impl HttpContext for StreamContext { }; if let Err(e) = self.http_call(call_args, call_context) { - debug!("http_call failed: {:?}", e); + warn!("http_call failed: {:?}", e); self.send_server_error(ServerError::HttpDispatch(e), None); } @@ -256,7 +255,7 @@ impl HttpContext for StreamContext { } fn on_http_response_headers(&mut self, _num_headers: usize, _end_of_stream: bool) -> Action { - trace!( + debug!( "on_http_response_headers recv [S={}] headers={:?}", self.context_id, self.get_http_response_headers() @@ -268,15 +267,13 @@ impl HttpContext for StreamContext { } fn on_http_response_body(&mut self, body_size: usize, end_of_stream: bool) -> Action { - trace!( + debug!( "on_http_response_body: recv [S={}] bytes={} end_stream={}", - self.context_id, - body_size, - end_of_stream + self.context_id, body_size, end_of_stream ); if !self.is_chat_completions_request { - debug!("non-gpt request"); + info!("non-gpt request"); return Action::Continue; } @@ -315,7 +312,7 @@ impl HttpContext for StreamContext { streaming_chunk } else { - debug!("non streaming response bytes read: 0:{}", body_size); + info!("non streaming response bytes read: 0:{}", body_size); match self.get_http_response_body(0, body_size) { Some(body) => body, None => { @@ -328,13 +325,13 @@ impl HttpContext for StreamContext { let body_utf8 = match String::from_utf8(body) { Ok(body_utf8) => body_utf8, Err(e) => { - debug!("could not convert to utf8: {}", e); + info!("could not convert to utf8: {}", e); return Action::Continue; } }; if self.streaming_response { - trace!("streaming response"); + debug!("streaming response"); if self.tool_calls.is_some() && !self.tool_calls.as_ref().unwrap().is_empty() { let chunks = vec![ @@ -396,13 +393,13 @@ impl HttpContext for StreamContext { serde_json::Value::String(arch_state_str), ); let data_serialized = serde_json::to_string(&data).unwrap(); - debug!("archgw <= developer: {}", data_serialized); + info!("archgw <= developer: {}", data_serialized); self.set_http_response_body(0, body_size, data_serialized.as_bytes()); }; } } - trace!("recv [S={}] end_stream={}", self.context_id, end_of_stream); + debug!("recv [S={}] end_stream={}", self.context_id, end_of_stream); Action::Continue } diff --git a/crates/prompt_gateway/src/stream_context.rs b/crates/prompt_gateway/src/stream_context.rs index 4a968d43..ec68c104 100644 --- a/crates/prompt_gateway/src/stream_context.rs +++ b/crates/prompt_gateway/src/stream_context.rs @@ -15,7 +15,7 @@ use common::http::{CallArgs, Client}; use common::stats::Gauge; use derivative::Derivative; use http::StatusCode; -use log::{debug, trace, warn}; +use log::{debug, info, warn}; use proxy_wasm::traits::*; use std::cell::RefCell; use std::collections::HashMap; @@ -128,8 +128,8 @@ impl StreamContext { mut callout_context: StreamCallContext, ) { let body_str = String::from_utf8(body).unwrap(); - debug!("model server response received"); - trace!("response body: {}", body_str); + info!("on_http_call_response: model server response received"); + debug!("response body: {}", body_str); let model_server_response: ChatCompletionsResponse = match serde_json::from_str(&body_str) { Ok(arch_fc_response) => arch_fc_response, @@ -150,14 +150,14 @@ impl StreamContext { .is_some(); if !intent_matched { - debug!("intent not matched"); + info!("intent not matched"); // check if we have a default prompt target if let Some(default_prompt_target) = self .prompt_targets .values() .find(|pt| pt.default.unwrap_or(false)) { - debug!("default prompt target found, forwarding request to default prompt target"); + info!("default prompt target found, forwarding request to default prompt target"); let endpoint = default_prompt_target.endpoint.clone().unwrap(); let upstream_path: String = endpoint.path.unwrap_or(String::from("/")); @@ -204,7 +204,7 @@ impl StreamContext { } return; } else { - debug!("no default prompt target found, forwarding request to upstream llm"); + info!("no default prompt target found, forwarding request to upstream llm"); let mut messages = Vec::new(); // add system prompt match self.system_prompt.as_ref() { @@ -242,7 +242,7 @@ impl StreamContext { let chat_completion_request_json = serde_json::to_string(&chat_completion_request).unwrap(); - debug!( + info!( "archgw => upstream llm request: {}", chat_completion_request_json ); @@ -353,7 +353,7 @@ impl StreamContext { }; let body_str = serde_json::to_string(&chat_completion_request).unwrap(); - debug!("sending request to llm agent: {}", body_str); + info!("sending request to llm agent: {}", body_str); self.set_http_request_body(0, self.request_body_size, body_str.as_bytes()); self.resume_http_request(); return; @@ -396,7 +396,7 @@ impl StreamContext { } }; - debug!("api call body {:?}", api_call_body); + debug!("on_http_call_response: api call body {:?}", api_call_body); let timeout_str = API_REQUEST_TIMEOUT_MS.to_string(); @@ -436,8 +436,8 @@ impl StreamContext { Duration::from_secs(5), ); - debug!( - "dispatching api call to developer endpoint: {}, path: {}, method: {}", + info!( + "on_http_call_response: dispatching api call to developer endpoint: {}, path: {}, method: {}", endpoint_details.name, path, http_method_str ); @@ -454,8 +454,8 @@ impl StreamContext { let http_status = self .get_http_call_response_header(":status") .unwrap_or(StatusCode::OK.as_str().to_string()); - debug!( - "developer api call response received: status code: {}", + info!( + "on_http_call_response: developer api call response received: status code: {}", http_status ); let prompt_target = self @@ -479,7 +479,7 @@ impl StreamContext { ); } self.tool_call_response = Some(String::from_utf8(body).unwrap()); - trace!( + debug!( "response body: {}", self.tool_call_response.as_ref().unwrap() ); @@ -561,8 +561,8 @@ impl StreamContext { return self.send_server_error(ServerError::Serialization(e), None); } }; - debug!("sending request to upstream llm"); - trace!("request body: {}", llm_request_str); + info!("on_http_call_response: sending request to upstream llm"); + debug!("request body: {}", llm_request_str); self.start_upstream_llm_request_time = SystemTime::now() .duration_since(UNIX_EPOCH) @@ -755,7 +755,7 @@ impl StreamContext { }; let json_resp = serde_json::to_string(&chat_completion_request).unwrap(); - debug!("archgw => (default target) llm request: {}", json_resp); + info!("archgw => (default target) llm request: {}", json_resp); self.set_http_request_body(0, self.request_body_size, json_resp.as_bytes()); self.resume_http_request(); }