fix signal propagation

This commit is contained in:
Adil Hafeez 2026-02-07 13:51:15 -08:00
parent 7d75b340b5
commit 509976dec6
No known key found for this signature in database
GPG key ID: 9B18EF7691369645
2 changed files with 92 additions and 14 deletions

View file

@ -314,16 +314,16 @@ async fn llm_chat_inner(
// Record the routed model in span
tracing::Span::current().record("model.routing_resolved", resolved_model.as_str());
let span_name = if model_from_request == resolved_model {
format!("POST {} {}", request_path, resolved_model)
} else {
format!(
"POST {} {} -> {}",
request_path, model_from_request, resolved_model
)
};
get_active_span(|span| {
let span_name = if model_from_request == resolved_model {
format!("POST {} {}", request_path, resolved_model)
} else {
format!(
"POST {} {} -> {}",
request_path, model_from_request, resolved_model
)
};
span.update_name(span_name);
span.update_name(span_name.clone());
});
debug!(
@ -380,6 +380,7 @@ async fn llm_chat_inner(
// Create base processor for metrics and tracing
let base_processor = ObservableStreamProcessor::new(
operation_component::LLM,
span_name,
request_start_time,
Some(messages_for_signals),
);

View file

@ -2,14 +2,17 @@ use bytes::Bytes;
use http_body_util::combinators::BoxBody;
use http_body_util::StreamBody;
use hyper::body::Frame;
use opentelemetry::trace::TraceContextExt;
use opentelemetry::KeyValue;
use std::time::Instant;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use tracing::{info, warn, Instrument};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use crate::signals::{SignalAnalyzer, TextBasedSignalAnalyzer};
use crate::tracing::set_service_name;
use crate::signals::{InteractionQuality, SignalAnalyzer, TextBasedSignalAnalyzer, FLAG_MARKER};
use crate::tracing::{set_service_name, signals as signal_constants};
use hermesllm::apis::openai::Message;
/// Trait for processing streaming chunks
@ -31,6 +34,7 @@ pub trait StreamProcessor: Send + 'static {
/// A processor that tracks streaming metrics
pub struct ObservableStreamProcessor {
service_name: String,
operation_name: String,
total_bytes: usize,
chunk_count: usize,
start_time: Instant,
@ -45,10 +49,13 @@ impl ObservableStreamProcessor {
/// * `service_name` - The service name for this span (e.g., "plano(llm)")
/// This will be set as the `service.name.override` attribute on the current span,
/// allowing the ServiceNameOverrideExporter to route spans to different services.
/// * `operation_name` - The current span operation name (e.g., "POST /v1/chat/completions gpt-4")
/// Used to append the flag marker when concerning signals are detected.
/// * `start_time` - When the request started (for duration calculation)
/// * `messages` - Optional conversation messages for signal analysis
pub fn new(
service_name: impl Into<String>,
operation_name: impl Into<String>,
start_time: Instant,
messages: Option<Vec<Message>>,
) -> Self {
@ -60,6 +67,7 @@ impl ObservableStreamProcessor {
Self {
service_name,
operation_name: operation_name.into(),
total_bytes: 0,
chunk_count: 0,
start_time,
@ -84,11 +92,80 @@ impl StreamProcessor for ObservableStreamProcessor {
}
fn on_complete(&mut self) {
// Analyze signals if messages are available
// Analyze signals if messages are available and record as span attributes
if let Some(ref messages) = self.messages {
let analyzer: Box<dyn SignalAnalyzer> = Box::new(TextBasedSignalAnalyzer::new());
let _report = analyzer.analyze(messages);
// Signal analysis complete - OpenTelemetry automatic instrumentation handles span attributes
let report = analyzer.analyze(messages);
// Get the current OTel span to set signal attributes
let span = tracing::Span::current();
let otel_context = span.context();
let otel_span = otel_context.span();
// Add overall quality
otel_span.set_attribute(KeyValue::new(
signal_constants::QUALITY,
format!("{:?}", report.overall_quality),
));
// Add repair/follow-up metrics if concerning
if report.follow_up.is_concerning || report.follow_up.repair_count > 0 {
otel_span.set_attribute(KeyValue::new(
signal_constants::REPAIR_COUNT,
report.follow_up.repair_count as i64,
));
otel_span.set_attribute(KeyValue::new(
signal_constants::REPAIR_RATIO,
format!("{:.3}", report.follow_up.repair_ratio),
));
}
// Add frustration metrics
if report.frustration.has_frustration {
otel_span.set_attribute(KeyValue::new(
signal_constants::FRUSTRATION_COUNT,
report.frustration.frustration_count as i64,
));
otel_span.set_attribute(KeyValue::new(
signal_constants::FRUSTRATION_SEVERITY,
report.frustration.severity as i64,
));
}
// Add repetition metrics
if report.repetition.has_looping {
otel_span.set_attribute(KeyValue::new(
signal_constants::REPETITION_COUNT,
report.repetition.repetition_count as i64,
));
}
// Add escalation metrics
if report.escalation.escalation_requested {
otel_span
.set_attribute(KeyValue::new(signal_constants::ESCALATION_REQUESTED, true));
}
// Add positive feedback metrics
if report.positive_feedback.has_positive_feedback {
otel_span.set_attribute(KeyValue::new(
signal_constants::POSITIVE_FEEDBACK_COUNT,
report.positive_feedback.positive_count as i64,
));
}
// Flag the span name if any concerning signal is detected
let should_flag = report.frustration.has_frustration
|| report.repetition.has_looping
|| report.escalation.escalation_requested
|| matches!(
report.overall_quality,
InteractionQuality::Poor | InteractionQuality::Severe
);
if should_flag {
otel_span.update_name(format!("{} {}", self.operation_name, FLAG_MARKER));
}
}
info!(