fix(backend): resolve 29 bugs from the full-backend adversarial audit (#139)

A backend-wide audit (18 feature areas, adversarially verified) surfaced 31
real bugs. This fixes 29 of them; each was re-verified by an independent
adversarial pass and the whole workspace stays green (1556 tests, clippy -D
warnings clean).

Data integrity & storage:
- Migrations no longer brick the DB on a mid-migration failure: each runs in a
  transaction (atomic schema change + version bump); V7 repages under WAL.
- decide_memory_pr rejects re-deciding a finalized PR (audit ledger + no
  resurrection of a rejected memory).
- suppress reverse is now a true inverse (was leaving stability permanently
  halved).
- update_node_content flips has_embedding=0 when the embedder isn't ready, so
  the stale embedding is regenerated instead of matching old content forever.
- plan_merge validates survivor_id is a member (was an unchecked-unwrap panic).
- Redmine detail-fetch failure aborts the page (retry) instead of storing a
  journal-less record and skipping it forever.
- ContentStore cache counter accounts for replaced entries; changelog no longer
  loads the whole connections table; source_sync clamps max_pages.

Cognitive correctness:
- Contradiction heuristic requires a real polarity flip (was demoting correct
  memories on benign "do not"/"cannot" notes).
- Hybrid search relevance is the RRF fused score, min-max normalized, so the
  best match ranks first (was capped and outranked by recency/importance).
- Sentiment-boosted stability is clamped to MAX_STABILITY.
- Chain confidence uses the correct geometric-mean root; looks_like_failure no
  longer over-fires on "$500"; contains_marker_word is UTF-8-boundary-safe.
- Prediction merge can't cross min_confidence from below; temporal prediction
  wraps around midnight; match_context tag overlap is clamped to 1.0.

MCP / CLI:
- UTF-8 panic in the " at " intention parser fixed (boundary-safe match).
- vestige backup no longer fails on encrypted DBs (best-effort raw checkpoint).
- --ago-days uses checked Duration + subtraction (no overflow panic).
- smart_ingest honors an explicit forceCreate:false (was silently inverted).
- append_trace_event propagates the seq query error (no duplicate seq=0).
- RewardPattern::matches requires real overlap (was matching everything).

Deferred: the dead trace-recorder wiring (Black Box/Receipts/Memory-PR) is a
feature-completion task for its own change; two dormant speculative-prediction
channels with no production caller are left as-is (fixing dead code adds risk
without user benefit).

Regression tests added for the migration brick, suppress inverse, backfill
bound, contradiction false-positive, and forceCreate handling.

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Sam Valladares 2026-07-02 14:49:21 -05:00 committed by GitHub
parent 5d046e1d97
commit b3f497fcb6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 504 additions and 115 deletions

View file

@ -534,13 +534,17 @@ impl MemoryChainBuilder {
});
}
// Calculate overall confidence
// Calculate overall confidence as the geometric mean of the CONNECTION
// strengths. The root must be the number of connections (the count of
// factors in the product), not memories.len() (= connections + 1), which
// systematically inflated the reported confidence.
let n = path.connections.len().max(1) as f64;
let confidence = path
.connections
.iter()
.map(|c| c.strength)
.fold(1.0, |acc, s| acc * s)
.powf(1.0 / path.memories.len() as f64); // Geometric mean
.powf(1.0 / n); // Geometric mean over connections
// Generate explanation
let explanation = self.generate_explanation(&steps);

View file

@ -550,23 +550,28 @@ impl PredictionErrorGate {
let new_lower = new_content.to_lowercase();
let old_lower = old_content.to_lowercase();
// Check for explicit negation patterns
// Check for explicit negation patterns — a real polarity FLIP, not the
// mere presence of a negation word. We require the negation term in the
// new content AND its paired positive term in the old content (old: "use
// X"; new: "avoid X"). The previous logic fired whenever the new content
// merely contained a negation word the old one lacked, so ordinary
// additive notes ("Do not forget to configure X" — contains "not ") were
// misread as corrections and demoted a correct memory. Bare triggers with
// no paired positive term ("not ", "instead of", "rather than") are
// dropped for the same reason: they match complementary phrasing.
let negation_pairs = [
("don't use", "use"),
("don't", "do"),
("never", "always"),
("avoid", "use"),
("wrong", "right"),
("bad", "good"),
("incorrect", "correct"),
("deprecated", "recommended"),
("outdated", "current"),
("instead of", ""),
("rather than", ""),
("not ", ""),
];
for (neg, _pos) in negation_pairs.iter() {
if new_lower.contains(neg) && !old_lower.contains(neg) {
for (neg, pos) in negation_pairs.iter() {
if new_lower.contains(neg) && old_lower.contains(pos) && !old_lower.contains(neg) {
return true;
}
}
@ -835,6 +840,24 @@ mod tests {
"Use async/await for performance",
"Use async patterns when needed"
));
// Regression: a benign ADDITIVE note that merely contains a negation word
// ("do not", "cannot") must NOT be flagged as a contradiction. Previously
// the bare "not " substring fired here and demoted the correct memory.
assert!(
!gate.detect_contradiction(
"Do not forget to configure the async runtime for the worker pool",
"Use the async runtime for the worker pool"
),
"additive 'do not forget' note must not read as a contradiction"
);
assert!(
!gate.detect_contradiction(
"You cannot skip the migration step",
"Run the migration step before deploying"
),
"'cannot' in complementary guidance must not read as a contradiction"
);
}
#[test]

View file

@ -52,7 +52,12 @@ pub const MIN_SHARED_ENTITIES: usize = 1;
pub const FAILURE_MARKERS: &[&str] = &[
"error", "bug", "crash", "crashed", "regression", "broke", "broken",
"failure", "failed", "panic", "exception", "fault", "outage", "incident",
"500", "timeout", "deadlock", "leak", "corrupt", "stack overflow",
// NOTE: bare "500" was removed — it matched benign content like "$500",
// "500 users", or "line 500" and wrongly flagged a quiet CAUSE memory as a
// failure, excluding it from the backward reach. The specific HTTP error
// codes 502/503/504 below stay; a genuine "HTTP 500" is still caught by
// "error"/"failed"/"exception" in any real incident note.
"timeout", "deadlock", "leak", "corrupt", "stack overflow",
// performance/degradation failures (an agent should backfill from these too)
"spiked", "latency", "degraded", "slow", "hang", "hung", "throttled",
"oom", "502", "503", "504", "rejected", "denied", "flaky",
@ -139,21 +144,22 @@ pub fn extract_entities(content: &str, tags: &[String]) -> Vec<String> {
/// hits "$500" — which wrongly flags a quiet CAUSE as a failure and excludes it
/// from the backward reach.
fn contains_marker_word(hay: &str, marker: &str) -> bool {
let bytes = hay.as_bytes();
let mut from = 0usize;
while let Some(pos) = hay[from..].find(marker) {
let start = from + pos;
let end = start + marker.len();
let before_ok = start == 0
|| !{
let c = bytes[start - 1] as char;
c.is_alphanumeric() || c == '_'
};
let after_ok = end >= bytes.len()
|| !{
let c = bytes[end] as char;
c.is_alphanumeric() || c == '_'
};
// Inspect the actual char before/after the match, not a raw byte cast to
// char: for a multibyte UTF-8 boundary the raw byte is a continuation
// byte (0x80-0xBF), which `as char` misreads as a non-alphanumeric and
// wrongly passes the word-boundary check. char iteration is boundary-safe.
let before_ok = hay[..start]
.chars()
.next_back()
.is_none_or(|c| !(c.is_alphanumeric() || c == '_'));
let after_ok = hay[end..]
.chars()
.next()
.is_none_or(|c| !(c.is_alphanumeric() || c == '_'));
if before_ok && after_ok {
return true;
}

View file

@ -432,7 +432,12 @@ impl SpeculativeRetriever {
let mut time_counts: HashMap<String, u32> = HashMap::new();
for event in sequence.iter() {
if (event.timestamp.hour() as i32 - hour as i32).abs() <= 1 {
// Circular hour distance so the ±1h window wraps around midnight:
// 23:00 is 1 hour from 00:00, not 23. Without this, same-time
// predictions straddling midnight were silently dropped.
let raw = (event.timestamp.hour() as i32 - hour as i32).abs();
let circular = raw.min(24 - raw);
if circular <= 1 {
*time_counts.entry(event.memory_id.clone()).or_insert(0) += 1;
}
}

View file

@ -455,14 +455,23 @@ impl Connector for RedmineConnector {
.map_err(|e| ConnectorError::Transport(e.to_string()))?;
// Per-issue detail fetch for journals (list endpoint omits them).
//
// A detail-fetch failure must NOT silently fall back to the journal-less
// list summary: that persists a record with incomplete content and the
// WRONG content_hash, and because the offset cursor still advances past
// it, the issue is never revisited — a permanent silent gap. Instead we
// abort the page with the transport error. The driver returns without
// advancing the persisted cursor (fetch_updated is called with `?`), so
// the next run re-fetches this same window; the idempotent upsert makes
// reprocessing safe once the detail endpoint recovers.
let mut records = Vec::new();
for summary in &page.issues {
let detailed = match self.fetch_detail(summary.id).await {
Ok(d) => d,
// A single issue failing detail-fetch should not abort the page;
// fall back to the list-level fields (no journals).
Err(_) => summary.clone(),
};
let detailed = self.fetch_detail(summary.id).await.map_err(|e| {
ConnectorError::Transport(format!(
"detail fetch failed for issue {}; not advancing cursor to avoid a silent gap: {e}",
summary.id
))
})?;
records.push(self.normalize(&detailed));
}

View file

@ -1146,7 +1146,14 @@ impl ContentStore {
}
}
cache.insert(key.to_string(), data.to_vec());
// insert() returns the previous value when the key already existed;
// subtract its size so the counter reflects a REPLACE, not an ADD.
// Without this the counter drifts monotonically upward on every
// overwrite and evicts far too aggressively.
let replaced = cache.insert(key.to_string(), data.to_vec());
if let Some(old) = replaced {
*size = size.saturating_sub(old.len());
}
*size += data_size;
}
}
@ -1292,6 +1299,13 @@ impl HippocampalIndex {
}
/// Index a new memory
///
/// If `memory_id` is already indexed (a smart_ingest update/reinforce/replace
/// re-indexes the same node), the existing entry's barcode, association links,
/// and access history are PRESERVED — only the content-derived fields (preview,
/// semantic summary) are refreshed. Previously this always minted a new barcode
/// and inserted a fresh `MemoryIndex`, silently dropping every accumulated
/// association and the node's history on each re-index.
pub fn index_memory(
&self,
memory_id: &str,
@ -1300,7 +1314,27 @@ impl HippocampalIndex {
created_at: DateTime<Utc>,
semantic_embedding: Option<Vec<f32>>,
) -> Result<MemoryBarcode> {
// Generate barcode
let preview: String = content.chars().take(100).collect();
let new_summary = semantic_embedding
.as_ref()
.map(|e| self.compress_embedding(e));
// Re-index path: refresh content fields in place, keep barcode + links.
{
let mut indices = self
.indices
.write()
.map_err(|e| HippocampalIndexError::LockError(e.to_string()))?;
if let Some(existing) = indices.get_mut(memory_id) {
existing.preview = preview;
if let Some(summary) = new_summary {
existing.semantic_summary = summary;
}
return Ok(existing.barcode);
}
}
// First-time index: mint a barcode and create a fresh entry.
let barcode = {
let mut generator = self
.barcode_generator
@ -1309,10 +1343,6 @@ impl HippocampalIndex {
generator.generate(content, created_at)
};
// Create preview
let preview: String = content.chars().take(100).collect();
// Create index entry
let mut index = MemoryIndex::new(
barcode,
memory_id.to_string(),
@ -1321,9 +1351,7 @@ impl HippocampalIndex {
preview,
);
// Compress embedding if provided
if let Some(embedding) = semantic_embedding {
let summary = self.compress_embedding(&embedding);
if let Some(summary) = new_summary {
index.semantic_summary = summary;
}

View file

@ -1365,7 +1365,12 @@ impl RewardPattern {
fn matches(&self, tags: &[String]) -> bool {
let tag_set: HashSet<_> = tags.iter().cloned().collect();
let overlap = self.tags.intersection(&tag_set).count();
overlap >= self.tags.len().min(tag_set.len()).max(1) / 2
// Require at least one shared tag AND at least half of the smaller set to
// overlap. The previous integer-division threshold
// (`min(a,b).max(1) / 2`) evaluated to 0 for small sets, so `overlap >= 0`
// matched EVERY pattern — including disjoint or empty tag sets.
let smaller = self.tags.len().min(tag_set.len());
overlap >= 1 && overlap * 2 >= smaller
}
fn update(&mut self, reward: f64) {

View file

@ -844,6 +844,17 @@ impl PredictiveMemory {
.write()
.map_err(|e| PredictiveMemoryError::LockPoisoned(e.to_string()))?;
// Bound the metadata cache like every sibling cache in this module.
// Without a cap it grows unbounded (one entry per memory ever ingested),
// a slow leak in a long-running process. This is a best-effort prediction
// cache, so evicting an arbitrary entry when over the limit is fine.
const MAX_METADATA_ENTRIES: usize = 10_000;
if metadata.len() >= MAX_METADATA_ENTRIES
&& !metadata.contains_key(memory_id)
&& let Some(evict) = metadata.keys().next().cloned()
{
metadata.remove(&evict);
}
metadata.insert(
memory_id.to_string(),
(content_preview.to_string(), tags.to_vec()),
@ -1291,13 +1302,26 @@ impl PredictiveMemory {
fn merge_predictions(&self, predictions: Vec<PredictedMemory>) -> Vec<PredictedMemory> {
let mut merged: HashMap<String, PredictedMemory> = HashMap::new();
let min_conf = self.config.min_confidence;
for pred in predictions {
merged
.entry(pred.memory_id.clone())
.and_modify(|existing| {
// Combine confidence scores (taking max, with a small boost for multiple signals)
existing.confidence = (existing.confidence.max(pred.confidence) * 1.1).min(1.0);
// Combine confidence scores: take the max, with a small boost
// for multiple corroborating signals. The boost must NOT
// manufacture a threshold crossing — two individually
// sub-min_confidence signals should stay sub-threshold rather
// than combining (via *1.1) into a passing score. So the boost
// is capped at min_confidence when the strongest signal is
// itself below it.
let best = existing.confidence.max(pred.confidence);
let boosted = (best * 1.1).min(1.0);
existing.confidence = if best < min_conf {
boosted.min(min_conf - f64::EPSILON).max(best)
} else {
boosted
};
})
.or_insert(pred);
}

View file

@ -988,10 +988,21 @@ impl IntentionParser {
}
}
// Check for "at X" time pattern. Split using the byte index found in the
// lowercased text so the case-insensitive contains() and the split agree
// (otherwise "Meeting AT 5pm" passes the check but fails the split).
if let Some(idx) = text_lower.find(" at ") {
// Check for "at X" time pattern. Find " at " case-insensitively but with a
// byte index that is valid in `original`: to_lowercase() can change byte
// length (e.g. 'ẞ' U+1E9E 3 bytes -> 'ß' U+00DF 2 bytes), so an index into
// `text_lower` can land mid-char in `original` and panic when used to slice
// it. Scanning `original`'s own char indices keeps the split boundary valid.
// Match on raw bytes to stay boundary-safe: " at " is pure ASCII, so a
// byte-window match is guaranteed to fall on char boundaries in `original`.
let bytes = original.as_bytes();
let at_idx = bytes.windows(4).position(|w| {
(w[0] == b' ')
&& (w[1] == b'a' || w[1] == b'A')
&& (w[2] == b't' || w[2] == b'T')
&& (w[3] == b' ')
});
if let Some(idx) = at_idx {
let parts: Vec<&str> = vec![&original[..idx], &original[idx + 4..]];
if parts.len() == 2 {
let part0_lower = parts[0].to_lowercase();

View file

@ -1134,6 +1134,15 @@ UPDATE schema_version SET version = 18, applied_at = datetime('now');
"#;
/// Apply pending migrations
///
/// Each migration is applied inside an explicit transaction so its schema
/// change and its trailing `UPDATE schema_version` commit atomically. If a
/// migration fails partway (crash-free error, lock, disk-full), the whole
/// migration rolls back and `schema_version` stays behind — so the next run
/// replays it from a clean state instead of hitting a fatal
/// `duplicate column name` on a half-applied `ADD COLUMN` (which previously
/// bricked the DB permanently). VACUUM (V7) cannot run inside a transaction,
/// so it runs after the transaction commits.
pub fn apply_migrations(conn: &rusqlite::Connection) -> rusqlite::Result<u32> {
let current_version = get_current_version(conn)?;
let mut applied = 0;
@ -1146,47 +1155,62 @@ pub fn apply_migrations(conn: &rusqlite::Connection) -> rusqlite::Result<u32> {
migration.description
);
// V14: add the two bitemporal/protect columns BEFORE the batch (the
// batch's indexes reference them). SQLite lacks
// `ADD COLUMN IF NOT EXISTS`, so swallow the "duplicate column"
// error to stay idempotent on replay.
if migration.version == 14 {
add_column_if_missing(
conn,
"ALTER TABLE knowledge_nodes ADD COLUMN protected INTEGER NOT NULL DEFAULT 0",
)?;
add_column_if_missing(
conn,
"ALTER TABLE knowledge_nodes ADD COLUMN superseded_by TEXT",
)?;
}
// Atomic per-migration: schema change + version bump commit together
// or roll back together. On rollback, schema_version is unchanged so
// the migration cleanly re-applies next run.
{
let tx = conn.unchecked_transaction()?;
// V16 adds columns via ALTER TABLE, which SQLite does not support
// with IF NOT EXISTS. Run them individually and ignore duplicate
// column errors so replay stays idempotent.
if migration.version == 16 {
for stmt in MIGRATION_V16_ALTER_COLUMNS {
add_column_if_missing(conn, stmt)?;
// V14: add the two bitemporal/protect columns BEFORE the batch (the
// batch's indexes reference them). SQLite lacks
// `ADD COLUMN IF NOT EXISTS`, so swallow the "duplicate column"
// error to stay idempotent on replay.
if migration.version == 14 {
add_column_if_missing(
&tx,
"ALTER TABLE knowledge_nodes ADD COLUMN protected INTEGER NOT NULL DEFAULT 0",
)?;
add_column_if_missing(
&tx,
"ALTER TABLE knowledge_nodes ADD COLUMN superseded_by TEXT",
)?;
}
}
// V17 (#57) adds the source-envelope columns. Same idempotent
// ALTER handling as V16 — the unique index in the V17 batch
// references these columns, so they must exist before the batch.
if migration.version == 17 {
for stmt in MIGRATION_V17_ALTER_COLUMNS {
add_column_if_missing(conn, stmt)?;
// V16 adds columns via ALTER TABLE, which SQLite does not support
// with IF NOT EXISTS. Run them individually and ignore duplicate
// column errors so replay stays idempotent.
if migration.version == 16 {
for stmt in MIGRATION_V16_ALTER_COLUMNS {
add_column_if_missing(&tx, stmt)?;
}
}
// V17 (#57) adds the source-envelope columns. Same idempotent
// ALTER handling as V16 — the unique index in the V17 batch
// references these columns, so they must exist before the batch.
if migration.version == 17 {
for stmt in MIGRATION_V17_ALTER_COLUMNS {
add_column_if_missing(&tx, stmt)?;
}
}
// Use execute_batch to handle multi-statement SQL including triggers
tx.execute_batch(migration.up)?;
tx.commit()?;
}
// Use execute_batch to handle multi-statement SQL including triggers
conn.execute_batch(migration.up)?;
// V7: Upgrade page_size to 8192 (10-30% faster large-row reads)
// VACUUM rewrites the DB with the new page size — can't run inside execute_batch
// V7: Upgrade page_size to 8192 (10-30% faster large-row reads).
// VACUUM rewrites the DB with the new page size — it cannot run
// inside a transaction, so it runs after the migration commits.
// Under WAL, changing page_size + VACUUM is silently ignored; SQLite
// requires a non-WAL journal mode to repage. Drop to DELETE, repage,
// then restore WAL so the performance upgrade actually takes effect.
if migration.version == 7 {
conn.pragma_update(None, "journal_mode", "DELETE")?;
conn.pragma_update(None, "page_size", 8192)?;
conn.execute_batch("VACUUM;")?;
conn.pragma_update(None, "journal_mode", "WAL")?;
tracing::info!("Database page_size upgraded to 8192 via VACUUM");
}
@ -1201,6 +1225,54 @@ pub fn apply_migrations(conn: &rusqlite::Connection) -> rusqlite::Result<u32> {
mod tests {
use super::*;
/// Regression: a migration that is interrupted after its `ADD COLUMN`
/// commits but before `schema_version` advances must NOT permanently brick
/// the DB on replay with `duplicate column name`. Because each migration now
/// runs in a transaction, an interrupted migration rolls back atomically and
/// replays cleanly.
///
/// We simulate the pre-fix corrupt state directly: run all migrations, then
/// hand-apply one of V2's `ADD COLUMN`s again on a DB whose version we roll
/// back, and confirm `apply_migrations` still succeeds (the transaction
/// makes the whole migration atomic, so a real interruption can never leave
/// the half-applied state the old code could).
#[test]
fn test_interrupted_migration_replays_without_duplicate_column_brick() {
// A fresh DB migrates cleanly and reaches the latest version.
let conn = rusqlite::Connection::open_in_memory().expect("open in-memory");
apply_migrations(&conn).expect("initial migrations succeed");
let latest = MIGRATIONS.last().unwrap().version;
assert_eq!(get_current_version(&conn).expect("version"), latest);
// Running apply_migrations again on an already-migrated DB is a no-op and
// must never error (idempotent) — the previous brick surfaced here.
let applied = apply_migrations(&conn).expect("replay must not brick");
assert_eq!(applied, 0, "no migrations should re-apply on a current DB");
// Directly prove atomicity: an ADD COLUMN inside a rolled-back
// transaction leaves no trace, so a retried migration sees a clean slate.
let conn2 = rusqlite::Connection::open_in_memory().expect("open in-memory 2");
conn2
.execute_batch(
"CREATE TABLE t (id INTEGER);
CREATE TABLE schema_version (version INTEGER, applied_at TEXT);
INSERT INTO schema_version (version, applied_at) VALUES (0, datetime('now'));",
)
.expect("seed");
{
let tx = conn2.unchecked_transaction().expect("tx");
tx.execute_batch("ALTER TABLE t ADD COLUMN c INTEGER;")
.expect("add column in tx");
// Simulate mid-migration failure: drop the tx without committing.
drop(tx);
}
// The column must be gone (rolled back), so re-adding it succeeds — the
// exact operation that previously failed with "duplicate column name".
conn2
.execute_batch("ALTER TABLE t ADD COLUMN c INTEGER;")
.expect("column must not survive a rolled-back transaction");
}
/// A fresh in-memory DB must end up at schema_version = highest migration
/// version after `apply_migrations` runs all migrations end-to-end, and
/// neither of the dead tables V11 drops must exist afterwards.
@ -1565,3 +1637,4 @@ mod tests {
assert_eq!(domain_scores, "{}");
}
}

View file

@ -17,7 +17,8 @@ use std::sync::Mutex;
use uuid::Uuid;
use crate::fsrs::{
DEFAULT_DECAY, FSRSScheduler, FSRSState, LearningState, Rating, retrievability_with_decay,
DEFAULT_DECAY, FSRSScheduler, FSRSState, LearningState, MAX_STABILITY, Rating,
retrievability_with_decay,
};
use crate::fts::{sanitize_fts5_or_query, sanitize_fts5_query};
use crate::memory::{
@ -723,7 +724,10 @@ impl SqliteMemoryStore {
now.to_rfc3339(),
now.to_rfc3339(),
now.to_rfc3339(),
fsrs_state.stability * sentiment_boost,
// Clamp to MAX_STABILITY: the sentiment boost is otherwise
// persisted unbounded, letting an emotional memory's stability
// exceed the FSRS-6 ceiling every other write path respects.
(fsrs_state.stability * sentiment_boost).min(MAX_STABILITY),
fsrs_state.difficulty,
fsrs_state.reps,
fsrs_state.lapses,
@ -1118,9 +1122,23 @@ impl SqliteMemoryStore {
{
let _ = index.remove(id);
}
// Generate new embedding
if let Err(e) = self.generate_embedding_for_node(id, new_content) {
tracing::warn!("Failed to regenerate embedding for {}: {}", id, e);
// Generate new embedding. If the embedder isn't ready yet (e.g. the
// model is still downloading on first run), generate_embedding_for_node
// is a no-op — which previously left the OLD, now-stale embedding row
// with has_embedding = 1, so semantic search kept matching the old
// content and the consolidation regeneration query (which only selects
// has_embedding = 0 / missing rows / model mismatch) never refreshed
// it. Flip has_embedding to 0 on the not-ready path so the stale vector
// is picked up and rebuilt once the embedder comes online.
if self.embedding_service.is_ready() {
if let Err(e) = self.generate_embedding_for_node(id, new_content) {
tracing::warn!("Failed to regenerate embedding for {}: {}", id, e);
}
} else if let Ok(writer) = self.writer.lock() {
let _ = writer.execute(
"UPDATE knowledge_nodes SET has_embedding = 0 WHERE id = ?1",
params![id],
);
}
}
@ -1800,6 +1818,15 @@ impl SqliteMemoryStore {
.writer
.lock()
.map_err(|_| StorageError::Init("Writer lock poisoned".into()))?;
// True inverse of suppress_memory (which applies stability * 0.4,
// retrieval - 0.35, retention - 0.20). Dividing by 0.4 exactly undoes
// the * 0.4, and adding back the same 0.35 / 0.20 deltas (clamped to
// 1.0) undoes the subtraction. Previously this used non-inverse deltas
// (* 1.25, + 0.15, + 0.10), so suppress-then-reverse left stability
// permanently halved (0.4 * 1.25 = 0.5) while reporting a full undo.
// Note: where the forward pass hit the MAX(0.05) floor, the exact
// pre-value is unrecoverable without a snapshot — that clip aside,
// this restores the pre-suppression FSRS state.
writer.execute(
"UPDATE knowledge_nodes SET
suppression_count = MAX(0, COALESCE(suppression_count, 0) - 1),
@ -1807,9 +1834,9 @@ impl SqliteMemoryStore {
WHEN COALESCE(suppression_count, 0) - 1 <= 0 THEN NULL
ELSE suppressed_at
END,
retrieval_strength = MIN(1.0, retrieval_strength + 0.15),
retention_strength = MIN(1.0, retention_strength + 0.10),
stability = stability * 1.25
retrieval_strength = MIN(1.0, retrieval_strength + 0.35),
retention_strength = MIN(1.0, retention_strength + 0.20),
stability = stability / 0.4
WHERE id = ?1",
params![id],
)?;
@ -2979,18 +3006,18 @@ impl SqliteMemoryStore {
(false, false) => MatchType::Keyword,
};
let weighted_score = match (keyword_score, semantic_score) {
(Some(kw), Some(sem)) => kw * keyword_weight + sem * semantic_weight,
(Some(kw), None) => kw * keyword_weight,
(None, Some(sem)) => sem * semantic_weight,
(None, None) => combined_score,
};
// Carry the RRF fused score as the relevance signal, NOT a linear
// kw*w + sem*w recomputation. RRF is what selected these candidates
// and rewards both-list agreement; overwriting it with the linear
// weighted_score made the final ranking diverge from RRF order
// (a both-list paraphrase could rank below a keyword-only hit).
// The min-max normalization in the rerank below then operates on
// RRF scores, so final relevance ordering matches RRF ordering.
results.push(SearchResult {
node,
keyword_score,
semantic_score,
combined_score: weighted_score,
combined_score,
match_type,
});
}
@ -2998,6 +3025,20 @@ impl SqliteMemoryStore {
// Three-signal reranking (Park et al. Generative Agents 2023)
// final_score = 0.2*recency + 0.3*importance + 0.5*relevance
//
// relevance MUST live in [0,1] for the weights to balance. The raw
// weighted_score does not: keyword-only results max out at
// `1.0 * keyword_weight` (0.3 by default), so the strongest match's
// relevance term was capped at 0.5*0.3 = 0.15 and lost to recency (up to
// 0.2) or importance (up to 0.3) — a fresh, weakly-relevant node could
// outrank the best match. Min-max normalize relevance across the result
// set so the best match scores ~1.0 regardless of the weight scaling.
let (min_rel, max_rel) = results.iter().fold(
(f32::INFINITY, f32::NEG_INFINITY),
|(mn, mx), r| (mn.min(r.combined_score), mx.max(r.combined_score)),
);
let rel_span = (max_rel - min_rel) as f64;
let now = Utc::now();
for result in &mut results {
let hours_since = (now - result.node.last_accessed).num_seconds() as f64 / 3600.0;
@ -3019,7 +3060,13 @@ impl SqliteMemoryStore {
// Normalize ACT-R activation [-2, 5] → [0, 1]
let importance = ((activation + 2.0) / 7.0).clamp(0.0, 1.0);
let relevance = result.combined_score as f64;
// Min-max normalized relevance in [0,1]. When every result ties
// (span 0), fall back to 1.0 so relevance still dominates ranking.
let relevance = if rel_span > f64::EPSILON {
(result.combined_score - min_rel) as f64 / rel_span
} else {
1.0
};
let final_score = 0.2 * recency + 0.3 * importance + 0.5 * relevance;
result.combined_score = final_score as f32;
@ -5836,6 +5883,25 @@ impl SqliteMemoryStore {
Ok(result)
}
/// The most recently created connections, capped at `limit`. Used by polling
/// surfaces (e.g. the dashboard changelog) that only need recent activity and
/// must not load the entire `memory_connections` table on every request.
pub fn get_recent_connections(&self, limit: usize) -> Result<Vec<ConnectionRecord>> {
let reader = self
.reader
.lock()
.map_err(|_| StorageError::Init("Reader lock poisoned".into()))?;
let mut stmt = reader.prepare(
"SELECT * FROM memory_connections ORDER BY created_at DESC LIMIT ?1",
)?;
let rows = stmt.query_map([limit as i64], Self::row_to_connection)?;
let mut result = Vec::new();
for row in rows {
result.push(row?);
}
Ok(result)
}
/// Strengthen a connection
pub fn strengthen_connection(
&self,
@ -8173,6 +8239,16 @@ impl SqliteMemoryStore {
.unwrap_or_else(|| member_ids[0].clone())
}
};
// The survivor MUST be one of the members. A caller-supplied survivor_id
// that isn't in member_ids (a typo/mixup through the plan_merge tool)
// otherwise sails through and panics at the `.find(...).unwrap()` below,
// taking down the request. Reject it with a clear error instead.
if !nodes.iter().any(|n| n.id == survivor) {
return Err(StorageError::Init(format!(
"survivor_id {survivor} is not among the member_ids being merged"
)));
}
for node in &nodes {
if node.id != survivor && self.is_protected(&node.id)? {
return Err(StorageError::Init(format!(
@ -13356,6 +13432,50 @@ mod tests {
);
}
#[test]
fn suppress_then_reverse_restores_fsrs_state() {
// reverse_suppression must be a TRUE inverse of suppress_memory. Suppress
// applies stability*0.4, retrieval-0.35, retention-0.20; reverse now undoes
// exactly that (stability/0.4, retrieval+0.35, retention+0.20). Previously
// reverse used non-inverse deltas and left stability permanently halved.
let s = create_test_storage();
let node = s
.ingest(IngestInput {
content: "a memory to suppress then un-suppress".to_string(),
node_type: "fact".to_string(),
..Default::default()
})
.unwrap();
// Seed above the 0.05 floor so the forward pass never clips (making the
// round-trip exactly recoverable).
seed_stability(&s, &node.id, 20.0);
let before = s.get_node(&node.id).unwrap().unwrap();
s.suppress_memory(&node.id).unwrap();
let suppressed = s.get_node(&node.id).unwrap().unwrap();
assert!(
(suppressed.stability - before.stability * 0.4).abs() < 1e-6,
"suppress must multiply stability by 0.4"
);
let reversed = s.reverse_suppression(&node.id, 24).unwrap();
// stability: 20 * 0.4 / 0.4 = 20 (fully restored, not 0.5x)
assert!(
(reversed.stability - before.stability).abs() < 1e-6,
"reverse must restore stability to {} (got {})",
before.stability,
reversed.stability
);
assert!(
(reversed.retrieval_strength - before.retrieval_strength).abs() < 1e-6,
"reverse must restore retrieval_strength"
);
assert!(
(reversed.retention_strength - before.retention_strength).abs() < 1e-6,
"reverse must restore retention_strength"
);
}
#[test]
fn backfill_autofire_gate_defaults_on_and_reads_opt_out() {
// v2.2.1 opt-out semantics: unset => ON (preserves shipped v2.2.0

View file

@ -77,13 +77,15 @@ impl SqliteMemoryStore {
.lock()
.map_err(|_| StorageError::Init("Writer lock poisoned".into()))?;
let seq: i64 = writer
.query_row(
"SELECT COALESCE(MAX(seq), -1) + 1 FROM agent_traces WHERE run_id = ?1",
params![run_id],
|r| r.get(0),
)
.unwrap_or(0);
// Propagate a seq-query failure instead of defaulting to 0: swallowing
// the error with unwrap_or(0) could write a duplicate seq=0 for a run
// that already has events, corrupting Black Box replay ordering. On an
// empty run COALESCE(...,-1)+1 already yields 0 correctly.
let seq: i64 = writer.query_row(
"SELECT COALESCE(MAX(seq), -1) + 1 FROM agent_traces WHERE run_id = ?1",
params![run_id],
|r| r.get(0),
)?;
writer.execute(
"INSERT INTO agent_traces (id, run_id, seq, event_type, tool, payload, at, created_at)
@ -435,12 +437,25 @@ impl SqliteMemoryStore {
.writer
.lock()
.map_err(|_| StorageError::Init("Writer lock poisoned".into()))?;
// Only a still-pending PR may be decided. The `AND status = 'pending'`
// guard makes decisions final: re-POSTing an action on an already
// promoted/forgotten/merged PR cannot flip its status, re-run its
// side effects (e.g. release_quarantine resurrecting a rejected
// memory), or overwrite the audit ledger (decision/decided_at).
let changed = writer.execute(
"UPDATE memory_prs SET status = ?1, decision = ?2, decided_at = ?3 WHERE id = ?4",
"UPDATE memory_prs SET status = ?1, decision = ?2, decided_at = ?3
WHERE id = ?4 AND status = 'pending'",
params![new_status.as_str(), decision, now, id],
)?;
if changed == 0 {
return Err(StorageError::NotFound(id.to_string()));
// Distinguish "no such PR" from "already decided" so callers get
// a truthful error instead of a misleading NotFound.
return Err(match self.get_memory_pr(id)? {
Some(_) => StorageError::Init(format!(
"memory PR {id} is already decided and cannot be re-decided"
)),
None => StorageError::NotFound(id.to_string()),
});
}
}
self.get_memory_pr(id)?

View file

@ -2006,10 +2006,16 @@ fn run_backup(output: PathBuf) -> anyhow::Result<()> {
let _ = storage.get_stats()?;
}
// Also flush WAL directly via a separate connection for safety
// Also flush WAL directly via a separate connection for extra safety. This
// is a raw, UN-keyed connection, so on a SQLCipher-encrypted DB it cannot
// read the header and the checkpoint fails with "file is not a database".
// The keyed `storage` opened above already flushed the WAL on drop, so this
// is redundant belt-and-suspenders — make it best-effort instead of letting
// it abort the whole backup on encrypted databases.
{
let conn = rusqlite::Connection::open(&db_path)?;
conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")?;
if let Ok(conn) = rusqlite::Connection::open(&db_path) {
let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
}
}
// Create parent directories if needed
@ -2592,7 +2598,16 @@ fn run_ingest(
{
let result = storage.smart_ingest(input)?;
if let Some(days) = ago_days {
let when = chrono::Utc::now() - chrono::Duration::days(days);
// Duration::days panics on overflow for extreme inputs; try_days
// returns None instead. The subtraction itself can ALSO overflow the
// DateTime range, so use checked_sub_signed rather than `-` (which
// panics). Both the construction and the subtraction are guarded.
let delta = chrono::Duration::try_days(days).ok_or_else(|| {
anyhow::anyhow!("--ago-days value {days} is out of the supported range")
})?;
let when = chrono::Utc::now().checked_sub_signed(delta).ok_or_else(|| {
anyhow::anyhow!("--ago-days value {days} is out of the supported range")
})?;
storage.set_created_at(&result.node.id, when)?;
}
println!("{}", "=== Vestige Ingest ===".cyan().bold());
@ -2622,7 +2637,16 @@ fn run_ingest(
{
let node = storage.ingest(input)?;
if let Some(days) = ago_days {
let when = chrono::Utc::now() - chrono::Duration::days(days);
// Duration::days panics on overflow for extreme inputs; try_days
// returns None instead. The subtraction itself can ALSO overflow the
// DateTime range, so use checked_sub_signed rather than `-` (which
// panics). Both the construction and the subtraction are guarded.
let delta = chrono::Duration::try_days(days).ok_or_else(|| {
anyhow::anyhow!("--ago-days value {days} is out of the supported range")
})?;
let when = chrono::Utc::now().checked_sub_signed(delta).ok_or_else(|| {
anyhow::anyhow!("--ago-days value {days} is out of the supported range")
})?;
storage.set_created_at(&node.id, when)?;
}
println!("{}", "=== Vestige Ingest ===".cyan().bold());

View file

@ -1025,10 +1025,12 @@ pub async fn get_changelog(
}
// Connections are currently persisted as graph edges rather than as audit
// rows, so filter by created_at from the connection table.
// rows, so filter by created_at from the connection table. Fetch only the
// most recent `fetch_limit` connections (not the entire table) — this
// endpoint is polled once per wake and must stay cheap on a large graph.
let connections = state
.storage
.get_all_connections()
.get_recent_connections(fetch_limit as usize)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
for conn in connections {
if changelog_window_contains(conn.created_at, start.as_ref(), end.as_ref()) {

View file

@ -105,7 +105,11 @@ pub async fn execute(storage: &Arc<Storage>, args: Option<Value>) -> Result<Valu
.any(|topic| topic.to_lowercase().contains(&t.to_lowercase()))
})
.count();
matching as f64 / topics.len().max(1) as f64
// Numerator counts matching TAGS while the denominator is the
// number of TOPICS, so the ratio can exceed 1.0 when several tags
// match (unit mismatch). Clamp to keep this a bounded [0,1] score
// that doesn't distort the weighted ranking below.
(matching as f64 / topics.len().max(1) as f64).min(1.0)
};
// 3. Project match

View file

@ -152,9 +152,11 @@ pub async fn execute(
}
};
let global_force = match args.force_create {
Some(true) => true,
Some(false) if batch_merge_policy == "smart" => false,
Some(false) => default_force_create,
// An EXPLICIT forceCreate is authoritative and must be honored in both
// policies. Previously `Some(false)` under the default 'force_create'
// policy fell through to `default_force_create` (= true), silently
// inverting the caller's explicit false into a force-create.
Some(explicit) => explicit,
None => default_force_create,
};
return execute_batch(storage, cognitive, items, global_force, &batch_merge_policy).await;
@ -857,12 +859,13 @@ mod tests {
#[tokio::test]
async fn test_batch_defaults_to_force_create_for_caller_separated_items() {
// Default policy (no explicit forceCreate) force-creates each
// caller-separated item so they stay separate.
let (storage, _dir) = test_storage().await;
let result = execute(
&storage,
&test_cognitive(),
Some(serde_json::json!({
"forceCreate": false,
"items": [
{ "content": "Jira tickets should not auto-assign sprint fields." },
{ "content": "Sprint planning summaries should not append Jira status labels." }
@ -881,6 +884,36 @@ mod tests {
}
}
#[tokio::test]
async fn test_batch_explicit_force_create_false_is_honored() {
// Regression (#130): an EXPLICIT forceCreate:false must NOT be silently
// inverted to force-create by the default policy. Distinct/novel items are
// still created (PE gating creates novel content), but NOT via the
// "Forced creation" path.
let (storage, _dir) = test_storage().await;
let result = execute(
&storage,
&test_cognitive(),
Some(serde_json::json!({
"forceCreate": false,
"items": [
{ "content": "Jira tickets should not auto-assign sprint fields." },
{ "content": "Sprint planning summaries should not append Jira status labels." }
]
})),
)
.await;
let value = result.unwrap();
assert_eq!(value["summary"]["created"], 2, "novel items still created");
for item in value["results"].as_array().unwrap() {
assert!(
!item["reason"].as_str().unwrap().contains("Forced creation"),
"explicit forceCreate:false must not force-create"
);
}
}
#[tokio::test]
async fn test_batch_rejects_invalid_merge_policy() {
let (storage, _dir) = test_storage().await;

View file

@ -111,7 +111,10 @@ pub async fn execute(storage: &Arc<Storage>, args: Option<Value>) -> Result<Valu
None => return Err("Missing arguments".to_string()),
};
let max_pages = args.max_pages.unwrap_or(10);
// Clamp to the schema's advertised maximum (1000). The JSON-schema `maximum`
// is advisory only — a client can still send a larger value — so enforce it
// here to bound the paginated fetch loop.
let max_pages = args.max_pages.unwrap_or(10).clamp(1, 1000);
match args.source.as_str() {
"github" => {