Use better logs (#452)

This commit is contained in:
Adil Hafeez 2025-03-27 10:40:20 -07:00 committed by GitHub
parent 76ec5cda68
commit de221525de
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 200 additions and 156 deletions

View file

@ -15,7 +15,7 @@ use common::stats::{IncrementingMetric, RecordingMetric};
use common::tracing::{Event, Span, TraceData, Traceparent};
use common::{ratelimit, routing, tokenizer};
use http::StatusCode;
use log::{debug, trace, warn};
use log::{debug, info, warn};
use proxy_wasm::hostcalls::get_current_time;
use proxy_wasm::traits::*;
use proxy_wasm::types::*;
@ -89,7 +89,7 @@ impl StreamContext {
provider_hint,
));
trace!(
debug!(
"request received: llm provider hint: {}, selected llm: {}, model: {}",
self.get_http_request_header(ARCH_PROVIDER_HINT_HEADER)
.unwrap_or_default(),
@ -140,7 +140,7 @@ impl StreamContext {
}
fn send_server_error(&self, error: ServerError, override_status_code: Option<StatusCode>) {
debug!("server error occurred: {}", error);
warn!("server error occurred: {}", error);
self.send_http_response(
override_status_code
.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR)
@ -159,7 +159,7 @@ impl StreamContext {
// Tokenize and record token count.
let token_count = tokenizer::token_count(model, json_string).unwrap_or(0);
trace!("Recorded input token count: {}", token_count);
debug!("Recorded input token count: {}", token_count);
// Record the token count to metrics.
self.metrics
.input_sequence_length
@ -167,14 +167,14 @@ impl StreamContext {
// Check if rate limiting needs to be applied.
if let Some(selector) = self.ratelimit_selector.take() {
log::trace!("Applying ratelimit for model: {}", model);
log::debug!("Applying ratelimit for model: {}", model);
ratelimit::ratelimits(None).read().unwrap().check_limit(
model.to_owned(),
selector,
NonZero::new(token_count as u32).unwrap(),
)?;
} else {
trace!("No rate limit applied for model: {}", model);
debug!("No rate limit applied for model: {}", model);
}
Ok(())
@ -200,7 +200,7 @@ impl HttpContext for StreamContext {
};
if let Some(routing_header_value) = routing_header_value.as_ref() {
debug!("routing header already set: {}", routing_header_value);
info!("routing header already set: {}", routing_header_value);
self.llm_provider = Some(Rc::new(LlmProvider {
name: routing_header_value.to_string(),
provider_interface: LlmProviderType::OpenAI,
@ -247,6 +247,11 @@ impl HttpContext for StreamContext {
}
fn on_http_request_body(&mut self, body_size: usize, end_of_stream: bool) -> Action {
debug!(
"on_http_request_body [S={}] bytes={} end_stream={}",
self.context_id, body_size, end_of_stream
);
// Let the client send the gateway all the data before sending to the LLM_provider.
// TODO: consider a streaming API.
@ -282,7 +287,10 @@ impl HttpContext for StreamContext {
match serde_json::from_slice(&body_bytes) {
Ok(deserialized) => deserialized,
Err(e) => {
debug!("body str: {}", String::from_utf8_lossy(&body_bytes));
debug!(
"on_http_request_body: request body: {}",
String::from_utf8_lossy(&body_bytes)
);
self.send_server_error(
ServerError::Deserialization(e),
Some(StatusCode::BAD_REQUEST),
@ -336,16 +344,19 @@ impl HttpContext for StreamContext {
}
}
debug!(
"provider: {:?}, model requested: {}, model selected: {:?}",
info!(
"on_http_request_body: provider: {}, model requested: {}, model selected: {}",
self.llm_provider().name,
model_requested,
model_name,
model_name.unwrap_or(&"None".to_string()),
);
let chat_completion_request_str = serde_json::to_string(&deserialized_body).unwrap();
trace!("request body: {}", chat_completion_request_str);
debug!(
"on_http_request_body: request body: {}",
chat_completion_request_str
);
if deserialized_body.stream {
self.streaming_response = true;
@ -380,10 +391,9 @@ impl HttpContext for StreamContext {
}
fn on_http_response_headers(&mut self, _num_headers: usize, _end_of_stream: bool) -> Action {
trace!(
debug!(
"on_http_response_headers [S={}] end_stream={}",
self.context_id,
_end_of_stream
self.context_id, _end_of_stream
);
self.set_property(
@ -395,15 +405,18 @@ impl HttpContext for StreamContext {
}
fn on_http_response_body(&mut self, body_size: usize, end_of_stream: bool) -> Action {
trace!(
debug!(
"on_http_response_body [S={}] bytes={} end_stream={}",
self.context_id,
body_size,
end_of_stream
self.context_id, body_size, end_of_stream
);
if self.request_body_sent_time.is_none() {
debug!("on_http_response_body: request body not sent, no doing any processing in llm filter");
return Action::Continue;
}
if !self.is_chat_completions_request {
debug!("non-chatcompletion request");
info!("on_http_response_body: non-chatcompletion request");
return Action::Continue;
}
@ -415,7 +428,7 @@ impl HttpContext for StreamContext {
Ok(duration) => {
// Convert the duration to milliseconds
let duration_ms = duration.as_millis();
debug!("request latency: {}ms", duration_ms);
info!("on_http_response_body: request latency: {}ms", duration_ms);
// Record the latency to the latency histogram
self.metrics.request_latency.record(duration_ms as u64);
@ -426,7 +439,7 @@ impl HttpContext for StreamContext {
// Record the time per output token
self.metrics.time_per_output_token.record(tpot);
trace!(
debug!(
"time per token: {}ms, tokens per second: {}",
tpot,
1000 / tpot
@ -490,10 +503,9 @@ impl HttpContext for StreamContext {
let body = if self.streaming_response {
let chunk_start = 0;
let chunk_size = body_size;
trace!(
"streaming response reading, {}..{}",
chunk_start,
chunk_size
debug!(
"on_http_response_body: streaming response reading, {}..{}",
chunk_start, chunk_size
);
let streaming_chunk = match self.get_http_response_body(0, chunk_size) {
Some(chunk) => chunk,
@ -515,7 +527,7 @@ impl HttpContext for StreamContext {
}
streaming_chunk
} else {
trace!("non streaming response bytes read: 0:{}", body_size);
debug!("non streaming response bytes read: 0:{}", body_size);
match self.get_http_response_body(0, body_size) {
Some(body) => body,
None => {
@ -528,7 +540,7 @@ impl HttpContext for StreamContext {
let body_utf8 = match String::from_utf8(body) {
Ok(body_utf8) => body_utf8,
Err(e) => {
debug!("could not convert to utf8: {}", e);
warn!("could not convert to utf8: {}", e);
return Action::Continue;
}
};
@ -542,7 +554,7 @@ impl HttpContext for StreamContext {
match ChatCompletionStreamResponseServerEvents::try_from(body_utf8.as_str()) {
Ok(response) => response,
Err(e) => {
debug!(
warn!(
"invalid streaming response: body str: {}, {:?}",
body_utf8, e
);
@ -551,8 +563,8 @@ impl HttpContext for StreamContext {
};
if chat_completions_chunk_response_events.events.is_empty() {
debug!(
"cound't parse any streaming events: body str: {}",
warn!(
"couldn't parse any streaming events: body str: {}",
body_utf8
);
return Action::Continue;
@ -571,7 +583,7 @@ impl HttpContext for StreamContext {
{
Ok(token_count) => token_count,
Err(e) => {
debug!("could not get token count: {:?}", e);
warn!("could not get token count: {:?}", e);
return Action::Continue;
}
};
@ -585,7 +597,10 @@ impl HttpContext for StreamContext {
match current_time.duration_since(self.start_time) {
Ok(duration) => {
let duration_ms = duration.as_millis();
debug!("time to first token: {}ms", duration_ms);
info!(
"on_http_response_body: time to first token: {}ms",
duration_ms
);
self.ttft_duration = Some(duration);
self.metrics.time_to_first_token.record(duration_ms as u64);
}
@ -595,12 +610,12 @@ impl HttpContext for StreamContext {
}
}
} else {
trace!("non streaming response");
debug!("non streaming response");
let chat_completions_response: ChatCompletionsResponse =
match serde_json::from_str(body_utf8.as_str()) {
Ok(de) => de,
Err(err) => {
debug!(
info!(
"non chat-completion compliant response received err: {}, body: {}",
err, body_utf8
);
@ -617,11 +632,9 @@ impl HttpContext for StreamContext {
}
}
trace!(
debug!(
"recv [S={}] total_tokens={} end_stream={}",
self.context_id,
self.response_tokens,
end_of_stream
self.context_id, self.response_tokens, end_of_stream
);
Action::Continue