signals v2 phase 1: introduce SignalEvent stream and emit as OTel span events

Made-with: Cursor
This commit is contained in:
Syed Hashmi 2026-04-17 16:34:26 -07:00
parent 743d074184
commit 66819df153
8 changed files with 1078 additions and 16 deletions

View file

@ -428,6 +428,27 @@ def _anyvalue_to_python(value_obj: Any) -> Any:
return None
def _kv_to_attr_dict(kv: Any) -> dict[str, Any] | None:
"""Convert a protobuf KeyValue into the {"key", "value"} dict shape used
internally. Returns None if the value type is unsupported."""
py_val = _anyvalue_to_python(kv.value)
if py_val is None:
return None
value_dict: dict[str, Any] = {}
if isinstance(py_val, bool):
# bool check must come before int, since bool is a subclass of int.
value_dict["boolValue"] = py_val
elif isinstance(py_val, str):
value_dict["stringValue"] = py_val
elif isinstance(py_val, int):
value_dict["intValue"] = str(py_val)
elif isinstance(py_val, float):
value_dict["doubleValue"] = py_val
else:
return None
return {"key": kv.key, "value": value_dict}
def _proto_span_to_dict(span: Any, service_name: str) -> dict[str, Any]:
"""Convert a protobuf Span message to the dict format used internally."""
span_dict: dict[str, Any] = {
@ -439,20 +460,29 @@ def _proto_span_to_dict(span: Any, service_name: str) -> dict[str, Any]:
"endTimeUnixNano": str(span.end_time_unix_nano),
"service": service_name,
"attributes": [],
"events": [],
}
for kv in span.attributes:
py_val = _anyvalue_to_python(kv.value)
if py_val is not None:
value_dict: dict[str, Any] = {}
if isinstance(py_val, str):
value_dict["stringValue"] = py_val
elif isinstance(py_val, bool):
value_dict["boolValue"] = py_val
elif isinstance(py_val, int):
value_dict["intValue"] = str(py_val)
elif isinstance(py_val, float):
value_dict["doubleValue"] = py_val
span_dict["attributes"].append({"key": kv.key, "value": value_dict})
attr = _kv_to_attr_dict(kv)
if attr is not None:
span_dict["attributes"].append(attr)
# Preserve span events (name, timestamp, attributes). OTel span events are
# the drill-down channel for granular signals that aggregate span
# attributes summarize. Dropping them here would make every
# per-detection `SignalEvent` emitted by brightstaff invisible to
# `planoai trace`.
for event in span.events:
event_dict: dict[str, Any] = {
"name": event.name,
"timeUnixNano": str(event.time_unix_nano),
"attributes": [],
}
for kv in event.attributes:
attr = _kv_to_attr_dict(kv)
if attr is not None:
event_dict["attributes"].append(attr)
span_dict["events"].append(event_dict)
return span_dict

View file

@ -75,6 +75,89 @@ class _FakeGrpcServer:
return None
def test_proto_span_to_dict_preserves_span_events():
"""Span events are the drill-down channel for granular signals. The OTLP
store must preserve them so `planoai trace` can surface per-detection
SignalEvent payloads alongside aggregate signals.* attributes."""
from opentelemetry.proto.common.v1 import common_pb2
from opentelemetry.proto.trace.v1 import trace_pb2
span = trace_pb2.Span(
trace_id=bytes.fromhex("0123456789abcdef0123456789abcdef"),
span_id=bytes.fromhex("1111111111111111"),
parent_span_id=b"",
name="POST /v1/chat/completions gpt-4",
start_time_unix_nano=1_700_000_000_000_000_000,
end_time_unix_nano=1_700_000_000_500_000_000,
attributes=[
common_pb2.KeyValue(
key="signals.quality",
value=common_pb2.AnyValue(string_value="Neutral"),
),
],
events=[
trace_pb2.Span.Event(
time_unix_nano=1_700_000_000_100_000_000,
name="signal.interaction.frustration",
attributes=[
common_pb2.KeyValue(
key="signal.event_id",
value=common_pb2.AnyValue(
string_value="01HF4ZABCDEF0123456789ABCD"
),
),
common_pb2.KeyValue(
key="signal.source_message_idx",
value=common_pb2.AnyValue(int_value=3),
),
common_pb2.KeyValue(
key="signal.evidence.snippet",
value=common_pb2.AnyValue(string_value="WHY"),
),
],
),
],
)
span_dict = trace_cmd._proto_span_to_dict(span, "plano(llm)")
assert span_dict["name"] == "POST /v1/chat/completions gpt-4"
assert span_dict["attributes"] == [
{"key": "signals.quality", "value": {"stringValue": "Neutral"}}
]
assert "events" in span_dict, "span events must be preserved"
assert len(span_dict["events"]) == 1
event = span_dict["events"][0]
assert event["name"] == "signal.interaction.frustration"
assert event["timeUnixNano"] == "1700000000100000000"
event_attrs = {a["key"]: a["value"] for a in event["attributes"]}
assert event_attrs["signal.event_id"] == {
"stringValue": "01HF4ZABCDEF0123456789ABCD"
}
assert event_attrs["signal.source_message_idx"] == {"intValue": "3"}
assert event_attrs["signal.evidence.snippet"] == {"stringValue": "WHY"}
def test_proto_span_to_dict_no_events_yields_empty_list():
"""Spans without events should still produce an `events: []` key so
downstream code can treat it as always present."""
from opentelemetry.proto.trace.v1 import trace_pb2
span = trace_pb2.Span(
trace_id=bytes.fromhex("0123456789abcdef0123456789abcdef"),
span_id=bytes.fromhex("2222222222222222"),
parent_span_id=b"",
name="POST /v1/chat/completions",
start_time_unix_nano=1_700_000_000_000_000_000,
end_time_unix_nano=1_700_000_000_100_000_000,
)
span_dict = trace_cmd._proto_span_to_dict(span, "plano(llm)")
assert span_dict["events"] == []
def test_start_trace_server_raises_bind_error(monkeypatch):
monkeypatch.setattr(
trace_cmd.grpc, "server", lambda *_args, **_kwargs: _FakeGrpcServer()

12
crates/Cargo.lock generated
View file

@ -339,6 +339,7 @@ dependencies = [
"tracing",
"tracing-opentelemetry",
"tracing-subscriber",
"ulid",
"uuid",
]
@ -3659,6 +3660,17 @@ version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb"
[[package]]
name = "ulid"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "470dbf6591da1b39d43c14523b2b469c86879a53e8b758c8e090a470fe7b1fbe"
dependencies = [
"rand 0.9.4",
"serde",
"web-time",
]
[[package]]
name = "unicase"
version = "2.9.0"

View file

@ -7,7 +7,7 @@ edition = "2021"
async-openai = "0.30.1"
async-trait = "0.1"
bytes = "1.10.1"
chrono = "0.4"
chrono = { version = "0.4", features = ["serde"] }
common = { version = "0.1.0", path = "../common" }
eventsource-client = "0.15.0"
eventsource-stream = "0.2.3"
@ -42,6 +42,7 @@ time = { version = "0.3", features = ["formatting", "macros"] }
tracing = "0.1"
tracing-opentelemetry = "0.32.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
ulid = { version = "1.1", features = ["serde"] }
uuid = { version = "1.0", features = ["v4", "serde"] }
[dev-dependencies]

View file

@ -940,8 +940,26 @@ pub struct FollowUpSignal {
pub repair_ratio: f64,
/// Whether repair ratio is concerning (> 0.3)
pub is_concerning: bool,
/// List of detected repair phrases
/// List of detected repair phrases (human-readable)
pub repair_phrases: Vec<String>,
/// Structured per-message repair indicators, one per detection
#[serde(default)]
pub repair_indicators: Vec<FollowUpIndicator>,
}
/// Individual repair / follow-up indicator
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FollowUpIndicator {
/// Message index where the repair was detected (relative to the analyzer's
/// normalized message slice; absolute indices are attached when projecting
/// to `SignalEvent`).
pub message_index: usize,
/// Relevant text snippet — either the matched lexical pattern or a marker
/// string for the semantic-rephrase path.
pub snippet: String,
/// True when the repair was detected by semantic rephrase of the previous
/// user turn rather than a lexical repair pattern.
pub similar_to_prev_user_turn: bool,
}
/// User frustration indicators
@ -1102,6 +1120,13 @@ pub enum EscalationType {
pub trait SignalAnalyzer {
/// Analyze a conversation and generate a complete signal report
fn analyze(&self, messages: &[Message]) -> SignalReport;
/// Analyze a conversation and return both the aggregate report and the
/// underlying event stream. The default implementation returns an empty
/// event list; implementors that support events should override.
fn analyze_with_events(&self, messages: &[Message]) -> (SignalReport, Vec<super::SignalEvent>) {
(self.analyze(messages), Vec::new())
}
}
/// Text-based implementation of signal analyzer that computes all signals from a message array
@ -1248,6 +1273,7 @@ impl TextBasedSignalAnalyzer {
) -> FollowUpSignal {
let mut repair_count = 0;
let mut repair_phrases = Vec::new();
let mut repair_indicators = Vec::new();
let mut user_turn_count = 0;
for (i, role, norm_msg) in normalized_messages {
@ -1269,6 +1295,11 @@ impl TextBasedSignalAnalyzer {
) {
repair_count += 1;
repair_phrases.push(format!("Turn {}: '{}'", i + 1, pattern.raw));
repair_indicators.push(FollowUpIndicator {
message_index: *i,
snippet: pattern.raw.clone(),
similar_to_prev_user_turn: false,
});
found_in_turn = true;
break;
}
@ -1284,6 +1315,11 @@ impl TextBasedSignalAnalyzer {
repair_count += 1;
repair_phrases
.push(format!("Turn {}: Similar rephrase detected", i + 1));
repair_indicators.push(FollowUpIndicator {
message_index: *i,
snippet: "Similar rephrase detected".to_string(),
similar_to_prev_user_turn: true,
});
}
break;
}
@ -1304,6 +1340,7 @@ impl TextBasedSignalAnalyzer {
repair_ratio,
is_concerning,
repair_phrases,
repair_indicators,
}
}
@ -1865,6 +1902,10 @@ impl TextBasedSignalAnalyzer {
impl SignalAnalyzer for TextBasedSignalAnalyzer {
fn analyze(&self, messages: &[Message]) -> SignalReport {
self.analyze_with_events(messages).0
}
fn analyze_with_events(&self, messages: &[Message]) -> (SignalReport, Vec<super::SignalEvent>) {
// Limit the number of messages to process (take most recent messages)
let messages_to_process = if messages.len() > self.max_messages {
&messages[messages.len() - self.max_messages..]
@ -1914,7 +1955,7 @@ impl SignalAnalyzer for TextBasedSignalAnalyzer {
&overall_quality,
);
SignalReport {
let report = SignalReport {
turn_count,
follow_up,
frustration,
@ -1923,7 +1964,88 @@ impl SignalAnalyzer for TextBasedSignalAnalyzer {
escalation,
overall_quality,
summary,
};
// Detector indicator indices are relative to the (possibly truncated)
// slice. Project them back to absolute indices in the original input
// so event consumers can reference the original conversation.
let abs_offset = messages.len().saturating_sub(self.max_messages);
let events = Self::project_events(&report, abs_offset);
(report, events)
}
}
impl TextBasedSignalAnalyzer {
/// Project the indicator collections inside a `SignalReport` into a
/// flat event stream. One event per detected indicator.
fn project_events(report: &SignalReport, abs_offset: usize) -> Vec<super::SignalEvent> {
use super::{SignalEvent, SignalEvidence, SignalSubtype};
let mut events: Vec<SignalEvent> = Vec::new();
for indicator in &report.follow_up.repair_indicators {
events.push(SignalEvent::new(
SignalSubtype::Repair,
indicator.message_index + abs_offset,
SignalEvidence::Repair {
snippet: indicator.snippet.clone(),
similar_to_prev_user_turn: indicator.similar_to_prev_user_turn,
},
));
}
for ind in &report.frustration.indicators {
events.push(SignalEvent::new(
SignalSubtype::Frustration,
ind.message_index + abs_offset,
SignalEvidence::Frustration {
indicator_type: ind.indicator_type.clone(),
snippet: ind.snippet.clone(),
},
));
}
for rep in &report.repetition.repetitions {
// `message_indices` always has exactly two entries: the earlier
// and later assistant messages in the detected pair.
if let [first, second] = rep.message_indices.as_slice() {
events.push(SignalEvent::new(
SignalSubtype::Repetition,
first + abs_offset,
SignalEvidence::Repetition {
other_message_idx: second + abs_offset,
similarity: rep.similarity,
repetition_type: rep.repetition_type.clone(),
},
));
}
}
for ind in &report.positive_feedback.indicators {
events.push(SignalEvent::new(
SignalSubtype::PositiveFeedback,
ind.message_index + abs_offset,
SignalEvidence::PositiveFeedback {
indicator_type: ind.indicator_type.clone(),
snippet: ind.snippet.clone(),
},
));
}
for req in &report.escalation.requests {
events.push(SignalEvent::new(
SignalSubtype::Escalation,
req.message_index + abs_offset,
SignalEvidence::Escalation {
escalation_type: req.escalation_type.clone(),
snippet: req.snippet.clone(),
},
));
}
events.sort_by_key(|e| e.source_message_idx);
events
}
}
@ -3187,4 +3309,385 @@ mod tests {
// Validate overall quality
assert_eq!(report.overall_quality, InteractionQuality::Severe, "Should be classified as Severe due to escalation + excessive frustration + looping + high repair ratio");
}
// ========================================================================
// Event-level tests (Phase 1 — SignalEvent stream)
// ========================================================================
use crate::signals::{SignalEvidence, SignalSubtype, SignalType};
use std::collections::HashSet;
fn sample_frustrated_conversation() -> Vec<Message> {
vec![
create_message(Role::User, "I can't log into my account"),
create_message(
Role::Assistant,
"Have you tried resetting your password from the login page?",
),
create_message(
Role::User,
"THAT DOESN'T WORK I ALREADY TRIED THAT!!! This is ridiculous!",
),
create_message(
Role::Assistant,
"I apologize for the frustration. Let me escalate this.",
),
create_message(Role::User, "Can I speak to a human agent please?"),
]
}
#[test]
fn empty_conversation_yields_no_events() {
let analyzer = TextBasedSignalAnalyzer::new();
let (_report, events) = analyzer.analyze_with_events(&[]);
assert!(events.is_empty());
}
#[test]
fn events_have_unique_ids() {
let analyzer = TextBasedSignalAnalyzer::new();
let messages = sample_frustrated_conversation();
let (_report, events) = analyzer.analyze_with_events(&messages);
assert!(!events.is_empty(), "sample should produce events");
let ids: HashSet<_> = events.iter().map(|e| e.event_id).collect();
assert_eq!(
ids.len(),
events.len(),
"every SignalEvent must have a unique event_id"
);
}
#[test]
fn event_source_idx_within_message_bounds() {
let analyzer = TextBasedSignalAnalyzer::new();
let messages = sample_frustrated_conversation();
let (_report, events) = analyzer.analyze_with_events(&messages);
for event in &events {
assert!(
event.source_message_idx < messages.len(),
"event source_message_idx {} out of bounds for conversation of len {} (subtype={:?})",
event.source_message_idx,
messages.len(),
event.signal_subtype,
);
}
}
#[test]
fn event_role_matches_subtype() {
let analyzer = TextBasedSignalAnalyzer::new();
let messages = sample_frustrated_conversation();
let (_report, events) = analyzer.analyze_with_events(&messages);
for event in &events {
let role = &messages[event.source_message_idx].role;
match event.signal_subtype {
SignalSubtype::Repair
| SignalSubtype::Frustration
| SignalSubtype::PositiveFeedback
| SignalSubtype::Escalation => {
assert_eq!(
*role,
Role::User,
"subtype {:?} must reference a User message",
event.signal_subtype
);
}
SignalSubtype::Repetition => {
assert_eq!(
*role,
Role::Assistant,
"Repetition must reference an Assistant message"
);
}
SignalSubtype::ToolFailure
| SignalSubtype::ExecutionLoop
| SignalSubtype::Exhaustion => {
panic!(
"Phase 2 subtype {:?} should not be emitted in Phase 1",
event.signal_subtype
);
}
}
}
}
#[test]
fn event_evidence_variant_matches_subtype() {
let analyzer = TextBasedSignalAnalyzer::new();
let messages = sample_frustrated_conversation();
let (_report, events) = analyzer.analyze_with_events(&messages);
for event in &events {
let ok = matches!(
(&event.signal_subtype, &event.evidence),
(SignalSubtype::Repair, SignalEvidence::Repair { .. })
| (
SignalSubtype::Frustration,
SignalEvidence::Frustration { .. }
)
| (SignalSubtype::Repetition, SignalEvidence::Repetition { .. })
| (
SignalSubtype::PositiveFeedback,
SignalEvidence::PositiveFeedback { .. }
)
| (SignalSubtype::Escalation, SignalEvidence::Escalation { .. })
);
assert!(
ok,
"evidence variant does not match subtype {:?}: {:?}",
event.signal_subtype, event.evidence
);
}
}
#[test]
fn event_counts_fold_to_report_aggregates() {
let analyzer = TextBasedSignalAnalyzer::new();
let messages = sample_frustrated_conversation();
let (report, events) = analyzer.analyze_with_events(&messages);
let count = |subtype: SignalSubtype| -> usize {
events
.iter()
.filter(|e| e.signal_subtype == subtype)
.count()
};
assert_eq!(count(SignalSubtype::Repair), report.follow_up.repair_count);
assert_eq!(
count(SignalSubtype::Frustration),
report.frustration.frustration_count
);
assert_eq!(
count(SignalSubtype::Repetition),
report.repetition.repetition_count
);
assert_eq!(
count(SignalSubtype::PositiveFeedback),
report.positive_feedback.positive_count
);
assert_eq!(
count(SignalSubtype::Escalation),
report.escalation.escalation_count
);
}
#[test]
fn events_are_sorted_by_source_message_idx() {
let analyzer = TextBasedSignalAnalyzer::new();
let messages = sample_frustrated_conversation();
let (_report, events) = analyzer.analyze_with_events(&messages);
let mut prev = 0usize;
for e in &events {
assert!(
e.source_message_idx >= prev,
"events must be sorted by source_message_idx"
);
prev = e.source_message_idx;
}
}
#[test]
fn events_all_map_to_interaction_layer_in_phase_1() {
let analyzer = TextBasedSignalAnalyzer::new();
let messages = sample_frustrated_conversation();
let (_report, events) = analyzer.analyze_with_events(&messages);
for event in &events {
assert_eq!(
event.signal_type,
SignalType::Interaction,
"Phase 1 should only emit Interaction-layer events; got {:?}",
event.signal_type
);
assert_eq!(event.signal_subtype.layer(), event.signal_type);
}
}
/// Structural snapshot gate: locks down the JSON shape (field names,
/// ordering, nesting) of `SignalReport` against the frustrated fixture.
/// Content values are asserted loosely — the goal is to catch silent
/// schema changes from future refactors, not to pin every count.
#[test]
fn signal_report_json_schema_is_stable() {
let analyzer = TextBasedSignalAnalyzer::new();
let messages = sample_frustrated_conversation();
let report = analyzer.analyze(&messages);
let value = serde_json::to_value(&report).expect("serialize report");
// Top-level keys must be exactly these, in any order.
let top_level: std::collections::BTreeSet<&str> = value
.as_object()
.unwrap()
.keys()
.map(|s| s.as_str())
.collect();
let expected_top: std::collections::BTreeSet<&str> = [
"turn_count",
"follow_up",
"frustration",
"repetition",
"positive_feedback",
"escalation",
"overall_quality",
"summary",
]
.into_iter()
.collect();
assert_eq!(
top_level, expected_top,
"SignalReport top-level schema drifted"
);
// Per-signal keys — these are the public contract for aggregators.
let keys = |path: &str| -> std::collections::BTreeSet<String> {
value
.pointer(path)
.and_then(|v| v.as_object())
.map(|o| o.keys().cloned().collect())
.unwrap_or_default()
};
assert_eq!(
keys("/turn_count"),
[
"total_turns",
"user_turns",
"assistant_turns",
"is_concerning",
"is_excessive",
"efficiency_score",
]
.iter()
.map(|s| s.to_string())
.collect()
);
assert_eq!(
keys("/follow_up"),
[
"repair_count",
"repair_ratio",
"is_concerning",
"repair_phrases",
"repair_indicators",
]
.iter()
.map(|s| s.to_string())
.collect(),
"FollowUpSignal schema drifted (repair_indicators must be present)"
);
assert_eq!(
keys("/frustration"),
[
"frustration_count",
"has_frustration",
"severity",
"indicators"
]
.iter()
.map(|s| s.to_string())
.collect()
);
assert_eq!(
keys("/repetition"),
["repetition_count", "has_looping", "severity", "repetitions"]
.iter()
.map(|s| s.to_string())
.collect()
);
assert_eq!(
keys("/positive_feedback"),
[
"positive_count",
"has_positive_feedback",
"confidence",
"indicators"
]
.iter()
.map(|s| s.to_string())
.collect()
);
assert_eq!(
keys("/escalation"),
["escalation_requested", "escalation_count", "requests"]
.iter()
.map(|s| s.to_string())
.collect()
);
}
/// Pins the aggregate numbers for the canonical frustrated fixture so a
/// future refactor catches any value drift in the fold/aggregate path.
#[test]
fn signal_report_aggregates_are_stable_for_frustrated_fixture() {
let analyzer = TextBasedSignalAnalyzer::new();
let messages = sample_frustrated_conversation();
let report = analyzer.analyze(&messages);
assert_eq!(report.turn_count.user_turns, 3);
assert_eq!(report.turn_count.assistant_turns, 2);
assert!(report.frustration.has_frustration);
assert!(report.frustration.frustration_count >= 2);
assert!(report.escalation.escalation_requested);
assert_eq!(report.escalation.escalation_count, 1);
assert!(
matches!(
report.overall_quality,
InteractionQuality::Poor | InteractionQuality::Severe
),
"frustrated+escalated fixture should be Poor or Severe, got {:?}",
report.overall_quality
);
}
#[test]
fn absolute_indices_survive_message_truncation() {
// Use a tight analyzer window so we can exercise the offset path
// without building a 100+ message fixture.
let analyzer = TextBasedSignalAnalyzer::with_full_settings(5, 0.50, 0.60, 2000, 4, 20);
// First two messages are filler; the last four contain detectable
// frustration and an escalation request. With max_messages=4 the
// analyzer will drop the filler and start at original index 2.
let messages = vec![
create_message(Role::User, "Hi there"),
create_message(Role::Assistant, "Hello! How can I help?"),
create_message(
Role::User,
"THIS IS NOT WORKING AT ALL!!! I've tried everything!",
),
create_message(Role::Assistant, "I'm sorry to hear that."),
create_message(Role::User, "Can I speak to a human agent please?"),
create_message(Role::Assistant, "Let me connect you."),
];
let (_report, events) = analyzer.analyze_with_events(&messages);
assert!(!events.is_empty(), "fixture should produce events");
for event in &events {
// Every event must point at one of the last 4 messages
// (indices 2..=5), proving the abs_offset was applied.
assert!(
event.source_message_idx >= 2 && event.source_message_idx < messages.len(),
"event idx {} not in expected absolute range [2,{}) for subtype {:?}",
event.source_message_idx,
messages.len(),
event.signal_subtype
);
// And it must align with the correct role.
let role = &messages[event.source_message_idx].role;
match event.signal_subtype {
SignalSubtype::Repetition => assert_eq!(*role, Role::Assistant),
SignalSubtype::Repair
| SignalSubtype::Frustration
| SignalSubtype::PositiveFeedback
| SignalSubtype::Escalation => assert_eq!(*role, Role::User),
_ => panic!("unexpected subtype in Phase 1"),
}
}
}
}

View file

@ -0,0 +1,423 @@
//! Signal event atoms — the foundational representation of detections.
//!
//! A `SignalEvent` is a single detected indicator in a conversation (one
//! frustration indicator, one repetition instance, etc.). Aggregate metrics
//! in `SignalReport` are unchanged; events are emitted alongside them so
//! downstream consumers can drill from an aggregate counter to the specific
//! message or tool call that triggered it.
//!
//! Phase 1 populates only the `Interaction` layer variants; the `Execution`
//! and `Environment` variants are declared here so callers can match
//! exhaustively, but they are not emitted until Phase 2.
use chrono::{DateTime, Utc};
use opentelemetry::KeyValue;
use serde::{Deserialize, Serialize};
use ulid::Ulid;
use super::analyzer::{EscalationType, FrustrationType, PositiveType, RepetitionType};
/// OpenTelemetry attribute keys used on `SignalEvent` span events.
///
/// These are intentionally flat strings rather than constants pulled from
/// `tracing::constants` because they are only meaningful on span events, not
/// on the outer span attributes. Keeping them local to `event.rs` avoids
/// coupling the constants module to event-specific schema.
mod otel_keys {
pub const EVENT_ID: &str = "signal.event_id";
pub const TYPE: &str = "signal.type";
pub const SUBTYPE: &str = "signal.subtype";
pub const SOURCE_MESSAGE_IDX: &str = "signal.source_message_idx";
pub const EVIDENCE_SNIPPET: &str = "signal.evidence.snippet";
pub const EVIDENCE_INDICATOR_TYPE: &str = "signal.evidence.indicator_type";
pub const EVIDENCE_ESCALATION_TYPE: &str = "signal.evidence.escalation_type";
pub const EVIDENCE_REPETITION_TYPE: &str = "signal.evidence.repetition_type";
pub const EVIDENCE_SIMILARITY: &str = "signal.evidence.similarity";
pub const EVIDENCE_OTHER_MESSAGE_IDX: &str = "signal.evidence.other_message_idx";
pub const EVIDENCE_SIMILAR_TO_PREV_USER_TURN: &str =
"signal.evidence.similar_to_prev_user_turn";
}
/// Top-level signal layer from the Signals paper taxonomy.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[serde(rename_all = "snake_case")]
pub enum SignalType {
Interaction,
Execution,
Environment,
}
impl SignalType {
pub fn as_str(&self) -> &'static str {
match self {
Self::Interaction => "interaction",
Self::Execution => "execution",
Self::Environment => "environment",
}
}
}
impl std::fmt::Display for SignalType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
/// Specific signal subtype. Each subtype belongs to exactly one `SignalType`
/// layer (see [`SignalSubtype::layer`]).
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[serde(rename_all = "snake_case")]
pub enum SignalSubtype {
// Interaction layer (Phase 1).
Repair,
Frustration,
Repetition,
PositiveFeedback,
Escalation,
// Execution layer (declared for Phase 2; not emitted yet).
ToolFailure,
ExecutionLoop,
// Environment layer (declared for Phase 2; not emitted yet).
Exhaustion,
}
impl SignalSubtype {
pub fn as_str(&self) -> &'static str {
match self {
Self::Repair => "repair",
Self::Frustration => "frustration",
Self::Repetition => "repetition",
Self::PositiveFeedback => "positive_feedback",
Self::Escalation => "escalation",
Self::ToolFailure => "tool_failure",
Self::ExecutionLoop => "execution_loop",
Self::Exhaustion => "exhaustion",
}
}
/// Returns the signal layer this subtype belongs to.
pub fn layer(&self) -> SignalType {
match self {
Self::Repair
| Self::Frustration
| Self::Repetition
| Self::PositiveFeedback
| Self::Escalation => SignalType::Interaction,
Self::ToolFailure | Self::ExecutionLoop => SignalType::Execution,
Self::Exhaustion => SignalType::Environment,
}
}
}
impl std::fmt::Display for SignalSubtype {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
/// Structured evidence payload. Variants mirror [`SignalSubtype`] — a
/// `Frustration` subtype always carries `Frustration` evidence, and so on.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum SignalEvidence {
Repair {
snippet: String,
/// True when the repair was detected by semantic rephrase of the
/// previous user turn (rather than a lexical repair pattern).
similar_to_prev_user_turn: bool,
},
Frustration {
indicator_type: FrustrationType,
snippet: String,
},
Repetition {
/// Absolute index of the other message in the detected pair.
other_message_idx: usize,
similarity: f64,
repetition_type: RepetitionType,
},
PositiveFeedback {
indicator_type: PositiveType,
snippet: String,
},
Escalation {
escalation_type: EscalationType,
snippet: String,
},
}
/// Single detected signal atom.
///
/// `source_message_idx` is an absolute index into the input `Vec<Message>`
/// the analyzer was called with — stable across the analyzer's internal
/// truncation window.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SignalEvent {
pub event_id: Ulid,
pub signal_type: SignalType,
pub signal_subtype: SignalSubtype,
pub source_message_idx: usize,
pub timestamp: DateTime<Utc>,
pub evidence: SignalEvidence,
}
impl SignalEvent {
pub fn new(
signal_subtype: SignalSubtype,
source_message_idx: usize,
evidence: SignalEvidence,
) -> Self {
Self {
event_id: Ulid::new(),
signal_type: signal_subtype.layer(),
signal_subtype,
source_message_idx,
timestamp: Utc::now(),
evidence,
}
}
/// Canonical span-event name for this signal.
///
/// Shape: `signal.{type}.{subtype}` (e.g. `signal.interaction.frustration`).
/// External consumers can key off these stable names.
pub fn otel_event_name(&self) -> String {
format!("signal.{}.{}", self.signal_type, self.signal_subtype)
}
/// Flatten this event into OpenTelemetry key-value attributes suitable
/// for `span.add_event(name, attrs)`.
pub fn to_otel_attributes(&self) -> Vec<KeyValue> {
let mut attrs = vec![
KeyValue::new(otel_keys::EVENT_ID, self.event_id.to_string()),
KeyValue::new(otel_keys::TYPE, self.signal_type.as_str()),
KeyValue::new(otel_keys::SUBTYPE, self.signal_subtype.as_str()),
KeyValue::new(
otel_keys::SOURCE_MESSAGE_IDX,
self.source_message_idx as i64,
),
];
match &self.evidence {
SignalEvidence::Repair {
snippet,
similar_to_prev_user_turn,
} => {
attrs.push(KeyValue::new(otel_keys::EVIDENCE_SNIPPET, snippet.clone()));
attrs.push(KeyValue::new(
otel_keys::EVIDENCE_SIMILAR_TO_PREV_USER_TURN,
*similar_to_prev_user_turn,
));
}
SignalEvidence::Frustration {
indicator_type,
snippet,
} => {
attrs.push(KeyValue::new(
otel_keys::EVIDENCE_INDICATOR_TYPE,
format!("{:?}", indicator_type),
));
attrs.push(KeyValue::new(otel_keys::EVIDENCE_SNIPPET, snippet.clone()));
}
SignalEvidence::Repetition {
other_message_idx,
similarity,
repetition_type,
} => {
attrs.push(KeyValue::new(
otel_keys::EVIDENCE_OTHER_MESSAGE_IDX,
*other_message_idx as i64,
));
attrs.push(KeyValue::new(
otel_keys::EVIDENCE_SIMILARITY,
format!("{:.3}", similarity),
));
attrs.push(KeyValue::new(
otel_keys::EVIDENCE_REPETITION_TYPE,
format!("{:?}", repetition_type),
));
}
SignalEvidence::PositiveFeedback {
indicator_type,
snippet,
} => {
attrs.push(KeyValue::new(
otel_keys::EVIDENCE_INDICATOR_TYPE,
format!("{:?}", indicator_type),
));
attrs.push(KeyValue::new(otel_keys::EVIDENCE_SNIPPET, snippet.clone()));
}
SignalEvidence::Escalation {
escalation_type,
snippet,
} => {
attrs.push(KeyValue::new(
otel_keys::EVIDENCE_ESCALATION_TYPE,
format!("{:?}", escalation_type),
));
attrs.push(KeyValue::new(otel_keys::EVIDENCE_SNIPPET, snippet.clone()));
}
}
attrs
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn subtype_layer_mapping() {
assert_eq!(SignalSubtype::Repair.layer(), SignalType::Interaction);
assert_eq!(SignalSubtype::Frustration.layer(), SignalType::Interaction);
assert_eq!(SignalSubtype::Repetition.layer(), SignalType::Interaction);
assert_eq!(
SignalSubtype::PositiveFeedback.layer(),
SignalType::Interaction
);
assert_eq!(SignalSubtype::Escalation.layer(), SignalType::Interaction);
assert_eq!(SignalSubtype::ToolFailure.layer(), SignalType::Execution);
assert_eq!(SignalSubtype::ExecutionLoop.layer(), SignalType::Execution);
assert_eq!(SignalSubtype::Exhaustion.layer(), SignalType::Environment);
}
#[test]
fn new_event_sets_layer_from_subtype() {
let event = SignalEvent::new(
SignalSubtype::Frustration,
7,
SignalEvidence::Frustration {
indicator_type: FrustrationType::AllCaps,
snippet: "WHY ISN'T THIS WORKING".to_string(),
},
);
assert_eq!(event.signal_type, SignalType::Interaction);
assert_eq!(event.signal_subtype, SignalSubtype::Frustration);
assert_eq!(event.source_message_idx, 7);
}
#[test]
fn otel_event_name_shape() {
let event = SignalEvent::new(
SignalSubtype::Frustration,
7,
SignalEvidence::Frustration {
indicator_type: FrustrationType::AllCaps,
snippet: "WHY".to_string(),
},
);
assert_eq!(event.otel_event_name(), "signal.interaction.frustration");
let tool_failure_event = SignalEvent {
event_id: Ulid::new(),
signal_type: SignalType::Execution,
signal_subtype: SignalSubtype::ToolFailure,
source_message_idx: 3,
timestamp: Utc::now(),
// Evidence variant is not yet defined for ToolFailure in Phase 1;
// use Frustration as a stand-in purely to exercise name formatting.
evidence: SignalEvidence::Frustration {
indicator_type: FrustrationType::AllCaps,
snippet: String::new(),
},
};
assert_eq!(
tool_failure_event.otel_event_name(),
"signal.execution.tool_failure"
);
}
#[test]
fn to_otel_attributes_includes_base_keys() {
let event = SignalEvent::new(
SignalSubtype::Frustration,
7,
SignalEvidence::Frustration {
indicator_type: FrustrationType::AllCaps,
snippet: "WHY".to_string(),
},
);
let attrs = event.to_otel_attributes();
let keys: std::collections::HashSet<String> =
attrs.iter().map(|kv| kv.key.as_str().to_string()).collect();
for required in [
"signal.event_id",
"signal.type",
"signal.subtype",
"signal.source_message_idx",
] {
assert!(
keys.contains(required),
"missing required attribute {}",
required
);
}
}
#[test]
fn to_otel_attributes_includes_evidence_fields_per_variant() {
let repair = SignalEvent::new(
SignalSubtype::Repair,
2,
SignalEvidence::Repair {
snippet: "that's not what i meant".to_string(),
similar_to_prev_user_turn: false,
},
);
let repetition = SignalEvent::new(
SignalSubtype::Repetition,
4,
SignalEvidence::Repetition {
other_message_idx: 2,
similarity: 0.91,
repetition_type: RepetitionType::Exact,
},
);
let escalation = SignalEvent::new(
SignalSubtype::Escalation,
5,
SignalEvidence::Escalation {
escalation_type: EscalationType::HumanAgent,
snippet: "speak to a human".to_string(),
},
);
let keyset = |e: &SignalEvent| -> std::collections::HashSet<String> {
e.to_otel_attributes()
.iter()
.map(|kv| kv.key.as_str().to_string())
.collect()
};
assert!(keyset(&repair).contains("signal.evidence.snippet"));
assert!(keyset(&repair).contains("signal.evidence.similar_to_prev_user_turn"));
let rep_keys = keyset(&repetition);
assert!(rep_keys.contains("signal.evidence.other_message_idx"));
assert!(rep_keys.contains("signal.evidence.similarity"));
assert!(rep_keys.contains("signal.evidence.repetition_type"));
let esc_keys = keyset(&escalation);
assert!(esc_keys.contains("signal.evidence.escalation_type"));
assert!(esc_keys.contains("signal.evidence.snippet"));
}
#[test]
fn event_serialization_round_trip() {
let event = SignalEvent::new(
SignalSubtype::Repetition,
12,
SignalEvidence::Repetition {
other_message_idx: 8,
similarity: 0.91,
repetition_type: RepetitionType::Exact,
},
);
let serialized = serde_json::to_string(&event).expect("serialize");
let deserialized: SignalEvent = serde_json::from_str(&serialized).expect("deserialize");
assert_eq!(deserialized.event_id, event.event_id);
assert_eq!(deserialized.signal_subtype, event.signal_subtype);
assert_eq!(deserialized.source_message_idx, event.source_message_idx);
}
}

View file

@ -1,3 +1,5 @@
mod analyzer;
mod event;
pub use analyzer::*;
pub use event::{SignalEvent, SignalEvidence, SignalSubtype, SignalType};

View file

@ -127,7 +127,7 @@ impl StreamProcessor for ObservableStreamProcessor {
// Analyze signals if messages are available and record as span attributes
if let Some(ref messages) = self.messages {
let analyzer: Box<dyn SignalAnalyzer> = Box::new(TextBasedSignalAnalyzer::new());
let report = analyzer.analyze(messages);
let (report, events) = analyzer.analyze_with_events(messages);
// Get the current OTel span to set signal attributes
let span = tracing::Span::current();
@ -198,6 +198,14 @@ impl StreamProcessor for ObservableStreamProcessor {
if should_flag {
otel_span.update_name(format!("{} {}", self.operation_name, FLAG_MARKER));
}
// Emit one OTel span event per detected SignalEvent, providing the
// drill-down path from aggregate attributes to the triggering
// message. Existing aggregate attributes above remain unchanged so
// dashboards keep working.
for event in &events {
otel_span.add_event(event.otel_event_name(), event.to_otel_attributes());
}
}
info!(