mirror of
https://github.com/katanemo/plano.git
synced 2026-06-17 15:25:17 +02:00
fixed test cases and added more structured logs
This commit is contained in:
parent
e218b1c380
commit
ee52c608f7
2 changed files with 154 additions and 66 deletions
|
|
@ -109,10 +109,11 @@ impl StreamContext {
|
|||
provider_hint,
|
||||
));
|
||||
|
||||
debug!(
|
||||
"request received: llm provider hint: {}, selected provider: {}",
|
||||
info!(
|
||||
"[ARCHGW_REQ_ID:{}] PROVIDER_SELECTION: hint='{}' -> selected='{}'",
|
||||
self.context_id,
|
||||
self.get_http_request_header(ARCH_PROVIDER_HINT_HEADER)
|
||||
.unwrap_or_default(),
|
||||
.unwrap_or("none".to_string()),
|
||||
self.llm_provider.as_ref().unwrap().name
|
||||
);
|
||||
}
|
||||
|
|
@ -183,7 +184,11 @@ impl StreamContext {
|
|||
// Tokenize and record token count.
|
||||
let token_count = tokenizer::token_count(model, json_string).unwrap_or(0);
|
||||
|
||||
debug!("Recorded input token count: {}", token_count);
|
||||
info!(
|
||||
"[ARCHGW_REQ_ID:{}] TOKEN_COUNT: model='{}' input_tokens={}",
|
||||
self.context_id, model, token_count
|
||||
);
|
||||
|
||||
// Record the token count to metrics.
|
||||
self.metrics
|
||||
.input_sequence_length
|
||||
|
|
@ -191,14 +196,20 @@ impl StreamContext {
|
|||
|
||||
// Check if rate limiting needs to be applied.
|
||||
if let Some(selector) = self.ratelimit_selector.take() {
|
||||
log::debug!("Applying ratelimit for model: {}", model);
|
||||
info!(
|
||||
"[ARCHGW_REQ_ID:{}] RATELIMIT_CHECK: model='{}' selector='{}:{}'",
|
||||
self.context_id, model, selector.key, selector.value
|
||||
);
|
||||
ratelimit::ratelimits(None).read().unwrap().check_limit(
|
||||
model.to_owned(),
|
||||
selector,
|
||||
NonZero::new(token_count as u32).unwrap(),
|
||||
)?;
|
||||
} else {
|
||||
debug!("No rate limit applied for model: {}", model);
|
||||
debug!(
|
||||
"[ARCHGW_REQ_ID:{}] RATELIMIT_SKIP: model='{}' (no selector)",
|
||||
self.context_id, model
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
|
@ -214,14 +225,17 @@ impl StreamContext {
|
|||
Ok(duration) => {
|
||||
let duration_ms = duration.as_millis();
|
||||
info!(
|
||||
"on_http_response_body: time to first token: {}ms",
|
||||
duration_ms
|
||||
"[ARCHGW_REQ_ID:{}] TIME_TO_FIRST_TOKEN: {}ms",
|
||||
self.context_id, duration_ms
|
||||
);
|
||||
self.ttft_duration = Some(duration);
|
||||
self.metrics.time_to_first_token.record(duration_ms as u64);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("SystemTime error: {:?}", e);
|
||||
warn!(
|
||||
"[ARCHGW_REQ_ID:{}] TIME_MEASUREMENT_ERROR: {:?}",
|
||||
self.context_id, e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -233,7 +247,10 @@ impl StreamContext {
|
|||
Ok(duration) => {
|
||||
// Convert the duration to milliseconds
|
||||
let duration_ms = duration.as_millis();
|
||||
info!("on_http_response_body: request latency: {}ms", duration_ms);
|
||||
info!(
|
||||
"[ARCHGW_REQ_ID:{}] REQUEST_COMPLETE: latency={}ms tokens={}",
|
||||
self.context_id, duration_ms, self.response_tokens
|
||||
);
|
||||
// Record the latency to the latency histogram
|
||||
self.metrics.request_latency.record(duration_ms as u64);
|
||||
|
||||
|
|
@ -244,8 +261,9 @@ impl StreamContext {
|
|||
// Record the time per output token
|
||||
self.metrics.time_per_output_token.record(tpot);
|
||||
|
||||
debug!(
|
||||
"time per token: {}ms, tokens per second: {}",
|
||||
info!(
|
||||
"[ARCHGW_REQ_ID:{}] TOKEN_THROUGHPUT: time_per_token={}ms tokens_per_second={}",
|
||||
self.context_id,
|
||||
tpot,
|
||||
1000 / tpot
|
||||
);
|
||||
|
|
@ -301,18 +319,17 @@ impl StreamContext {
|
|||
|
||||
fn read_raw_response_body(&mut self, body_size: usize) -> Result<Vec<u8>, Action> {
|
||||
if self.streaming_response {
|
||||
let chunk_start = 0;
|
||||
let chunk_size = body_size;
|
||||
debug!(
|
||||
"on_http_response_body: streaming response reading, {}..{}",
|
||||
chunk_start, chunk_size
|
||||
"[ARCHGW_REQ_ID:{}] UPSTREAM_RESPONSE_CHUNK: streaming=true chunk_size={}",
|
||||
self.context_id, chunk_size
|
||||
);
|
||||
let streaming_chunk = match self.get_http_response_body(0, chunk_size) {
|
||||
Some(chunk) => chunk,
|
||||
None => {
|
||||
warn!(
|
||||
"response body empty, chunk_start: {}, chunk_size: {}",
|
||||
chunk_start, chunk_size
|
||||
"[ARCHGW_REQ_ID:{}] UPSTREAM_RESPONSE_ERROR: empty chunk, size={}",
|
||||
self.context_id, chunk_size
|
||||
);
|
||||
return Err(Action::Continue);
|
||||
}
|
||||
|
|
@ -320,9 +337,10 @@ impl StreamContext {
|
|||
|
||||
if streaming_chunk.len() != chunk_size {
|
||||
warn!(
|
||||
"chunk size mismatch: read: {} != requested: {}",
|
||||
streaming_chunk.len(),
|
||||
chunk_size
|
||||
"[ARCHGW_REQ_ID:{}] UPSTREAM_RESPONSE_MISMATCH: expected={} actual={}",
|
||||
self.context_id,
|
||||
chunk_size,
|
||||
streaming_chunk.len()
|
||||
);
|
||||
}
|
||||
Ok(streaming_chunk)
|
||||
|
|
@ -330,7 +348,10 @@ impl StreamContext {
|
|||
if body_size == 0 {
|
||||
return Err(Action::Continue);
|
||||
}
|
||||
debug!("non streaming response bytes read: 0:{}", body_size);
|
||||
debug!(
|
||||
"[ARCHGW_REQ_ID:{}] UPSTREAM_RESPONSE_COMPLETE: streaming=false body_size={}",
|
||||
self.context_id, body_size
|
||||
);
|
||||
match self.get_http_response_body(0, body_size) {
|
||||
Some(body) => Ok(body),
|
||||
None => {
|
||||
|
|
@ -342,12 +363,12 @@ impl StreamContext {
|
|||
}
|
||||
|
||||
fn debug_log_body(&self, body: &[u8]) {
|
||||
if log::log_enabled!(log::Level::Debug) {
|
||||
debug!(
|
||||
"raw response data (converted to utf8): {}",
|
||||
String::from_utf8_lossy(body)
|
||||
);
|
||||
}
|
||||
debug!(
|
||||
"[ARCHGW_REQ_ID:{}] UPSTREAM_RAW_RESPONSE: body_size={} content={}",
|
||||
self.context_id,
|
||||
body.len(),
|
||||
String::from_utf8_lossy(body)
|
||||
);
|
||||
}
|
||||
|
||||
fn handle_streaming_response(
|
||||
|
|
@ -355,7 +376,12 @@ impl StreamContext {
|
|||
body: &[u8],
|
||||
provider_id: ProviderId,
|
||||
) -> Result<Vec<u8>, Action> {
|
||||
debug!("processing streaming response");
|
||||
debug!(
|
||||
"[ARCHGW_REQ_ID:{}] STREAMING_PROCESS: provider_id={:?} chunk_size={}",
|
||||
self.context_id,
|
||||
provider_id,
|
||||
body.len()
|
||||
);
|
||||
match self.client_api.as_ref() {
|
||||
Some(client_api) => {
|
||||
let client_api = client_api.clone(); // Clone to avoid borrowing issues
|
||||
|
|
@ -392,16 +418,29 @@ impl StreamContext {
|
|||
self.record_ttft_if_needed();
|
||||
|
||||
if provider_response.is_final() {
|
||||
debug!("Received final streaming chunk");
|
||||
debug!(
|
||||
"[ARCHGW_REQ_ID:{}] STREAMING_FINAL_CHUNK: total_tokens={}",
|
||||
self.context_id, self.response_tokens
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(content) = provider_response.content_delta() {
|
||||
let estimated_tokens = content.len() / 4;
|
||||
self.response_tokens += estimated_tokens.max(1);
|
||||
debug!(
|
||||
"[ARCHGW_REQ_ID:{}] STREAMING_TOKEN_UPDATE: delta_chars={} estimated_tokens={} total_tokens={}",
|
||||
self.context_id,
|
||||
content.len(),
|
||||
estimated_tokens.max(1),
|
||||
self.response_tokens
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Error processing streaming chunk: {}", e);
|
||||
warn!(
|
||||
"[ARCHGW_REQ_ID:{}] STREAMING_CHUNK_ERROR: {}",
|
||||
self.context_id, e
|
||||
);
|
||||
return Err(Action::Continue);
|
||||
}
|
||||
}
|
||||
|
|
@ -426,19 +465,22 @@ impl StreamContext {
|
|||
body: &[u8],
|
||||
provider_id: ProviderId,
|
||||
) -> Result<Vec<u8>, Action> {
|
||||
info!(
|
||||
"[ARCHGW_REQ_ID:{}] NON_STREAMING_PROCESS: provider_id={:?} body_size={}",
|
||||
self.context_id,
|
||||
provider_id,
|
||||
body.len()
|
||||
);
|
||||
|
||||
let response: ProviderResponseType = match self.client_api.as_ref() {
|
||||
Some(client_api) => {
|
||||
match ProviderResponseType::try_from((body, client_api, &provider_id)) {
|
||||
Ok(response) => response,
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"could not parse response: {}, body str: {}",
|
||||
e,
|
||||
String::from_utf8_lossy(body)
|
||||
);
|
||||
debug!(
|
||||
"on_http_response_body: S[{}], response body: {}",
|
||||
"[ARCHGW_REQ_ID:{}] UPSTREAM_RESPONSE_PARSE_ERROR: {} | body: {}",
|
||||
self.context_id,
|
||||
e,
|
||||
String::from_utf8_lossy(body)
|
||||
);
|
||||
self.send_server_error(
|
||||
|
|
@ -450,7 +492,10 @@ impl StreamContext {
|
|||
}
|
||||
}
|
||||
None => {
|
||||
warn!("Missing client_api for non-streaming response");
|
||||
warn!(
|
||||
"[ARCHGW_REQ_ID:{}] UPSTREAM_RESPONSE_ERROR: missing client_api",
|
||||
self.context_id
|
||||
);
|
||||
return Err(Action::Continue);
|
||||
}
|
||||
};
|
||||
|
|
@ -459,22 +504,28 @@ impl StreamContext {
|
|||
if let Some((prompt_tokens, completion_tokens, total_tokens)) =
|
||||
response.extract_usage_counts()
|
||||
{
|
||||
debug!(
|
||||
"Response usage: prompt={}, completion={}, total={}",
|
||||
prompt_tokens, completion_tokens, total_tokens
|
||||
info!(
|
||||
"[ARCHGW_REQ_ID:{}] RESPONSE_USAGE: prompt_tokens={} completion_tokens={} total_tokens={}",
|
||||
self.context_id,
|
||||
prompt_tokens,
|
||||
completion_tokens,
|
||||
total_tokens
|
||||
);
|
||||
self.response_tokens = completion_tokens;
|
||||
} else {
|
||||
warn!("No usage information found in response");
|
||||
warn!(
|
||||
"[ARCHGW_REQ_ID:{}] RESPONSE_USAGE: no usage information found",
|
||||
self.context_id
|
||||
);
|
||||
}
|
||||
// Serialize the normalized response back to JSON bytes
|
||||
match serde_json::to_vec(&response) {
|
||||
Ok(bytes) => {
|
||||
debug!(
|
||||
"non streaming response data after serialization. length: {}, converted to utf8: {}",
|
||||
bytes.len(),
|
||||
String::from_utf8_lossy(&bytes)
|
||||
);
|
||||
"[ARCHGW_REQ_ID:{}] CLIENT_RESPONSE_PAYLOAD: {}",
|
||||
self.context_id,
|
||||
String::from_utf8_lossy(&bytes)
|
||||
);
|
||||
Ok(bytes)
|
||||
}
|
||||
Err(e) => {
|
||||
|
|
@ -576,7 +627,7 @@ impl HttpContext for StreamContext {
|
|||
|
||||
fn on_http_request_body(&mut self, body_size: usize, end_of_stream: bool) -> Action {
|
||||
debug!(
|
||||
"on_http_request_body [S={}] bytes={} end_stream={}",
|
||||
"[ARCHGW_REQ_ID:{}] REQUEST_BODY_CHUNK: bytes={} end_stream={}",
|
||||
self.context_id, body_size, end_of_stream
|
||||
);
|
||||
|
||||
|
|
@ -612,11 +663,26 @@ impl HttpContext for StreamContext {
|
|||
//We need to deserialize the request body based on the resolved API
|
||||
let mut deserialized_client_request: ProviderRequestType = match self.client_api.as_ref() {
|
||||
Some(the_client_api) => {
|
||||
info!(
|
||||
"[ARCHGW_REQ_ID:{}] CLIENT_REQUEST_RECEIVED: api={:?} body_size={}",
|
||||
self.context_id,
|
||||
the_client_api,
|
||||
body_bytes.len()
|
||||
);
|
||||
|
||||
debug!(
|
||||
"[ARCHGW_REQ_ID:{}] CLIENT_REQUEST_PAYLOAD: {}",
|
||||
self.context_id,
|
||||
String::from_utf8_lossy(&body_bytes)
|
||||
);
|
||||
|
||||
match ProviderRequestType::try_from((&body_bytes[..], the_client_api)) {
|
||||
Ok(deserialized) => deserialized,
|
||||
Err(e) => {
|
||||
debug!(
|
||||
"on_http_request_body: request body: {}",
|
||||
warn!(
|
||||
"[ARCHGW_REQ_ID:{}] CLIENT_REQUEST_PARSE_ERROR: {} | body: {}",
|
||||
self.context_id,
|
||||
e,
|
||||
String::from_utf8_lossy(&body_bytes)
|
||||
);
|
||||
self.send_server_error(
|
||||
|
|
@ -656,6 +722,13 @@ impl HttpContext for StreamContext {
|
|||
if use_agent_orchestrator {
|
||||
"agent_orchestrator".to_string()
|
||||
} else {
|
||||
warn!(
|
||||
"[ARCHGW_REQ_ID:{}] MODEL_RESOLUTION_ERROR: no model specified | req_model='{}' provider='{}' config_model={:?}",
|
||||
self.context_id,
|
||||
model_requested,
|
||||
self.llm_provider().name,
|
||||
self.llm_provider().model
|
||||
);
|
||||
self.send_server_error(
|
||||
ServerError::BadRequest {
|
||||
why: format!(
|
||||
|
|
@ -679,10 +752,12 @@ impl HttpContext for StreamContext {
|
|||
self.user_message = deserialized_client_request.get_recent_user_message();
|
||||
|
||||
info!(
|
||||
"on_http_request_body: provider: {}, model requested (in body): {}, model selected: {}",
|
||||
self.llm_provider().name,
|
||||
"[ARCHGW_REQ_ID:{}] MODEL_RESOLUTION: req_model='{}' -> resolved_model='{}' provider='{}' streaming={}",
|
||||
self.context_id,
|
||||
model_requested,
|
||||
model_name.unwrap_or(&"None".to_string()),
|
||||
resolved_model,
|
||||
self.llm_provider().name,
|
||||
deserialized_client_request.is_streaming()
|
||||
);
|
||||
|
||||
// Use provider interface for streaming detection and setup
|
||||
|
|
@ -703,21 +778,34 @@ impl HttpContext for StreamContext {
|
|||
// Convert chat completion request to llm provider specific request using provider interface
|
||||
let serialized_body_bytes_upstream = match self.resolved_api.as_ref() {
|
||||
Some(upstream) => {
|
||||
info!(
|
||||
"[ARCHGW_REQ_ID:{}] UPSTREAM_TRANSFORM: client_api={:?} -> upstream_api={:?}",
|
||||
self.context_id, self.client_api, upstream
|
||||
);
|
||||
|
||||
match ProviderRequestType::try_from((&deserialized_client_request, upstream)) {
|
||||
Ok(request) => match request.to_bytes() {
|
||||
Ok(bytes) => bytes,
|
||||
Err(e) => {
|
||||
warn!("Failed to serialize request body: {}", e);
|
||||
self.send_server_error(
|
||||
ServerError::LogicError(format!(
|
||||
"Request serialization error: {}",
|
||||
e
|
||||
)),
|
||||
Some(StatusCode::BAD_REQUEST),
|
||||
);
|
||||
return Action::Pause;
|
||||
Ok(request) => {
|
||||
debug!(
|
||||
"[ARCHGW_REQ_ID:{}] UPSTREAM_REQUEST_PAYLOAD: {}",
|
||||
self.context_id,
|
||||
String::from_utf8_lossy(&request.to_bytes().unwrap_or_default())
|
||||
);
|
||||
|
||||
match request.to_bytes() {
|
||||
Ok(bytes) => bytes,
|
||||
Err(e) => {
|
||||
warn!("Failed to serialize request body: {}", e);
|
||||
self.send_server_error(
|
||||
ServerError::LogicError(format!(
|
||||
"Request serialization error: {}",
|
||||
e
|
||||
)),
|
||||
Some(StatusCode::BAD_REQUEST),
|
||||
);
|
||||
return Action::Pause;
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Failed to create provider request: {}", e);
|
||||
self.send_server_error(
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ POST http://localhost:12000/v1/chat/completions
|
|||
Content-Type: application/json
|
||||
|
||||
{
|
||||
"model": "openai/gpt-4.1",
|
||||
"model": "openai/gpt-4o-mini",
|
||||
"messages": [
|
||||
{
|
||||
"role": "user",
|
||||
|
|
@ -13,7 +13,7 @@ Content-Type: application/json
|
|||
HTTP 200
|
||||
[Asserts]
|
||||
header "content-type" == "application/json"
|
||||
jsonpath "$.model" matches /^gpt-4.1/
|
||||
jsonpath "$.model" matches /^gpt-4o-mini/
|
||||
jsonpath "$.usage" != null
|
||||
jsonpath "$.choices[0].message.content" != null
|
||||
jsonpath "$.choices[0].message.role" == "assistant"
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue