mirror of
https://github.com/katanemo/plano.git
synced 2026-06-17 15:25:17 +02:00
Merge branch 'main' into collect-stats-in-stream-context
This commit is contained in:
commit
d035b33faf
12 changed files with 368 additions and 9 deletions
7
crates/Cargo.lock
generated
7
crates/Cargo.lock
generated
|
|
@ -224,6 +224,7 @@ dependencies = [
|
|||
"derivative",
|
||||
"duration-string",
|
||||
"governor",
|
||||
"hex",
|
||||
"log",
|
||||
"pretty_assertions",
|
||||
"proxy-wasm",
|
||||
|
|
@ -764,6 +765,12 @@ dependencies = [
|
|||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hex"
|
||||
version = "0.4.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
|
||||
|
||||
[[package]]
|
||||
name = "http"
|
||||
version = "1.1.0"
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ thiserror = "1.0.64"
|
|||
tiktoken-rs = "0.5.9"
|
||||
rand = "0.8.5"
|
||||
serde_json = "1.0"
|
||||
hex = "0.4.3"
|
||||
|
||||
[dev-dependencies]
|
||||
pretty_assertions = "1.4.1"
|
||||
|
|
|
|||
|
|
@ -7,8 +7,9 @@ pub mod embeddings;
|
|||
pub mod errors;
|
||||
pub mod http;
|
||||
pub mod llm_providers;
|
||||
pub mod pii;
|
||||
pub mod ratelimit;
|
||||
pub mod routing;
|
||||
pub mod stats;
|
||||
pub mod tokenizer;
|
||||
pub mod pii;
|
||||
pub mod tracing;
|
||||
|
|
|
|||
177
crates/common/src/tracing.rs
Normal file
177
crates/common/src/tracing.rs
Normal file
|
|
@ -0,0 +1,177 @@
|
|||
use rand::RngCore;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct ResourceSpan {
|
||||
pub resource: Resource,
|
||||
#[serde(rename = "scopeSpans")]
|
||||
pub scope_spans: Vec<ScopeSpan>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct Resource {
|
||||
pub attributes: Vec<Attribute>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct ScopeSpan {
|
||||
scope: Scope,
|
||||
spans: Vec<Span>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
struct Scope {
|
||||
name: String,
|
||||
version: String,
|
||||
attributes: Vec<Attribute>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct Span {
|
||||
#[serde(rename = "traceId")]
|
||||
pub trace_id: String,
|
||||
#[serde(rename = "spanId")]
|
||||
pub span_id: String,
|
||||
#[serde(rename = "parentSpanId")]
|
||||
pub parent_span_id: Option<String>, // Optional in case there’s no parent span
|
||||
pub name: String,
|
||||
#[serde(rename = "startTimeUnixNano")]
|
||||
pub start_time_unix_nano: String,
|
||||
#[serde(rename = "endTimeUnixNano")]
|
||||
pub end_time_unix_nano: String,
|
||||
pub kind: u32,
|
||||
pub attributes: Vec<Attribute>,
|
||||
pub events: Option<Vec<Event>>,
|
||||
}
|
||||
|
||||
impl Span {
|
||||
pub fn new(
|
||||
name: String,
|
||||
parent_trace_id: String,
|
||||
parent_span_id: Option<String>,
|
||||
start_time_unix_nano: u128,
|
||||
end_time_unix_nano: u128,
|
||||
) -> Self {
|
||||
Span {
|
||||
trace_id: parent_trace_id,
|
||||
span_id: get_random_span_id(),
|
||||
parent_span_id,
|
||||
name,
|
||||
start_time_unix_nano: format!("{}", start_time_unix_nano),
|
||||
end_time_unix_nano: format!("{}", end_time_unix_nano),
|
||||
kind: 0,
|
||||
attributes: Vec::new(),
|
||||
events: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_attribute(&mut self, key: String, value: String) {
|
||||
self.attributes.push(Attribute {
|
||||
key,
|
||||
value: AttributeValue {
|
||||
string_value: Some(value),
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
pub fn add_event(&mut self, event: Event) {
|
||||
if self.events.is_none() {
|
||||
self.events = Some(Vec::new());
|
||||
}
|
||||
self.events.as_mut().unwrap().push(event);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct Event {
|
||||
#[serde(rename = "timeUnixNano")]
|
||||
pub time_unix_nano: String,
|
||||
pub name: String,
|
||||
pub attributes: Vec<Attribute>,
|
||||
}
|
||||
|
||||
impl Event {
|
||||
pub fn new(name: String, time_unix_nano: u128) -> Self {
|
||||
Event {
|
||||
time_unix_nano: format!("{}", time_unix_nano),
|
||||
name,
|
||||
attributes: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_attribute(&mut self, key: String, value: String) {
|
||||
self.attributes.push(Attribute {
|
||||
key,
|
||||
value: AttributeValue {
|
||||
string_value: Some(value),
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct Attribute {
|
||||
key: String,
|
||||
value: AttributeValue,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
struct AttributeValue {
|
||||
#[serde(rename = "stringValue")]
|
||||
string_value: Option<String>, // Use Option to handle different value types
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct TraceData {
|
||||
#[serde(rename = "resourceSpans")]
|
||||
resource_spans: Vec<ResourceSpan>,
|
||||
}
|
||||
|
||||
impl Default for TraceData {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl TraceData {
|
||||
pub fn new() -> Self {
|
||||
TraceData {
|
||||
resource_spans: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_span(&mut self, span: Span) {
|
||||
if self.resource_spans.is_empty() {
|
||||
let resource = Resource {
|
||||
attributes: vec![Attribute {
|
||||
key: "service.name".to_string(),
|
||||
value: AttributeValue {
|
||||
string_value: Some("upstream-llm".to_string()),
|
||||
},
|
||||
}],
|
||||
};
|
||||
let scope_span = ScopeSpan {
|
||||
scope: Scope {
|
||||
name: "default".to_string(),
|
||||
version: "1.0".to_string(),
|
||||
attributes: Vec::new(),
|
||||
},
|
||||
spans: Vec::new(),
|
||||
};
|
||||
let resource_span = ResourceSpan {
|
||||
resource,
|
||||
scope_spans: vec![scope_span],
|
||||
};
|
||||
self.resource_spans.push(resource_span);
|
||||
}
|
||||
self.resource_spans[0].scope_spans[0].spans.push(span);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_random_span_id() -> String {
|
||||
let mut rng = rand::thread_rng();
|
||||
let mut random_bytes = [0u8; 8];
|
||||
rng.fill_bytes(&mut random_bytes);
|
||||
|
||||
hex::encode(random_bytes)
|
||||
}
|
||||
|
|
@ -1,4 +1,7 @@
|
|||
use std::{collections::HashMap, time::Duration};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
time::{Duration, SystemTime, UNIX_EPOCH},
|
||||
};
|
||||
|
||||
use common::{
|
||||
common_types::{
|
||||
|
|
@ -13,7 +16,9 @@ use common::{
|
|||
HEALTHZ_PATH, REQUEST_ID_HEADER, TOOL_ROLE, TRACE_PARENT_HEADER, USER_ROLE,
|
||||
},
|
||||
errors::ServerError,
|
||||
http::{CallArgs, Client}, pii::obfuscate_auth_header,
|
||||
http::{CallArgs, Client},
|
||||
pii::obfuscate_auth_header,
|
||||
tracing::{Event, Span},
|
||||
};
|
||||
use http::StatusCode;
|
||||
use log::{debug, trace, warn};
|
||||
|
|
@ -239,7 +244,7 @@ impl HttpContext for StreamContext {
|
|||
|
||||
fn on_http_response_body(&mut self, body_size: usize, end_of_stream: bool) -> Action {
|
||||
trace!(
|
||||
"recv [S={}] bytes={} end_stream={}",
|
||||
"on_http_response_body: recv [S={}] bytes={} end_stream={}",
|
||||
self.context_id,
|
||||
body_size,
|
||||
end_of_stream
|
||||
|
|
@ -250,6 +255,55 @@ impl HttpContext for StreamContext {
|
|||
return Action::Continue;
|
||||
}
|
||||
|
||||
if self.time_to_first_token.is_none() {
|
||||
self.time_to_first_token = Some(
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_nanos(),
|
||||
);
|
||||
}
|
||||
|
||||
if end_of_stream && body_size == 0 {
|
||||
if let Some(traceparent) = self.traceparent.as_ref() {
|
||||
let since_the_epoch_ns = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_nanos();
|
||||
|
||||
let traceparent_tokens = traceparent.split("-").collect::<Vec<&str>>();
|
||||
if traceparent_tokens.len() != 4 {
|
||||
warn!("traceparent header is invalid: {}", traceparent);
|
||||
return Action::Continue;
|
||||
}
|
||||
let parent_trace_id = traceparent_tokens[1];
|
||||
let parent_span_id = traceparent_tokens[2];
|
||||
let mut trace_data = common::tracing::TraceData::new();
|
||||
let mut llm_span = Span::new(
|
||||
"upstream_llm_time".to_string(),
|
||||
parent_trace_id.to_string(),
|
||||
Some(parent_span_id.to_string()),
|
||||
self.start_upstream_llm_request_time,
|
||||
since_the_epoch_ns,
|
||||
);
|
||||
if let Some(prompt) = self.user_prompt.as_ref() {
|
||||
if let Some(content) = prompt.content.as_ref() {
|
||||
llm_span.add_attribute("user_prompt".to_string(), content.to_string());
|
||||
}
|
||||
}
|
||||
llm_span.add_event(Event::new(
|
||||
"time_to_first_token".to_string(),
|
||||
self.time_to_first_token.unwrap(),
|
||||
));
|
||||
trace_data.add_span(llm_span);
|
||||
|
||||
let trace_data_str = serde_json::to_string(&trace_data).unwrap();
|
||||
debug!("upstream_llm trace details: {}", trace_data_str);
|
||||
// send trace_data to http tracing endpoint
|
||||
}
|
||||
return Action::Continue;
|
||||
}
|
||||
|
||||
let body = if self.streaming_response {
|
||||
let streaming_chunk = match self.get_http_response_body(0, body_size) {
|
||||
Some(chunk) => chunk,
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ use std::cell::RefCell;
|
|||
use std::collections::HashMap;
|
||||
use std::rc::Rc;
|
||||
use std::str::FromStr;
|
||||
use std::time::Duration;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum ResponseHandlerType {
|
||||
|
|
@ -77,6 +77,8 @@ pub struct StreamContext {
|
|||
pub chat_completions_request: Option<ChatCompletionsRequest>,
|
||||
pub prompt_guards: Rc<PromptGuards>,
|
||||
pub request_id: Option<String>,
|
||||
pub start_upstream_llm_request_time: u128,
|
||||
pub time_to_first_token: Option<u128>,
|
||||
pub traceparent: Option<String>,
|
||||
pub tracing: Rc<Option<Tracing>>,
|
||||
}
|
||||
|
|
@ -113,6 +115,8 @@ impl StreamContext {
|
|||
request_id: None,
|
||||
traceparent: None,
|
||||
tracing,
|
||||
start_upstream_llm_request_time: 0,
|
||||
time_to_first_token: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1003,6 +1007,11 @@ impl StreamContext {
|
|||
};
|
||||
debug!("archgw => llm request: {}", llm_request_str);
|
||||
|
||||
self.start_upstream_llm_request_time = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_nanos();
|
||||
|
||||
self.set_http_request_body(0, self.request_body_size, &llm_request_str.into_bytes());
|
||||
self.resume_http_request();
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue