From 509976dec6a936a746cd0cae113c762cdb39513b Mon Sep 17 00:00:00 2001 From: Adil Hafeez Date: Sat, 7 Feb 2026 13:51:15 -0800 Subject: [PATCH] fix signal propagation --- crates/brightstaff/src/handlers/llm.rs | 19 +++--- crates/brightstaff/src/handlers/utils.rs | 87 ++++++++++++++++++++++-- 2 files changed, 92 insertions(+), 14 deletions(-) diff --git a/crates/brightstaff/src/handlers/llm.rs b/crates/brightstaff/src/handlers/llm.rs index 8cf42568..12456af1 100644 --- a/crates/brightstaff/src/handlers/llm.rs +++ b/crates/brightstaff/src/handlers/llm.rs @@ -314,16 +314,16 @@ async fn llm_chat_inner( // Record the routed model in span tracing::Span::current().record("model.routing_resolved", resolved_model.as_str()); + let span_name = if model_from_request == resolved_model { + format!("POST {} {}", request_path, resolved_model) + } else { + format!( + "POST {} {} -> {}", + request_path, model_from_request, resolved_model + ) + }; get_active_span(|span| { - let span_name = if model_from_request == resolved_model { - format!("POST {} {}", request_path, resolved_model) - } else { - format!( - "POST {} {} -> {}", - request_path, model_from_request, resolved_model - ) - }; - span.update_name(span_name); + span.update_name(span_name.clone()); }); debug!( @@ -380,6 +380,7 @@ async fn llm_chat_inner( // Create base processor for metrics and tracing let base_processor = ObservableStreamProcessor::new( operation_component::LLM, + span_name, request_start_time, Some(messages_for_signals), ); diff --git a/crates/brightstaff/src/handlers/utils.rs b/crates/brightstaff/src/handlers/utils.rs index 3d11b770..5d5ced8a 100644 --- a/crates/brightstaff/src/handlers/utils.rs +++ b/crates/brightstaff/src/handlers/utils.rs @@ -2,14 +2,17 @@ use bytes::Bytes; use http_body_util::combinators::BoxBody; use http_body_util::StreamBody; use hyper::body::Frame; +use opentelemetry::trace::TraceContextExt; +use opentelemetry::KeyValue; use std::time::Instant; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::StreamExt; use tracing::{info, warn, Instrument}; +use tracing_opentelemetry::OpenTelemetrySpanExt; -use crate::signals::{SignalAnalyzer, TextBasedSignalAnalyzer}; -use crate::tracing::set_service_name; +use crate::signals::{InteractionQuality, SignalAnalyzer, TextBasedSignalAnalyzer, FLAG_MARKER}; +use crate::tracing::{set_service_name, signals as signal_constants}; use hermesllm::apis::openai::Message; /// Trait for processing streaming chunks @@ -31,6 +34,7 @@ pub trait StreamProcessor: Send + 'static { /// A processor that tracks streaming metrics pub struct ObservableStreamProcessor { service_name: String, + operation_name: String, total_bytes: usize, chunk_count: usize, start_time: Instant, @@ -45,10 +49,13 @@ impl ObservableStreamProcessor { /// * `service_name` - The service name for this span (e.g., "plano(llm)") /// This will be set as the `service.name.override` attribute on the current span, /// allowing the ServiceNameOverrideExporter to route spans to different services. + /// * `operation_name` - The current span operation name (e.g., "POST /v1/chat/completions gpt-4") + /// Used to append the flag marker when concerning signals are detected. /// * `start_time` - When the request started (for duration calculation) /// * `messages` - Optional conversation messages for signal analysis pub fn new( service_name: impl Into, + operation_name: impl Into, start_time: Instant, messages: Option>, ) -> Self { @@ -60,6 +67,7 @@ impl ObservableStreamProcessor { Self { service_name, + operation_name: operation_name.into(), total_bytes: 0, chunk_count: 0, start_time, @@ -84,11 +92,80 @@ impl StreamProcessor for ObservableStreamProcessor { } fn on_complete(&mut self) { - // Analyze signals if messages are available + // Analyze signals if messages are available and record as span attributes if let Some(ref messages) = self.messages { let analyzer: Box = Box::new(TextBasedSignalAnalyzer::new()); - let _report = analyzer.analyze(messages); - // Signal analysis complete - OpenTelemetry automatic instrumentation handles span attributes + let report = analyzer.analyze(messages); + + // Get the current OTel span to set signal attributes + let span = tracing::Span::current(); + let otel_context = span.context(); + let otel_span = otel_context.span(); + + // Add overall quality + otel_span.set_attribute(KeyValue::new( + signal_constants::QUALITY, + format!("{:?}", report.overall_quality), + )); + + // Add repair/follow-up metrics if concerning + if report.follow_up.is_concerning || report.follow_up.repair_count > 0 { + otel_span.set_attribute(KeyValue::new( + signal_constants::REPAIR_COUNT, + report.follow_up.repair_count as i64, + )); + otel_span.set_attribute(KeyValue::new( + signal_constants::REPAIR_RATIO, + format!("{:.3}", report.follow_up.repair_ratio), + )); + } + + // Add frustration metrics + if report.frustration.has_frustration { + otel_span.set_attribute(KeyValue::new( + signal_constants::FRUSTRATION_COUNT, + report.frustration.frustration_count as i64, + )); + otel_span.set_attribute(KeyValue::new( + signal_constants::FRUSTRATION_SEVERITY, + report.frustration.severity as i64, + )); + } + + // Add repetition metrics + if report.repetition.has_looping { + otel_span.set_attribute(KeyValue::new( + signal_constants::REPETITION_COUNT, + report.repetition.repetition_count as i64, + )); + } + + // Add escalation metrics + if report.escalation.escalation_requested { + otel_span + .set_attribute(KeyValue::new(signal_constants::ESCALATION_REQUESTED, true)); + } + + // Add positive feedback metrics + if report.positive_feedback.has_positive_feedback { + otel_span.set_attribute(KeyValue::new( + signal_constants::POSITIVE_FEEDBACK_COUNT, + report.positive_feedback.positive_count as i64, + )); + } + + // Flag the span name if any concerning signal is detected + let should_flag = report.frustration.has_frustration + || report.repetition.has_looping + || report.escalation.escalation_requested + || matches!( + report.overall_quality, + InteractionQuality::Poor | InteractionQuality::Severe + ); + + if should_flag { + otel_span.update_name(format!("{} {}", self.operation_name, FLAG_MARKER)); + } } info!(