diff --git a/cli/planoai/trace_cmd.py b/cli/planoai/trace_cmd.py index 42538434..3c7eba3b 100644 --- a/cli/planoai/trace_cmd.py +++ b/cli/planoai/trace_cmd.py @@ -428,6 +428,27 @@ def _anyvalue_to_python(value_obj: Any) -> Any: return None +def _kv_to_attr_dict(kv: Any) -> dict[str, Any] | None: + """Convert a protobuf KeyValue into the {"key", "value"} dict shape used + internally. Returns None if the value type is unsupported.""" + py_val = _anyvalue_to_python(kv.value) + if py_val is None: + return None + value_dict: dict[str, Any] = {} + if isinstance(py_val, bool): + # bool check must come before int, since bool is a subclass of int. + value_dict["boolValue"] = py_val + elif isinstance(py_val, str): + value_dict["stringValue"] = py_val + elif isinstance(py_val, int): + value_dict["intValue"] = str(py_val) + elif isinstance(py_val, float): + value_dict["doubleValue"] = py_val + else: + return None + return {"key": kv.key, "value": value_dict} + + def _proto_span_to_dict(span: Any, service_name: str) -> dict[str, Any]: """Convert a protobuf Span message to the dict format used internally.""" span_dict: dict[str, Any] = { @@ -439,20 +460,29 @@ def _proto_span_to_dict(span: Any, service_name: str) -> dict[str, Any]: "endTimeUnixNano": str(span.end_time_unix_nano), "service": service_name, "attributes": [], + "events": [], } for kv in span.attributes: - py_val = _anyvalue_to_python(kv.value) - if py_val is not None: - value_dict: dict[str, Any] = {} - if isinstance(py_val, str): - value_dict["stringValue"] = py_val - elif isinstance(py_val, bool): - value_dict["boolValue"] = py_val - elif isinstance(py_val, int): - value_dict["intValue"] = str(py_val) - elif isinstance(py_val, float): - value_dict["doubleValue"] = py_val - span_dict["attributes"].append({"key": kv.key, "value": value_dict}) + attr = _kv_to_attr_dict(kv) + if attr is not None: + span_dict["attributes"].append(attr) + + # Preserve span events (name, timestamp, attributes). OTel span events are + # the drill-down channel for granular signals that aggregate span + # attributes summarize. Dropping them here would make every + # per-detection `SignalEvent` emitted by brightstaff invisible to + # `planoai trace`. + for event in span.events: + event_dict: dict[str, Any] = { + "name": event.name, + "timeUnixNano": str(event.time_unix_nano), + "attributes": [], + } + for kv in event.attributes: + attr = _kv_to_attr_dict(kv) + if attr is not None: + event_dict["attributes"].append(attr) + span_dict["events"].append(event_dict) return span_dict diff --git a/cli/test/test_trace_cmd.py b/cli/test/test_trace_cmd.py index fdcf8c3c..577488e9 100644 --- a/cli/test/test_trace_cmd.py +++ b/cli/test/test_trace_cmd.py @@ -75,6 +75,89 @@ class _FakeGrpcServer: return None +def test_proto_span_to_dict_preserves_span_events(): + """Span events are the drill-down channel for granular signals. The OTLP + store must preserve them so `planoai trace` can surface per-detection + SignalEvent payloads alongside aggregate signals.* attributes.""" + from opentelemetry.proto.common.v1 import common_pb2 + from opentelemetry.proto.trace.v1 import trace_pb2 + + span = trace_pb2.Span( + trace_id=bytes.fromhex("0123456789abcdef0123456789abcdef"), + span_id=bytes.fromhex("1111111111111111"), + parent_span_id=b"", + name="POST /v1/chat/completions gpt-4", + start_time_unix_nano=1_700_000_000_000_000_000, + end_time_unix_nano=1_700_000_000_500_000_000, + attributes=[ + common_pb2.KeyValue( + key="signals.quality", + value=common_pb2.AnyValue(string_value="Neutral"), + ), + ], + events=[ + trace_pb2.Span.Event( + time_unix_nano=1_700_000_000_100_000_000, + name="signal.interaction.frustration", + attributes=[ + common_pb2.KeyValue( + key="signal.event_id", + value=common_pb2.AnyValue( + string_value="01HF4ZABCDEF0123456789ABCD" + ), + ), + common_pb2.KeyValue( + key="signal.source_message_idx", + value=common_pb2.AnyValue(int_value=3), + ), + common_pb2.KeyValue( + key="signal.evidence.snippet", + value=common_pb2.AnyValue(string_value="WHY"), + ), + ], + ), + ], + ) + + span_dict = trace_cmd._proto_span_to_dict(span, "plano(llm)") + + assert span_dict["name"] == "POST /v1/chat/completions gpt-4" + assert span_dict["attributes"] == [ + {"key": "signals.quality", "value": {"stringValue": "Neutral"}} + ] + + assert "events" in span_dict, "span events must be preserved" + assert len(span_dict["events"]) == 1 + event = span_dict["events"][0] + assert event["name"] == "signal.interaction.frustration" + assert event["timeUnixNano"] == "1700000000100000000" + + event_attrs = {a["key"]: a["value"] for a in event["attributes"]} + assert event_attrs["signal.event_id"] == { + "stringValue": "01HF4ZABCDEF0123456789ABCD" + } + assert event_attrs["signal.source_message_idx"] == {"intValue": "3"} + assert event_attrs["signal.evidence.snippet"] == {"stringValue": "WHY"} + + +def test_proto_span_to_dict_no_events_yields_empty_list(): + """Spans without events should still produce an `events: []` key so + downstream code can treat it as always present.""" + from opentelemetry.proto.trace.v1 import trace_pb2 + + span = trace_pb2.Span( + trace_id=bytes.fromhex("0123456789abcdef0123456789abcdef"), + span_id=bytes.fromhex("2222222222222222"), + parent_span_id=b"", + name="POST /v1/chat/completions", + start_time_unix_nano=1_700_000_000_000_000_000, + end_time_unix_nano=1_700_000_000_100_000_000, + ) + + span_dict = trace_cmd._proto_span_to_dict(span, "plano(llm)") + assert span_dict["events"] == [] + + def test_start_trace_server_raises_bind_error(monkeypatch): monkeypatch.setattr( trace_cmd.grpc, "server", lambda *_args, **_kwargs: _FakeGrpcServer() diff --git a/crates/Cargo.lock b/crates/Cargo.lock index e07b47ee..401eb0b5 100644 --- a/crates/Cargo.lock +++ b/crates/Cargo.lock @@ -339,6 +339,7 @@ dependencies = [ "tracing", "tracing-opentelemetry", "tracing-subscriber", + "ulid", "uuid", ] @@ -3659,6 +3660,17 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" +[[package]] +name = "ulid" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "470dbf6591da1b39d43c14523b2b469c86879a53e8b758c8e090a470fe7b1fbe" +dependencies = [ + "rand 0.9.4", + "serde", + "web-time", +] + [[package]] name = "unicase" version = "2.9.0" diff --git a/crates/brightstaff/Cargo.toml b/crates/brightstaff/Cargo.toml index f88ed918..f2685ecd 100644 --- a/crates/brightstaff/Cargo.toml +++ b/crates/brightstaff/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" async-openai = "0.30.1" async-trait = "0.1" bytes = "1.10.1" -chrono = "0.4" +chrono = { version = "0.4", features = ["serde"] } common = { version = "0.1.0", path = "../common" } eventsource-client = "0.15.0" eventsource-stream = "0.2.3" @@ -42,6 +42,7 @@ time = { version = "0.3", features = ["formatting", "macros"] } tracing = "0.1" tracing-opentelemetry = "0.32.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } +ulid = { version = "1.1", features = ["serde"] } uuid = { version = "1.0", features = ["v4", "serde"] } [dev-dependencies] diff --git a/crates/brightstaff/src/signals/analyzer.rs b/crates/brightstaff/src/signals/analyzer.rs index 5ee3c7d9..e01f6d87 100644 --- a/crates/brightstaff/src/signals/analyzer.rs +++ b/crates/brightstaff/src/signals/analyzer.rs @@ -940,8 +940,26 @@ pub struct FollowUpSignal { pub repair_ratio: f64, /// Whether repair ratio is concerning (> 0.3) pub is_concerning: bool, - /// List of detected repair phrases + /// List of detected repair phrases (human-readable) pub repair_phrases: Vec, + /// Structured per-message repair indicators, one per detection + #[serde(default)] + pub repair_indicators: Vec, +} + +/// Individual repair / follow-up indicator +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FollowUpIndicator { + /// Message index where the repair was detected (relative to the analyzer's + /// normalized message slice; absolute indices are attached when projecting + /// to `SignalEvent`). + pub message_index: usize, + /// Relevant text snippet — either the matched lexical pattern or a marker + /// string for the semantic-rephrase path. + pub snippet: String, + /// True when the repair was detected by semantic rephrase of the previous + /// user turn rather than a lexical repair pattern. + pub similar_to_prev_user_turn: bool, } /// User frustration indicators @@ -1102,6 +1120,13 @@ pub enum EscalationType { pub trait SignalAnalyzer { /// Analyze a conversation and generate a complete signal report fn analyze(&self, messages: &[Message]) -> SignalReport; + + /// Analyze a conversation and return both the aggregate report and the + /// underlying event stream. The default implementation returns an empty + /// event list; implementors that support events should override. + fn analyze_with_events(&self, messages: &[Message]) -> (SignalReport, Vec) { + (self.analyze(messages), Vec::new()) + } } /// Text-based implementation of signal analyzer that computes all signals from a message array @@ -1248,6 +1273,7 @@ impl TextBasedSignalAnalyzer { ) -> FollowUpSignal { let mut repair_count = 0; let mut repair_phrases = Vec::new(); + let mut repair_indicators = Vec::new(); let mut user_turn_count = 0; for (i, role, norm_msg) in normalized_messages { @@ -1269,6 +1295,11 @@ impl TextBasedSignalAnalyzer { ) { repair_count += 1; repair_phrases.push(format!("Turn {}: '{}'", i + 1, pattern.raw)); + repair_indicators.push(FollowUpIndicator { + message_index: *i, + snippet: pattern.raw.clone(), + similar_to_prev_user_turn: false, + }); found_in_turn = true; break; } @@ -1284,6 +1315,11 @@ impl TextBasedSignalAnalyzer { repair_count += 1; repair_phrases .push(format!("Turn {}: Similar rephrase detected", i + 1)); + repair_indicators.push(FollowUpIndicator { + message_index: *i, + snippet: "Similar rephrase detected".to_string(), + similar_to_prev_user_turn: true, + }); } break; } @@ -1304,6 +1340,7 @@ impl TextBasedSignalAnalyzer { repair_ratio, is_concerning, repair_phrases, + repair_indicators, } } @@ -1865,6 +1902,10 @@ impl TextBasedSignalAnalyzer { impl SignalAnalyzer for TextBasedSignalAnalyzer { fn analyze(&self, messages: &[Message]) -> SignalReport { + self.analyze_with_events(messages).0 + } + + fn analyze_with_events(&self, messages: &[Message]) -> (SignalReport, Vec) { // Limit the number of messages to process (take most recent messages) let messages_to_process = if messages.len() > self.max_messages { &messages[messages.len() - self.max_messages..] @@ -1914,7 +1955,7 @@ impl SignalAnalyzer for TextBasedSignalAnalyzer { &overall_quality, ); - SignalReport { + let report = SignalReport { turn_count, follow_up, frustration, @@ -1923,7 +1964,88 @@ impl SignalAnalyzer for TextBasedSignalAnalyzer { escalation, overall_quality, summary, + }; + + // Detector indicator indices are relative to the (possibly truncated) + // slice. Project them back to absolute indices in the original input + // so event consumers can reference the original conversation. + let abs_offset = messages.len().saturating_sub(self.max_messages); + let events = Self::project_events(&report, abs_offset); + + (report, events) + } +} + +impl TextBasedSignalAnalyzer { + /// Project the indicator collections inside a `SignalReport` into a + /// flat event stream. One event per detected indicator. + fn project_events(report: &SignalReport, abs_offset: usize) -> Vec { + use super::{SignalEvent, SignalEvidence, SignalSubtype}; + + let mut events: Vec = Vec::new(); + + for indicator in &report.follow_up.repair_indicators { + events.push(SignalEvent::new( + SignalSubtype::Repair, + indicator.message_index + abs_offset, + SignalEvidence::Repair { + snippet: indicator.snippet.clone(), + similar_to_prev_user_turn: indicator.similar_to_prev_user_turn, + }, + )); } + + for ind in &report.frustration.indicators { + events.push(SignalEvent::new( + SignalSubtype::Frustration, + ind.message_index + abs_offset, + SignalEvidence::Frustration { + indicator_type: ind.indicator_type.clone(), + snippet: ind.snippet.clone(), + }, + )); + } + + for rep in &report.repetition.repetitions { + // `message_indices` always has exactly two entries: the earlier + // and later assistant messages in the detected pair. + if let [first, second] = rep.message_indices.as_slice() { + events.push(SignalEvent::new( + SignalSubtype::Repetition, + first + abs_offset, + SignalEvidence::Repetition { + other_message_idx: second + abs_offset, + similarity: rep.similarity, + repetition_type: rep.repetition_type.clone(), + }, + )); + } + } + + for ind in &report.positive_feedback.indicators { + events.push(SignalEvent::new( + SignalSubtype::PositiveFeedback, + ind.message_index + abs_offset, + SignalEvidence::PositiveFeedback { + indicator_type: ind.indicator_type.clone(), + snippet: ind.snippet.clone(), + }, + )); + } + + for req in &report.escalation.requests { + events.push(SignalEvent::new( + SignalSubtype::Escalation, + req.message_index + abs_offset, + SignalEvidence::Escalation { + escalation_type: req.escalation_type.clone(), + snippet: req.snippet.clone(), + }, + )); + } + + events.sort_by_key(|e| e.source_message_idx); + events } } @@ -3187,4 +3309,385 @@ mod tests { // Validate overall quality assert_eq!(report.overall_quality, InteractionQuality::Severe, "Should be classified as Severe due to escalation + excessive frustration + looping + high repair ratio"); } + + // ======================================================================== + // Event-level tests (Phase 1 — SignalEvent stream) + // ======================================================================== + + use crate::signals::{SignalEvidence, SignalSubtype, SignalType}; + use std::collections::HashSet; + + fn sample_frustrated_conversation() -> Vec { + vec![ + create_message(Role::User, "I can't log into my account"), + create_message( + Role::Assistant, + "Have you tried resetting your password from the login page?", + ), + create_message( + Role::User, + "THAT DOESN'T WORK I ALREADY TRIED THAT!!! This is ridiculous!", + ), + create_message( + Role::Assistant, + "I apologize for the frustration. Let me escalate this.", + ), + create_message(Role::User, "Can I speak to a human agent please?"), + ] + } + + #[test] + fn empty_conversation_yields_no_events() { + let analyzer = TextBasedSignalAnalyzer::new(); + let (_report, events) = analyzer.analyze_with_events(&[]); + assert!(events.is_empty()); + } + + #[test] + fn events_have_unique_ids() { + let analyzer = TextBasedSignalAnalyzer::new(); + let messages = sample_frustrated_conversation(); + let (_report, events) = analyzer.analyze_with_events(&messages); + assert!(!events.is_empty(), "sample should produce events"); + + let ids: HashSet<_> = events.iter().map(|e| e.event_id).collect(); + assert_eq!( + ids.len(), + events.len(), + "every SignalEvent must have a unique event_id" + ); + } + + #[test] + fn event_source_idx_within_message_bounds() { + let analyzer = TextBasedSignalAnalyzer::new(); + let messages = sample_frustrated_conversation(); + let (_report, events) = analyzer.analyze_with_events(&messages); + + for event in &events { + assert!( + event.source_message_idx < messages.len(), + "event source_message_idx {} out of bounds for conversation of len {} (subtype={:?})", + event.source_message_idx, + messages.len(), + event.signal_subtype, + ); + } + } + + #[test] + fn event_role_matches_subtype() { + let analyzer = TextBasedSignalAnalyzer::new(); + let messages = sample_frustrated_conversation(); + let (_report, events) = analyzer.analyze_with_events(&messages); + + for event in &events { + let role = &messages[event.source_message_idx].role; + match event.signal_subtype { + SignalSubtype::Repair + | SignalSubtype::Frustration + | SignalSubtype::PositiveFeedback + | SignalSubtype::Escalation => { + assert_eq!( + *role, + Role::User, + "subtype {:?} must reference a User message", + event.signal_subtype + ); + } + SignalSubtype::Repetition => { + assert_eq!( + *role, + Role::Assistant, + "Repetition must reference an Assistant message" + ); + } + SignalSubtype::ToolFailure + | SignalSubtype::ExecutionLoop + | SignalSubtype::Exhaustion => { + panic!( + "Phase 2 subtype {:?} should not be emitted in Phase 1", + event.signal_subtype + ); + } + } + } + } + + #[test] + fn event_evidence_variant_matches_subtype() { + let analyzer = TextBasedSignalAnalyzer::new(); + let messages = sample_frustrated_conversation(); + let (_report, events) = analyzer.analyze_with_events(&messages); + + for event in &events { + let ok = matches!( + (&event.signal_subtype, &event.evidence), + (SignalSubtype::Repair, SignalEvidence::Repair { .. }) + | ( + SignalSubtype::Frustration, + SignalEvidence::Frustration { .. } + ) + | (SignalSubtype::Repetition, SignalEvidence::Repetition { .. }) + | ( + SignalSubtype::PositiveFeedback, + SignalEvidence::PositiveFeedback { .. } + ) + | (SignalSubtype::Escalation, SignalEvidence::Escalation { .. }) + ); + assert!( + ok, + "evidence variant does not match subtype {:?}: {:?}", + event.signal_subtype, event.evidence + ); + } + } + + #[test] + fn event_counts_fold_to_report_aggregates() { + let analyzer = TextBasedSignalAnalyzer::new(); + let messages = sample_frustrated_conversation(); + let (report, events) = analyzer.analyze_with_events(&messages); + + let count = |subtype: SignalSubtype| -> usize { + events + .iter() + .filter(|e| e.signal_subtype == subtype) + .count() + }; + + assert_eq!(count(SignalSubtype::Repair), report.follow_up.repair_count); + assert_eq!( + count(SignalSubtype::Frustration), + report.frustration.frustration_count + ); + assert_eq!( + count(SignalSubtype::Repetition), + report.repetition.repetition_count + ); + assert_eq!( + count(SignalSubtype::PositiveFeedback), + report.positive_feedback.positive_count + ); + assert_eq!( + count(SignalSubtype::Escalation), + report.escalation.escalation_count + ); + } + + #[test] + fn events_are_sorted_by_source_message_idx() { + let analyzer = TextBasedSignalAnalyzer::new(); + let messages = sample_frustrated_conversation(); + let (_report, events) = analyzer.analyze_with_events(&messages); + + let mut prev = 0usize; + for e in &events { + assert!( + e.source_message_idx >= prev, + "events must be sorted by source_message_idx" + ); + prev = e.source_message_idx; + } + } + + #[test] + fn events_all_map_to_interaction_layer_in_phase_1() { + let analyzer = TextBasedSignalAnalyzer::new(); + let messages = sample_frustrated_conversation(); + let (_report, events) = analyzer.analyze_with_events(&messages); + + for event in &events { + assert_eq!( + event.signal_type, + SignalType::Interaction, + "Phase 1 should only emit Interaction-layer events; got {:?}", + event.signal_type + ); + assert_eq!(event.signal_subtype.layer(), event.signal_type); + } + } + + /// Structural snapshot gate: locks down the JSON shape (field names, + /// ordering, nesting) of `SignalReport` against the frustrated fixture. + /// Content values are asserted loosely — the goal is to catch silent + /// schema changes from future refactors, not to pin every count. + #[test] + fn signal_report_json_schema_is_stable() { + let analyzer = TextBasedSignalAnalyzer::new(); + let messages = sample_frustrated_conversation(); + let report = analyzer.analyze(&messages); + let value = serde_json::to_value(&report).expect("serialize report"); + + // Top-level keys must be exactly these, in any order. + let top_level: std::collections::BTreeSet<&str> = value + .as_object() + .unwrap() + .keys() + .map(|s| s.as_str()) + .collect(); + let expected_top: std::collections::BTreeSet<&str> = [ + "turn_count", + "follow_up", + "frustration", + "repetition", + "positive_feedback", + "escalation", + "overall_quality", + "summary", + ] + .into_iter() + .collect(); + assert_eq!( + top_level, expected_top, + "SignalReport top-level schema drifted" + ); + + // Per-signal keys — these are the public contract for aggregators. + let keys = |path: &str| -> std::collections::BTreeSet { + value + .pointer(path) + .and_then(|v| v.as_object()) + .map(|o| o.keys().cloned().collect()) + .unwrap_or_default() + }; + + assert_eq!( + keys("/turn_count"), + [ + "total_turns", + "user_turns", + "assistant_turns", + "is_concerning", + "is_excessive", + "efficiency_score", + ] + .iter() + .map(|s| s.to_string()) + .collect() + ); + assert_eq!( + keys("/follow_up"), + [ + "repair_count", + "repair_ratio", + "is_concerning", + "repair_phrases", + "repair_indicators", + ] + .iter() + .map(|s| s.to_string()) + .collect(), + "FollowUpSignal schema drifted (repair_indicators must be present)" + ); + assert_eq!( + keys("/frustration"), + [ + "frustration_count", + "has_frustration", + "severity", + "indicators" + ] + .iter() + .map(|s| s.to_string()) + .collect() + ); + assert_eq!( + keys("/repetition"), + ["repetition_count", "has_looping", "severity", "repetitions"] + .iter() + .map(|s| s.to_string()) + .collect() + ); + assert_eq!( + keys("/positive_feedback"), + [ + "positive_count", + "has_positive_feedback", + "confidence", + "indicators" + ] + .iter() + .map(|s| s.to_string()) + .collect() + ); + assert_eq!( + keys("/escalation"), + ["escalation_requested", "escalation_count", "requests"] + .iter() + .map(|s| s.to_string()) + .collect() + ); + } + + /// Pins the aggregate numbers for the canonical frustrated fixture so a + /// future refactor catches any value drift in the fold/aggregate path. + #[test] + fn signal_report_aggregates_are_stable_for_frustrated_fixture() { + let analyzer = TextBasedSignalAnalyzer::new(); + let messages = sample_frustrated_conversation(); + let report = analyzer.analyze(&messages); + + assert_eq!(report.turn_count.user_turns, 3); + assert_eq!(report.turn_count.assistant_turns, 2); + assert!(report.frustration.has_frustration); + assert!(report.frustration.frustration_count >= 2); + assert!(report.escalation.escalation_requested); + assert_eq!(report.escalation.escalation_count, 1); + assert!( + matches!( + report.overall_quality, + InteractionQuality::Poor | InteractionQuality::Severe + ), + "frustrated+escalated fixture should be Poor or Severe, got {:?}", + report.overall_quality + ); + } + + #[test] + fn absolute_indices_survive_message_truncation() { + // Use a tight analyzer window so we can exercise the offset path + // without building a 100+ message fixture. + let analyzer = TextBasedSignalAnalyzer::with_full_settings(5, 0.50, 0.60, 2000, 4, 20); + + // First two messages are filler; the last four contain detectable + // frustration and an escalation request. With max_messages=4 the + // analyzer will drop the filler and start at original index 2. + let messages = vec![ + create_message(Role::User, "Hi there"), + create_message(Role::Assistant, "Hello! How can I help?"), + create_message( + Role::User, + "THIS IS NOT WORKING AT ALL!!! I've tried everything!", + ), + create_message(Role::Assistant, "I'm sorry to hear that."), + create_message(Role::User, "Can I speak to a human agent please?"), + create_message(Role::Assistant, "Let me connect you."), + ]; + + let (_report, events) = analyzer.analyze_with_events(&messages); + assert!(!events.is_empty(), "fixture should produce events"); + + for event in &events { + // Every event must point at one of the last 4 messages + // (indices 2..=5), proving the abs_offset was applied. + assert!( + event.source_message_idx >= 2 && event.source_message_idx < messages.len(), + "event idx {} not in expected absolute range [2,{}) for subtype {:?}", + event.source_message_idx, + messages.len(), + event.signal_subtype + ); + // And it must align with the correct role. + let role = &messages[event.source_message_idx].role; + match event.signal_subtype { + SignalSubtype::Repetition => assert_eq!(*role, Role::Assistant), + SignalSubtype::Repair + | SignalSubtype::Frustration + | SignalSubtype::PositiveFeedback + | SignalSubtype::Escalation => assert_eq!(*role, Role::User), + _ => panic!("unexpected subtype in Phase 1"), + } + } + } } diff --git a/crates/brightstaff/src/signals/event.rs b/crates/brightstaff/src/signals/event.rs new file mode 100644 index 00000000..577e89a9 --- /dev/null +++ b/crates/brightstaff/src/signals/event.rs @@ -0,0 +1,423 @@ +//! Signal event atoms — the foundational representation of detections. +//! +//! A `SignalEvent` is a single detected indicator in a conversation (one +//! frustration indicator, one repetition instance, etc.). Aggregate metrics +//! in `SignalReport` are unchanged; events are emitted alongside them so +//! downstream consumers can drill from an aggregate counter to the specific +//! message or tool call that triggered it. +//! +//! Phase 1 populates only the `Interaction` layer variants; the `Execution` +//! and `Environment` variants are declared here so callers can match +//! exhaustively, but they are not emitted until Phase 2. + +use chrono::{DateTime, Utc}; +use opentelemetry::KeyValue; +use serde::{Deserialize, Serialize}; +use ulid::Ulid; + +use super::analyzer::{EscalationType, FrustrationType, PositiveType, RepetitionType}; + +/// OpenTelemetry attribute keys used on `SignalEvent` span events. +/// +/// These are intentionally flat strings rather than constants pulled from +/// `tracing::constants` because they are only meaningful on span events, not +/// on the outer span attributes. Keeping them local to `event.rs` avoids +/// coupling the constants module to event-specific schema. +mod otel_keys { + pub const EVENT_ID: &str = "signal.event_id"; + pub const TYPE: &str = "signal.type"; + pub const SUBTYPE: &str = "signal.subtype"; + pub const SOURCE_MESSAGE_IDX: &str = "signal.source_message_idx"; + pub const EVIDENCE_SNIPPET: &str = "signal.evidence.snippet"; + pub const EVIDENCE_INDICATOR_TYPE: &str = "signal.evidence.indicator_type"; + pub const EVIDENCE_ESCALATION_TYPE: &str = "signal.evidence.escalation_type"; + pub const EVIDENCE_REPETITION_TYPE: &str = "signal.evidence.repetition_type"; + pub const EVIDENCE_SIMILARITY: &str = "signal.evidence.similarity"; + pub const EVIDENCE_OTHER_MESSAGE_IDX: &str = "signal.evidence.other_message_idx"; + pub const EVIDENCE_SIMILAR_TO_PREV_USER_TURN: &str = + "signal.evidence.similar_to_prev_user_turn"; +} + +/// Top-level signal layer from the Signals paper taxonomy. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)] +#[serde(rename_all = "snake_case")] +pub enum SignalType { + Interaction, + Execution, + Environment, +} + +impl SignalType { + pub fn as_str(&self) -> &'static str { + match self { + Self::Interaction => "interaction", + Self::Execution => "execution", + Self::Environment => "environment", + } + } +} + +impl std::fmt::Display for SignalType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.as_str()) + } +} + +/// Specific signal subtype. Each subtype belongs to exactly one `SignalType` +/// layer (see [`SignalSubtype::layer`]). +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)] +#[serde(rename_all = "snake_case")] +pub enum SignalSubtype { + // Interaction layer (Phase 1). + Repair, + Frustration, + Repetition, + PositiveFeedback, + Escalation, + // Execution layer (declared for Phase 2; not emitted yet). + ToolFailure, + ExecutionLoop, + // Environment layer (declared for Phase 2; not emitted yet). + Exhaustion, +} + +impl SignalSubtype { + pub fn as_str(&self) -> &'static str { + match self { + Self::Repair => "repair", + Self::Frustration => "frustration", + Self::Repetition => "repetition", + Self::PositiveFeedback => "positive_feedback", + Self::Escalation => "escalation", + Self::ToolFailure => "tool_failure", + Self::ExecutionLoop => "execution_loop", + Self::Exhaustion => "exhaustion", + } + } + + /// Returns the signal layer this subtype belongs to. + pub fn layer(&self) -> SignalType { + match self { + Self::Repair + | Self::Frustration + | Self::Repetition + | Self::PositiveFeedback + | Self::Escalation => SignalType::Interaction, + Self::ToolFailure | Self::ExecutionLoop => SignalType::Execution, + Self::Exhaustion => SignalType::Environment, + } + } +} + +impl std::fmt::Display for SignalSubtype { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.as_str()) + } +} + +/// Structured evidence payload. Variants mirror [`SignalSubtype`] — a +/// `Frustration` subtype always carries `Frustration` evidence, and so on. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum SignalEvidence { + Repair { + snippet: String, + /// True when the repair was detected by semantic rephrase of the + /// previous user turn (rather than a lexical repair pattern). + similar_to_prev_user_turn: bool, + }, + Frustration { + indicator_type: FrustrationType, + snippet: String, + }, + Repetition { + /// Absolute index of the other message in the detected pair. + other_message_idx: usize, + similarity: f64, + repetition_type: RepetitionType, + }, + PositiveFeedback { + indicator_type: PositiveType, + snippet: String, + }, + Escalation { + escalation_type: EscalationType, + snippet: String, + }, +} + +/// Single detected signal atom. +/// +/// `source_message_idx` is an absolute index into the input `Vec` +/// the analyzer was called with — stable across the analyzer's internal +/// truncation window. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SignalEvent { + pub event_id: Ulid, + pub signal_type: SignalType, + pub signal_subtype: SignalSubtype, + pub source_message_idx: usize, + pub timestamp: DateTime, + pub evidence: SignalEvidence, +} + +impl SignalEvent { + pub fn new( + signal_subtype: SignalSubtype, + source_message_idx: usize, + evidence: SignalEvidence, + ) -> Self { + Self { + event_id: Ulid::new(), + signal_type: signal_subtype.layer(), + signal_subtype, + source_message_idx, + timestamp: Utc::now(), + evidence, + } + } + + /// Canonical span-event name for this signal. + /// + /// Shape: `signal.{type}.{subtype}` (e.g. `signal.interaction.frustration`). + /// External consumers can key off these stable names. + pub fn otel_event_name(&self) -> String { + format!("signal.{}.{}", self.signal_type, self.signal_subtype) + } + + /// Flatten this event into OpenTelemetry key-value attributes suitable + /// for `span.add_event(name, attrs)`. + pub fn to_otel_attributes(&self) -> Vec { + let mut attrs = vec![ + KeyValue::new(otel_keys::EVENT_ID, self.event_id.to_string()), + KeyValue::new(otel_keys::TYPE, self.signal_type.as_str()), + KeyValue::new(otel_keys::SUBTYPE, self.signal_subtype.as_str()), + KeyValue::new( + otel_keys::SOURCE_MESSAGE_IDX, + self.source_message_idx as i64, + ), + ]; + + match &self.evidence { + SignalEvidence::Repair { + snippet, + similar_to_prev_user_turn, + } => { + attrs.push(KeyValue::new(otel_keys::EVIDENCE_SNIPPET, snippet.clone())); + attrs.push(KeyValue::new( + otel_keys::EVIDENCE_SIMILAR_TO_PREV_USER_TURN, + *similar_to_prev_user_turn, + )); + } + SignalEvidence::Frustration { + indicator_type, + snippet, + } => { + attrs.push(KeyValue::new( + otel_keys::EVIDENCE_INDICATOR_TYPE, + format!("{:?}", indicator_type), + )); + attrs.push(KeyValue::new(otel_keys::EVIDENCE_SNIPPET, snippet.clone())); + } + SignalEvidence::Repetition { + other_message_idx, + similarity, + repetition_type, + } => { + attrs.push(KeyValue::new( + otel_keys::EVIDENCE_OTHER_MESSAGE_IDX, + *other_message_idx as i64, + )); + attrs.push(KeyValue::new( + otel_keys::EVIDENCE_SIMILARITY, + format!("{:.3}", similarity), + )); + attrs.push(KeyValue::new( + otel_keys::EVIDENCE_REPETITION_TYPE, + format!("{:?}", repetition_type), + )); + } + SignalEvidence::PositiveFeedback { + indicator_type, + snippet, + } => { + attrs.push(KeyValue::new( + otel_keys::EVIDENCE_INDICATOR_TYPE, + format!("{:?}", indicator_type), + )); + attrs.push(KeyValue::new(otel_keys::EVIDENCE_SNIPPET, snippet.clone())); + } + SignalEvidence::Escalation { + escalation_type, + snippet, + } => { + attrs.push(KeyValue::new( + otel_keys::EVIDENCE_ESCALATION_TYPE, + format!("{:?}", escalation_type), + )); + attrs.push(KeyValue::new(otel_keys::EVIDENCE_SNIPPET, snippet.clone())); + } + } + + attrs + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn subtype_layer_mapping() { + assert_eq!(SignalSubtype::Repair.layer(), SignalType::Interaction); + assert_eq!(SignalSubtype::Frustration.layer(), SignalType::Interaction); + assert_eq!(SignalSubtype::Repetition.layer(), SignalType::Interaction); + assert_eq!( + SignalSubtype::PositiveFeedback.layer(), + SignalType::Interaction + ); + assert_eq!(SignalSubtype::Escalation.layer(), SignalType::Interaction); + assert_eq!(SignalSubtype::ToolFailure.layer(), SignalType::Execution); + assert_eq!(SignalSubtype::ExecutionLoop.layer(), SignalType::Execution); + assert_eq!(SignalSubtype::Exhaustion.layer(), SignalType::Environment); + } + + #[test] + fn new_event_sets_layer_from_subtype() { + let event = SignalEvent::new( + SignalSubtype::Frustration, + 7, + SignalEvidence::Frustration { + indicator_type: FrustrationType::AllCaps, + snippet: "WHY ISN'T THIS WORKING".to_string(), + }, + ); + assert_eq!(event.signal_type, SignalType::Interaction); + assert_eq!(event.signal_subtype, SignalSubtype::Frustration); + assert_eq!(event.source_message_idx, 7); + } + + #[test] + fn otel_event_name_shape() { + let event = SignalEvent::new( + SignalSubtype::Frustration, + 7, + SignalEvidence::Frustration { + indicator_type: FrustrationType::AllCaps, + snippet: "WHY".to_string(), + }, + ); + assert_eq!(event.otel_event_name(), "signal.interaction.frustration"); + + let tool_failure_event = SignalEvent { + event_id: Ulid::new(), + signal_type: SignalType::Execution, + signal_subtype: SignalSubtype::ToolFailure, + source_message_idx: 3, + timestamp: Utc::now(), + // Evidence variant is not yet defined for ToolFailure in Phase 1; + // use Frustration as a stand-in purely to exercise name formatting. + evidence: SignalEvidence::Frustration { + indicator_type: FrustrationType::AllCaps, + snippet: String::new(), + }, + }; + assert_eq!( + tool_failure_event.otel_event_name(), + "signal.execution.tool_failure" + ); + } + + #[test] + fn to_otel_attributes_includes_base_keys() { + let event = SignalEvent::new( + SignalSubtype::Frustration, + 7, + SignalEvidence::Frustration { + indicator_type: FrustrationType::AllCaps, + snippet: "WHY".to_string(), + }, + ); + let attrs = event.to_otel_attributes(); + let keys: std::collections::HashSet = + attrs.iter().map(|kv| kv.key.as_str().to_string()).collect(); + for required in [ + "signal.event_id", + "signal.type", + "signal.subtype", + "signal.source_message_idx", + ] { + assert!( + keys.contains(required), + "missing required attribute {}", + required + ); + } + } + + #[test] + fn to_otel_attributes_includes_evidence_fields_per_variant() { + let repair = SignalEvent::new( + SignalSubtype::Repair, + 2, + SignalEvidence::Repair { + snippet: "that's not what i meant".to_string(), + similar_to_prev_user_turn: false, + }, + ); + let repetition = SignalEvent::new( + SignalSubtype::Repetition, + 4, + SignalEvidence::Repetition { + other_message_idx: 2, + similarity: 0.91, + repetition_type: RepetitionType::Exact, + }, + ); + let escalation = SignalEvent::new( + SignalSubtype::Escalation, + 5, + SignalEvidence::Escalation { + escalation_type: EscalationType::HumanAgent, + snippet: "speak to a human".to_string(), + }, + ); + + let keyset = |e: &SignalEvent| -> std::collections::HashSet { + e.to_otel_attributes() + .iter() + .map(|kv| kv.key.as_str().to_string()) + .collect() + }; + + assert!(keyset(&repair).contains("signal.evidence.snippet")); + assert!(keyset(&repair).contains("signal.evidence.similar_to_prev_user_turn")); + + let rep_keys = keyset(&repetition); + assert!(rep_keys.contains("signal.evidence.other_message_idx")); + assert!(rep_keys.contains("signal.evidence.similarity")); + assert!(rep_keys.contains("signal.evidence.repetition_type")); + + let esc_keys = keyset(&escalation); + assert!(esc_keys.contains("signal.evidence.escalation_type")); + assert!(esc_keys.contains("signal.evidence.snippet")); + } + + #[test] + fn event_serialization_round_trip() { + let event = SignalEvent::new( + SignalSubtype::Repetition, + 12, + SignalEvidence::Repetition { + other_message_idx: 8, + similarity: 0.91, + repetition_type: RepetitionType::Exact, + }, + ); + let serialized = serde_json::to_string(&event).expect("serialize"); + let deserialized: SignalEvent = serde_json::from_str(&serialized).expect("deserialize"); + assert_eq!(deserialized.event_id, event.event_id); + assert_eq!(deserialized.signal_subtype, event.signal_subtype); + assert_eq!(deserialized.source_message_idx, event.source_message_idx); + } +} diff --git a/crates/brightstaff/src/signals/mod.rs b/crates/brightstaff/src/signals/mod.rs index 83db943e..1e3a8291 100644 --- a/crates/brightstaff/src/signals/mod.rs +++ b/crates/brightstaff/src/signals/mod.rs @@ -1,3 +1,5 @@ mod analyzer; +mod event; pub use analyzer::*; +pub use event::{SignalEvent, SignalEvidence, SignalSubtype, SignalType}; diff --git a/crates/brightstaff/src/streaming.rs b/crates/brightstaff/src/streaming.rs index f7af8ae0..2aac7ef2 100644 --- a/crates/brightstaff/src/streaming.rs +++ b/crates/brightstaff/src/streaming.rs @@ -127,7 +127,7 @@ impl StreamProcessor for ObservableStreamProcessor { // Analyze signals if messages are available and record as span attributes if let Some(ref messages) = self.messages { let analyzer: Box = Box::new(TextBasedSignalAnalyzer::new()); - let report = analyzer.analyze(messages); + let (report, events) = analyzer.analyze_with_events(messages); // Get the current OTel span to set signal attributes let span = tracing::Span::current(); @@ -198,6 +198,14 @@ impl StreamProcessor for ObservableStreamProcessor { if should_flag { otel_span.update_name(format!("{} {}", self.operation_name, FLAG_MARKER)); } + + // Emit one OTel span event per detected SignalEvent, providing the + // drill-down path from aggregate attributes to the triggering + // message. Existing aggregate attributes above remain unchanged so + // dashboards keep working. + for event in &events { + otel_span.add_event(event.otel_event_name(), event.to_otel_attributes()); + } } info!(