mirror of
https://github.com/katanemo/plano.git
synced 2026-05-07 23:02:43 +02:00
fixed bug in Bedrock translation code and dramatically improved tracing for outbound LLM traffic (#601)
* dramatically improve LLM traces and fixed bug with Bedrock translation from claude code * addressing comments --------- Co-authored-by: Salman Paracha <salmanparacha@MacBook-Pro-288.local>
This commit is contained in:
parent
0ee0912a73
commit
566e7b9c09
8 changed files with 149 additions and 61 deletions
|
|
@ -54,6 +54,8 @@ pub struct StreamContext {
|
|||
user_message: Option<String>,
|
||||
upstream_status_code: Option<StatusCode>,
|
||||
binary_frame_decoder: Option<BedrockBinaryFrameDecoder<bytes::BytesMut>>,
|
||||
http_method: Option<String>,
|
||||
http_protocol: Option<String>,
|
||||
}
|
||||
|
||||
impl StreamContext {
|
||||
|
|
@ -83,6 +85,8 @@ impl StreamContext {
|
|||
user_message: None,
|
||||
upstream_status_code: None,
|
||||
binary_frame_decoder: None,
|
||||
http_method: None,
|
||||
http_protocol: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -282,7 +286,7 @@ impl StreamContext {
|
|||
}
|
||||
}
|
||||
}
|
||||
fn handle_end_of_stream_metrics_and_traces(&mut self, current_time: SystemTime) {
|
||||
fn handle_end_of_request_metrics_and_traces(&mut self, current_time: SystemTime) {
|
||||
// All streaming responses end with bytes=0 and end_stream=true
|
||||
// Record the latency for the request
|
||||
match current_time.duration_since(self.start_time) {
|
||||
|
|
@ -332,9 +336,18 @@ impl StreamContext {
|
|||
warn!("traceparent header is invalid: {}", e);
|
||||
}
|
||||
Ok(traceparent) => {
|
||||
let mut trace_data = common::tracing::TraceData::new();
|
||||
let service_name = match &self.resolved_api {
|
||||
Some(api) => {
|
||||
let api_display = api.to_string();
|
||||
format!("archgw.{}", api_display)
|
||||
}
|
||||
None => "archgw".to_string(),
|
||||
};
|
||||
|
||||
let mut trace_data =
|
||||
common::tracing::TraceData::new_with_service_name(service_name);
|
||||
let mut llm_span = Span::new(
|
||||
"egress_traffic".to_string(),
|
||||
self.llm_provider().name.to_string(),
|
||||
Some(traceparent.trace_id),
|
||||
Some(traceparent.parent_id),
|
||||
self.request_body_sent_time.unwrap(),
|
||||
|
|
@ -344,17 +357,34 @@ impl StreamContext {
|
|||
.add_attribute("model".to_string(), self.llm_provider().name.to_string());
|
||||
|
||||
if let Some(user_message) = &self.user_message {
|
||||
llm_span.add_attribute("user_message".to_string(), user_message.clone());
|
||||
llm_span.add_attribute("message".to_string(), user_message.clone());
|
||||
}
|
||||
|
||||
// Add HTTP attributes
|
||||
if let Some(method) = &self.http_method {
|
||||
llm_span.add_attribute("http.method".to_string(), method.clone());
|
||||
}
|
||||
if let Some(protocol) = &self.http_protocol {
|
||||
llm_span.add_attribute("http.protocol".to_string(), protocol.clone());
|
||||
}
|
||||
if let Some(status_code) = &self.upstream_status_code {
|
||||
llm_span.add_attribute(
|
||||
"http.status_code".to_string(),
|
||||
status_code.as_u16().to_string(),
|
||||
);
|
||||
}
|
||||
|
||||
// Add request ID attribute
|
||||
llm_span
|
||||
.add_attribute("http.request_id".to_string(), self.request_identifier());
|
||||
|
||||
if self.ttft_time.is_some() {
|
||||
llm_span.add_event(Event::new(
|
||||
"time_to_first_token".to_string(),
|
||||
self.ttft_time.unwrap(),
|
||||
));
|
||||
trace_data.add_span(llm_span);
|
||||
}
|
||||
|
||||
trace_data.add_span(llm_span);
|
||||
self.traces_queue.lock().unwrap().push_back(trace_data);
|
||||
}
|
||||
};
|
||||
|
|
@ -460,7 +490,7 @@ impl StreamContext {
|
|||
};
|
||||
|
||||
// Extract ProviderStreamResponse for processing (token counting, etc.)
|
||||
if !transformed_event.is_done() {
|
||||
if !transformed_event.is_done() && !transformed_event.is_event_only() {
|
||||
match transformed_event.provider_response() {
|
||||
Ok(provider_response) => {
|
||||
self.record_ttft_if_needed();
|
||||
|
|
@ -722,6 +752,10 @@ impl HttpContext for StreamContext {
|
|||
return Action::Continue;
|
||||
}
|
||||
|
||||
// Capture HTTP method and protocol for tracing
|
||||
self.http_method = self.get_http_request_header(":method");
|
||||
self.http_protocol = self.get_http_request_header(":scheme");
|
||||
|
||||
self.streaming_response = self
|
||||
.get_http_request_header(ARCH_IS_STREAMING_HEADER)
|
||||
.map(|val| val == "true")
|
||||
|
|
@ -1058,6 +1092,17 @@ impl HttpContext for StreamContext {
|
|||
return Action::Continue;
|
||||
}
|
||||
|
||||
let current_time = get_current_time().unwrap();
|
||||
if end_of_stream && body_size == 0 {
|
||||
debug!(
|
||||
"[ARCHGW_REQ_ID:{}] RESPONSE_BODY_COMPLETE: total_bytes={}",
|
||||
self.request_identifier(),
|
||||
body_size
|
||||
);
|
||||
self.handle_end_of_request_metrics_and_traces(current_time);
|
||||
return Action::Continue;
|
||||
}
|
||||
|
||||
// Check if this is an error response from upstream
|
||||
if let Some(status_code) = &self.upstream_status_code {
|
||||
if status_code.is_client_error() || status_code.is_server_error() {
|
||||
|
|
@ -1101,12 +1146,6 @@ impl HttpContext for StreamContext {
|
|||
}
|
||||
}
|
||||
|
||||
let current_time = get_current_time().unwrap();
|
||||
if end_of_stream && body_size == 0 {
|
||||
self.handle_end_of_stream_metrics_and_traces(current_time);
|
||||
return Action::Continue;
|
||||
}
|
||||
|
||||
let body = match self.read_raw_response_body(body_size) {
|
||||
Ok(bytes) => bytes,
|
||||
Err(action) => return action,
|
||||
|
|
@ -1135,6 +1174,7 @@ impl HttpContext for StreamContext {
|
|||
Err(action) => return action,
|
||||
}
|
||||
}
|
||||
|
||||
Action::Continue
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue