adding support for signals

This commit is contained in:
Salman Paracha 2025-12-17 13:09:09 -08:00
parent a56bb9d190
commit ec9ec7b6bd
7 changed files with 1957 additions and 12 deletions

1
crates/Cargo.lock generated
View file

@ -335,6 +335,7 @@ dependencies = [
"serde_json",
"serde_with",
"serde_yaml",
"strsim",
"thiserror 2.0.12",
"time",
"tokio",

View file

@ -30,6 +30,7 @@ reqwest = { version = "0.12.15", features = ["stream"] }
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140"
serde_with = "3.13.0"
strsim = "0.11"
serde_yaml = "0.9.34"
thiserror = "2.0.12"
tokio = { version = "1.44.2", features = ["full"] }

View file

@ -99,6 +99,9 @@ pub async fn llm_chat(
let user_message_preview = client_request.get_recent_user_message()
.map(|msg| truncate_message(&msg, 50));
// Extract messages for signal analysis (clone before moving client_request)
let messages_for_signals = extract_messages_for_signals(&client_request);
client_request.set_model(resolved_model.clone());
if client_request.remove_metadata_key("archgw_preference_config") {
debug!("[PLANO_REQ_ID:{}] Removed archgw_preference_config from metadata", request_id);
@ -260,13 +263,18 @@ pub async fn llm_chat(
).await;
// Create base processor for metrics and tracing
let base_processor = ObservableStreamProcessor::new(
let mut base_processor = ObservableStreamProcessor::new(
trace_collector,
operation_component::LLM,
llm_span,
request_start_time,
);
// Add messages for signal analysis if available
if let Some(messages) = messages_for_signals {
base_processor = base_processor.with_messages(messages);
}
// === v1/responses state management: Wrap with ResponsesStateProcessor ===
// Only wrap if we need to manage state (client is ResponsesAPI AND upstream is NOT ResponsesAPI AND state_storage is configured)
let streaming_response = if should_manage_state && !original_input_items.is_empty() && state_storage.is_some() {
@ -444,19 +452,20 @@ async fn get_provider_info(
let provider_id = provider.provider_interface.to_provider_id();
let prefix = provider.base_url_path_prefix.clone();
return (provider_id, prefix);
}
let default_provider = providers_lock.iter().find(|p| {
p.default.unwrap_or(false)
});
if let Some(provider) = default_provider {
let provider_id = provider.provider_interface.to_provider_id();
let prefix = provider.base_url_path_prefix.clone();
(provider_id, prefix)
} else {
// Last resort: use OpenAI as hardcoded fallback
warn!("No default provider found, falling back to OpenAI");
(hermesllm::ProviderId::OpenAI, None)
}
}
/// Extract messages from ProviderRequestType for signal analysis
/// Returns None for non-ChatCompletions requests
fn extract_messages_for_signals(request: &ProviderRequestType) -> Option<Vec<hermesllm::apis::openai::Message>> {
match request {
ProviderRequestType::ChatCompletionsRequest(chat_req) => {
Some(chat_req.messages.clone())
}
_ => None,
}
}

View file

@ -10,8 +10,10 @@ use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use tracing::warn;
// Import tracing constants
// Import tracing constants and signals
use crate::tracing::{llm, error};
use crate::signals::signals::{SignalAnalyzer, InteractionQuality, FLAG_MARKER};
use hermesllm::apis::openai::Message;
/// Trait for processing streaming chunks
/// Implementors can inject custom logic during streaming (e.g., hallucination detection, logging)
@ -38,6 +40,7 @@ pub struct ObservableStreamProcessor {
chunk_count: usize,
start_time: Instant,
time_to_first_token: Option<u128>,
messages: Option<Vec<Message>>,
}
impl ObservableStreamProcessor {
@ -62,8 +65,15 @@ impl ObservableStreamProcessor {
chunk_count: 0,
start_time,
time_to_first_token: None,
messages: None,
}
}
/// Set the conversation messages for signal analysis
pub fn with_messages(mut self, messages: Vec<Message>) -> Self {
self.messages = Some(messages);
self
}
}
impl StreamProcessor for ObservableStreamProcessor {
@ -136,6 +146,77 @@ impl StreamProcessor for ObservableStreamProcessor {
}
}
// Analyze signals if messages are available and add to span attributes
if let Some(ref messages) = self.messages {
let analyzer = SignalAnalyzer::new();
let report = analyzer.analyze(messages);
// Add overall quality
self.span.attributes.push(Attribute {
key: "signals.quality".to_string(),
value: AttributeValue {
string_value: Some(format!("{:?}", report.overall_quality)),
},
});
// Add flag marker to operation name if any concerning signal is detected
let should_flag = report.frustration.has_frustration
|| report.repetition.has_looping
|| report.escalation.escalation_requested
|| matches!(
report.overall_quality,
InteractionQuality::Poor | InteractionQuality::Critical
);
if should_flag {
// Prepend flag marker to the operation name
self.span.name = format!("{} {}", self.span.name, FLAG_MARKER);
}
// Add key signal metrics
if report.frustration.has_frustration {
self.span.attributes.push(Attribute {
key: "signals.frustration.count".to_string(),
value: AttributeValue {
string_value: Some(report.frustration.frustration_count.to_string()),
},
});
self.span.attributes.push(Attribute {
key: "signals.frustration.severity".to_string(),
value: AttributeValue {
string_value: Some(report.frustration.severity.to_string()),
},
});
}
if report.repetition.has_looping {
self.span.attributes.push(Attribute {
key: "signals.repetition.count".to_string(),
value: AttributeValue {
string_value: Some(report.repetition.repetition_count.to_string()),
},
});
}
if report.escalation.escalation_requested {
self.span.attributes.push(Attribute {
key: "signals.escalation.requested".to_string(),
value: AttributeValue {
string_value: Some("true".to_string()),
},
});
}
if report.positive_feedback.has_positive_feedback {
self.span.attributes.push(Attribute {
key: "signals.positive_feedback.count".to_string(),
value: AttributeValue {
string_value: Some(report.positive_feedback.positive_count.to_string()),
},
});
}
}
// Record the finalized span
self.collector.record_span(&self.service_name, self.span.clone());
}

View file

@ -1,5 +1,6 @@
pub mod handlers;
pub mod router;
pub mod state;
pub mod signals;
pub mod tracing;
pub mod utils;

View file

@ -0,0 +1 @@
pub mod signals;

File diff suppressed because it is too large Load diff