Introduce signals change (#655)

* adding support for signals

* reducing false positives for signals like positive interaction

* adding docs. Still need to fix the messages list, but waiting on PR #621

* Improve frustration detection: normalize contractions and refine punctuation

* Further refine test cases with longer messages

* minor doc changes

* fixing echo statement for build

* fixing the messages construction and using the trait for signals

* update signals docs

* fixed some minor doc changes

* added more tests and fixed docuemtnation. PR 100% ready

* made fixes based on PR comments

* Optimize latency

1. replace sliding window approach with trigram containment check
2. add code to pre-compute ngrams for patterns

* removed some debug statements to make tests easier to read

* PR comments to make ObservableStreamProcessor accept optonal Vec<Messagges>

* fixed PR comments

---------

Co-authored-by: Salman Paracha <salmanparacha@MacBook-Pro-342.local>
Co-authored-by: MeiyuZhong <mariazhong9612@gmail.com>
Co-authored-by: nehcgs <54548843+nehcgs@users.noreply.github.com>
This commit is contained in:
Salman Paracha 2026-01-07 11:20:44 -08:00 committed by GitHub
parent 57327ba667
commit b4543ba56c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 3972 additions and 191 deletions

View file

@ -111,6 +111,9 @@ pub async fn llm_chat(
.get_recent_user_message()
.map(|msg| truncate_message(&msg, 50));
// Extract messages for signal analysis (clone before moving client_request)
let messages_for_signals = client_request.get_messages();
client_request.set_model(resolved_model.clone());
if client_request.remove_metadata_key("archgw_preference_config") {
debug!(
@ -292,6 +295,7 @@ pub async fn llm_chat(
operation_component::LLM,
llm_span,
request_start_time,
Some(messages_for_signals),
);
// === v1/responses state management: Wrap with ResponsesStateProcessor ===

View file

@ -10,8 +10,10 @@ use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use tracing::warn;
// Import tracing constants
use crate::tracing::{error, llm};
// Import tracing constants and signals
use crate::signals::{InteractionQuality, SignalAnalyzer, TextBasedSignalAnalyzer, FLAG_MARKER};
use crate::tracing::{error, llm, signals as signal_constants};
use hermesllm::apis::openai::Message;
/// Trait for processing streaming chunks
/// Implementors can inject custom logic during streaming (e.g., hallucination detection, logging)
@ -38,6 +40,7 @@ pub struct ObservableStreamProcessor {
chunk_count: usize,
start_time: Instant,
time_to_first_token: Option<u128>,
messages: Option<Vec<Message>>,
}
impl ObservableStreamProcessor {
@ -48,11 +51,13 @@ impl ObservableStreamProcessor {
/// * `service_name` - The service name for this span (e.g., "archgw(llm)")
/// * `span` - The span to finalize after streaming completes
/// * `start_time` - When the request started (for duration calculation)
/// * `messages` - Optional conversation messages for signal analysis
pub fn new(
collector: Arc<TraceCollector>,
service_name: impl Into<String>,
span: Span,
start_time: Instant,
messages: Option<Vec<Message>>,
) -> Self {
Self {
collector,
@ -62,6 +67,7 @@ impl ObservableStreamProcessor {
chunk_count: 0,
start_time,
time_to_first_token: None,
messages,
}
}
}
@ -133,6 +139,94 @@ impl StreamProcessor for ObservableStreamProcessor {
}
}
// Analyze signals if messages are available and add to span attributes
if let Some(ref messages) = self.messages {
let analyzer: Box<dyn SignalAnalyzer> = Box::new(TextBasedSignalAnalyzer::new());
let report = analyzer.analyze(messages);
// Add overall quality
self.span.attributes.push(Attribute {
key: signal_constants::QUALITY.to_string(),
value: AttributeValue {
string_value: Some(format!("{:?}", report.overall_quality)),
},
});
// Add repair/follow-up metrics if concerning
if report.follow_up.is_concerning || report.follow_up.repair_count > 0 {
self.span.attributes.push(Attribute {
key: signal_constants::REPAIR_COUNT.to_string(),
value: AttributeValue {
string_value: Some(report.follow_up.repair_count.to_string()),
},
});
self.span.attributes.push(Attribute {
key: signal_constants::REPAIR_RATIO.to_string(),
value: AttributeValue {
string_value: Some(format!("{:.3}", report.follow_up.repair_ratio)),
},
});
}
// Add flag marker to operation name if any concerning signal is detected
let should_flag = report.frustration.has_frustration
|| report.repetition.has_looping
|| report.escalation.escalation_requested
|| matches!(
report.overall_quality,
InteractionQuality::Poor | InteractionQuality::Severe
);
if should_flag {
// Prepend flag marker to the operation name
self.span.name = format!("{} {}", self.span.name, FLAG_MARKER);
}
// Add key signal metrics
if report.frustration.has_frustration {
self.span.attributes.push(Attribute {
key: signal_constants::FRUSTRATION_COUNT.to_string(),
value: AttributeValue {
string_value: Some(report.frustration.frustration_count.to_string()),
},
});
self.span.attributes.push(Attribute {
key: signal_constants::FRUSTRATION_SEVERITY.to_string(),
value: AttributeValue {
string_value: Some(report.frustration.severity.to_string()),
},
});
}
if report.repetition.has_looping {
self.span.attributes.push(Attribute {
key: signal_constants::REPETITION_COUNT.to_string(),
value: AttributeValue {
string_value: Some(report.repetition.repetition_count.to_string()),
},
});
}
if report.escalation.escalation_requested {
self.span.attributes.push(Attribute {
key: signal_constants::ESCALATION_REQUESTED.to_string(),
value: AttributeValue {
string_value: Some("true".to_string()),
},
});
}
if report.positive_feedback.has_positive_feedback {
self.span.attributes.push(Attribute {
key: signal_constants::POSITIVE_FEEDBACK_COUNT.to_string(),
value: AttributeValue {
string_value: Some(report.positive_feedback.positive_count.to_string()),
},
});
}
}
// Record the finalized span
self.collector
.record_span(&self.service_name, self.span.clone());

View file

@ -1,5 +1,6 @@
pub mod handlers;
pub mod router;
pub mod signals;
pub mod state;
pub mod tracing;
pub mod utils;

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,3 @@
mod analyzer;
pub use analyzer::*;

View file

@ -139,6 +139,45 @@ pub mod error {
pub const STACK_TRACE: &str = "error.stack_trace";
}
// =============================================================================
// Span Attributes - Agentic Signals
// =============================================================================
/// Behavioral quality indicators for agent interactions
/// These signals are computed automatically from conversation patterns
pub mod signals {
/// Overall quality assessment
/// Values: "Excellent", "Good", "Neutral", "Poor", "Severe"
pub const QUALITY: &str = "signals.quality";
/// Total number of turns in the conversation
pub const TURN_COUNT: &str = "signals.turn_count";
/// Efficiency score (0.0-1.0)
pub const EFFICIENCY_SCORE: &str = "signals.efficiency_score";
/// Number of repair attempts detected
pub const REPAIR_COUNT: &str = "signals.follow_up.repair.count";
/// Ratio of repairs to user turns
pub const REPAIR_RATIO: &str = "signals.follow_up.repair.ratio";
/// Number of frustration indicators detected
pub const FRUSTRATION_COUNT: &str = "signals.frustration.count";
/// Frustration severity level (0-3)
pub const FRUSTRATION_SEVERITY: &str = "signals.frustration.severity";
/// Number of repetition instances detected
pub const REPETITION_COUNT: &str = "signals.repetition.count";
/// Whether escalation was requested (user asked for human help)
pub const ESCALATION_REQUESTED: &str = "signals.escalation.requested";
/// Number of positive feedback indicators detected
pub const POSITIVE_FEEDBACK_COUNT: &str = "signals.positive_feedback.count";
}
// =============================================================================
// Operation Names
// =============================================================================

View file

@ -1,3 +1,5 @@
mod constants;
pub use constants::{error, http, llm, operation_component, routing, OperationNameBuilder};
pub use constants::{
error, http, llm, operation_component, routing, signals, OperationNameBuilder,
};