mirror of
https://github.com/katanemo/plano.git
synced 2026-06-17 15:25:17 +02:00
adding support for behavior signals
This commit is contained in:
parent
33e90dd338
commit
aa78ac4791
7 changed files with 1957 additions and 2 deletions
1
crates/Cargo.lock
generated
1
crates/Cargo.lock
generated
|
|
@ -333,6 +333,7 @@ dependencies = [
|
|||
"serde_json",
|
||||
"serde_with",
|
||||
"serde_yaml",
|
||||
"strsim",
|
||||
"thiserror 2.0.12",
|
||||
"time",
|
||||
"tokio",
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ reqwest = { version = "0.12.15", features = ["stream"] }
|
|||
serde = { version = "1.0.219", features = ["derive"] }
|
||||
serde_json = "1.0.140"
|
||||
serde_with = "3.13.0"
|
||||
strsim = "0.11"
|
||||
serde_yaml = "0.9.34"
|
||||
thiserror = "2.0.12"
|
||||
tokio = { version = "1.44.2", features = ["full"] }
|
||||
|
|
|
|||
|
|
@ -81,6 +81,9 @@ pub async fn llm_chat(
|
|||
let user_message_preview = client_request.get_recent_user_message()
|
||||
.map(|msg| truncate_message(&msg, 50));
|
||||
|
||||
// Extract messages for signal analysis (clone before moving client_request)
|
||||
let messages_for_signals = extract_messages_for_signals(&client_request);
|
||||
|
||||
client_request.set_model(resolved_model.clone());
|
||||
if client_request.remove_metadata_key("archgw_preference_config") {
|
||||
debug!("Removed archgw_preference_config from metadata");
|
||||
|
|
@ -174,13 +177,18 @@ pub async fn llm_chat(
|
|||
).await;
|
||||
|
||||
// Use PassthroughProcessor to track streaming metrics and finalize the span
|
||||
let processor = ObservableStreamProcessor::new(
|
||||
let mut processor = ObservableStreamProcessor::new(
|
||||
trace_collector,
|
||||
operation_component::LLM,
|
||||
llm_span,
|
||||
request_start_time,
|
||||
);
|
||||
|
||||
// Add messages for signal analysis if available
|
||||
if let Some(messages) = messages_for_signals {
|
||||
processor = processor.with_messages(messages);
|
||||
}
|
||||
|
||||
let streaming_response = create_streaming_response(byte_stream, processor, 16);
|
||||
|
||||
match response.body(streaming_response.body) {
|
||||
|
|
@ -343,3 +351,14 @@ async fn get_upstream_path(
|
|||
base_url_path_prefix.as_deref(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Extract messages from ProviderRequestType for signal analysis
|
||||
/// Returns None for non-ChatCompletions requests
|
||||
fn extract_messages_for_signals(request: &ProviderRequestType) -> Option<Vec<hermesllm::apis::openai::Message>> {
|
||||
match request {
|
||||
ProviderRequestType::ChatCompletionsRequest(chat_req) => {
|
||||
Some(chat_req.messages.clone())
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,8 +10,10 @@ use tokio_stream::wrappers::ReceiverStream;
|
|||
use tokio_stream::StreamExt;
|
||||
use tracing::warn;
|
||||
|
||||
// Import tracing constants
|
||||
// Import tracing constants and signals
|
||||
use crate::tracing::{llm, error};
|
||||
use crate::signals::signals::{SignalAnalyzer, InteractionQuality, FLAG_MARKER};
|
||||
use hermesllm::apis::openai::Message;
|
||||
|
||||
/// Trait for processing streaming chunks
|
||||
/// Implementors can inject custom logic during streaming (e.g., hallucination detection, logging)
|
||||
|
|
@ -38,6 +40,7 @@ pub struct ObservableStreamProcessor {
|
|||
chunk_count: usize,
|
||||
start_time: Instant,
|
||||
time_to_first_token: Option<u128>,
|
||||
messages: Option<Vec<Message>>,
|
||||
}
|
||||
|
||||
impl ObservableStreamProcessor {
|
||||
|
|
@ -62,8 +65,15 @@ impl ObservableStreamProcessor {
|
|||
chunk_count: 0,
|
||||
start_time,
|
||||
time_to_first_token: None,
|
||||
messages: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Set the conversation messages for signal analysis
|
||||
pub fn with_messages(mut self, messages: Vec<Message>) -> Self {
|
||||
self.messages = Some(messages);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl StreamProcessor for ObservableStreamProcessor {
|
||||
|
|
@ -136,6 +146,77 @@ impl StreamProcessor for ObservableStreamProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
// Analyze signals if messages are available and add to span attributes
|
||||
if let Some(ref messages) = self.messages {
|
||||
let analyzer = SignalAnalyzer::new();
|
||||
let report = analyzer.analyze(messages);
|
||||
|
||||
// Add overall quality
|
||||
self.span.attributes.push(Attribute {
|
||||
key: "signals.quality".to_string(),
|
||||
value: AttributeValue {
|
||||
string_value: Some(format!("{:?}", report.overall_quality)),
|
||||
},
|
||||
});
|
||||
|
||||
// Add flag marker to operation name if any concerning signal is detected
|
||||
let should_flag = report.frustration.has_frustration
|
||||
|| report.repetition.has_looping
|
||||
|| report.escalation.escalation_requested
|
||||
|| matches!(
|
||||
report.overall_quality,
|
||||
InteractionQuality::Poor | InteractionQuality::Critical
|
||||
);
|
||||
|
||||
if should_flag {
|
||||
// Prepend flag marker to the operation name
|
||||
self.span.name = format!("{} {}", self.span.name, FLAG_MARKER);
|
||||
}
|
||||
|
||||
// Add key signal metrics
|
||||
if report.frustration.has_frustration {
|
||||
self.span.attributes.push(Attribute {
|
||||
key: "signals.frustration.count".to_string(),
|
||||
value: AttributeValue {
|
||||
string_value: Some(report.frustration.frustration_count.to_string()),
|
||||
},
|
||||
});
|
||||
self.span.attributes.push(Attribute {
|
||||
key: "signals.frustration.severity".to_string(),
|
||||
value: AttributeValue {
|
||||
string_value: Some(report.frustration.severity.to_string()),
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
if report.repetition.has_looping {
|
||||
self.span.attributes.push(Attribute {
|
||||
key: "signals.repetition.count".to_string(),
|
||||
value: AttributeValue {
|
||||
string_value: Some(report.repetition.repetition_count.to_string()),
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
if report.escalation.escalation_requested {
|
||||
self.span.attributes.push(Attribute {
|
||||
key: "signals.escalation.requested".to_string(),
|
||||
value: AttributeValue {
|
||||
string_value: Some("true".to_string()),
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
if report.positive_feedback.has_positive_feedback {
|
||||
self.span.attributes.push(Attribute {
|
||||
key: "signals.positive_feedback.count".to_string(),
|
||||
value: AttributeValue {
|
||||
string_value: Some(report.positive_feedback.positive_count.to_string()),
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Record the finalized span
|
||||
self.collector.record_span(&self.service_name, self.span.clone());
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
pub mod handlers;
|
||||
pub mod router;
|
||||
pub mod signals;
|
||||
pub mod tracing;
|
||||
pub mod utils;
|
||||
|
|
|
|||
1
crates/brightstaff/src/signals/mod.rs
Normal file
1
crates/brightstaff/src/signals/mod.rs
Normal file
|
|
@ -0,0 +1 @@
|
|||
pub mod signals;
|
||||
1851
crates/brightstaff/src/signals/signals.rs
Normal file
1851
crates/brightstaff/src/signals/signals.rs
Normal file
File diff suppressed because it is too large
Load diff
Loading…
Add table
Add a link
Reference in a new issue