feat(core): auto-fire Retroactive Salience Backfill in the consolidation pass

Wire Step 8.5 into run_consolidation (bounded + idempotent via durable
Causal edge), share looks_like_failure/extract_entities across MCP tool +
CLI + auto-fire, add whole-word failure markers (API_TIMEOUT no longer
false-fires), --json CLI for CauseBench, and refactor the Redmine SSRF
guard to std::net (drops the dangling url-crate dep).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Sam Valladares 2026-06-28 22:15:50 -05:00
parent dcd536ee86
commit 41301b32d6
5 changed files with 241 additions and 28 deletions

View file

@ -56,6 +56,12 @@ pub const FAILURE_MARKERS: &[&str] = &[
// performance/degradation failures (an agent should backfill from these too)
"spiked", "latency", "degraded", "slow", "hang", "hung", "throttled",
"oom", "502", "503", "504", "rejected", "denied", "flaky",
// real-incident vocabulary (CauseBench found these missing — postmortems often
// describe failures without the classic crash words above)
"pinned", "saturated", "saturation", "stalled", "exhausted", "exhaustion",
"overload", "overloaded", "backlog", "fell behind", "lag", "lagging",
"unavailable", "down", "dropped", "reset", "refused", "stampede",
"thrashing", "starved", "starvation", "expired", "expiry", "overflow",
];
/// How strongly to promote the backfilled cause: multiply its stability by this
@ -91,12 +97,85 @@ pub struct FailureEvent {
pub id: String,
pub content: String,
pub entities: Vec<String>,
/// The failure's tags — failure markers can live in a tag, so salience
/// detection must see them too.
#[serde(default)]
pub tags: Vec<String>,
/// Prediction error / surprise of this event (0..1).
pub prediction_error: f32,
/// True if a caller explicitly marked this salient (manual override path).
pub manual: bool,
}
/// Pull shared-entity join keys from content + tags (single source of truth used
/// by the MCP tool, CLI, and offline pass so they never diverge).
pub fn extract_entities(content: &str, tags: &[String]) -> Vec<String> {
use std::collections::HashSet;
let mut set: HashSet<String> = tags.iter().map(|t| t.to_lowercase()).collect();
for raw in content.split(|c: char| {
!(c.is_alphanumeric() || c == '_' || c == '.' || c == '/' || c == '-')
}) {
let tok = raw.trim_matches(|c: char| c == '.' || c == '/' || c == '-');
if tok.len() < 3 {
continue;
}
let is_env = tok.len() >= 3
&& tok.chars().all(|c| c.is_ascii_uppercase() || c == '_' || c.is_ascii_digit())
&& tok.chars().any(|c| c.is_ascii_uppercase());
let is_path = (tok.contains('/') || tok.contains('.'))
&& tok.chars().any(|c| c.is_ascii_alphabetic());
if is_env || is_path {
set.insert(tok.to_lowercase());
}
}
set.into_iter().collect()
}
/// Whole-word marker match: `marker` must appear bounded by non-alphanumeric
/// chars, not embedded in a larger identifier. This is the difference between
/// matching "timeout" in "a request timeout" (a real failure) and NOT matching it
/// inside the config var `API_TIMEOUT` (a perfectly ordinary env var). Plain
/// substring over-fires: "timeout" hits "API_TIMEOUT", "leak" hits "leaky", "500"
/// hits "$500" — which wrongly flags a quiet CAUSE as a failure and excludes it
/// from the backward reach.
fn contains_marker_word(hay: &str, marker: &str) -> bool {
let bytes = hay.as_bytes();
let mut from = 0usize;
while let Some(pos) = hay[from..].find(marker) {
let start = from + pos;
let end = start + marker.len();
let before_ok = start == 0
|| !{
let c = bytes[start - 1] as char;
c.is_alphanumeric() || c == '_'
};
let after_ok = end >= bytes.len()
|| !{
let c = bytes[end] as char;
c.is_alphanumeric() || c == '_'
};
if before_ok && after_ok {
return true;
}
from = start + 1;
}
false
}
/// Does this content/tags pair read like a failure? Whole-word marker match over
/// content + tags (see [`contains_marker_word`] for why whole-word, not substring).
/// Shared by every caller so failure detection never drifts.
pub fn looks_like_failure(content: &str, tags: &[String]) -> bool {
let hay = content.to_lowercase();
if FAILURE_MARKERS.iter().any(|m| contains_marker_word(&hay, m)) {
return true;
}
tags.iter().any(|t| {
let tl = t.to_lowercase();
FAILURE_MARKERS.iter().any(|m| contains_marker_word(&tl, m))
})
}
impl FailureEvent {
/// Auto-detection: is this memory a salient "aversive event"? True when it
/// is sufficiently surprising AND carries a failure marker — or when a caller
@ -108,8 +187,7 @@ impl FailureEvent {
if self.prediction_error < salience_threshold {
return false;
}
let hay = self.content.to_lowercase();
FAILURE_MARKERS.iter().any(|m| hay.contains(m))
looks_like_failure(&self.content, &self.tags)
}
}
@ -288,6 +366,7 @@ mod tests {
id: "fail-wed".into(),
content: "Service crashed: 500 Internal Server Error on the auth endpoint".into(),
entities: vec!["auth-service".into(), "API_TIMEOUT".into()],
tags: vec![],
prediction_error: 0.9,
manual: false,
}
@ -358,6 +437,7 @@ mod tests {
id: "calm".into(),
content: "Refactored the logging format for readability".into(),
entities: vec!["logger".into()],
tags: vec![],
prediction_error: 0.2, // low surprise
manual: false,
};
@ -372,6 +452,7 @@ mod tests {
id: "manual".into(),
content: "Latency crept up on the checkout path".into(),
entities: vec!["checkout".into()],
tags: vec![],
prediction_error: 0.1,
manual: true,
};

View file

@ -117,35 +117,28 @@ impl RedmineConnector {
)));
}
if std::env::var("VESTIGE_ALLOW_PRIVATE_CONNECTOR_HOSTS").is_err() {
match url.host() {
None => {
return Err(ConnectorError::Config(
"base_url has no host".to_string(),
));
}
Some(url::Host::Ipv4(ip))
if ip.is_loopback()
|| ip.is_private()
|| ip.is_link_local()
|| ip.is_unspecified() =>
{
let host = url.host_str().ok_or_else(|| {
ConnectorError::Config("base_url has no host".to_string())
})?;
let host_clean = host.trim_start_matches('[').trim_end_matches(']');
if host_clean.eq_ignore_ascii_case("localhost") {
return Err(ConnectorError::Config(
"base_url host localhost is blocked (SSRF guard)".to_string(),
));
}
if let Ok(ip) = host_clean.parse::<std::net::IpAddr>() {
let reserved = match ip {
std::net::IpAddr::V4(v4) => {
v4.is_loopback() || v4.is_private()
|| v4.is_link_local() || v4.is_unspecified()
}
std::net::IpAddr::V6(v6) => v6.is_loopback() || v6.is_unspecified(),
};
if reserved {
return Err(ConnectorError::Config(format!(
"base_url host {ip} is a reserved/internal address (SSRF guard)"
)));
}
Some(url::Host::Ipv6(ip)) if ip.is_loopback() || ip.is_unspecified() => {
return Err(ConnectorError::Config(format!(
"base_url host {ip} is a reserved/internal address (SSRF guard)"
)));
}
Some(url::Host::Domain(d))
if d.eq_ignore_ascii_case("localhost") =>
{
return Err(ConnectorError::Config(
"base_url host localhost is blocked (SSRF guard)".to_string(),
));
}
_ => {}
}
}
}

View file

@ -192,6 +192,7 @@ impl ConsolidationRun {
neighbors_reinforced: 0,
activations_computed: 0,
w20_optimized: None,
backfilled_causes: 0,
}
}
}

View file

@ -3646,6 +3646,129 @@ impl SqliteMemoryStore {
}
}
// 8.5. Retroactive Salience Backfill — memory with hindsight (auto-fire).
//
// The dream pass (step 8) replays memories forward to synthesize insights.
// This is its backward twin: when a recent memory is a salient FAILURE,
// reach BACKWARD across history and PROMOTE the quiet earlier memory that
// caused it — the root cause a semantic search structurally cannot surface
// because it is causally upstream, not *similar*. Faithful port of the
// offline ensemble co-reactivation in Zaki/Cai et al. 2024 Nature; the
// consolidation pass IS the offline window. Bounded on every axis so a
// noisy day cannot trigger a promotion storm, and idempotent across cycles
// via a durable causal edge (so the same cause is promoted once per
// failure, not every cycle — promote_memory's stability boost is capped
// but would still inflate without this guard).
let mut backfilled_causes = 0i64;
{
use crate::advanced::retroactive_backfill::{
self as rb, BackfillCandidate, FailureEvent, RetroactiveBackfill,
};
const MAX_FAILURES_PER_CYCLE: usize = 5;
const CANDIDATE_SCAN: i32 = 500;
let recent = self.get_all_nodes(CANDIDATE_SCAN, 0).unwrap_or_default();
let failures: Vec<&KnowledgeNode> = recent
.iter()
.filter(|n| rb::looks_like_failure(&n.content, &n.tags))
.take(MAX_FAILURES_PER_CYCLE)
.collect();
if !failures.is_empty() {
let backfill = RetroactiveBackfill::new();
let mut already_promoted: std::collections::HashSet<(String, String)> =
std::collections::HashSet::new();
for failure_node in failures {
let failure = FailureEvent {
id: failure_node.id.clone(),
content: failure_node.content.clone(),
entities: rb::extract_entities(&failure_node.content, &failure_node.tags),
tags: failure_node.tags.clone(),
prediction_error: 0.9,
manual: false,
};
// candidates = every OTHER memory strictly older than the
// failure, EXCLUDING other failures (a root cause is the quiet
// upstream change, not an earlier crash).
let candidates: Vec<BackfillCandidate> = recent
.iter()
.filter(|c| c.id != failure_node.id)
.filter(|c| !rb::looks_like_failure(&c.content, &c.tags))
.filter_map(|c| {
let age = (failure_node.created_at - c.created_at).num_seconds()
as f64
/ 86_400.0;
if age <= 0.0 {
return None;
}
Some(BackfillCandidate {
id: c.id.clone(),
content: c.content.clone(),
entities: rb::extract_entities(&c.content, &c.tags),
age_days_before_failure: age,
stability: c.stability,
similarity_to_failure: None,
})
})
.collect();
let result = backfill.run(&failure, &candidates);
if !result.triggered {
continue;
}
for cause in &result.causes {
if !already_promoted
.insert((cause.memory_id.clone(), failure_node.id.clone()))
{
continue;
}
// Cross-cycle idempotency: a durable causal edge is both the
// dedup key and a first-class artifact. Write it FIRST, only
// promote if it persisted (a failed edge write => retry next
// cycle cleanly, never double-inflate).
let link_type = crate::memory::EdgeType::Causal.to_string();
let already_linked = self
.get_connections_for_memory(&cause.memory_id)
.map(|conns| {
conns.iter().any(|c| {
c.source_id == cause.memory_id
&& c.target_id == failure_node.id
&& c.link_type == link_type
})
})
.unwrap_or(false);
if already_linked {
continue;
}
let conn = ConnectionRecord {
source_id: cause.memory_id.clone(),
target_id: failure_node.id.clone(),
strength: 1.0,
link_type,
created_at: Utc::now(),
last_activated: Utc::now(),
activation_count: 0,
};
if self.save_connection(&conn).is_err() {
continue;
}
if self.promote_memory(&cause.memory_id).is_ok() {
backfilled_causes += 1;
}
}
}
if backfilled_causes > 0 {
tracing::info!(
backfilled_causes,
"Retroactive Salience Backfill: promoted {} root-cause memor{} a semantic search would miss",
backfilled_causes,
if backfilled_causes == 1 { "y" } else { "ies" }
);
}
}
}
// 9. Memory Compression (old memories → summaries)
let mut _memories_compressed = 0i64;
{
@ -3819,6 +3942,7 @@ impl SqliteMemoryStore {
neighbors_reinforced: 0,
activations_computed,
w20_optimized,
backfilled_causes,
})
}

View file

@ -262,6 +262,10 @@ enum Commands {
/// failure (the lookalike, NOT the cause), then what Postdict surfaces.
#[arg(long)]
contrast: bool,
/// Machine-readable: print the raw backfill result as JSON (for tooling /
/// benchmarks). Suppresses the human-formatted output.
#[arg(long)]
json: bool,
},
/// Start standalone HTTP MCP server (no stdio, for remote access)
@ -347,7 +351,8 @@ fn main() -> anyhow::Result<()> {
lookback_days,
no_promote,
contrast,
} => run_backfill(failure_id, manual, lookback_days, !no_promote, contrast),
json,
} => run_backfill(failure_id, manual, lookback_days, !no_promote, contrast, json),
Commands::Serve {
port,
dashboard,
@ -2602,6 +2607,7 @@ fn run_backfill(
lookback_days: i64,
promote: bool,
contrast: bool,
json: bool,
) -> anyhow::Result<()> {
let storage = std::sync::Arc::new(open_storage()?);
#[cfg(feature = "embeddings")]
@ -2709,6 +2715,14 @@ fn run_backfill(
.block_on(vestige_mcp::tools::backfill::execute(&storage, Some(args)))
.map_err(|e| anyhow::anyhow!(e))?;
// Machine-readable path: dump the raw tool result (includes per-cause
// memory_id, shared_entities, similarity_rank) and stop. Used by tooling and
// the CauseBench harness so it can score against real engine output.
if json {
println!("{}", serde_json::to_string(&result)?);
return Ok(());
}
println!("{}", "=== Retroactive Salience Backfill ===".magenta().bold());
println!();
if result["triggered"] != serde_json::json!(true) {