diff --git a/cli/test/test_config_generator.py b/cli/test/test_config_generator.py index 0c5dc969..2f9834ca 100644 --- a/cli/test/test_config_generator.py +++ b/cli/test/test_config_generator.py @@ -386,6 +386,33 @@ model_providers: access_key: $OPENAI_API_KEY default: true +""", + }, + { + "id": "valid_tracing_posthog_exporter", + "expected_error": None, + "plano_config": """ +version: v0.4.0 + +listeners: + - name: llm + type: model + port: 12000 + +model_providers: + - model: openai/gpt-4o-mini + access_key: $OPENAI_API_KEY + default: true + +tracing: + random_sampling: 100 + exporters: + - type: posthog + url: https://us.i.posthog.com + api_key: $POSTHOG_API_KEY + distinct_id_header: x-user-id + capture_messages: false + """, }, ] diff --git a/config/plano_config_schema.yaml b/config/plano_config_schema.yaml index 5e4afc77..f356b573 100644 --- a/config/plano_config_schema.yaml +++ b/config/plano_config_schema.yaml @@ -447,6 +447,28 @@ properties: additionalProperties: type: string additionalProperties: false + exporters: + type: array + items: + oneOf: + - type: object + properties: + type: + type: string + const: posthog + url: + type: string + api_key: + type: string + distinct_id_header: + type: string + capture_messages: + type: boolean + additionalProperties: false + required: + - type + - url + - api_key additionalProperties: false mode: type: string diff --git a/crates/brightstaff/src/app_state.rs b/crates/brightstaff/src/app_state.rs index 1d534e89..de4393f3 100644 --- a/crates/brightstaff/src/app_state.rs +++ b/crates/brightstaff/src/app_state.rs @@ -21,6 +21,10 @@ pub struct AppState { pub state_storage: Option>, pub llm_provider_url: String, pub span_attributes: Option, + /// Request header whose value populates the observability `distinct_id` + /// (e.g. PostHog). Sourced from `tracing.exporters[].distinct_id_header`. + /// `None` means LLM events are captured anonymously. + pub distinct_id_header: Option, /// Shared HTTP client for upstream LLM requests (connection pooling / keep-alive). pub http_client: reqwest::Client, pub filter_pipeline: Arc, diff --git a/crates/brightstaff/src/handlers/llm/mod.rs b/crates/brightstaff/src/handlers/llm/mod.rs index 3336209f..a1d3ce97 100644 --- a/crates/brightstaff/src/handlers/llm/mod.rs +++ b/crates/brightstaff/src/handlers/llm/mod.rs @@ -93,6 +93,25 @@ async fn llm_chat_inner( } }); + // Stamp the caller identity for downstream exporters (e.g. PostHog + // `distinct_id`). Sourced from the configured `distinct_id_header`; when the + // header is absent the event is exported anonymously. + if let Some(header_name) = state.distinct_id_header.as_deref() { + if let Some(distinct_id) = request_headers + .get(header_name) + .and_then(|v| v.to_str().ok()) + .map(str::trim) + .filter(|s| !s.is_empty()) + { + get_active_span(|span| { + span.set_attribute(opentelemetry::KeyValue::new( + tracing_plano::DISTINCT_ID, + distinct_id.to_string(), + )); + }); + } + } + // Session pinning: extract session ID and check cache before routing let session_id: Option = request_headers .get(MODEL_AFFINITY_HEADER) @@ -366,6 +385,19 @@ async fn llm_chat_inner( }; tracing::Span::current().record(tracing_llm::MODEL_NAME, resolved_model.as_str()); + // Record the provider (derived from the `provider/model` prefix) so + // observability exporters can populate provider fields (e.g. PostHog + // `$ai_provider`). + let (resolved_provider, _) = bs_metrics::split_provider_model(&resolved_model); + if resolved_provider != "unknown" { + get_active_span(|span| { + span.set_attribute(opentelemetry::KeyValue::new( + tracing_llm::PROVIDER, + resolved_provider.to_string(), + )); + }); + } + // --- Phase 4: Forward to upstream and stream back --- send_upstream( &state.http_client, diff --git a/crates/brightstaff/src/main.rs b/crates/brightstaff/src/main.rs index 90ed84c3..c9e8b9bc 100644 --- a/crates/brightstaff/src/main.rs +++ b/crates/brightstaff/src/main.rs @@ -327,6 +327,20 @@ async fn init_app_state( .as_ref() .and_then(|tracing| tracing.span_attributes.clone()); + // Resolve the distinct_id header from the first PostHog exporter that + // declares one, so the LLM handler can stamp `plano.distinct_id` on spans. + let distinct_id_header = config + .tracing + .as_ref() + .and_then(|tracing| tracing.exporters.as_ref()) + .and_then(|exporters| { + exporters.iter().find_map(|exporter| match exporter { + common::configuration::Exporter::Posthog(posthog) => { + posthog.distinct_id_header.clone() + } + }) + }); + let signals_enabled = !overrides.disable_signals.unwrap_or(false); Ok(AppState { @@ -338,6 +352,7 @@ async fn init_app_state( state_storage, llm_provider_url, span_attributes, + distinct_id_header, http_client: reqwest::Client::new(), filter_pipeline, signals_enabled, diff --git a/crates/brightstaff/src/tracing/constants.rs b/crates/brightstaff/src/tracing/constants.rs index 1c85fd50..f88cf99c 100644 --- a/crates/brightstaff/src/tracing/constants.rs +++ b/crates/brightstaff/src/tracing/constants.rs @@ -145,6 +145,11 @@ pub mod plano { /// "software-engineering"). Absent when the client routed directly /// to a concrete model. pub const ROUTE_NAME: &str = "plano.route.name"; + + /// Caller identity used to populate downstream observability `distinct_id` + /// fields (e.g. PostHog). Sourced from the configured + /// `tracing.exporters[].distinct_id_header`. Absent for anonymous calls. + pub const DISTINCT_ID: &str = "plano.distinct_id"; } // ============================================================================= diff --git a/crates/brightstaff/src/tracing/init.rs b/crates/brightstaff/src/tracing/init.rs index ed351148..b9560423 100644 --- a/crates/brightstaff/src/tracing/init.rs +++ b/crates/brightstaff/src/tracing/init.rs @@ -11,8 +11,8 @@ use tracing_subscriber::registry::LookupSpan; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::EnvFilter; -use super::ServiceNameOverrideExporter; -use common::configuration::Tracing; +use super::{PostHogExporter, ServiceNameOverrideExporter}; +use common::configuration::{Exporter, PosthogExporter, Tracing}; struct BracketedTime; @@ -90,26 +90,53 @@ pub fn init_tracer(tracing_config: Option<&Tracing>) -> &'static SdkTracerProvid let random_sampling = tracing_config.and_then(|t| t.random_sampling).unwrap_or(0); - let tracing_enabled = random_sampling > 0 && otel_endpoint.is_some(); + // Collect PostHog export destinations from `tracing.exporters`. + let posthog_exporters: Vec = tracing_config + .and_then(|t| t.exporters.as_ref()) + .map(|exporters| { + exporters + .iter() + .map(|Exporter::Posthog(posthog)| posthog.clone()) + .collect() + }) + .unwrap_or_default(); + + // Tracing is enabled when sampling is on and there is at least one + // destination — an OTLP collector and/or a configured exporter. + let has_destination = otel_endpoint.is_some() || !posthog_exporters.is_empty(); + let tracing_enabled = random_sampling > 0 && has_destination; eprintln!( - "initializing tracing: tracing_enabled={}, otel_endpoint={:?}, random_sampling={}", - tracing_enabled, otel_endpoint, random_sampling + "initializing tracing: tracing_enabled={}, otel_endpoint={:?}, random_sampling={}, posthog_exporters={}", + tracing_enabled, otel_endpoint, random_sampling, posthog_exporters.len() ); - // Create OTLP exporter to send spans to collector. - // Use `if let` to destructure the endpoint, avoiding an unwrap. - if let Some(endpoint) = otel_endpoint.as_deref().filter(|_| tracing_enabled) { + if tracing_enabled { if std::env::var("OTEL_SERVICE_NAME").is_err() { std::env::set_var("OTEL_SERVICE_NAME", "plano"); } + + // Compose the tracer provider from all configured destinations. Each + // `with_batch_exporter` registers an independent span processor, so + // every span fans out to the OTLP collector and every exporter. + let mut builder = SdkTracerProvider::builder(); + // Create ServiceNameOverrideExporter to support per-span service names // This allows spans to have different service names (e.g., plano(orchestrator), // plano(filter), plano(llm)) by setting the "service.name.override" attribute - let exporter = ServiceNameOverrideExporter::new(endpoint); + if let Some(endpoint) = otel_endpoint.as_deref() { + builder = builder.with_batch_exporter(ServiceNameOverrideExporter::new(endpoint)); + } - let provider = SdkTracerProvider::builder() - .with_batch_exporter(exporter) - .build(); + // PostHog exporters translate LLM spans into `$ai_generation` events. + for posthog in &posthog_exporters { + builder = builder.with_batch_exporter(PostHogExporter::new( + &posthog.url, + &posthog.api_key, + posthog.capture_messages.unwrap_or(false), + )); + } + + let provider = builder.build(); global::set_tracer_provider(provider.clone()); diff --git a/crates/brightstaff/src/tracing/mod.rs b/crates/brightstaff/src/tracing/mod.rs index 8e09a21c..dac26232 100644 --- a/crates/brightstaff/src/tracing/mod.rs +++ b/crates/brightstaff/src/tracing/mod.rs @@ -1,6 +1,7 @@ mod constants; mod custom_attributes; mod init; +mod posthog_exporter; mod service_name_exporter; pub use constants::{ @@ -8,6 +9,7 @@ pub use constants::{ }; pub use custom_attributes::collect_custom_trace_attributes; pub use init::init_tracer; +pub use posthog_exporter::PostHogExporter; pub use service_name_exporter::{ServiceNameOverrideExporter, SERVICE_NAME_OVERRIDE_KEY}; use opentelemetry::trace::get_active_span; diff --git a/crates/brightstaff/src/tracing/posthog_exporter.rs b/crates/brightstaff/src/tracing/posthog_exporter.rs new file mode 100644 index 00000000..53d3ccef --- /dev/null +++ b/crates/brightstaff/src/tracing/posthog_exporter.rs @@ -0,0 +1,402 @@ +//! PostHog Span Exporter +//! +//! A custom [`SpanExporter`] that translates Plano's LLM spans into PostHog +//! [`$ai_generation`](https://posthog.com/docs/ai-observability/generations) +//! events and POSTs them to PostHog's capture API (`{url}/batch/`). +//! +//! This makes PostHog a first-class, provider-agnostic export target: a user +//! only points `tracing.exporters` at their PostHog URL + project token and +//! every LLM call is captured — mirroring LiteLLM's `posthog` callback. +//! +//! # Behaviour +//! +//! - Receives every span in the provider (like all batch exporters do) and +//! keeps only LLM generation spans, identified by the presence of the +//! [`llm::MODEL_NAME`] (`llm.model`) attribute. +//! - Maps span attributes onto `$ai_*` PostHog properties (model, provider, +//! latency, tokens, http status, ...). +//! - `distinct_id` is read from the [`plano::DISTINCT_ID`] span attribute (set +//! by the LLM handler from the configured `distinct_id_header`). When absent +//! the event is captured anonymously (`$process_person_profile = false`). +//! - Network failures are logged and dropped — telemetry export never blocks or +//! fails request processing. + +use std::time::Duration; + +use opentelemetry::{Array, Value}; +use opentelemetry_sdk::error::OTelSdkResult; +use opentelemetry_sdk::trace::{SpanData, SpanExporter}; +use opentelemetry_sdk::Resource; +use serde_json::{json, Map, Value as JsonValue}; +use time::format_description::well_known::Rfc3339; +use time::OffsetDateTime; + +use super::{http, llm, plano}; + +/// PostHog event name for an individual LLM call. +const AI_GENERATION_EVENT: &str = "$ai_generation"; + +/// PostHog capture path appended to the configured host. +const CAPTURE_PATH: &str = "batch/"; + +/// A [`SpanExporter`] that ships LLM spans to PostHog as `$ai_generation` events. +pub struct PostHogExporter { + client: reqwest::Client, + /// Fully-qualified capture endpoint, e.g. `https://us.i.posthog.com/batch/`. + endpoint: String, + /// PostHog project API key (token). + api_key: String, + /// Whether to attach the truncated user message preview as `$ai_input`. + capture_messages: bool, +} + +impl std::fmt::Debug for PostHogExporter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PostHogExporter") + .field("endpoint", &self.endpoint) + .field("capture_messages", &self.capture_messages) + .finish() + } +} + +impl PostHogExporter { + /// Create a new PostHog exporter. + /// + /// # Arguments + /// * `url` – PostHog host (e.g. `https://us.i.posthog.com`). The `/batch/` + /// capture path is appended automatically. + /// * `api_key` – PostHog project API key (token). + /// * `capture_messages` – when true, send the user message preview as + /// `$ai_input`. + pub fn new(url: &str, api_key: &str, capture_messages: bool) -> Self { + let endpoint = format!("{}/{}", url.trim_end_matches('/'), CAPTURE_PATH); + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(10)) + .build() + .unwrap_or_default(); + Self { + client, + endpoint, + api_key: api_key.to_string(), + capture_messages, + } + } + + /// Build the PostHog `batch` payload from a batch of spans, keeping only LLM + /// generation spans. Returns `None` when no LLM spans are present. + fn build_payload(&self, batch: &[SpanData]) -> Option { + let events: Vec = batch + .iter() + .filter_map(|span| self.build_generation_event(span)) + .collect(); + + if events.is_empty() { + return None; + } + + Some(json!({ + "api_key": self.api_key, + "batch": events, + })) + } + + /// Translate a single span into a PostHog `$ai_generation` event, or `None` + /// if the span is not an LLM generation span. + fn build_generation_event(&self, span: &SpanData) -> Option { + // Only LLM generation spans carry `llm.model`. + let model = find_attr(span, llm::MODEL_NAME)?; + + let mut props = Map::new(); + props.insert("$ai_model".to_string(), otel_value_to_json(model)); + props.insert( + "$ai_trace_id".to_string(), + json!(span.span_context.trace_id().to_string()), + ); + if span.parent_span_id != opentelemetry::trace::SpanId::INVALID { + props.insert( + "$ai_parent_id".to_string(), + json!(span.parent_span_id.to_string()), + ); + } + + if let Some(provider) = find_attr(span, llm::PROVIDER) { + props.insert("$ai_provider".to_string(), otel_value_to_json(provider)); + } + + // Latency / TTFT are stored in milliseconds; PostHog wants seconds. + if let Some(ms) = find_i64(span, llm::DURATION_MS) { + props.insert("$ai_latency".to_string(), json!(ms as f64 / 1000.0)); + } + if let Some(ms) = find_i64(span, llm::TIME_TO_FIRST_TOKEN_MS) { + props.insert( + "$ai_time_to_first_token".to_string(), + json!(ms as f64 / 1000.0), + ); + props.insert("$ai_stream".to_string(), json!(true)); + } + + if let Some(tokens) = find_i64(span, llm::PROMPT_TOKENS) { + props.insert("$ai_input_tokens".to_string(), json!(tokens)); + } + if let Some(tokens) = find_i64(span, llm::COMPLETION_TOKENS) { + props.insert("$ai_output_tokens".to_string(), json!(tokens)); + } + + if let Some(status) = find_i64(span, http::STATUS_CODE) { + props.insert("$ai_http_status".to_string(), json!(status)); + if status >= 400 { + props.insert("$ai_is_error".to_string(), json!(true)); + } + } + + if self.capture_messages { + if let Some(preview) = find_attr(span, llm::USER_MESSAGE_PREVIEW) { + props.insert( + "$ai_input".to_string(), + json!([{ "role": "user", "content": value_to_string(preview) }]), + ); + } + } + + // distinct_id: identified when the configured header was present, + // otherwise anonymous (do not create/update a person profile). + match find_attr(span, plano::DISTINCT_ID) { + Some(id) => { + props.insert("distinct_id".to_string(), otel_value_to_json(id)); + } + None => { + props.insert( + "distinct_id".to_string(), + json!(span.span_context.trace_id().to_string()), + ); + props.insert("$process_person_profile".to_string(), json!(false)); + } + } + + // Pass through any other non-reserved attributes (custom span attributes + // such as static tags or header-derived tenant ids) as plain properties. + for kv in span.attributes.iter() { + let key = kv.key.as_str(); + if is_reserved_attr(key) { + continue; + } + props + .entry(key.to_string()) + .or_insert_with(|| otel_value_to_json(&kv.value)); + } + + let mut event = Map::new(); + event.insert("event".to_string(), json!(AI_GENERATION_EVENT)); + event.insert("properties".to_string(), JsonValue::Object(props)); + if let Ok(ts) = OffsetDateTime::from(span.end_time).format(&Rfc3339) { + event.insert("timestamp".to_string(), json!(ts)); + } + + Some(JsonValue::Object(event)) + } +} + +impl SpanExporter for PostHogExporter { + fn export( + &self, + batch: Vec, + ) -> impl std::future::Future + Send { + let payload = self.build_payload(&batch); + let client = self.client.clone(); + let endpoint = self.endpoint.clone(); + async move { + let Some(payload) = payload else { + return Ok(()); + }; + match client.post(&endpoint).json(&payload).send().await { + Ok(resp) if resp.status().is_success() => {} + Ok(resp) => { + tracing::warn!( + status = %resp.status(), + endpoint = %endpoint, + "PostHog exporter: non-success response" + ); + } + Err(e) => { + tracing::warn!(error = ?e, endpoint = %endpoint, "PostHog exporter: request failed"); + } + } + Ok(()) + } + } + + fn shutdown_with_timeout(&mut self, _timeout: Duration) -> OTelSdkResult { + Ok(()) + } + + fn set_resource(&mut self, _resource: &Resource) {} +} + +/// Span attributes that are mapped to dedicated `$ai_*` properties (or are +/// internal plumbing) and should not be duplicated as raw properties. +fn is_reserved_attr(key: &str) -> bool { + matches!( + key, + k if k == llm::MODEL_NAME + || k == llm::PROVIDER + || k == llm::DURATION_MS + || k == llm::TIME_TO_FIRST_TOKEN_MS + || k == llm::PROMPT_TOKENS + || k == llm::COMPLETION_TOKENS + || k == llm::USER_MESSAGE_PREVIEW + || k == http::STATUS_CODE + || k == plano::DISTINCT_ID + || k == super::SERVICE_NAME_OVERRIDE_KEY + ) +} + +fn find_attr<'a>(span: &'a SpanData, key: &str) -> Option<&'a Value> { + span.attributes + .iter() + .find(|kv| kv.key.as_str() == key) + .map(|kv| &kv.value) +} + +fn find_i64(span: &SpanData, key: &str) -> Option { + match find_attr(span, key)? { + Value::I64(i) => Some(*i), + _ => None, + } +} + +fn value_to_string(value: &Value) -> String { + match value { + Value::String(s) => s.as_str().to_string(), + other => other.to_string(), + } +} + +fn otel_value_to_json(value: &Value) -> JsonValue { + match value { + Value::Bool(b) => json!(b), + Value::I64(i) => json!(i), + Value::F64(f) => json!(f), + Value::String(s) => json!(s.as_str()), + Value::Array(arr) => match arr { + Array::Bool(v) => json!(v), + Array::I64(v) => json!(v), + Array::F64(v) => json!(v), + Array::String(v) => json!(v.iter().map(|s| s.as_str()).collect::>()), + _ => JsonValue::Null, + }, + _ => json!(value.to_string()), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use opentelemetry::trace::{ + SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState, + }; + use opentelemetry::KeyValue; + use opentelemetry_sdk::trace::{SpanData, SpanEvents, SpanLinks}; + use std::borrow::Cow; + use std::time::SystemTime; + + fn span_with_attrs(attrs: Vec) -> SpanData { + SpanData { + span_context: SpanContext::new( + TraceId::from_bytes([ + 0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc, 0xde, 0xf0, 0x12, 0x34, 0x56, 0x78, 0x9a, + 0xbc, 0xde, 0xf0, + ]), + SpanId::from_bytes([0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88]), + TraceFlags::SAMPLED, + false, + TraceState::default(), + ), + parent_span_id: SpanId::INVALID, + parent_span_is_remote: false, + span_kind: SpanKind::Client, + name: Cow::Borrowed("llm"), + start_time: SystemTime::UNIX_EPOCH, + end_time: SystemTime::UNIX_EPOCH, + attributes: attrs, + dropped_attributes_count: 0, + events: SpanEvents::default(), + links: SpanLinks::default(), + status: Status::Unset, + instrumentation_scope: Default::default(), + } + } + + fn props(event: &JsonValue) -> &Map { + event["properties"].as_object().unwrap() + } + + #[test] + fn non_llm_span_is_skipped() { + let exporter = PostHogExporter::new("https://us.i.posthog.com", "phc_x", false); + let span = span_with_attrs(vec![KeyValue::new("routing.strategy", "least-latency")]); + assert!(exporter.build_generation_event(&span).is_none()); + } + + #[test] + fn maps_llm_attributes_to_ai_properties() { + let exporter = PostHogExporter::new("https://us.i.posthog.com/", "phc_x", false); + let span = span_with_attrs(vec![ + KeyValue::new(llm::MODEL_NAME, "gpt-5-mini"), + KeyValue::new(llm::PROVIDER, "openai"), + KeyValue::new(llm::DURATION_MS, 1500_i64), + KeyValue::new(llm::TIME_TO_FIRST_TOKEN_MS, 250_i64), + KeyValue::new(llm::PROMPT_TOKENS, 10_i64), + KeyValue::new(llm::COMPLETION_TOKENS, 20_i64), + KeyValue::new(http::STATUS_CODE, 200_i64), + KeyValue::new("tenant.id", "acme"), + ]); + + let event = exporter.build_generation_event(&span).unwrap(); + assert_eq!(event["event"], json!("$ai_generation")); + let p = props(&event); + assert_eq!(p["$ai_model"], json!("gpt-5-mini")); + assert_eq!(p["$ai_provider"], json!("openai")); + assert_eq!(p["$ai_latency"], json!(1.5)); + assert_eq!(p["$ai_time_to_first_token"], json!(0.25)); + assert_eq!(p["$ai_stream"], json!(true)); + assert_eq!(p["$ai_input_tokens"], json!(10)); + assert_eq!(p["$ai_output_tokens"], json!(20)); + assert_eq!(p["$ai_http_status"], json!(200)); + // Anonymous (no distinct id header captured). + assert_eq!(p["$process_person_profile"], json!(false)); + // Custom passthrough attribute preserved. + assert_eq!(p["tenant.id"], json!("acme")); + // No $ai_input unless capture_messages is enabled. + assert!(!p.contains_key("$ai_input")); + } + + #[test] + fn uses_distinct_id_and_flags_errors() { + let exporter = PostHogExporter::new("https://us.i.posthog.com", "phc_x", true); + let span = span_with_attrs(vec![ + KeyValue::new(llm::MODEL_NAME, "gpt-5-mini"), + KeyValue::new(plano::DISTINCT_ID, "user_123"), + KeyValue::new(llm::USER_MESSAGE_PREVIEW, "hello"), + KeyValue::new(http::STATUS_CODE, 500_i64), + ]); + + let event = exporter.build_generation_event(&span).unwrap(); + let p = props(&event); + assert_eq!(p["distinct_id"], json!("user_123")); + assert!(!p.contains_key("$process_person_profile")); + assert_eq!(p["$ai_is_error"], json!(true)); + assert_eq!( + p["$ai_input"], + json!([{ "role": "user", "content": "hello" }]) + ); + } + + #[test] + fn payload_wraps_events_with_api_key() { + let exporter = PostHogExporter::new("https://us.i.posthog.com", "phc_secret", false); + let span = span_with_attrs(vec![KeyValue::new(llm::MODEL_NAME, "gpt-5-mini")]); + let payload = exporter.build_payload(&[span]).unwrap(); + assert_eq!(payload["api_key"], json!("phc_secret")); + assert_eq!(payload["batch"].as_array().unwrap().len(), 1); + } +} diff --git a/crates/common/src/configuration.rs b/crates/common/src/configuration.rs index 924c9b03..ceea57b3 100644 --- a/crates/common/src/configuration.rs +++ b/crates/common/src/configuration.rs @@ -251,6 +251,11 @@ pub struct Tracing { pub random_sampling: Option, pub opentracing_grpc_endpoint: Option, pub span_attributes: Option, + /// Provider-agnostic telemetry export destinations. Each entry is tagged by + /// its `type` (e.g. `posthog`) so new backends can be added without breaking + /// existing configs. LLM spans are translated into each backend's native + /// event format and streamed in addition to any `opentracing_grpc_endpoint`. + pub exporters: Option>, } #[derive(Debug, Clone, Serialize, Deserialize, Default)] @@ -260,6 +265,36 @@ pub struct SpanAttributes { pub static_attributes: Option>, } +/// A telemetry export destination configured under `tracing.exporters`. +/// +/// The list is provider-agnostic; each variant is internally tagged by its +/// `type` field (e.g. `type: posthog`). Additional backends (datadog, raw +/// otlp, ...) can be added as new variants without breaking existing configs. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum Exporter { + /// PostHog AI observability. LLM spans are converted into PostHog + /// `$ai_generation` events and POSTed to the configured `url`. + Posthog(PosthogExporter), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PosthogExporter { + /// PostHog host, e.g. `https://us.i.posthog.com`. The `/batch/` capture + /// path is appended automatically. + pub url: String, + /// PostHog project API key (token). Supports `$ENV_VAR` expansion at render + /// time, e.g. `$POSTHOG_API_KEY`. + pub api_key: String, + /// Optional request header whose value is used as the PostHog `distinct_id`. + /// When unset (or the header is missing on a request) events are captured + /// anonymously. + pub distinct_id_header: Option, + /// When true, include the truncated user message preview as `$ai_input`. + /// Defaults to `false` to avoid sending prompt content off-box. + pub capture_messages: Option, +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Default)] pub enum GatewayMode { #[serde(rename = "llm")] @@ -865,4 +900,47 @@ disable_signals: false let overrides: super::Overrides = serde_yaml::from_str(yaml_missing).unwrap(); assert_eq!(overrides.disable_signals, None); } + + #[test] + fn test_tracing_posthog_exporter_deserialize() { + let yaml = r#" +random_sampling: 100 +exporters: + - type: posthog + url: https://us.i.posthog.com + api_key: phc_secret + distinct_id_header: x-user-id + capture_messages: true +"#; + let tracing: super::Tracing = serde_yaml::from_str(yaml).unwrap(); + let exporters = tracing.exporters.expect("exporters should be parsed"); + assert_eq!(exporters.len(), 1); + match &exporters[0] { + super::Exporter::Posthog(posthog) => { + assert_eq!(posthog.url, "https://us.i.posthog.com"); + assert_eq!(posthog.api_key, "phc_secret"); + assert_eq!(posthog.distinct_id_header.as_deref(), Some("x-user-id")); + assert_eq!(posthog.capture_messages, Some(true)); + } + } + } + + #[test] + fn test_tracing_posthog_exporter_minimal() { + let yaml = r#" +exporters: + - type: posthog + url: https://eu.i.posthog.com + api_key: phc_eu +"#; + let tracing: super::Tracing = serde_yaml::from_str(yaml).unwrap(); + let exporters = tracing.exporters.unwrap(); + match &exporters[0] { + super::Exporter::Posthog(posthog) => { + assert_eq!(posthog.url, "https://eu.i.posthog.com"); + assert_eq!(posthog.distinct_id_header, None); + assert_eq!(posthog.capture_messages, None); + } + } + } } diff --git a/docs/source/guides/observability/tracing.rst b/docs/source/guides/observability/tracing.rst index bff4257e..6e0ee23b 100644 --- a/docs/source/guides/observability/tracing.rst +++ b/docs/source/guides/observability/tracing.rst @@ -259,6 +259,86 @@ Request headers:: Result: no attributes are captured from ``X-Other-User-Id``. +Exporting Telemetry Anywhere +---------------------------- + +Beyond the OTLP/gRPC collector, Plano can stream LLM telemetry directly to +third-party observability backends through ``tracing.exporters``. The list is +provider-agnostic: each entry is tagged by its ``type`` and points at a URL, so +new destinations can be added without changing anything else. Exporters run in +addition to ``opentracing_grpc_endpoint`` — you can use one, the other, or both. + +PostHog +~~~~~~~ + +PostHog is supported as a first-class integration. Every LLM call is captured as +a PostHog `$ai_generation `_ +event and POSTed to PostHog's capture API. Setup is intentionally minimal — +point at your PostHog URL and project token:: + + tracing: + random_sampling: 100 + exporters: + - type: posthog + url: https://us.i.posthog.com # /batch/ is appended automatically + api_key: $POSTHOG_API_KEY # PostHog project token (env expansion supported) + distinct_id_header: x-user-id # optional; omit for anonymous capture + capture_messages: false # optional; send user message as $ai_input + +That's all that's required. When ``random_sampling`` is greater than ``0`` and at +least one exporter (or ``opentracing_grpc_endpoint``) is configured, tracing is +enabled and ``$ai_generation`` events begin flowing. They appear under PostHog's +**AI Observability** in the Traces and Generations tabs. + +**Captured properties** + +Plano maps span data onto PostHog ``$ai_*`` properties: + +.. list-table:: + :header-rows: 1 + :widths: 30 70 + + * - PostHog property + - Source + * - ``$ai_model`` + - Resolved upstream model (``llm.model``) + * - ``$ai_provider`` + - Provider derived from the resolved model (``llm.provider``) + * - ``$ai_latency`` + - Total call duration in seconds (``llm.duration_ms``) + * - ``$ai_time_to_first_token`` + - Time to first token in seconds, streaming only + * - ``$ai_input_tokens`` / ``$ai_output_tokens`` + - Prompt / completion token usage + * - ``$ai_http_status`` / ``$ai_is_error`` + - Upstream HTTP status and error flag + * - ``$ai_trace_id`` / ``$ai_parent_id`` + - Trace and parent span identifiers + * - ``distinct_id`` + - Value of ``distinct_id_header`` (else anonymous) + +**Identifying users** + +Set ``distinct_id_header`` to the request header carrying your user identity +(for example ``x-user-id``). When present, Plano stamps the value as the PostHog +``distinct_id``. When the header is missing — or ``distinct_id_header`` is not +configured — the event is captured anonymously (``$process_person_profile`` is +set to ``false``), matching PostHog's anonymous vs. identified semantics. + +**Capturing message content** + +By default Plano does not send prompt content off-box. Set +``capture_messages: true`` to include the (truncated) user message preview as +``$ai_input``. Leave it ``false`` when prompt content must not leave your data +plane. + +**Multiple destinations** + +``exporters`` is a list, so you can fan out to several backends (and combine +with an OTLP collector). A common use is shipping to multiple PostHog instances +(for example separate EU and US projects for data-residency). + + Benefits of Using ``Traceparent`` Headers ----------------------------------------- diff --git a/docs/source/resources/includes/plano_config_full_reference.yaml b/docs/source/resources/includes/plano_config_full_reference.yaml index 17c8161d..e370aeda 100644 --- a/docs/source/resources/includes/plano_config_full_reference.yaml +++ b/docs/source/resources/includes/plano_config_full_reference.yaml @@ -261,3 +261,16 @@ tracing: static: environment: production service.team: platform + # Provider-agnostic export destinations. LLM spans are streamed to each of + # these in addition to any opentracing_grpc_endpoint above. + exporters: + # PostHog AI observability: each LLM call is captured as an $ai_generation event. + - type: posthog + # PostHog host. The /batch/ capture path is appended automatically. + url: https://us.i.posthog.com + # PostHog project API key (token). Supports $ENV_VAR expansion. + api_key: $POSTHOG_API_KEY + # Optional: request header used as the PostHog distinct_id. Omit for anonymous capture. + distinct_id_header: x-user-id + # Optional: include the (truncated) user message as $ai_input. Defaults to false. + capture_messages: false diff --git a/docs/source/resources/includes/plano_config_full_reference_rendered.yaml b/docs/source/resources/includes/plano_config_full_reference_rendered.yaml index 3afa4404..9cabf183 100644 --- a/docs/source/resources/includes/plano_config_full_reference_rendered.yaml +++ b/docs/source/resources/includes/plano_config_full_reference_rendered.yaml @@ -266,6 +266,12 @@ system_prompt: 'You are a helpful assistant. Always respond concisely and accura ' tracing: + exporters: + - api_key: $POSTHOG_API_KEY + capture_messages: false + distinct_id_header: x-user-id + type: posthog + url: https://us.i.posthog.com opentracing_grpc_endpoint: http://localhost:4317 random_sampling: 100 span_attributes: