mirror of
https://github.com/katanemo/plano.git
synced 2026-05-10 08:12:48 +02:00
planoai obs: live LLM observability TUI (#891)
This commit is contained in:
parent
1f701258cb
commit
0f67b2c806
19 changed files with 1766 additions and 5 deletions
|
|
@ -33,7 +33,8 @@ use crate::streaming::{
|
|||
ObservableStreamProcessor, StreamProcessor,
|
||||
};
|
||||
use crate::tracing::{
|
||||
collect_custom_trace_attributes, llm as tracing_llm, operation_component, set_service_name,
|
||||
collect_custom_trace_attributes, llm as tracing_llm, operation_component,
|
||||
plano as tracing_plano, set_service_name,
|
||||
};
|
||||
use model_selection::router_chat_get_upstream_model;
|
||||
|
||||
|
|
@ -102,15 +103,36 @@ async fn llm_chat_inner(
|
|||
.and_then(|hdr| request_headers.get(hdr))
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.map(|s| s.to_string());
|
||||
let pinned_model: Option<String> = if let Some(ref sid) = session_id {
|
||||
let cached_route = if let Some(ref sid) = session_id {
|
||||
state
|
||||
.orchestrator_service
|
||||
.get_cached_route(sid, tenant_id.as_deref())
|
||||
.await
|
||||
.map(|c| c.model_name)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let (pinned_model, pinned_route_name): (Option<String>, Option<String>) = match cached_route {
|
||||
Some(c) => (Some(c.model_name), c.route_name),
|
||||
None => (None, None),
|
||||
};
|
||||
|
||||
// Record session id on the LLM span for the observability console.
|
||||
if let Some(ref sid) = session_id {
|
||||
get_active_span(|span| {
|
||||
span.set_attribute(opentelemetry::KeyValue::new(
|
||||
tracing_plano::SESSION_ID,
|
||||
sid.clone(),
|
||||
));
|
||||
});
|
||||
}
|
||||
if let Some(ref route_name) = pinned_route_name {
|
||||
get_active_span(|span| {
|
||||
span.set_attribute(opentelemetry::KeyValue::new(
|
||||
tracing_plano::ROUTE_NAME,
|
||||
route_name.clone(),
|
||||
));
|
||||
});
|
||||
}
|
||||
|
||||
let full_qualified_llm_provider_url = format!("{}{}", state.llm_provider_url, request_path);
|
||||
|
||||
|
|
@ -311,6 +333,18 @@ async fn llm_chat_inner(
|
|||
alias_resolved_model.clone()
|
||||
};
|
||||
|
||||
// Record route name on the LLM span (only when the orchestrator produced one).
|
||||
if let Some(ref rn) = route_name {
|
||||
if !rn.is_empty() && rn != "none" {
|
||||
get_active_span(|span| {
|
||||
span.set_attribute(opentelemetry::KeyValue::new(
|
||||
tracing_plano::ROUTE_NAME,
|
||||
rn.clone(),
|
||||
));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(ref sid) = session_id {
|
||||
state
|
||||
.orchestrator_service
|
||||
|
|
@ -671,6 +705,36 @@ async fn send_upstream(
|
|||
// Propagate upstream headers and status
|
||||
let response_headers = llm_response.headers().clone();
|
||||
let upstream_status = llm_response.status();
|
||||
|
||||
// Upstream routers (e.g. DigitalOcean Gradient) may return an
|
||||
// `x-model-router-selected-route` header indicating which task-level
|
||||
// route the request was classified into (e.g. "Code Generation"). Surface
|
||||
// it as `plano.route.name` so the obs console's Route hit % panel can
|
||||
// show the breakdown even when Plano's own orchestrator wasn't in the
|
||||
// routing path. Any value from Plano's orchestrator already set earlier
|
||||
// takes precedence — this only fires when the span doesn't already have
|
||||
// a route name.
|
||||
if let Some(upstream_route) = response_headers
|
||||
.get("x-model-router-selected-route")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
{
|
||||
if !upstream_route.is_empty() {
|
||||
get_active_span(|span| {
|
||||
span.set_attribute(opentelemetry::KeyValue::new(
|
||||
crate::tracing::plano::ROUTE_NAME,
|
||||
upstream_route.to_string(),
|
||||
));
|
||||
});
|
||||
}
|
||||
}
|
||||
// Record the upstream HTTP status on the span for the obs console.
|
||||
get_active_span(|span| {
|
||||
span.set_attribute(opentelemetry::KeyValue::new(
|
||||
crate::tracing::http::STATUS_CODE,
|
||||
upstream_status.as_u16() as i64,
|
||||
));
|
||||
});
|
||||
|
||||
let mut response = Response::builder().status(upstream_status);
|
||||
if let Some(headers) = response.headers_mut() {
|
||||
for (name, value) in response_headers.iter() {
|
||||
|
|
|
|||
|
|
@ -16,10 +16,131 @@ use tracing_opentelemetry::OpenTelemetrySpanExt;
|
|||
use crate::handlers::agents::pipeline::{PipelineError, PipelineProcessor};
|
||||
|
||||
const STREAM_BUFFER_SIZE: usize = 16;
|
||||
/// Cap on accumulated response bytes kept for usage extraction.
|
||||
/// Most chat responses are well under this; pathological ones are dropped without
|
||||
/// affecting pass-through streaming to the client.
|
||||
const USAGE_BUFFER_MAX: usize = 2 * 1024 * 1024;
|
||||
use crate::signals::{InteractionQuality, SignalAnalyzer, TextBasedSignalAnalyzer, FLAG_MARKER};
|
||||
use crate::tracing::{llm, set_service_name, signals as signal_constants};
|
||||
use hermesllm::apis::openai::Message;
|
||||
|
||||
/// Parsed usage + resolved-model details from a provider response.
|
||||
#[derive(Debug, Default, Clone)]
|
||||
struct ExtractedUsage {
|
||||
prompt_tokens: Option<i64>,
|
||||
completion_tokens: Option<i64>,
|
||||
total_tokens: Option<i64>,
|
||||
cached_input_tokens: Option<i64>,
|
||||
cache_creation_tokens: Option<i64>,
|
||||
reasoning_tokens: Option<i64>,
|
||||
/// The model the upstream actually used. For router aliases (e.g.
|
||||
/// `router:software-engineering`), this differs from the request model.
|
||||
resolved_model: Option<String>,
|
||||
}
|
||||
|
||||
impl ExtractedUsage {
|
||||
fn is_empty(&self) -> bool {
|
||||
self.prompt_tokens.is_none()
|
||||
&& self.completion_tokens.is_none()
|
||||
&& self.total_tokens.is_none()
|
||||
&& self.resolved_model.is_none()
|
||||
}
|
||||
|
||||
fn from_json(value: &serde_json::Value) -> Self {
|
||||
let mut out = Self::default();
|
||||
if let Some(model) = value.get("model").and_then(|v| v.as_str()) {
|
||||
if !model.is_empty() {
|
||||
out.resolved_model = Some(model.to_string());
|
||||
}
|
||||
}
|
||||
if let Some(u) = value.get("usage") {
|
||||
// OpenAI-shape usage
|
||||
out.prompt_tokens = u.get("prompt_tokens").and_then(|v| v.as_i64());
|
||||
out.completion_tokens = u.get("completion_tokens").and_then(|v| v.as_i64());
|
||||
out.total_tokens = u.get("total_tokens").and_then(|v| v.as_i64());
|
||||
out.cached_input_tokens = u
|
||||
.get("prompt_tokens_details")
|
||||
.and_then(|d| d.get("cached_tokens"))
|
||||
.and_then(|v| v.as_i64());
|
||||
out.reasoning_tokens = u
|
||||
.get("completion_tokens_details")
|
||||
.and_then(|d| d.get("reasoning_tokens"))
|
||||
.and_then(|v| v.as_i64());
|
||||
|
||||
// Anthropic-shape fallbacks
|
||||
if out.prompt_tokens.is_none() {
|
||||
out.prompt_tokens = u.get("input_tokens").and_then(|v| v.as_i64());
|
||||
}
|
||||
if out.completion_tokens.is_none() {
|
||||
out.completion_tokens = u.get("output_tokens").and_then(|v| v.as_i64());
|
||||
}
|
||||
if out.total_tokens.is_none() {
|
||||
if let (Some(p), Some(c)) = (out.prompt_tokens, out.completion_tokens) {
|
||||
out.total_tokens = Some(p + c);
|
||||
}
|
||||
}
|
||||
if out.cached_input_tokens.is_none() {
|
||||
out.cached_input_tokens = u.get("cache_read_input_tokens").and_then(|v| v.as_i64());
|
||||
}
|
||||
if out.cached_input_tokens.is_none() {
|
||||
out.cached_input_tokens =
|
||||
u.get("cached_content_token_count").and_then(|v| v.as_i64());
|
||||
}
|
||||
out.cache_creation_tokens = u
|
||||
.get("cache_creation_input_tokens")
|
||||
.and_then(|v| v.as_i64());
|
||||
if out.reasoning_tokens.is_none() {
|
||||
out.reasoning_tokens = u.get("thoughts_token_count").and_then(|v| v.as_i64());
|
||||
}
|
||||
}
|
||||
out
|
||||
}
|
||||
}
|
||||
|
||||
/// Try to pull usage out of an accumulated response body.
|
||||
/// Handles both a single JSON object (non-streaming) and SSE streams where the
|
||||
/// final `data: {...}` event carries the `usage` field.
|
||||
fn extract_usage_from_bytes(buf: &[u8]) -> ExtractedUsage {
|
||||
if buf.is_empty() {
|
||||
return ExtractedUsage::default();
|
||||
}
|
||||
|
||||
// Fast path: full-body JSON (non-streaming).
|
||||
if let Ok(value) = serde_json::from_slice::<serde_json::Value>(buf) {
|
||||
let u = ExtractedUsage::from_json(&value);
|
||||
if !u.is_empty() {
|
||||
return u;
|
||||
}
|
||||
}
|
||||
|
||||
// SSE path: scan from the end for a `data:` line containing a usage object.
|
||||
let text = match std::str::from_utf8(buf) {
|
||||
Ok(t) => t,
|
||||
Err(_) => return ExtractedUsage::default(),
|
||||
};
|
||||
for line in text.lines().rev() {
|
||||
let trimmed = line.trim_start();
|
||||
let payload = match trimmed.strip_prefix("data:") {
|
||||
Some(p) => p.trim_start(),
|
||||
None => continue,
|
||||
};
|
||||
if payload == "[DONE]" || payload.is_empty() {
|
||||
continue;
|
||||
}
|
||||
if !payload.contains("\"usage\"") {
|
||||
continue;
|
||||
}
|
||||
if let Ok(value) = serde_json::from_str::<serde_json::Value>(payload) {
|
||||
let u = ExtractedUsage::from_json(&value);
|
||||
if !u.is_empty() {
|
||||
return u;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ExtractedUsage::default()
|
||||
}
|
||||
|
||||
/// Trait for processing streaming chunks
|
||||
/// Implementors can inject custom logic during streaming (e.g., hallucination detection, logging)
|
||||
pub trait StreamProcessor: Send + 'static {
|
||||
|
|
@ -60,6 +181,10 @@ pub struct ObservableStreamProcessor {
|
|||
start_time: Instant,
|
||||
time_to_first_token: Option<u128>,
|
||||
messages: Option<Vec<Message>>,
|
||||
/// Accumulated response bytes used only for best-effort usage extraction
|
||||
/// on `on_complete`. Capped at `USAGE_BUFFER_MAX`; excess chunks are dropped
|
||||
/// from the buffer (they still pass through to the client).
|
||||
response_buffer: Vec<u8>,
|
||||
}
|
||||
|
||||
impl ObservableStreamProcessor {
|
||||
|
|
@ -93,6 +218,7 @@ impl ObservableStreamProcessor {
|
|||
start_time,
|
||||
time_to_first_token: None,
|
||||
messages,
|
||||
response_buffer: Vec::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -101,6 +227,13 @@ impl StreamProcessor for ObservableStreamProcessor {
|
|||
fn process_chunk(&mut self, chunk: Bytes) -> Result<Option<Bytes>, String> {
|
||||
self.total_bytes += chunk.len();
|
||||
self.chunk_count += 1;
|
||||
// Accumulate for best-effort usage extraction; drop further chunks once
|
||||
// the cap is reached so we don't retain huge response bodies in memory.
|
||||
if self.response_buffer.len() < USAGE_BUFFER_MAX {
|
||||
let remaining = USAGE_BUFFER_MAX - self.response_buffer.len();
|
||||
let take = chunk.len().min(remaining);
|
||||
self.response_buffer.extend_from_slice(&chunk[..take]);
|
||||
}
|
||||
Ok(Some(chunk))
|
||||
}
|
||||
|
||||
|
|
@ -124,6 +257,52 @@ impl StreamProcessor for ObservableStreamProcessor {
|
|||
);
|
||||
}
|
||||
|
||||
// Record total duration on the span for the observability console.
|
||||
let duration_ms = self.start_time.elapsed().as_millis() as i64;
|
||||
{
|
||||
let span = tracing::Span::current();
|
||||
let otel_context = span.context();
|
||||
let otel_span = otel_context.span();
|
||||
otel_span.set_attribute(KeyValue::new(llm::DURATION_MS, duration_ms));
|
||||
otel_span.set_attribute(KeyValue::new(llm::RESPONSE_BYTES, self.total_bytes as i64));
|
||||
}
|
||||
|
||||
// Best-effort usage extraction + emission (works for both streaming
|
||||
// SSE and non-streaming JSON responses that include a `usage` object).
|
||||
let usage = extract_usage_from_bytes(&self.response_buffer);
|
||||
if !usage.is_empty() {
|
||||
let span = tracing::Span::current();
|
||||
let otel_context = span.context();
|
||||
let otel_span = otel_context.span();
|
||||
if let Some(v) = usage.prompt_tokens {
|
||||
otel_span.set_attribute(KeyValue::new(llm::PROMPT_TOKENS, v));
|
||||
}
|
||||
if let Some(v) = usage.completion_tokens {
|
||||
otel_span.set_attribute(KeyValue::new(llm::COMPLETION_TOKENS, v));
|
||||
}
|
||||
if let Some(v) = usage.total_tokens {
|
||||
otel_span.set_attribute(KeyValue::new(llm::TOTAL_TOKENS, v));
|
||||
}
|
||||
if let Some(v) = usage.cached_input_tokens {
|
||||
otel_span.set_attribute(KeyValue::new(llm::CACHED_INPUT_TOKENS, v));
|
||||
}
|
||||
if let Some(v) = usage.cache_creation_tokens {
|
||||
otel_span.set_attribute(KeyValue::new(llm::CACHE_CREATION_TOKENS, v));
|
||||
}
|
||||
if let Some(v) = usage.reasoning_tokens {
|
||||
otel_span.set_attribute(KeyValue::new(llm::REASONING_TOKENS, v));
|
||||
}
|
||||
// Override `llm.model` with the model the upstream actually ran
|
||||
// (e.g. `openai-gpt-5.4` resolved from `router:software-engineering`).
|
||||
// Cost lookup keys off the real model, not the alias.
|
||||
if let Some(resolved) = usage.resolved_model.clone() {
|
||||
otel_span.set_attribute(KeyValue::new(llm::MODEL_NAME, resolved));
|
||||
}
|
||||
}
|
||||
// Release the buffered bytes early; nothing downstream needs them.
|
||||
self.response_buffer.clear();
|
||||
self.response_buffer.shrink_to_fit();
|
||||
|
||||
// Analyze signals if messages are available and record as span attributes
|
||||
if let Some(ref messages) = self.messages {
|
||||
let analyzer: Box<dyn SignalAnalyzer> = Box::new(TextBasedSignalAnalyzer::new());
|
||||
|
|
@ -404,3 +583,55 @@ pub fn truncate_message(message: &str, max_length: usize) -> String {
|
|||
message.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod usage_extraction_tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn non_streaming_openai_with_cached() {
|
||||
let body = br#"{"id":"x","model":"gpt-4o","choices":[],"usage":{"prompt_tokens":12,"completion_tokens":34,"total_tokens":46,"prompt_tokens_details":{"cached_tokens":5}}}"#;
|
||||
let u = extract_usage_from_bytes(body);
|
||||
assert_eq!(u.prompt_tokens, Some(12));
|
||||
assert_eq!(u.completion_tokens, Some(34));
|
||||
assert_eq!(u.total_tokens, Some(46));
|
||||
assert_eq!(u.cached_input_tokens, Some(5));
|
||||
assert_eq!(u.reasoning_tokens, None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn non_streaming_anthropic_with_cache_creation() {
|
||||
let body = br#"{"id":"x","model":"claude","usage":{"input_tokens":100,"output_tokens":50,"cache_creation_input_tokens":20,"cache_read_input_tokens":30}}"#;
|
||||
let u = extract_usage_from_bytes(body);
|
||||
assert_eq!(u.prompt_tokens, Some(100));
|
||||
assert_eq!(u.completion_tokens, Some(50));
|
||||
assert_eq!(u.total_tokens, Some(150));
|
||||
assert_eq!(u.cached_input_tokens, Some(30));
|
||||
assert_eq!(u.cache_creation_tokens, Some(20));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn streaming_openai_final_chunk_has_usage() {
|
||||
let sse = b"data: {\"choices\":[{\"delta\":{\"content\":\"hi\"}}]}
|
||||
|
||||
data: {\"choices\":[{\"delta\":{}, \"finish_reason\":\"stop\"}],\"usage\":{\"prompt_tokens\":7,\"completion_tokens\":3,\"total_tokens\":10}}
|
||||
|
||||
data: [DONE]
|
||||
|
||||
";
|
||||
let u = extract_usage_from_bytes(sse);
|
||||
assert_eq!(u.prompt_tokens, Some(7));
|
||||
assert_eq!(u.completion_tokens, Some(3));
|
||||
assert_eq!(u.total_tokens, Some(10));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn empty_returns_default() {
|
||||
assert!(extract_usage_from_bytes(b"").is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn no_usage_in_body_returns_default() {
|
||||
assert!(extract_usage_from_bytes(br#"{"ok":true}"#).is_empty());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -80,6 +80,18 @@ pub mod llm {
|
|||
/// Total tokens used (prompt + completion)
|
||||
pub const TOTAL_TOKENS: &str = "llm.usage.total_tokens";
|
||||
|
||||
/// Tokens served from a prompt cache read
|
||||
/// (OpenAI `prompt_tokens_details.cached_tokens`, Anthropic `cache_read_input_tokens`,
|
||||
/// Google `cached_content_token_count`)
|
||||
pub const CACHED_INPUT_TOKENS: &str = "llm.usage.cached_input_tokens";
|
||||
|
||||
/// Tokens used to write a prompt cache entry (Anthropic `cache_creation_input_tokens`)
|
||||
pub const CACHE_CREATION_TOKENS: &str = "llm.usage.cache_creation_tokens";
|
||||
|
||||
/// Reasoning tokens for reasoning models
|
||||
/// (OpenAI `completion_tokens_details.reasoning_tokens`, Google `thoughts_token_count`)
|
||||
pub const REASONING_TOKENS: &str = "llm.usage.reasoning_tokens";
|
||||
|
||||
/// Temperature parameter used
|
||||
pub const TEMPERATURE: &str = "llm.temperature";
|
||||
|
||||
|
|
@ -119,6 +131,22 @@ pub mod routing {
|
|||
pub const SELECTION_REASON: &str = "routing.selection_reason";
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// Span Attributes - Plano-specific
|
||||
// =============================================================================
|
||||
|
||||
/// Attributes specific to Plano (session affinity, routing decisions).
|
||||
pub mod plano {
|
||||
/// Session identifier propagated via the `x-model-affinity` header.
|
||||
/// Absent when the client did not send the header.
|
||||
pub const SESSION_ID: &str = "plano.session_id";
|
||||
|
||||
/// Matched route name from routing (e.g. "code", "summarization",
|
||||
/// "software-engineering"). Absent when the client routed directly
|
||||
/// to a concrete model.
|
||||
pub const ROUTE_NAME: &str = "plano.route.name";
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// Span Attributes - Error Handling
|
||||
// =============================================================================
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ mod init;
|
|||
mod service_name_exporter;
|
||||
|
||||
pub use constants::{
|
||||
error, http, llm, operation_component, routing, signals, OperationNameBuilder,
|
||||
error, http, llm, operation_component, plano, routing, signals, OperationNameBuilder,
|
||||
};
|
||||
pub use custom_attributes::collect_custom_trace_attributes;
|
||||
pub use init::init_tracer;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue