From c12120b78af6bf56dce9e14dbcd1db4b39b20bbf Mon Sep 17 00:00:00 2001 From: Sam Valladares Date: Fri, 24 Apr 2026 01:43:39 -0500 Subject: [PATCH] fix(autopilot): supervisor + dedup race + opt-out env var MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three blockers from the 5-agent v2.0.9 audit, all in autopilot.rs. 1. Supervisor loops around both tokio tasks (event subscriber + prospective poller). Previously, if a cognitive hook panicked on a single bad memory, the spawned task died permanently and silently — every future event lost. Now the outer supervisor catches JoinError::is_panic(), logs the panic with full error detail, sleeps 5s, and respawns the inner task. Turns a permanent silent failure into a transient hiccup. 2. DedupSweepState struct replaces the bare Option timestamp. It tracks the in-flight JoinHandle so the next Heartbeat skips spawning a second sweep while the first is still running. Previously, the cooldown timestamp was set BEFORE spawning the async sweep, which allowed two concurrent find_duplicates scans on 100k+ memory DBs where the sweep could exceed the 6h cooldown window. is_running() drops finished handles so a long-dead sweep doesn't block the next legitimate tick. 3. VESTIGE_AUTOPILOT_ENABLED=0 opt-out. v2.0.8 users updating in place can preserve the passive-library contract by setting the env var to any of {0, false, no, off}. Any other value (unset, 1, true, etc.) enables the default v2.0.9 Autopilot behavior. spawn() early-returns with an info! log before any task is spawned. Audit breakdown: - Agent 1 (internals): NO-GO → fixed (1, 2) - Agent 2 (backward compat): NO-GO → fixed (3) - Agent 3 (orphan cleanup): GO clean - Agent 4 (runtime safety): GO clean - Agent 5 (release prep): GO, procedural note logged Verification: - cargo check -p vestige-mcp: clean - cargo test -p vestige-mcp --lib: 373 passed, 0 failed - cargo clippy -p vestige-mcp --lib --bins -- -D warnings: clean --- crates/vestige-mcp/src/autopilot.rs | 157 ++++++++++++++++++++++++---- 1 file changed, 138 insertions(+), 19 deletions(-) diff --git a/crates/vestige-mcp/src/autopilot.rs b/crates/vestige-mcp/src/autopilot.rs index f16bc58..809e802 100644 --- a/crates/vestige-mcp/src/autopilot.rs +++ b/crates/vestige-mcp/src/autopilot.rs @@ -63,39 +63,145 @@ const DUPLICATES_SWEEP_COOLDOWN_SECS: u64 = 6 * 3600; // 6 hours /// Interval for polling `prospective_memory.check_triggers()`. const PROSPECTIVE_POLL_SECS: u64 = 60; +/// Backoff between supervisor restarts after a panicked child task. Short +/// enough that a single bad memory doesn't meaningfully degrade the system, +/// long enough to avoid a tight crash loop if the panic source is persistent. +const SUPERVISOR_RESTART_BACKOFF_SECS: u64 = 5; + +/// Tracks an in-flight Heartbeat-triggered dedup sweep so the next Heartbeat +/// can skip spawning a second sweep while the first is still running. The +/// previous implementation stored only the *start* time, which allowed two +/// concurrent scans on databases where `find_duplicates` exceeds the 6h +/// cooldown window. +struct DedupSweepState { + last_fired: Option, + in_flight: Option>, +} + +impl DedupSweepState { + fn new() -> Self { + Self { + last_fired: None, + in_flight: None, + } + } + + /// True if a previous sweep is still running. Drops a finished handle so + /// a long-dead sweep doesn't keep us from firing the next one. + fn is_running(&mut self) -> bool { + match &self.in_flight { + Some(h) if !h.is_finished() => true, + _ => { + self.in_flight = None; + false + } + } + } +} + /// Launch the Autopilot event-subscriber task + prospective-memory poller. /// -/// Both tasks live for the entire process lifetime and gracefully handle -/// broadcast lag (warn + resume) and closure (exit). The event loop holds -/// the `CognitiveEngine` mutex only for the duration of a single handler, -/// and never inside an `await`, so it never starves MCP tool dispatch. +/// Both tasks are supervised: if the inner loop panics on a single bad +/// memory, the supervisor logs the panic and restarts it after a short +/// backoff. This turns a permanent silent-failure mode ("task dies, every +/// future cognitive event lost") into a transient hiccup ("one bad memory +/// skipped, subsystem resumes"). The event loop holds the `CognitiveEngine` +/// mutex only for the duration of a single handler, and never inside an +/// `await`, so it never starves MCP tool dispatch. pub fn spawn( cognitive: Arc>, storage: Arc, event_tx: broadcast::Sender, ) { - let rx = event_tx.subscribe(); + // Opt-out: users upgrading in place from v2.0.8 may want to keep the + // "passive library" contract. Set VESTIGE_AUTOPILOT_ENABLED=0 to skip + // spawning both background tasks. Anything else (unset, "1", "true", etc.) + // enables the default v2.0.9 Autopilot behavior. + match std::env::var("VESTIGE_AUTOPILOT_ENABLED").as_deref() { + Ok("0") | Ok("false") | Ok("no") | Ok("off") => { + info!( + "Autopilot disabled via VESTIGE_AUTOPILOT_ENABLED — \ + cognitive modules remain passive (v2.0.8 behavior)" + ); + return; + } + _ => {} + } - // Event-subscriber task — routes every emitted event into cognitive hooks. + // Event-subscriber supervisor. { let cognitive = cognitive.clone(); let storage = storage.clone(); let event_tx = event_tx.clone(); tokio::spawn(async move { - run_event_subscriber(rx, cognitive, storage, event_tx).await; + loop { + let rx = event_tx.subscribe(); + let cog = cognitive.clone(); + let sto = storage.clone(); + let etx = event_tx.clone(); + let handle = tokio::spawn(async move { + run_event_subscriber(rx, cog, sto, etx).await; + }); + match handle.await { + Ok(()) => { + info!("Autopilot event subscriber exited cleanly"); + break; + } + Err(e) if e.is_panic() => { + warn!( + error = ?e, + backoff_secs = SUPERVISOR_RESTART_BACKOFF_SECS, + "Autopilot event subscriber panicked — supervisor restarting" + ); + tokio::time::sleep(Duration::from_secs( + SUPERVISOR_RESTART_BACKOFF_SECS, + )) + .await; + } + Err(e) => { + warn!(error = ?e, "Autopilot event subscriber join error — exiting"); + break; + } + } + } }); } - // Prospective memory poller — a separate task so a long-running event - // handler can't starve intention checks. + // Prospective-memory poller supervisor — symmetric restart semantics. { let cognitive = cognitive.clone(); tokio::spawn(async move { - run_prospective_poller(cognitive).await; + loop { + let cog = cognitive.clone(); + let handle = tokio::spawn(async move { + run_prospective_poller(cog).await; + }); + match handle.await { + Ok(()) => { + info!("Autopilot prospective poller exited cleanly"); + break; + } + Err(e) if e.is_panic() => { + warn!( + error = ?e, + backoff_secs = SUPERVISOR_RESTART_BACKOFF_SECS, + "Autopilot prospective poller panicked — supervisor restarting" + ); + tokio::time::sleep(Duration::from_secs( + SUPERVISOR_RESTART_BACKOFF_SECS, + )) + .await; + } + Err(e) => { + warn!(error = ?e, "Autopilot prospective poller join error — exiting"); + break; + } + } + } }); } - info!("Autopilot spawned (event-subscriber + prospective poller)"); + info!("Autopilot spawned (event-subscriber + prospective poller, supervised)"); } async fn run_event_subscriber( @@ -104,9 +210,10 @@ async fn run_event_subscriber( storage: Arc, event_tx: broadcast::Sender, ) { - // Last-time cache for Heartbeat-triggered auto-sweeps — prevents the - // same 5-second heartbeat from firing a dedup sweep on every tick. - let mut last_dedup_sweep: Option = None; + // Tracks Heartbeat-triggered auto-sweeps so the next Heartbeat skips + // spawning a second sweep while the first is still running — essential + // on large DBs where `find_duplicates` can outrun the cooldown. + let mut dedup_state = DedupSweepState::new(); loop { match rx.recv().await { @@ -116,7 +223,7 @@ async fn run_event_subscriber( &cognitive, &storage, &event_tx, - &mut last_dedup_sweep, + &mut dedup_state, ) .await; } @@ -136,7 +243,7 @@ async fn handle_event( cognitive: &Arc>, storage: &Arc, event_tx: &broadcast::Sender, - last_dedup_sweep: &mut Option, + dedup_state: &mut DedupSweepState, ) { match event { VestigeEvent::MemoryCreated { @@ -273,21 +380,32 @@ async fn handle_event( if memory_count <= DUPLICATES_THRESHOLD { return; } + // If a prior sweep is still running (possible on very large DBs + // where `find_duplicates` exceeds the 6h cooldown), skip this + // tick rather than spawn a concurrent second scan. + if dedup_state.is_running() { + debug!( + memory_count, + "Autopilot: dedup sweep already in flight — skipping Heartbeat tick" + ); + return; + } let now = Instant::now(); - let cooldown_elapsed = last_dedup_sweep + let cooldown_elapsed = dedup_state + .last_fired .map(|t| now.duration_since(t).as_secs() >= DUPLICATES_SWEEP_COOLDOWN_SECS) .unwrap_or(true); if !cooldown_elapsed { return; } - *last_dedup_sweep = Some(now); + dedup_state.last_fired = Some(now); // Fire the find_duplicates tool with conservative defaults. // Running on the heartbeat task keeps this off the critical // MCP-dispatch path. Result is logged only — the user's client // can still call the tool explicitly for an interactive run. let storage = storage.clone(); - tokio::spawn(async move { + let handle = tokio::spawn(async move { let args = serde_json::json!({ "similarity_threshold": 0.85, "limit": 50, @@ -316,6 +434,7 @@ async fn handle_event( } } }); + dedup_state.in_flight = Some(handle); } // Events that carry no autopilot work today. Explicit pass-through so