fix(autopilot): supervisor + dedup race + opt-out env var

Three blockers from the 5-agent v2.0.9 audit, all in autopilot.rs.

1. Supervisor loops around both tokio tasks (event subscriber + prospective
   poller). Previously, if a cognitive hook panicked on a single bad memory,
   the spawned task died permanently and silently — every future event lost.
   Now the outer supervisor catches JoinError::is_panic(), logs the panic
   with full error detail, sleeps 5s, and respawns the inner task. Turns
   a permanent silent failure into a transient hiccup.

2. DedupSweepState struct replaces the bare Option<Instant> timestamp. It
   tracks the in-flight JoinHandle so the next Heartbeat skips spawning a
   second sweep while the first is still running. Previously, the cooldown
   timestamp was set BEFORE spawning the async sweep, which allowed two
   concurrent find_duplicates scans on 100k+ memory DBs where the sweep
   could exceed the 6h cooldown window. is_running() drops finished handles
   so a long-dead sweep doesn't block the next legitimate tick.

3. VESTIGE_AUTOPILOT_ENABLED=0 opt-out. v2.0.8 users updating in place
   can preserve the passive-library contract by setting the env var to
   any of {0, false, no, off}. Any other value (unset, 1, true, etc.)
   enables the default v2.0.9 Autopilot behavior. spawn() early-returns
   with an info! log before any task is spawned.

Audit breakdown:
- Agent 1 (internals): NO-GO → fixed (1, 2)
- Agent 2 (backward compat): NO-GO → fixed (3)
- Agent 3 (orphan cleanup): GO clean
- Agent 4 (runtime safety): GO clean
- Agent 5 (release prep): GO, procedural note logged

Verification:
- cargo check -p vestige-mcp: clean
- cargo test -p vestige-mcp --lib: 373 passed, 0 failed
- cargo clippy -p vestige-mcp --lib --bins -- -D warnings: clean
This commit is contained in:
Sam Valladares 2026-04-24 01:43:39 -05:00
parent fe7a68c96a
commit c12120b78a

View file

@ -63,39 +63,145 @@ const DUPLICATES_SWEEP_COOLDOWN_SECS: u64 = 6 * 3600; // 6 hours
/// Interval for polling `prospective_memory.check_triggers()`.
const PROSPECTIVE_POLL_SECS: u64 = 60;
/// Backoff between supervisor restarts after a panicked child task. Short
/// enough that a single bad memory doesn't meaningfully degrade the system,
/// long enough to avoid a tight crash loop if the panic source is persistent.
const SUPERVISOR_RESTART_BACKOFF_SECS: u64 = 5;
/// Tracks an in-flight Heartbeat-triggered dedup sweep so the next Heartbeat
/// can skip spawning a second sweep while the first is still running. The
/// previous implementation stored only the *start* time, which allowed two
/// concurrent scans on databases where `find_duplicates` exceeds the 6h
/// cooldown window.
struct DedupSweepState {
last_fired: Option<Instant>,
in_flight: Option<tokio::task::JoinHandle<()>>,
}
impl DedupSweepState {
fn new() -> Self {
Self {
last_fired: None,
in_flight: None,
}
}
/// True if a previous sweep is still running. Drops a finished handle so
/// a long-dead sweep doesn't keep us from firing the next one.
fn is_running(&mut self) -> bool {
match &self.in_flight {
Some(h) if !h.is_finished() => true,
_ => {
self.in_flight = None;
false
}
}
}
}
/// Launch the Autopilot event-subscriber task + prospective-memory poller.
///
/// Both tasks live for the entire process lifetime and gracefully handle
/// broadcast lag (warn + resume) and closure (exit). The event loop holds
/// the `CognitiveEngine` mutex only for the duration of a single handler,
/// and never inside an `await`, so it never starves MCP tool dispatch.
/// Both tasks are supervised: if the inner loop panics on a single bad
/// memory, the supervisor logs the panic and restarts it after a short
/// backoff. This turns a permanent silent-failure mode ("task dies, every
/// future cognitive event lost") into a transient hiccup ("one bad memory
/// skipped, subsystem resumes"). The event loop holds the `CognitiveEngine`
/// mutex only for the duration of a single handler, and never inside an
/// `await`, so it never starves MCP tool dispatch.
pub fn spawn(
cognitive: Arc<Mutex<CognitiveEngine>>,
storage: Arc<Storage>,
event_tx: broadcast::Sender<VestigeEvent>,
) {
let rx = event_tx.subscribe();
// Opt-out: users upgrading in place from v2.0.8 may want to keep the
// "passive library" contract. Set VESTIGE_AUTOPILOT_ENABLED=0 to skip
// spawning both background tasks. Anything else (unset, "1", "true", etc.)
// enables the default v2.0.9 Autopilot behavior.
match std::env::var("VESTIGE_AUTOPILOT_ENABLED").as_deref() {
Ok("0") | Ok("false") | Ok("no") | Ok("off") => {
info!(
"Autopilot disabled via VESTIGE_AUTOPILOT_ENABLED — \
cognitive modules remain passive (v2.0.8 behavior)"
);
return;
}
_ => {}
}
// Event-subscriber task — routes every emitted event into cognitive hooks.
// Event-subscriber supervisor.
{
let cognitive = cognitive.clone();
let storage = storage.clone();
let event_tx = event_tx.clone();
tokio::spawn(async move {
run_event_subscriber(rx, cognitive, storage, event_tx).await;
loop {
let rx = event_tx.subscribe();
let cog = cognitive.clone();
let sto = storage.clone();
let etx = event_tx.clone();
let handle = tokio::spawn(async move {
run_event_subscriber(rx, cog, sto, etx).await;
});
match handle.await {
Ok(()) => {
info!("Autopilot event subscriber exited cleanly");
break;
}
Err(e) if e.is_panic() => {
warn!(
error = ?e,
backoff_secs = SUPERVISOR_RESTART_BACKOFF_SECS,
"Autopilot event subscriber panicked — supervisor restarting"
);
tokio::time::sleep(Duration::from_secs(
SUPERVISOR_RESTART_BACKOFF_SECS,
))
.await;
}
Err(e) => {
warn!(error = ?e, "Autopilot event subscriber join error — exiting");
break;
}
}
}
});
}
// Prospective memory poller — a separate task so a long-running event
// handler can't starve intention checks.
// Prospective-memory poller supervisor — symmetric restart semantics.
{
let cognitive = cognitive.clone();
tokio::spawn(async move {
run_prospective_poller(cognitive).await;
loop {
let cog = cognitive.clone();
let handle = tokio::spawn(async move {
run_prospective_poller(cog).await;
});
match handle.await {
Ok(()) => {
info!("Autopilot prospective poller exited cleanly");
break;
}
Err(e) if e.is_panic() => {
warn!(
error = ?e,
backoff_secs = SUPERVISOR_RESTART_BACKOFF_SECS,
"Autopilot prospective poller panicked — supervisor restarting"
);
tokio::time::sleep(Duration::from_secs(
SUPERVISOR_RESTART_BACKOFF_SECS,
))
.await;
}
Err(e) => {
warn!(error = ?e, "Autopilot prospective poller join error — exiting");
break;
}
}
}
});
}
info!("Autopilot spawned (event-subscriber + prospective poller)");
info!("Autopilot spawned (event-subscriber + prospective poller, supervised)");
}
async fn run_event_subscriber(
@ -104,9 +210,10 @@ async fn run_event_subscriber(
storage: Arc<Storage>,
event_tx: broadcast::Sender<VestigeEvent>,
) {
// Last-time cache for Heartbeat-triggered auto-sweeps — prevents the
// same 5-second heartbeat from firing a dedup sweep on every tick.
let mut last_dedup_sweep: Option<Instant> = None;
// Tracks Heartbeat-triggered auto-sweeps so the next Heartbeat skips
// spawning a second sweep while the first is still running — essential
// on large DBs where `find_duplicates` can outrun the cooldown.
let mut dedup_state = DedupSweepState::new();
loop {
match rx.recv().await {
@ -116,7 +223,7 @@ async fn run_event_subscriber(
&cognitive,
&storage,
&event_tx,
&mut last_dedup_sweep,
&mut dedup_state,
)
.await;
}
@ -136,7 +243,7 @@ async fn handle_event(
cognitive: &Arc<Mutex<CognitiveEngine>>,
storage: &Arc<Storage>,
event_tx: &broadcast::Sender<VestigeEvent>,
last_dedup_sweep: &mut Option<Instant>,
dedup_state: &mut DedupSweepState,
) {
match event {
VestigeEvent::MemoryCreated {
@ -273,21 +380,32 @@ async fn handle_event(
if memory_count <= DUPLICATES_THRESHOLD {
return;
}
// If a prior sweep is still running (possible on very large DBs
// where `find_duplicates` exceeds the 6h cooldown), skip this
// tick rather than spawn a concurrent second scan.
if dedup_state.is_running() {
debug!(
memory_count,
"Autopilot: dedup sweep already in flight — skipping Heartbeat tick"
);
return;
}
let now = Instant::now();
let cooldown_elapsed = last_dedup_sweep
let cooldown_elapsed = dedup_state
.last_fired
.map(|t| now.duration_since(t).as_secs() >= DUPLICATES_SWEEP_COOLDOWN_SECS)
.unwrap_or(true);
if !cooldown_elapsed {
return;
}
*last_dedup_sweep = Some(now);
dedup_state.last_fired = Some(now);
// Fire the find_duplicates tool with conservative defaults.
// Running on the heartbeat task keeps this off the critical
// MCP-dispatch path. Result is logged only — the user's client
// can still call the tool explicitly for an interactive run.
let storage = storage.clone();
tokio::spawn(async move {
let handle = tokio::spawn(async move {
let args = serde_json::json!({
"similarity_threshold": 0.85,
"limit": 50,
@ -316,6 +434,7 @@ async fn handle_event(
}
}
});
dedup_state.in_flight = Some(handle);
}
// Events that carry no autopilot work today. Explicit pass-through so