Merge remote-tracking branch 'origin/main' into codex/opencode-sigill-salvage

This commit is contained in:
Sam Valladares 2026-06-18 19:59:25 -05:00
commit ea5ed28081
26 changed files with 6997 additions and 91 deletions

View file

@ -129,6 +129,8 @@ usearch = { version = "=2.23.0", default-features = false, optional = true }
# LRU cache for query embeddings
lru = "0.16"
trait-variant = "0.1"
blake3 = "1"
[dev-dependencies]
tempfile = "3"

View file

@ -0,0 +1,181 @@
//! `FastembedEmbedder` -- adapts the existing `EmbeddingService` to the
//! `LocalEmbedder` trait.
#[cfg(feature = "embeddings")]
use crate::embeddings::{EMBEDDING_DIMENSIONS, EmbeddingService};
use super::{EmbedderError, EmbedderResult, EmbedderSend};
pub struct FastembedEmbedder {
#[cfg(feature = "embeddings")]
inner: EmbeddingService,
cached_hash: std::sync::OnceLock<String>,
}
impl FastembedEmbedder {
pub fn new() -> Self {
Self {
#[cfg(feature = "embeddings")]
inner: EmbeddingService::new(),
cached_hash: std::sync::OnceLock::new(),
}
}
fn compute_hash(name: &str, dim: usize) -> String {
let mut hasher = blake3::Hasher::new();
hasher.update(name.as_bytes());
hasher.update(&(dim as u64).to_le_bytes());
// fastembed's ONNX bytes are not directly accessible at runtime; we
// use `(name, dim, vestige-core CARGO_PKG_VERSION)` as the
// signature. If fastembed ever changes its output deterministically
// between minor versions, bumping the crate version triggers a
// mismatch -- which is exactly the drift we want to detect.
hasher.update(env!("CARGO_PKG_VERSION").as_bytes());
hasher.finalize().to_hex().to_string()
}
}
impl Default for FastembedEmbedder {
fn default() -> Self {
Self::new()
}
}
impl EmbedderSend for FastembedEmbedder {
async fn embed(&self, text: &str) -> EmbedderResult<Vec<f32>> {
#[cfg(feature = "embeddings")]
{
let emb = self
.inner
.embed(text)
.map_err(|e| EmbedderError::EmbedFailed(e.to_string()))?;
Ok(emb.vector)
}
#[cfg(not(feature = "embeddings"))]
{
let _ = text;
Err(EmbedderError::Init(
"embeddings feature not enabled".to_string(),
))
}
}
fn model_name(&self) -> &str {
#[cfg(feature = "embeddings")]
{
self.inner.model_name()
}
#[cfg(not(feature = "embeddings"))]
{
"nomic-ai/nomic-embed-text-v1.5"
}
}
fn dimension(&self) -> usize {
#[cfg(feature = "embeddings")]
{
EMBEDDING_DIMENSIONS
}
#[cfg(not(feature = "embeddings"))]
{
256
}
}
fn model_hash(&self) -> String {
self.cached_hash
.get_or_init(|| Self::compute_hash(self.model_name(), self.dimension()))
.clone()
}
async fn embed_batch(&self, texts: &[&str]) -> EmbedderResult<Vec<Vec<f32>>> {
#[cfg(feature = "embeddings")]
{
let embs = self
.inner
.embed_batch(texts)
.map_err(|e| EmbedderError::EmbedFailed(e.to_string()))?;
Ok(embs.into_iter().map(|e| e.vector).collect())
}
#[cfg(not(feature = "embeddings"))]
{
let _ = texts;
Err(EmbedderError::Init(
"embeddings feature not enabled".to_string(),
))
}
}
}
// ============================================================================
// UNIT TESTS
// ============================================================================
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn embedder_reports_correct_name() {
let e = FastembedEmbedder::new();
assert!(
e.model_name().contains("nomic"),
"model name should contain 'nomic'"
);
}
#[test]
fn embedder_reports_256_dimension() {
let e = FastembedEmbedder::new();
assert_eq!(e.dimension(), 256);
}
#[test]
fn embedder_hash_is_stable() {
let e = FastembedEmbedder::new();
let h1 = e.model_hash();
let h2 = e.model_hash();
assert_eq!(h1, h2, "model_hash must be stable across calls");
}
#[test]
fn embedder_hash_includes_crate_version() {
// Compute what the hash should be given the known inputs
let name = FastembedEmbedder::new().model_name().to_string();
let dim = FastembedEmbedder::new().dimension();
let expected = FastembedEmbedder::compute_hash(&name, dim);
let got = FastembedEmbedder::new().model_hash();
assert_eq!(got, expected);
}
#[test]
fn embedder_signature_matches_accessors() {
let e = FastembedEmbedder::new();
let sig = e.signature();
assert_eq!(sig.name, e.model_name());
assert_eq!(sig.dimension, e.dimension());
assert_eq!(sig.hash, e.model_hash());
}
#[cfg(feature = "embeddings")]
#[test]
fn embedder_embed_smoke() {
let e = FastembedEmbedder::new();
let rt = tokio::runtime::Runtime::new().unwrap();
let vec = rt.block_on(e.embed("hello world")).expect("embed");
assert_eq!(vec.len(), 256);
}
#[cfg(feature = "embeddings")]
#[test]
fn embedder_embed_batch_matches_sequential() {
let e = FastembedEmbedder::new();
let rt = tokio::runtime::Runtime::new().unwrap();
let texts = ["alpha beta", "gamma delta"];
let batch = rt.block_on(e.embed_batch(texts.as_ref())).expect("batch");
let seq_a = rt.block_on(e.embed(texts[0])).expect("seq a");
let seq_b = rt.block_on(e.embed(texts[1])).expect("seq b");
assert_eq!(batch[0], seq_a);
assert_eq!(batch[1], seq_b);
}
}

View file

@ -0,0 +1,109 @@
//! Text-to-vector encoding trait. Pluggable per-install.
use std::future::Future;
use std::pin::Pin;
mod fastembed;
pub use fastembed::FastembedEmbedder;
/// Error returned by every `Embedder` method.
#[non_exhaustive]
#[derive(Debug, thiserror::Error)]
pub enum EmbedderError {
#[error("embedder initialization failed: {0}")]
Init(String),
#[error("embedding generation failed: {0}")]
EmbedFailed(String),
#[error("invalid input: {0}")]
InvalidInput(String),
}
pub type EmbedderResult<T> = std::result::Result<T, EmbedderError>;
/// Boxed Send future returning an `EmbedderResult<T>`, bound to the lifetime
/// of the borrows captured by the call. Used as the return type of every
/// async method on the dyn-compatible `Embedder` trait below.
pub type BoxedEmbedderFuture<'a, T> = Pin<Box<dyn Future<Output = EmbedderResult<T>> + Send + 'a>>;
/// Pluggable embedder. The storage layer NEVER calls fastembed directly;
/// callers compute vectors via this trait and pass them into `MemoryStore`.
///
/// `LocalEmbedder` is the source-of-truth trait declared with native
/// async-fn-in-trait. `#[trait_variant::make(EmbedderSend: Send)]` derives
/// a Send-bounded variant that backends actually implement (the
/// trait_variant 0.1.x blanket goes variant -> source). The dyn-compatible
/// public surface is the `Embedder` trait declared below, which wraps every
/// async method in `Pin<Box<dyn Future + Send + '_>>`.
#[trait_variant::make(EmbedderSend: Send)]
pub trait LocalEmbedder: Sync + 'static {
async fn embed(&self, text: &str) -> EmbedderResult<Vec<f32>>;
fn model_name(&self) -> &str;
fn dimension(&self) -> usize;
/// Stable blake3 hash of (model_name || dimension || vestige-core crate version).
/// Lowercase hex, 64 chars.
///
/// Used by `MemoryStore::register_model` to detect silent model drift
/// (e.g. a fastembed minor upgrade that changes vector output).
fn model_hash(&self) -> String;
async fn embed_batch(&self, texts: &[&str]) -> EmbedderResult<Vec<Vec<f32>>>;
/// Returns the `ModelSignature` describing this embedder. Convenience
/// wrapper over the three accessors above.
fn signature(&self) -> crate::storage::ModelSignature {
crate::storage::ModelSignature {
name: self.model_name().to_string(),
dimension: self.dimension(),
hash: self.model_hash(),
}
}
}
/// Dyn-compatible embedder trait.
///
/// `EmbedderSend` above is the trait users implement; it uses native
/// async-fn-in-trait return types (RPITIT), which gives zero-allocation
/// static dispatch but is not dyn-safe. This trait wraps every async
/// method in `Pin<Box<dyn Future + Send + '_>>` so `Box<dyn Embedder>`
/// and `Arc<dyn Embedder>` work for the cognitive module surface and
/// the Phase 1 integration tests.
///
/// Implementations should not target this trait directly; the blanket
/// `impl<T: EmbedderSend> Embedder for T` adapts every Send-variant
/// implementation automatically.
pub trait Embedder: Send + Sync + 'static {
fn embed<'a>(&'a self, text: &'a str) -> BoxedEmbedderFuture<'a, Vec<f32>>;
fn embed_batch<'a>(&'a self, texts: &'a [&'a str]) -> BoxedEmbedderFuture<'a, Vec<Vec<f32>>>;
fn model_name(&self) -> &str;
fn dimension(&self) -> usize;
fn model_hash(&self) -> String;
fn signature(&self) -> crate::storage::ModelSignature;
}
impl<T> Embedder for T
where
T: EmbedderSend,
{
fn embed<'a>(&'a self, text: &'a str) -> BoxedEmbedderFuture<'a, Vec<f32>> {
Box::pin(<T as EmbedderSend>::embed(self, text))
}
fn embed_batch<'a>(&'a self, texts: &'a [&'a str]) -> BoxedEmbedderFuture<'a, Vec<Vec<f32>>> {
Box::pin(<T as EmbedderSend>::embed_batch(self, texts))
}
fn model_name(&self) -> &str {
<T as EmbedderSend>::model_name(self)
}
fn dimension(&self) -> usize {
<T as EmbedderSend>::dimension(self)
}
fn model_hash(&self) -> String {
<T as EmbedderSend>::model_hash(self)
}
fn signature(&self) -> crate::storage::ModelSignature {
<T as EmbedderSend>::signature(self)
}
}

View file

@ -83,6 +83,7 @@
/// Optional `vestige.toml` configuration (Phase 2: Configurable Output).
pub mod config;
pub mod consolidation;
pub mod embedder;
pub mod fsrs;
pub mod fts;
pub mod memory;
@ -155,13 +156,50 @@ pub use fsrs::{
};
// Configuration (vestige.toml output profiles / defaults)
pub use config::{OutputConfig, OutputDefaults, OutputProfile, VestigeConfig, CONFIG_FILE};
pub use config::{CONFIG_FILE, OutputConfig, OutputDefaults, OutputProfile, VestigeConfig};
// Storage layer
pub use storage::{
ConnectionRecord, ConsolidationHistoryRecord, DreamHistoryRecord, InsightRecord,
IntentionRecord, PORTABLE_ARCHIVE_FORMAT, PortableArchive, PortableImportMode,
PortableImportReport, Result, SmartIngestResult, StateTransitionRecord, Storage, StorageError,
ClassificationResult,
CompositionEventRecord,
CompositionMemberRecord,
CompositionNeighborRecord,
CompositionOutcomeRecord,
ConnectionRecord,
ConsolidationHistoryRecord,
Domain,
DreamHistoryRecord,
HealthStatus,
InsightRecord,
IntentionRecord,
LocalMemoryStore,
MemoryEdge,
MemoryRecord,
MemoryStore,
MemoryStoreError,
MemoryStoreResult,
ModelSignature,
NeverComposedCandidate,
PORTABLE_ARCHIVE_FORMAT,
PortableArchive,
PortableImportMode,
PortableImportReport,
Result,
SchedulingState,
SearchQuery,
SmartIngestResult,
SqliteMemoryStore,
StateTransitionRecord,
Storage,
StorageError,
StoreStats,
// Note: storage::SearchResult is intentionally not re-exported here to avoid
// collision with memory::SearchResult. Use vestige_core::storage::SearchResult directly.
};
// Embedder trait and implementations
pub use embedder::{
Embedder, EmbedderError, EmbedderResult, EmbedderSend, FastembedEmbedder, LocalEmbedder,
};
// Consolidation (sleep-inspired memory processing)
@ -220,6 +258,9 @@ pub use advanced::{
LabileState,
Language,
MaintenanceType,
// Merge / Supersede controls (Phase 3)
MatchClass,
MatchSignals,
// Memory chains
MemoryChainBuilder,
// Memory compression
@ -230,18 +271,15 @@ pub use advanced::{
MemoryPath,
MemoryReplay,
MemorySnapshot,
// Merge / Supersede controls (Phase 3)
MatchClass,
MatchSignals,
MergeCandidate,
MergeOperation,
MergePlan,
MergePolicy,
MergeStrategy,
Modification,
PlanKind,
Pattern,
PatternType,
PlanKind,
PredictedMemory,
PredictionContext,
PredictionErrorConfig,

View file

@ -0,0 +1,516 @@
//! Backend-agnostic memory store trait.
//!
//! This is the single abstraction every cognitive module sits above. It is
//! intentionally flat: one trait, ~25 methods, no sub-traits.
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
// ----------------------------------------------------------------------------
// ERROR
// ----------------------------------------------------------------------------
/// Error returned by every `LocalMemoryStore` / `MemoryStore` method.
#[non_exhaustive]
#[derive(Debug, thiserror::Error)]
pub enum MemoryStoreError {
#[error("not found: {0}")]
NotFound(String),
#[error("backend error: {0}")]
Backend(String),
#[error(
"embedding model mismatch: store registered {registered_name} (dim {registered_dim}, \
hash {registered_hash}), embedder is {actual_name} (dim {actual_dim}, hash {actual_hash})"
)]
ModelMismatch {
registered_name: String,
registered_dim: usize,
registered_hash: String,
actual_name: String,
actual_dim: usize,
actual_hash: String,
},
#[error("invalid input: {0}")]
InvalidInput(String),
#[error("initialization error: {0}")]
Init(String),
}
impl From<crate::storage::StorageError> for MemoryStoreError {
fn from(e: crate::storage::StorageError) -> Self {
use crate::storage::StorageError as S;
match e {
S::NotFound(s) => MemoryStoreError::NotFound(s),
S::Database(e) => MemoryStoreError::Backend(e.to_string()),
S::Io(e) => MemoryStoreError::Backend(e.to_string()),
S::InvalidTimestamp(s) => MemoryStoreError::Backend(format!("invalid timestamp: {s}")),
S::Init(s) => MemoryStoreError::Init(s),
}
}
}
pub type MemoryStoreResult<T> = std::result::Result<T, MemoryStoreError>;
// ----------------------------------------------------------------------------
// DATA TYPES
// ----------------------------------------------------------------------------
/// Backend-agnostic memory record.
///
/// Phase 1 intentionally keeps this type independent of `KnowledgeNode` to
/// avoid dragging 30+ legacy fields through the trait surface. The SQLite
/// backend converts between `MemoryRecord` and `KnowledgeNode` at the
/// boundary.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MemoryRecord {
pub id: Uuid,
/// Empty = unclassified. Populated in Phase 4.
pub domains: Vec<String>,
/// Raw similarity per domain centroid. Empty until Phase 4 runs clustering.
pub domain_scores: HashMap<String, f64>,
pub content: String,
pub node_type: String,
pub tags: Vec<String>,
pub embedding: Option<Vec<f32>>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub metadata: serde_json::Value,
}
/// FSRS-6 scheduling state, one row per memory.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SchedulingState {
pub memory_id: Uuid,
pub stability: f64,
pub difficulty: f64,
pub retrievability: f64,
pub last_review: Option<DateTime<Utc>>,
pub next_review: Option<DateTime<Utc>>,
pub reps: u32,
pub lapses: u32,
}
/// Hybrid search request.
#[derive(Debug, Clone, Default)]
pub struct SearchQuery {
pub domains: Option<Vec<String>>,
pub text: Option<String>,
pub embedding: Option<Vec<f32>>,
pub tags: Option<Vec<String>>,
pub node_types: Option<Vec<String>>,
pub limit: usize,
pub min_retrievability: Option<f64>,
}
#[derive(Debug, Clone)]
pub struct SearchResult {
pub record: MemoryRecord,
pub score: f64,
pub fts_score: Option<f64>,
pub vector_score: Option<f64>,
}
/// Edge in the spreading-activation graph.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MemoryEdge {
pub source_id: Uuid,
pub target_id: Uuid,
pub edge_type: String,
pub weight: f64,
pub created_at: DateTime<Utc>,
}
/// A topical domain (populated in Phase 4). Phase 1 only needs the type to
/// shape the trait surface; discover/classify are Phase 4 work.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Domain {
pub id: String,
pub label: String,
pub centroid: Vec<f32>,
pub top_terms: Vec<String>,
pub memory_count: usize,
pub created_at: DateTime<Utc>,
}
/// Result of classifying one vector against all known domains.
#[derive(Debug, Clone)]
pub struct ClassificationResult {
pub scores: HashMap<String, f64>,
pub domains: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct StoreStats {
pub total_memories: usize,
pub memories_with_embeddings: usize,
pub total_edges: usize,
pub total_domains: usize,
pub registered_model_name: Option<String>,
pub registered_model_dim: Option<usize>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum HealthStatus {
Healthy,
Degraded { reason: String },
Unavailable { reason: String },
}
// ----------------------------------------------------------------------------
// EMBEDDING MODEL SIGNATURE
// ----------------------------------------------------------------------------
/// Snapshot of the embedding model that was used to write vectors into the
/// store. Persisted in the `embedding_model` table; compared on every write
/// before the vector is accepted.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ModelSignature {
pub name: String,
pub dimension: usize,
/// Lowercase hex-encoded blake3 hash, 64 chars.
pub hash: String,
}
// ----------------------------------------------------------------------------
// TRAIT
// ----------------------------------------------------------------------------
/// Internal source trait declared with native async-fn-in-trait.
///
/// `#[trait_variant::make(MemoryStoreSend: Send)]` derives a Send-bounded
/// variant whose returned futures are `Send`. In trait_variant 0.1.x the
/// macro emits the blanket `impl<T: MemoryStoreSend> LocalMemoryStore for T`,
/// so backends implement `MemoryStoreSend` (the Send variant) and get
/// `LocalMemoryStore` (the non-Send variant) for free.
///
/// Most callers should reach for the dyn-compatible `MemoryStore` trait
/// declared below, which adapts `MemoryStoreSend` into a boxed-future surface
/// and is the public storage abstraction for cognitive modules and tests
/// that want `Arc<dyn MemoryStore>`.
#[trait_variant::make(MemoryStoreSend: Send)]
pub trait LocalMemoryStore: Sync + 'static {
// --- Lifecycle ---
async fn init(&self) -> MemoryStoreResult<()>;
async fn health_check(&self) -> MemoryStoreResult<HealthStatus>;
// --- Embedding model registry ---
async fn registered_model(&self) -> MemoryStoreResult<Option<ModelSignature>>;
async fn register_model(&self, sig: &ModelSignature) -> MemoryStoreResult<()>;
// --- CRUD ---
async fn insert(&self, record: &MemoryRecord) -> MemoryStoreResult<Uuid>;
async fn get(&self, id: Uuid) -> MemoryStoreResult<Option<MemoryRecord>>;
async fn update(&self, record: &MemoryRecord) -> MemoryStoreResult<()>;
async fn delete(&self, id: Uuid) -> MemoryStoreResult<()>;
// --- Search ---
async fn search(&self, query: &SearchQuery) -> MemoryStoreResult<Vec<SearchResult>>;
async fn fts_search(&self, text: &str, limit: usize) -> MemoryStoreResult<Vec<SearchResult>>;
async fn vector_search(
&self,
embedding: &[f32],
limit: usize,
) -> MemoryStoreResult<Vec<SearchResult>>;
// --- FSRS Scheduling ---
async fn get_scheduling(&self, memory_id: Uuid) -> MemoryStoreResult<Option<SchedulingState>>;
async fn update_scheduling(&self, state: &SchedulingState) -> MemoryStoreResult<()>;
async fn get_due_memories(
&self,
before: DateTime<Utc>,
limit: usize,
) -> MemoryStoreResult<Vec<(MemoryRecord, SchedulingState)>>;
// --- Graph (spreading activation) ---
async fn add_edge(&self, edge: &MemoryEdge) -> MemoryStoreResult<()>;
async fn get_edges(
&self,
node_id: Uuid,
edge_type: Option<&str>,
) -> MemoryStoreResult<Vec<MemoryEdge>>;
async fn remove_edge(&self, source: Uuid, target: Uuid) -> MemoryStoreResult<()>;
async fn get_neighbors(
&self,
node_id: Uuid,
depth: usize,
) -> MemoryStoreResult<Vec<(MemoryRecord, f64)>>;
// --- Domains (Phase 1: stubs return empty; full impl in Phase 4) ---
async fn list_domains(&self) -> MemoryStoreResult<Vec<Domain>>;
async fn get_domain(&self, id: &str) -> MemoryStoreResult<Option<Domain>>;
async fn upsert_domain(&self, domain: &Domain) -> MemoryStoreResult<()>;
async fn delete_domain(&self, id: &str) -> MemoryStoreResult<()>;
/// Phase 1: returns `Ok(vec![])` since no centroids exist. Phase 4 wires
/// the full soft-assignment pass.
async fn classify(&self, embedding: &[f32]) -> MemoryStoreResult<Vec<(String, f64)>>;
// --- Bulk / Maintenance ---
async fn count(&self) -> MemoryStoreResult<usize>;
async fn get_stats(&self) -> MemoryStoreResult<StoreStats>;
async fn vacuum(&self) -> MemoryStoreResult<()>;
}
// ----------------------------------------------------------------------------
// DYN-COMPATIBLE STORAGE TRAIT
// ----------------------------------------------------------------------------
/// Boxed Send future returning a `MemoryStoreResult<T>`, bound to the lifetime
/// of the borrows captured by the call (typically `&self` plus any reference
/// arguments). Used as the return type of every method on the dyn-compatible
/// `MemoryStore` trait below.
pub type BoxedStoreFuture<'a, T> = Pin<Box<dyn Future<Output = MemoryStoreResult<T>> + Send + 'a>>;
/// Dyn-compatible storage trait.
///
/// `MemoryStoreSend` above is the trait users implement; it uses native
/// async-fn-in-trait return types (RPITIT), which gives zero-allocation
/// static dispatch but is not dyn-safe. This trait wraps every method in
/// `Pin<Box<dyn Future + Send + '_>>` so `Arc<dyn MemoryStore>` works for
/// the cognitive module surface and the Phase 1 integration tests.
///
/// Implementations should not target this trait directly; the blanket
/// `impl<T: MemoryStoreSend> MemoryStore for T` adapts every Send-variant
/// implementation automatically. Each call boxes the returned future
/// exactly once, identical to the cost of the previous design.
pub trait MemoryStore: Send + Sync + 'static {
fn init<'a>(&'a self) -> BoxedStoreFuture<'a, ()>;
fn health_check<'a>(&'a self) -> BoxedStoreFuture<'a, HealthStatus>;
fn registered_model<'a>(&'a self) -> BoxedStoreFuture<'a, Option<ModelSignature>>;
fn register_model<'a>(&'a self, sig: &'a ModelSignature) -> BoxedStoreFuture<'a, ()>;
fn insert<'a>(&'a self, record: &'a MemoryRecord) -> BoxedStoreFuture<'a, Uuid>;
fn get<'a>(&'a self, id: Uuid) -> BoxedStoreFuture<'a, Option<MemoryRecord>>;
fn update<'a>(&'a self, record: &'a MemoryRecord) -> BoxedStoreFuture<'a, ()>;
fn delete<'a>(&'a self, id: Uuid) -> BoxedStoreFuture<'a, ()>;
fn search<'a>(&'a self, query: &'a SearchQuery) -> BoxedStoreFuture<'a, Vec<SearchResult>>;
fn fts_search<'a>(
&'a self,
text: &'a str,
limit: usize,
) -> BoxedStoreFuture<'a, Vec<SearchResult>>;
fn vector_search<'a>(
&'a self,
embedding: &'a [f32],
limit: usize,
) -> BoxedStoreFuture<'a, Vec<SearchResult>>;
fn get_scheduling<'a>(
&'a self,
memory_id: Uuid,
) -> BoxedStoreFuture<'a, Option<SchedulingState>>;
fn update_scheduling<'a>(&'a self, state: &'a SchedulingState) -> BoxedStoreFuture<'a, ()>;
fn get_due_memories<'a>(
&'a self,
before: DateTime<Utc>,
limit: usize,
) -> BoxedStoreFuture<'a, Vec<(MemoryRecord, SchedulingState)>>;
fn add_edge<'a>(&'a self, edge: &'a MemoryEdge) -> BoxedStoreFuture<'a, ()>;
fn get_edges<'a>(
&'a self,
node_id: Uuid,
edge_type: Option<&'a str>,
) -> BoxedStoreFuture<'a, Vec<MemoryEdge>>;
fn remove_edge<'a>(&'a self, source: Uuid, target: Uuid) -> BoxedStoreFuture<'a, ()>;
fn get_neighbors<'a>(
&'a self,
node_id: Uuid,
depth: usize,
) -> BoxedStoreFuture<'a, Vec<(MemoryRecord, f64)>>;
fn list_domains<'a>(&'a self) -> BoxedStoreFuture<'a, Vec<Domain>>;
fn get_domain<'a>(&'a self, id: &'a str) -> BoxedStoreFuture<'a, Option<Domain>>;
fn upsert_domain<'a>(&'a self, domain: &'a Domain) -> BoxedStoreFuture<'a, ()>;
fn delete_domain<'a>(&'a self, id: &'a str) -> BoxedStoreFuture<'a, ()>;
fn classify<'a>(&'a self, embedding: &'a [f32]) -> BoxedStoreFuture<'a, Vec<(String, f64)>>;
fn count<'a>(&'a self) -> BoxedStoreFuture<'a, usize>;
fn get_stats<'a>(&'a self) -> BoxedStoreFuture<'a, StoreStats>;
fn vacuum<'a>(&'a self) -> BoxedStoreFuture<'a, ()>;
}
impl<T> MemoryStore for T
where
T: MemoryStoreSend,
{
fn init<'a>(&'a self) -> BoxedStoreFuture<'a, ()> {
Box::pin(<T as MemoryStoreSend>::init(self))
}
fn health_check<'a>(&'a self) -> BoxedStoreFuture<'a, HealthStatus> {
Box::pin(<T as MemoryStoreSend>::health_check(self))
}
fn registered_model<'a>(&'a self) -> BoxedStoreFuture<'a, Option<ModelSignature>> {
Box::pin(<T as MemoryStoreSend>::registered_model(self))
}
fn register_model<'a>(&'a self, sig: &'a ModelSignature) -> BoxedStoreFuture<'a, ()> {
Box::pin(<T as MemoryStoreSend>::register_model(self, sig))
}
fn insert<'a>(&'a self, record: &'a MemoryRecord) -> BoxedStoreFuture<'a, Uuid> {
Box::pin(<T as MemoryStoreSend>::insert(self, record))
}
fn get<'a>(&'a self, id: Uuid) -> BoxedStoreFuture<'a, Option<MemoryRecord>> {
Box::pin(<T as MemoryStoreSend>::get(self, id))
}
fn update<'a>(&'a self, record: &'a MemoryRecord) -> BoxedStoreFuture<'a, ()> {
Box::pin(<T as MemoryStoreSend>::update(self, record))
}
fn delete<'a>(&'a self, id: Uuid) -> BoxedStoreFuture<'a, ()> {
Box::pin(<T as MemoryStoreSend>::delete(self, id))
}
fn search<'a>(&'a self, query: &'a SearchQuery) -> BoxedStoreFuture<'a, Vec<SearchResult>> {
Box::pin(<T as MemoryStoreSend>::search(self, query))
}
fn fts_search<'a>(
&'a self,
text: &'a str,
limit: usize,
) -> BoxedStoreFuture<'a, Vec<SearchResult>> {
Box::pin(<T as MemoryStoreSend>::fts_search(self, text, limit))
}
fn vector_search<'a>(
&'a self,
embedding: &'a [f32],
limit: usize,
) -> BoxedStoreFuture<'a, Vec<SearchResult>> {
Box::pin(<T as MemoryStoreSend>::vector_search(
self, embedding, limit,
))
}
fn get_scheduling<'a>(
&'a self,
memory_id: Uuid,
) -> BoxedStoreFuture<'a, Option<SchedulingState>> {
Box::pin(<T as MemoryStoreSend>::get_scheduling(self, memory_id))
}
fn update_scheduling<'a>(&'a self, state: &'a SchedulingState) -> BoxedStoreFuture<'a, ()> {
Box::pin(<T as MemoryStoreSend>::update_scheduling(self, state))
}
fn get_due_memories<'a>(
&'a self,
before: DateTime<Utc>,
limit: usize,
) -> BoxedStoreFuture<'a, Vec<(MemoryRecord, SchedulingState)>> {
Box::pin(<T as MemoryStoreSend>::get_due_memories(
self, before, limit,
))
}
fn add_edge<'a>(&'a self, edge: &'a MemoryEdge) -> BoxedStoreFuture<'a, ()> {
Box::pin(<T as MemoryStoreSend>::add_edge(self, edge))
}
fn get_edges<'a>(
&'a self,
node_id: Uuid,
edge_type: Option<&'a str>,
) -> BoxedStoreFuture<'a, Vec<MemoryEdge>> {
Box::pin(<T as MemoryStoreSend>::get_edges(self, node_id, edge_type))
}
fn remove_edge<'a>(&'a self, source: Uuid, target: Uuid) -> BoxedStoreFuture<'a, ()> {
Box::pin(<T as MemoryStoreSend>::remove_edge(self, source, target))
}
fn get_neighbors<'a>(
&'a self,
node_id: Uuid,
depth: usize,
) -> BoxedStoreFuture<'a, Vec<(MemoryRecord, f64)>> {
Box::pin(<T as MemoryStoreSend>::get_neighbors(self, node_id, depth))
}
fn list_domains<'a>(&'a self) -> BoxedStoreFuture<'a, Vec<Domain>> {
Box::pin(<T as MemoryStoreSend>::list_domains(self))
}
fn get_domain<'a>(&'a self, id: &'a str) -> BoxedStoreFuture<'a, Option<Domain>> {
Box::pin(<T as MemoryStoreSend>::get_domain(self, id))
}
fn upsert_domain<'a>(&'a self, domain: &'a Domain) -> BoxedStoreFuture<'a, ()> {
Box::pin(<T as MemoryStoreSend>::upsert_domain(self, domain))
}
fn delete_domain<'a>(&'a self, id: &'a str) -> BoxedStoreFuture<'a, ()> {
Box::pin(<T as MemoryStoreSend>::delete_domain(self, id))
}
fn classify<'a>(&'a self, embedding: &'a [f32]) -> BoxedStoreFuture<'a, Vec<(String, f64)>> {
Box::pin(<T as MemoryStoreSend>::classify(self, embedding))
}
fn count<'a>(&'a self) -> BoxedStoreFuture<'a, usize> {
Box::pin(<T as MemoryStoreSend>::count(self))
}
fn get_stats<'a>(&'a self) -> BoxedStoreFuture<'a, StoreStats> {
Box::pin(<T as MemoryStoreSend>::get_stats(self))
}
fn vacuum<'a>(&'a self) -> BoxedStoreFuture<'a, ()> {
Box::pin(<T as MemoryStoreSend>::vacuum(self))
}
}
// ----------------------------------------------------------------------------
// UNIT TESTS
// ----------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::StorageError;
#[test]
fn memory_store_error_from_storage_error() {
let se = StorageError::NotFound("abc".to_string());
let mse = MemoryStoreError::from(se);
assert!(matches!(mse, MemoryStoreError::NotFound(_)));
let se2 = StorageError::Init("init failure".to_string());
let mse2 = MemoryStoreError::from(se2);
assert!(matches!(mse2, MemoryStoreError::Init(_)));
}
#[test]
fn model_signature_serde_round_trip() {
let sig = ModelSignature {
name: "nomic-ai/nomic-embed-text-v1.5".to_string(),
dimension: 256,
hash: "a".repeat(64),
};
let json = serde_json::to_string(&sig).expect("serialize");
let sig2: ModelSignature = serde_json::from_str(&json).expect("deserialize");
assert_eq!(sig, sig2);
}
#[test]
fn memory_record_serde_round_trip() {
let rec = MemoryRecord {
id: Uuid::new_v4(),
domains: vec!["dev".to_string()],
domain_scores: {
let mut m = HashMap::new();
m.insert("dev".to_string(), 0.9);
m
},
content: "hello".to_string(),
node_type: "fact".to_string(),
tags: vec!["tag1".to_string()],
embedding: None,
created_at: Utc::now(),
updated_at: Utc::now(),
metadata: serde_json::json!({}),
};
let json = serde_json::to_string(&rec).expect("serialize");
let rec2: MemoryRecord = serde_json::from_str(&json).expect("deserialize");
assert_eq!(rec.content, rec2.content);
assert_eq!(rec.domains, rec2.domains);
}
}

View file

@ -74,6 +74,16 @@ pub const MIGRATIONS: &[Migration] = &[
description: "v2.1.25 Merge/Supersede: reversible operation log, merge plans, bitemporal lineage, protected pins",
up: MIGRATION_V14_UP,
},
Migration {
version: 15,
description: "ComposedGraph: composition events, members, outcomes",
up: MIGRATION_V15_UP,
},
Migration {
version: 16,
description: "ADR 0001 Phase 1: embedding_model registry, domains/domain_scores columns, domains table",
up: MIGRATION_V16_UP,
},
];
/// A database migration
@ -813,6 +823,67 @@ CREATE INDEX IF NOT EXISTS idx_merge_operations_survivor ON merge_operations(sur
UPDATE schema_version SET version = 14, applied_at = datetime('now');
"#;
/// V15: ComposedGraph persistence for memory composition outcomes.
///
/// These tables record which memories were used together, which tool/query
/// produced the composition, and what happened afterward. `memory_id` values
/// are intentionally historical references instead of foreign keys to
/// `knowledge_nodes`: purging or superseding a memory must not erase the fact
/// that a bounty lane or reasoning path was previously composed.
const MIGRATION_V15_UP: &str = r#"
CREATE TABLE IF NOT EXISTS composition_events (
id TEXT PRIMARY KEY,
created_at TEXT NOT NULL,
tool TEXT NOT NULL,
mode TEXT NOT NULL DEFAULT 'deep_reference',
query TEXT,
query_hash TEXT,
confidence REAL,
status TEXT,
output_preview TEXT,
metadata TEXT NOT NULL DEFAULT '{}'
);
CREATE INDEX IF NOT EXISTS idx_composition_events_created_at ON composition_events(created_at);
CREATE INDEX IF NOT EXISTS idx_composition_events_tool ON composition_events(tool);
CREATE INDEX IF NOT EXISTS idx_composition_events_mode ON composition_events(mode);
CREATE INDEX IF NOT EXISTS idx_composition_events_query_hash ON composition_events(query_hash);
CREATE TABLE IF NOT EXISTS composition_members (
event_id TEXT NOT NULL,
memory_id TEXT NOT NULL,
role TEXT NOT NULL, -- primary | supporting | contradicting | superseded | related
rank INTEGER NOT NULL DEFAULT 0,
trust REAL,
score REAL,
preview TEXT,
metadata TEXT NOT NULL DEFAULT '{}',
PRIMARY KEY (event_id, memory_id, role),
FOREIGN KEY (event_id) REFERENCES composition_events(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_composition_members_memory ON composition_members(memory_id);
CREATE INDEX IF NOT EXISTS idx_composition_members_role ON composition_members(role);
CREATE TABLE IF NOT EXISTS composition_outcomes (
id TEXT PRIMARY KEY,
event_id TEXT NOT NULL,
outcome_type TEXT NOT NULL,
labeled_at TEXT NOT NULL,
label_source TEXT NOT NULL DEFAULT 'tool',
confidence_delta REAL,
notes TEXT,
metadata TEXT NOT NULL DEFAULT '{}',
FOREIGN KEY (event_id) REFERENCES composition_events(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_composition_outcomes_event ON composition_outcomes(event_id);
CREATE INDEX IF NOT EXISTS idx_composition_outcomes_type ON composition_outcomes(outcome_type);
CREATE INDEX IF NOT EXISTS idx_composition_outcomes_labeled_at ON composition_outcomes(labeled_at);
UPDATE schema_version SET version = 15, applied_at = datetime('now');
"#;
/// Get current schema version from database
pub fn get_current_version(conn: &rusqlite::Connection) -> rusqlite::Result<u32> {
conn.query_row(
@ -829,13 +900,63 @@ pub fn get_current_version(conn: &rusqlite::Connection) -> rusqlite::Result<u32>
fn add_column_if_missing(conn: &rusqlite::Connection, sql: &str) -> rusqlite::Result<()> {
match conn.execute(sql, []) {
Ok(_) => Ok(()),
Err(rusqlite::Error::SqliteFailure(_, Some(msg))) if msg.contains("duplicate column name") => {
Err(rusqlite::Error::SqliteFailure(_, Some(msg)))
if msg.contains("duplicate column name") =>
{
Ok(())
}
Err(e) => Err(e),
}
}
/// V16: ADR 0001 Phase 1 - embedding_model registry + domain columns.
///
/// The ALTER TABLE statements are split out into `MIGRATION_V16_ALTER_COLUMNS`
/// because SQLite has no `ALTER TABLE ... ADD COLUMN IF NOT EXISTS`. The
/// migration runner handles them individually so replaying V16 is idempotent.
const MIGRATION_V16_UP: &str = r#"
-- Migration V16: embedding model registry + per-memory domain columns.
-- 1. Embedding model registry. Single logical row; the (id = 1) constraint is
-- enforced in code via `register_model` (SQLite CHECK on a single-row
-- table is uglier than a constraint we already enforce in Rust).
CREATE TABLE IF NOT EXISTS embedding_model (
id INTEGER PRIMARY KEY CHECK (id = 1),
name TEXT NOT NULL,
dimension INTEGER NOT NULL,
hash TEXT NOT NULL,
created_at TEXT NOT NULL
);
-- 2. Per-memory domain columns are applied separately (see apply_migrations).
-- 3. Index on the domains JSON column to enable LIKE-style filter in Phase 4.
CREATE INDEX IF NOT EXISTS idx_nodes_domains ON knowledge_nodes(domains);
CREATE INDEX IF NOT EXISTS idx_nodes_domain_scores ON knowledge_nodes(domain_scores);
-- 4. Domains catalogue (empty until Phase 4 populates).
CREATE TABLE IF NOT EXISTS domains (
id TEXT PRIMARY KEY,
label TEXT NOT NULL,
centroid BLOB,
top_terms TEXT NOT NULL DEFAULT '[]',
memory_count INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_domains_created_at ON domains(created_at);
UPDATE schema_version SET version = 16, applied_at = datetime('now');
"#;
/// The two ALTER TABLE statements for V16. Kept separate so the migration
/// runner can try each individually and ignore "duplicate column" errors,
/// making V16 idempotent on replay (SQLite has no ADD COLUMN IF NOT EXISTS).
pub const MIGRATION_V16_ALTER_COLUMNS: &[&str] = &[
"ALTER TABLE knowledge_nodes ADD COLUMN domains TEXT NOT NULL DEFAULT '[]'",
"ALTER TABLE knowledge_nodes ADD COLUMN domain_scores TEXT NOT NULL DEFAULT '{}'",
];
/// Apply pending migrations
pub fn apply_migrations(conn: &rusqlite::Connection) -> rusqlite::Result<u32> {
let current_version = get_current_version(conn)?;
@ -864,6 +985,15 @@ pub fn apply_migrations(conn: &rusqlite::Connection) -> rusqlite::Result<u32> {
)?;
}
// V16 adds columns via ALTER TABLE, which SQLite does not support
// with IF NOT EXISTS. Run them individually and ignore duplicate
// column errors so replay stays idempotent.
if migration.version == 16 {
for stmt in MIGRATION_V16_ALTER_COLUMNS {
add_column_if_missing(conn, stmt)?;
}
}
// Use execute_batch to handle multi-statement SQL including triggers
conn.execute_batch(migration.up)?;
@ -890,17 +1020,17 @@ mod tests {
/// version after `apply_migrations` runs all migrations end-to-end, and
/// neither of the dead tables V11 drops must exist afterwards.
#[test]
fn test_apply_migrations_advances_to_v14_and_drops_dead_tables() {
fn test_apply_migrations_advances_to_v16_and_drops_dead_tables() {
let conn = rusqlite::Connection::open_in_memory().expect("open in-memory");
// Pre-requisite: schema_version must be bootstrapped by V1.
apply_migrations(&conn).expect("apply_migrations succeeds");
// 1. schema_version advanced to V14
// 1. schema_version advanced to V16
let version = get_current_version(&conn).expect("read schema_version");
assert_eq!(
version, 14,
"schema_version must be 14 after all migrations"
version, 16,
"schema_version must be 16 after all migrations"
);
// 2. knowledge_edges is gone (V11 drops it)
@ -967,7 +1097,23 @@ mod tests {
assert_eq!(rows, 1, "{table} table must be created by V14");
}
// 7. knowledge_nodes gains `protected` + `superseded_by` (V14)
// 7. ComposedGraph tables exist (V15)
for table in [
"composition_events",
"composition_members",
"composition_outcomes",
] {
let rows: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1",
[table],
|row| row.get(0),
)
.expect("query sqlite_master");
assert_eq!(rows, 1, "{table} table must be created by V15");
}
// 8. knowledge_nodes gains `protected` + `superseded_by` (V14)
let node_cols: Vec<String> = {
let mut stmt = conn
.prepare("PRAGMA table_info(knowledge_nodes)")
@ -1002,10 +1148,132 @@ mod tests {
conn.execute("UPDATE schema_version SET version = 10", [])
.expect("rewind schema_version");
// Replay must not error.
apply_migrations(&conn).expect("V11 replay must be idempotent");
// Replay V11 onward. V11 uses DROP TABLE IF EXISTS so it is idempotent.
// V12/V13 tombstone tables use CREATE TABLE IF NOT EXISTS. V14/V16 ALTER
// TABLE idempotency is handled by the migration runner.
apply_migrations(&conn).expect("V11..V16 replay must be idempotent");
// After replaying from V10, the schema advances to the latest version.
let version = get_current_version(&conn).expect("read schema_version");
assert_eq!(version, 14, "schema_version back at 14 after replay");
assert_eq!(version, 16, "schema_version back at 16 after replay");
}
#[test]
fn v16_adds_embedding_model_table() {
let conn = rusqlite::Connection::open_in_memory().expect("open in-memory");
apply_migrations(&conn).expect("apply_migrations");
let count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='embedding_model'",
[],
|row| row.get(0),
)
.expect("query sqlite_master");
assert_eq!(count, 1, "embedding_model table must exist after V16");
}
#[test]
fn v16_adds_domains_columns() {
let conn = rusqlite::Connection::open_in_memory().expect("open in-memory");
apply_migrations(&conn).expect("apply_migrations");
let info: Vec<String> = {
let mut stmt = conn
.prepare("PRAGMA table_info(knowledge_nodes)")
.expect("prepare");
stmt.query_map([], |row| row.get::<_, String>(1))
.expect("query_map")
.map(|r| r.expect("row"))
.collect()
};
assert!(
info.contains(&"domains".to_string()),
"domains column missing"
);
assert!(
info.contains(&"domain_scores".to_string()),
"domain_scores column missing"
);
}
#[test]
fn v16_default_values_empty_json() {
let conn = rusqlite::Connection::open_in_memory().expect("open in-memory");
apply_migrations(&conn).expect("apply_migrations");
// Insert a minimal row to test defaults
conn.execute(
"INSERT INTO knowledge_nodes (id, content, node_type, created_at, updated_at, last_accessed, \
stability, difficulty, reps, lapses, learning_state, storage_strength, retrieval_strength, \
retention_strength, next_review, scheduled_days, has_embedding) \
VALUES ('test-id','content','fact',datetime('now'),datetime('now'),datetime('now'),\
1.0,0.3,0,0,'new',1.0,1.0,1.0,datetime('now'),1,0)",
[],
).expect("insert row");
let (domains, domain_scores): (String, String) = conn
.query_row(
"SELECT domains, domain_scores FROM knowledge_nodes WHERE id='test-id'",
[],
|row| Ok((row.get(0)?, row.get(1)?)),
)
.expect("query row");
assert_eq!(domains, "[]");
assert_eq!(domain_scores, "{}");
}
#[test]
fn v16_is_replayable() {
let conn = rusqlite::Connection::open_in_memory().expect("open in-memory");
apply_migrations(&conn).expect("first apply");
// Rewind to V15 so V16 runs again.
conn.execute("UPDATE schema_version SET version = 15", [])
.expect("rewind");
// V16 uses CREATE TABLE IF NOT EXISTS and idempotent ALTER handling.
apply_migrations(&conn).expect("V16 replay must be idempotent");
let version = get_current_version(&conn).expect("read version");
assert_eq!(version, 16, "schema_version must be 16 after replay");
}
#[test]
fn v16_preserves_existing_rows_from_v15() {
let conn = rusqlite::Connection::open_in_memory().expect("open in-memory");
// Apply up to V15 only, including the V14 ALTER TABLE columns that
// `apply_migrations` normally runs before the V14 SQL batch.
for migration in MIGRATIONS {
if migration.version <= 15 {
if migration.version == 14 {
add_column_if_missing(
&conn,
"ALTER TABLE knowledge_nodes ADD COLUMN protected INTEGER NOT NULL DEFAULT 0",
)
.expect("apply V14 protected column");
add_column_if_missing(
&conn,
"ALTER TABLE knowledge_nodes ADD COLUMN superseded_by TEXT",
)
.expect("apply V14 superseded_by column");
}
conn.execute_batch(migration.up).expect("apply migration");
}
}
// Insert a row under the V15 schema, before PR #61's V16 columns exist.
conn.execute(
"INSERT INTO knowledge_nodes (id, content, node_type, created_at, updated_at, last_accessed, \
stability, difficulty, reps, lapses, learning_state, storage_strength, retrieval_strength, \
retention_strength, next_review, scheduled_days, has_embedding) \
VALUES ('existing-id','old content','fact',datetime('now'),datetime('now'),datetime('now'),\
1.0,0.3,0,0,'new',1.0,1.0,1.0,datetime('now'),1,0)",
[],
).expect("insert pre-v16 row");
apply_migrations(&conn).expect("apply V16 migration");
// Check the old row has defaults
let (domains, domain_scores): (String, String) = conn
.query_row(
"SELECT domains, domain_scores FROM knowledge_nodes WHERE id='existing-id'",
[],
|row| Ok((row.get(0)?, row.get(1)?)),
)
.expect("query pre-v16 row");
assert_eq!(domains, "[]");
assert_eq!(domain_scores, "{}");
}
}

View file

@ -1,22 +1,31 @@
//! Storage Module
//!
//! SQLite-based storage layer with:
//! - FTS5 full-text search with query sanitization
//! - Embedded vector storage
//! - FSRS-6 state management
//! - Temporal memory support
//! Backend-agnostic memory store abstraction plus SQLite reference impl.
mod memory_store;
mod migrations;
mod portable;
mod sqlite;
pub use memory_store::{
ClassificationResult, Domain, HealthStatus, LocalMemoryStore, MemoryEdge, MemoryRecord,
MemoryStore, MemoryStoreError, MemoryStoreResult, MemoryStoreSend, ModelSignature,
SchedulingState, SearchQuery, SearchResult, StoreStats,
};
pub use migrations::MIGRATIONS;
pub use portable::{
PORTABLE_ARCHIVE_FORMAT, PortableArchive, PortableImportMode, PortableImportReport,
PortableTable, PortableValue,
};
pub use sqlite::{
ConnectionRecord, ConsolidationHistoryRecord, DreamHistoryRecord, FilePortableSyncBackend,
InsightRecord, IntentionRecord, PortableSyncBackend, PortableSyncReport, Result,
SmartIngestResult, StateTransitionRecord, Storage, StorageError,
CompositionEventRecord, CompositionMemberRecord, CompositionNeighborRecord,
CompositionOutcomeRecord, ConnectionRecord, ConsolidationHistoryRecord, DreamHistoryRecord,
FilePortableSyncBackend, InsightRecord, IntentionRecord, NeverComposedCandidate,
PortableSyncBackend, PortableSyncReport, Result, SmartIngestResult, SqliteMemoryStore,
StateTransitionRecord, StorageError,
};
/// Backwards-compatibility alias. Retained until Phase 4 completes so every
/// existing `Arc<Storage>` call site keeps compiling. Scheduled for removal
/// once no downstream source file references it.
pub type Storage = SqliteMemoryStore;

File diff suppressed because it is too large Load diff

View file

@ -61,7 +61,7 @@ The server exposes the current unified MCP tools from
- `search`, `smart_ingest`, `memory`, `codebase`, `intention`
- `deep_reference`, `cross_reference`, `contradictions`
- `dream`, `explore_connections`, `predict`
- `memory_health`, `memory_graph`, `system_status`
- `memory_health`, `memory_graph`, `composed_graph`, `system_status`
- `importance_score`, `find_duplicates`
- `consolidate`, `memory_timeline`, `memory_changelog`
- `backup`, `export`, `restore`, `gc`, `suppress`

View file

@ -443,6 +443,12 @@ impl McpServer {
input_schema: tools::graph::schema(),
..Default::default()
},
ToolDescription {
name: "composed_graph".to_string(),
description: Some("ComposedGraph memory topology. Reads durable composition events, members, and outcome labels; returns recent/already-composed lanes, neighbors, never-composed pairs, bounty-mode lanes, and lets users label outcomes such as helpful, submitted, accepted, rejected, duplicate_risk, needs_poc, or dead_end.".to_string()),
input_schema: tools::composed_graph::schema(),
..Default::default()
},
// ================================================================
// DEEP REFERENCE (v2.0.4+) — replaces cross_reference
// ================================================================
@ -959,7 +965,8 @@ impl McpServer {
// TEMPORAL TOOLS (v1.2+)
// ================================================================
"memory_timeline" => {
tools::timeline::execute(&self.storage, &self.output_config, request.arguments).await
tools::timeline::execute(&self.storage, &self.output_config, request.arguments)
.await
}
"memory_changelog" => tools::changelog::execute(&self.storage, request.arguments).await,
@ -1032,6 +1039,9 @@ impl McpServer {
// ================================================================
"memory_health" => tools::health::execute(&self.storage, request.arguments).await,
"memory_graph" => tools::graph::execute(&self.storage, request.arguments).await,
"composed_graph" => {
tools::composed_graph::execute(&self.storage, request.arguments).await
}
"deep_reference" | "cross_reference" => {
tools::cross_reference::execute(&self.storage, &self.cognitive, request.arguments)
.await
@ -1796,10 +1806,10 @@ mod tests {
let result = response.result.unwrap();
let tools = result["tools"].as_array().unwrap();
// v2.1.25: 32 tools (25 from v2.1.21 + 7 Phase 3 merge/supersede tools:
// 33 tools: 25 from v2.1.21 + 7 Phase 3 merge/supersede tools:
// merge_candidates, plan_merge, plan_supersede, apply_plan, merge_undo,
// protect, merge_policy)
assert_eq!(tools.len(), 32, "Expected exactly 32 tools in v2.1.25");
// protect, merge_policy, composed_graph)
assert_eq!(tools.len(), 33, "Expected exactly 33 tools");
let tool_names: Vec<&str> = tools.iter().map(|t| t["name"].as_str().unwrap()).collect();
@ -1874,6 +1884,7 @@ mod tests {
// Autonomic tools (v1.9)
assert!(tool_names.contains(&"memory_health"));
assert!(tool_names.contains(&"memory_graph"));
assert!(tool_names.contains(&"composed_graph"));
// Deep reference + cross_reference alias (v2.0.4)
assert!(tool_names.contains(&"deep_reference"));

View file

@ -0,0 +1,906 @@
//! composed_graph tool — durable composition history and bounty-mode lane queue.
use chrono::Utc;
use serde::Deserialize;
use serde_json::Value;
use std::sync::Arc;
use uuid::Uuid;
use vestige_core::{CompositionOutcomeRecord, Storage};
const OUTCOME_TYPES: &[&str] = &[
"helpful",
"dead_end",
"submitted",
"accepted",
"rejected",
"duplicate_risk",
"needs_poc",
"bad_severity",
"user_promoted",
"user_demoted",
"closed_by_scope",
"closed_by_duplicate",
"closed_by_false_assumption",
"closed_by_user",
"expired_lane",
];
pub fn schema() -> Value {
serde_json::json!({
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["recent", "get", "memory", "neighbors", "never_composed", "bounty_mode", "label"],
"description": "ComposedGraph action to run."
},
"event_id": {
"type": "string",
"description": "Composition event id for get/label actions."
},
"memory_id": {
"type": "string",
"description": "Memory id for memory/neighbors actions."
},
"limit": {
"type": "integer",
"description": "Maximum rows to return (default 10, max 100).",
"default": 10,
"minimum": 1,
"maximum": 100
},
"tags": {
"type": "array",
"items": { "type": "string" },
"description": "Optional tag filter for never_composed and bounty_mode."
},
"outcome_type": {
"type": "string",
"enum": ["helpful", "dead_end", "submitted", "accepted", "rejected", "duplicate_risk", "needs_poc", "bad_severity", "user_promoted", "user_demoted", "closed_by_scope", "closed_by_duplicate", "closed_by_false_assumption", "closed_by_user", "expired_lane"],
"description": "Outcome label for label action."
},
"notes": {
"type": "string",
"description": "Optional outcome notes."
},
"label_source": {
"type": "string",
"description": "Where the outcome label came from (default: user)."
},
"confidence_delta": {
"type": "number",
"description": "Optional confidence adjustment for this outcome."
}
},
"required": ["action"]
})
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
struct ComposedGraphArgs {
action: String,
event_id: Option<String>,
memory_id: Option<String>,
limit: Option<i32>,
tags: Option<Vec<String>>,
outcome_type: Option<String>,
notes: Option<String>,
label_source: Option<String>,
confidence_delta: Option<f64>,
}
pub async fn execute(storage: &Arc<Storage>, args: Option<Value>) -> Result<Value, String> {
let args: ComposedGraphArgs = match args {
Some(value) => {
serde_json::from_value(value).map_err(|e| format!("Invalid arguments: {}", e))?
}
None => return Err("Missing arguments".to_string()),
};
let limit = args.limit.unwrap_or(10).clamp(1, 100);
match args.action.as_str() {
"recent" => recent(storage, limit),
"get" => {
let event_id = args
.event_id
.as_deref()
.ok_or_else(|| "event_id is required for get".to_string())?;
get(storage, event_id)
}
"memory" => {
let memory_id = args
.memory_id
.as_deref()
.ok_or_else(|| "memory_id is required for memory".to_string())?;
memory(storage, memory_id, limit)
}
"neighbors" => {
let memory_id = args
.memory_id
.as_deref()
.ok_or_else(|| "memory_id is required for neighbors".to_string())?;
neighbors(storage, memory_id, limit)
}
"never_composed" => never_composed(storage, limit, args.tags.as_deref()),
"bounty_mode" => bounty_mode(storage, limit, args.tags.as_deref()),
"label" => label(storage, &args),
other => Err(format!("Unknown composed_graph action: {}", other)),
}
}
fn recent(storage: &Storage, limit: i32) -> Result<Value, String> {
let events = storage
.get_recent_composition_events(limit)
.map_err(|e| e.to_string())?;
Ok(serde_json::json!({
"action": "recent",
"events": events,
}))
}
fn get(storage: &Storage, event_id: &str) -> Result<Value, String> {
let event = storage
.get_composition_event(event_id)
.map_err(|e| e.to_string())?
.ok_or_else(|| format!("composition event not found: {}", event_id))?;
let members = storage
.get_composition_members(event_id)
.map_err(|e| e.to_string())?;
let outcomes = storage
.get_composition_outcomes(event_id)
.map_err(|e| e.to_string())?;
Ok(serde_json::json!({
"action": "get",
"event": event,
"members": members,
"outcomes": outcomes,
}))
}
fn memory(storage: &Storage, memory_id: &str, limit: i32) -> Result<Value, String> {
let events = storage
.get_compositions_for_memory(memory_id, limit)
.map_err(|e| e.to_string())?;
Ok(serde_json::json!({
"action": "memory",
"memoryId": memory_id,
"events": events,
}))
}
fn neighbors(storage: &Storage, memory_id: &str, limit: i32) -> Result<Value, String> {
let neighbors = storage
.get_composition_neighbors(memory_id, limit)
.map_err(|e| e.to_string())?;
Ok(serde_json::json!({
"action": "neighbors",
"memoryId": memory_id,
"neighbors": neighbors,
}))
}
fn never_composed(storage: &Storage, limit: i32, tags: Option<&[String]>) -> Result<Value, String> {
let candidates = storage
.get_never_composed_candidates(limit, tags)
.map_err(|e| e.to_string())?;
Ok(serde_json::json!({
"action": "never_composed",
"candidates": candidates,
}))
}
fn bounty_mode(storage: &Storage, limit: i32, tags: Option<&[String]>) -> Result<Value, String> {
const PAGE_SIZE: i32 = 100;
const MAX_SCAN_EVENTS: i32 = 1_000;
let mut offset = 0;
let mut scanned = 0;
let mut already_composed = Vec::new();
let mut closed_doors = Vec::new();
let mut duplicate_risk_lanes = Vec::new();
let mut needs_poc_lanes = Vec::new();
loop {
let events = storage
.get_recent_composition_events_page(PAGE_SIZE, offset)
.map_err(|e| e.to_string())?;
if events.is_empty() {
break;
}
scanned += events.len() as i32;
for event in events {
let outcomes = storage
.get_composition_outcomes(&event.id)
.map_err(|e| e.to_string())?;
let members = storage
.get_composition_members(&event.id)
.map_err(|e| e.to_string())?;
if !composition_matches_tags(storage, &event, &members, tags)? {
continue;
}
let item = serde_json::json!({
"event": event,
"members": members,
"outcomes": outcomes,
});
let outcome_types = item["outcomes"]
.as_array()
.map(|values| {
values
.iter()
.filter_map(|value| value.get("outcomeType").and_then(|v| v.as_str()))
.collect::<Vec<_>>()
})
.unwrap_or_default();
if outcome_types.iter().any(|kind| {
matches!(
*kind,
"dead_end"
| "rejected"
| "bad_severity"
| "closed_by_scope"
| "closed_by_duplicate"
| "closed_by_false_assumption"
| "closed_by_user"
| "expired_lane"
)
}) {
push_limited(&mut closed_doors, item.clone(), limit);
}
if outcome_types
.iter()
.any(|kind| matches!(*kind, "duplicate_risk" | "closed_by_duplicate"))
{
push_limited(&mut duplicate_risk_lanes, item.clone(), limit);
}
if outcome_types.contains(&"needs_poc") {
push_limited(&mut needs_poc_lanes, item.clone(), limit);
}
if already_composed.len() < limit as usize {
already_composed.push(item);
}
if bounty_mode_lanes_full(
limit,
&already_composed,
&closed_doors,
&duplicate_risk_lanes,
&needs_poc_lanes,
) {
break;
}
}
if bounty_mode_lanes_full(
limit,
&already_composed,
&closed_doors,
&duplicate_risk_lanes,
&needs_poc_lanes,
) || scanned >= MAX_SCAN_EVENTS
{
break;
}
offset += PAGE_SIZE;
}
let never = storage
.get_never_composed_candidates(limit, tags)
.map_err(|e| e.to_string())?;
let top_weird_combinations = never.iter().take(3).cloned().collect::<Vec<_>>();
Ok(serde_json::json!({
"action": "bounty_mode",
"alreadyComposedLanes": already_composed,
"neverComposedLanes": never,
"closedDoors": closed_doors,
"duplicateRiskLanes": duplicate_risk_lanes,
"needsPocLanes": needs_poc_lanes,
"topWeirdCombinations": top_weird_combinations,
"guardrails": [
"never-composed lane is not a finding",
"composition score is not severity",
"submit/reportable still needs source refs, scope fit, and PoC evidence"
]
}))
}
fn push_limited(items: &mut Vec<Value>, item: Value, limit: i32) {
if items.len() < limit as usize {
items.push(item);
}
}
fn bounty_mode_lanes_full(
limit: i32,
already_composed: &[Value],
closed_doors: &[Value],
duplicate_risk_lanes: &[Value],
needs_poc_lanes: &[Value],
) -> bool {
let limit = limit as usize;
already_composed.len() >= limit
&& closed_doors.len() >= limit
&& duplicate_risk_lanes.len() >= limit
&& needs_poc_lanes.len() >= limit
}
fn composition_matches_tags(
storage: &Storage,
event: &vestige_core::CompositionEventRecord,
members: &[vestige_core::CompositionMemberRecord],
tags: Option<&[String]>,
) -> Result<bool, String> {
let Some(tags) = tags else {
return Ok(true);
};
if tags.is_empty() {
return Ok(true);
}
if json_value_has_tag(&event.metadata, tags) {
return Ok(true);
}
for member in members {
if json_value_has_tag(&member.metadata, tags) {
return Ok(true);
}
if let Some(node) = storage
.get_node(&member.memory_id)
.map_err(|e| e.to_string())?
&& node.tags.iter().any(|tag| tag_matches_filter(tag, tags))
{
return Ok(true);
}
}
Ok(false)
}
fn json_value_has_tag(value: &Value, tags: &[String]) -> bool {
value
.get("tags")
.and_then(|tags_value| tags_value.as_array())
.is_some_and(|values| {
values.iter().any(|value| {
value
.as_str()
.is_some_and(|tag| tag_matches_filter(tag, tags))
})
})
}
fn tag_matches_filter(tag: &str, filters: &[String]) -> bool {
filters
.iter()
.any(|wanted| tag == wanted || tag.starts_with(&format!("{wanted}:")))
}
fn label(storage: &Storage, args: &ComposedGraphArgs) -> Result<Value, String> {
let event_id = args
.event_id
.as_deref()
.ok_or_else(|| "event_id is required for label".to_string())?;
let outcome_type = args
.outcome_type
.as_deref()
.ok_or_else(|| "outcome_type is required for label".to_string())?;
if !OUTCOME_TYPES.contains(&outcome_type) {
return Err(format!("unsupported outcome_type: {}", outcome_type));
}
if storage
.get_composition_event(event_id)
.map_err(|e| e.to_string())?
.is_none()
{
return Err(format!("composition event not found: {}", event_id));
}
let outcome = CompositionOutcomeRecord {
id: Uuid::new_v4().to_string(),
event_id: event_id.to_string(),
outcome_type: outcome_type.to_string(),
labeled_at: Utc::now(),
label_source: args
.label_source
.clone()
.unwrap_or_else(|| "user".to_string()),
confidence_delta: args.confidence_delta,
notes: args.notes.clone(),
metadata: serde_json::json!({}),
};
storage
.record_composition_outcome(&outcome)
.map_err(|e| e.to_string())?;
Ok(serde_json::json!({
"action": "label",
"eventId": event_id,
"outcome": outcome,
}))
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
use vestige_core::{
CompositionEventRecord, CompositionMemberRecord, CompositionOutcomeRecord, IngestInput,
};
fn test_storage() -> (Arc<Storage>, TempDir) {
let dir = TempDir::new().unwrap();
let storage = Storage::new(Some(dir.path().join("test.db"))).unwrap();
(Arc::new(storage), dir)
}
fn ingest(storage: &Storage, content: &str, tags: &[&str]) -> String {
storage
.ingest(IngestInput {
content: content.to_string(),
node_type: "fact".to_string(),
tags: tags.iter().map(|tag| tag.to_string()).collect(),
..Default::default()
})
.unwrap()
.id
}
#[tokio::test]
async fn test_composed_graph_get_label_and_bounty_mode() {
let (storage, _dir) = test_storage();
let first = ingest(
&storage,
"Oracle drift bounty lane",
&["protocolgate", "boundary-oracle", "settlement"],
);
let second = ingest(
&storage,
"Withdrawal queue bounty lane",
&["protocolgate", "boundary-queue", "settlement"],
);
let third = ingest(
&storage,
"Keeper role bounty lane",
&["protocolgate", "boundary-role", "settlement"],
);
let event = CompositionEventRecord {
id: "composed-graph-test".to_string(),
created_at: Utc::now(),
tool: "deep_reference".to_string(),
mode: "bounty".to_string(),
query: Some("oracle withdrawal".to_string()),
query_hash: Some("test".to_string()),
confidence: Some(0.8),
status: Some("resolved".to_string()),
output_preview: Some("compose oracle and withdrawal queue".to_string()),
metadata: serde_json::json!({}),
};
storage
.save_composition(
&event,
&[
CompositionMemberRecord {
event_id: event.id.clone(),
memory_id: first.clone(),
role: "primary".to_string(),
rank: 0,
trust: Some(0.8),
score: Some(0.9),
preview: None,
metadata: serde_json::json!({}),
},
CompositionMemberRecord {
event_id: event.id.clone(),
memory_id: second.clone(),
role: "supporting".to_string(),
rank: 1,
trust: Some(0.7),
score: Some(0.8),
preview: None,
metadata: serde_json::json!({}),
},
],
&[],
)
.unwrap();
let unrelated = ingest(&storage, "Personal planning lane", &["personal"]);
storage
.save_composition(
&CompositionEventRecord {
id: "unrelated-composed-graph-test".to_string(),
created_at: Utc::now() + chrono::Duration::seconds(10),
tool: "deep_reference".to_string(),
mode: "planning".to_string(),
query: Some("personal planning".to_string()),
query_hash: Some("unrelated".to_string()),
confidence: Some(0.4),
status: Some("resolved".to_string()),
output_preview: Some("unrelated composition".to_string()),
metadata: serde_json::json!({}),
},
&[CompositionMemberRecord {
event_id: "unrelated-composed-graph-test".to_string(),
memory_id: unrelated,
role: "primary".to_string(),
rank: 0,
trust: Some(0.4),
score: Some(0.2),
preview: None,
metadata: serde_json::json!({}),
}],
&[CompositionOutcomeRecord {
id: "unrelated-composed-graph-outcome".to_string(),
event_id: "unrelated-composed-graph-test".to_string(),
outcome_type: "needs_poc".to_string(),
labeled_at: Utc::now(),
label_source: "test".to_string(),
confidence_delta: None,
notes: None,
metadata: serde_json::json!({}),
}],
)
.unwrap();
let get_result = execute(
&storage,
Some(serde_json::json!({
"action": "get",
"event_id": event.id
})),
)
.await
.unwrap();
assert_eq!(get_result["members"].as_array().unwrap().len(), 2);
let label_result = execute(
&storage,
Some(serde_json::json!({
"action": "label",
"event_id": "composed-graph-test",
"outcome_type": "submitted",
"notes": "submitted in test"
})),
)
.await
.unwrap();
assert_eq!(
label_result["outcome"]["outcomeType"].as_str(),
Some("submitted")
);
let closed_label_result = execute(
&storage,
Some(serde_json::json!({
"action": "label",
"event_id": "composed-graph-test",
"outcome_type": "closed_by_scope",
"notes": "closed in test"
})),
)
.await
.unwrap();
assert_eq!(
closed_label_result["outcome"]["outcomeType"].as_str(),
Some("closed_by_scope")
);
let duplicate_label_result = execute(
&storage,
Some(serde_json::json!({
"action": "label",
"event_id": "composed-graph-test",
"outcome_type": "closed_by_duplicate",
"notes": "duplicate family in test"
})),
)
.await
.unwrap();
assert_eq!(
duplicate_label_result["outcome"]["outcomeType"].as_str(),
Some("closed_by_duplicate")
);
let bounty = execute(
&storage,
Some(serde_json::json!({
"action": "bounty_mode",
"tags": ["protocolgate"],
"limit": 1
})),
)
.await
.unwrap();
let already = bounty["alreadyComposedLanes"].as_array().unwrap();
assert_eq!(already.len(), 1);
assert!(
already[0]["event"]["id"].as_str() == Some("composed-graph-test"),
"tag-scoped bounty_mode should skip newer unrelated events before truncating"
);
assert_eq!(bounty["closedDoors"].as_array().unwrap().len(), 1);
assert_eq!(bounty["duplicateRiskLanes"].as_array().unwrap().len(), 1);
assert!(bounty["needsPocLanes"].as_array().unwrap().is_empty());
assert!(
bounty["neverComposedLanes"]
.as_array()
.unwrap()
.iter()
.any(|candidate| {
let first_id = candidate["firstId"].as_str().unwrap_or_default();
let second_id = candidate["secondId"].as_str().unwrap_or_default();
[first_id, second_id].contains(&third.as_str())
})
);
}
#[tokio::test]
async fn test_bounty_mode_paginates_tag_filter_and_matches_namespaced_tags() {
let (storage, _dir) = test_storage();
let tagged = ingest(
&storage,
"Older tagged composition lane",
&["project:vestige", "composition"],
);
let unrelated = ingest(&storage, "Newer unrelated lane", &["unrelated"]);
let base_time = Utc::now();
storage
.save_composition(
&CompositionEventRecord {
id: "older-tagged-composition".to_string(),
created_at: base_time,
tool: "deep_reference".to_string(),
mode: "research".to_string(),
query: Some("older tagged lane".to_string()),
query_hash: Some("fnv1a64:older".to_string()),
confidence: Some(0.8),
status: Some("resolved".to_string()),
output_preview: None,
metadata: serde_json::json!({}),
},
&[CompositionMemberRecord {
event_id: "older-tagged-composition".to_string(),
memory_id: tagged,
role: "primary".to_string(),
rank: 0,
trust: Some(0.8),
score: Some(0.9),
preview: None,
metadata: serde_json::json!({}),
}],
&[],
)
.unwrap();
for idx in 0..101 {
let event_id = format!("newer-unrelated-composition-{idx}");
storage
.save_composition(
&CompositionEventRecord {
id: event_id.clone(),
created_at: base_time + chrono::Duration::seconds(i64::from(idx + 1)),
tool: "deep_reference".to_string(),
mode: "planning".to_string(),
query: Some(format!("newer unrelated lane {idx}")),
query_hash: Some(format!("fnv1a64:newer-{idx}")),
confidence: Some(0.3),
status: Some("resolved".to_string()),
output_preview: None,
metadata: serde_json::json!({}),
},
&[CompositionMemberRecord {
event_id,
memory_id: unrelated.clone(),
role: "primary".to_string(),
rank: 0,
trust: Some(0.3),
score: Some(0.2),
preview: None,
metadata: serde_json::json!({}),
}],
&[],
)
.unwrap();
}
let bounty = execute(
&storage,
Some(serde_json::json!({
"action": "bounty_mode",
"tags": ["project"],
"limit": 1
})),
)
.await
.unwrap();
let already = bounty["alreadyComposedLanes"].as_array().unwrap();
assert_eq!(already.len(), 1);
assert_eq!(
already[0]["event"]["id"].as_str(),
Some("older-tagged-composition"),
"tag-filtered bounty_mode should page past newer unrelated events and match namespaced tags"
);
}
#[tokio::test]
async fn test_bounty_mode_uses_member_tag_snapshot_after_purge() {
let (storage, _dir) = test_storage();
let tagged = ingest(
&storage,
"Tagged member that will be purged",
&["project:vestige", "composition"],
);
storage
.save_composition(
&CompositionEventRecord {
id: "purged-tagged-member-composition".to_string(),
created_at: Utc::now(),
tool: "deep_reference".to_string(),
mode: "research".to_string(),
query: Some("purged tagged lane".to_string()),
query_hash: Some("fnv1a64:purged".to_string()),
confidence: Some(0.6),
status: Some("closed".to_string()),
output_preview: None,
metadata: serde_json::json!({}),
},
&[CompositionMemberRecord {
event_id: "purged-tagged-member-composition".to_string(),
memory_id: tagged.clone(),
role: "primary".to_string(),
rank: 0,
trust: Some(0.7),
score: Some(0.8),
preview: Some("Tagged member that will be purged".to_string()),
metadata: serde_json::json!({}),
}],
&[CompositionOutcomeRecord {
id: "purged-tagged-member-outcome".to_string(),
event_id: "purged-tagged-member-composition".to_string(),
outcome_type: "closed_by_scope".to_string(),
labeled_at: Utc::now(),
label_source: "test".to_string(),
confidence_delta: Some(-0.2),
notes: None,
metadata: serde_json::json!({}),
}],
)
.unwrap();
storage
.purge_node(&tagged, Some("test purge"))
.expect("purge should succeed");
let get_result = execute(
&storage,
Some(serde_json::json!({
"action": "get",
"event_id": "purged-tagged-member-composition"
})),
)
.await
.unwrap();
assert!(
get_result["members"][0].get("preview").is_none()
|| get_result["members"][0]["preview"].is_null(),
"purge should scrub member preview from composed_graph get"
);
let bounty = execute(
&storage,
Some(serde_json::json!({
"action": "bounty_mode",
"tags": ["project"],
"limit": 1
})),
)
.await
.unwrap();
let already = bounty["alreadyComposedLanes"].as_array().unwrap();
assert_eq!(already.len(), 1);
assert_eq!(
already[0]["event"]["id"].as_str(),
Some("purged-tagged-member-composition"),
"tag-filtered bounty_mode should use composition member tag snapshots after source memory purge"
);
assert_eq!(bounty["closedDoors"].as_array().unwrap().len(), 1);
}
#[tokio::test]
async fn test_bounty_mode_guardrail_buckets_are_not_truncated_by_already_limit() {
let (storage, _dir) = test_storage();
let neutral = ingest(&storage, "Neutral release lane", &["project:vestige"]);
let closed = ingest(&storage, "Closed release lane", &["project:vestige"]);
let base_time = Utc::now();
storage
.save_composition(
&CompositionEventRecord {
id: "older-closed-lane".to_string(),
created_at: base_time,
tool: "deep_reference".to_string(),
mode: "release".to_string(),
query: Some("older closed lane".to_string()),
query_hash: Some("fnv1a64:older-closed".to_string()),
confidence: Some(0.3),
status: Some("closed".to_string()),
output_preview: None,
metadata: serde_json::json!({}),
},
&[CompositionMemberRecord {
event_id: "older-closed-lane".to_string(),
memory_id: closed,
role: "primary".to_string(),
rank: 0,
trust: Some(0.5),
score: Some(0.4),
preview: None,
metadata: serde_json::json!({}),
}],
&[CompositionOutcomeRecord {
id: "older-closed-outcome".to_string(),
event_id: "older-closed-lane".to_string(),
outcome_type: "closed_by_false_assumption".to_string(),
labeled_at: base_time,
label_source: "test".to_string(),
confidence_delta: Some(-0.3),
notes: None,
metadata: serde_json::json!({}),
}],
)
.unwrap();
storage
.save_composition(
&CompositionEventRecord {
id: "newer-neutral-lane".to_string(),
created_at: base_time + chrono::Duration::seconds(1),
tool: "deep_reference".to_string(),
mode: "release".to_string(),
query: Some("newer neutral lane".to_string()),
query_hash: Some("fnv1a64:newer-neutral".to_string()),
confidence: Some(0.7),
status: Some("resolved".to_string()),
output_preview: None,
metadata: serde_json::json!({}),
},
&[CompositionMemberRecord {
event_id: "newer-neutral-lane".to_string(),
memory_id: neutral,
role: "primary".to_string(),
rank: 0,
trust: Some(0.8),
score: Some(0.8),
preview: None,
metadata: serde_json::json!({}),
}],
&[],
)
.unwrap();
let bounty = execute(
&storage,
Some(serde_json::json!({
"action": "bounty_mode",
"tags": ["project"],
"limit": 1
})),
)
.await
.unwrap();
assert_eq!(
bounty["alreadyComposedLanes"][0]["event"]["id"].as_str(),
Some("newer-neutral-lane")
);
assert_eq!(
bounty["closedDoors"][0]["event"]["id"].as_str(),
Some("older-closed-lane"),
"guardrail buckets should keep scanning after alreadyComposedLanes reaches limit"
);
}
}

View file

@ -20,9 +20,10 @@ use serde::Deserialize;
use serde_json::Value;
use std::sync::Arc;
use tokio::sync::Mutex;
use uuid::Uuid;
use crate::cognitive::CognitiveEngine;
use vestige_core::Storage;
use vestige_core::{CompositionEventRecord, CompositionMemberRecord, Storage};
/// Input schema for deep_reference / cross_reference tool
pub fn schema() -> Value {
@ -509,6 +510,7 @@ pub async fn execute(
"confidence": 0.0,
"guidance": "No memories found. Use smart_ingest to add memories.",
"memoriesAnalyzed": 0,
"compositionWriteStatus": "skipped_empty",
}));
}
@ -820,6 +822,7 @@ pub async fn execute(
"id": s.id,
"preview": s.content.chars().take(200).collect::<String>(),
"trust": (s.trust * 100.0).round() / 100.0,
"relevanceScore": ((composite(s) * 100.0).round() / 100.0),
"date": s.updated_at.to_rfc3339(),
"role": if i == 0 { "primary" } else { "supporting" },
})
@ -925,9 +928,163 @@ pub async fn execute(
response["related_insights"] = serde_json::json!(related_insights);
}
match persist_deep_reference_composition(storage, &args.query, &intent, &response) {
Ok(Some(event_id)) => {
response["composition_event_id"] = serde_json::json!(event_id);
response["compositionWriteStatus"] = serde_json::json!("persisted");
}
Ok(None) => {
response["compositionWriteStatus"] = serde_json::json!("skipped_empty");
}
Err(err) => {
tracing::warn!(
"Failed to persist deep_reference composition event: {}",
err
);
response["compositionWriteStatus"] = serde_json::json!("failed");
}
}
Ok(response)
}
fn persist_deep_reference_composition(
storage: &Arc<Storage>,
query: &str,
intent: &QueryIntent,
response: &Value,
) -> Result<Option<String>, String> {
let event_id = Uuid::new_v4().to_string();
let event = CompositionEventRecord {
id: event_id.clone(),
created_at: Utc::now(),
tool: "deep_reference".to_string(),
mode: "deep_reference".to_string(),
query: Some(query.to_string()),
query_hash: Some(query_hash(query)),
confidence: response.get("confidence").and_then(|v| v.as_f64()),
status: response
.get("status")
.and_then(|v| v.as_str())
.map(ToOwned::to_owned),
output_preview: response
.get("guidance")
.and_then(|v| v.as_str())
.map(|value| preview_text(value, 280)),
metadata: serde_json::json!({
"intent": format!("{:?}", intent),
"memoriesAnalyzed": response.get("memoriesAnalyzed").and_then(|v| v.as_u64()).unwrap_or(0),
"activationExpanded": response.get("activationExpanded").and_then(|v| v.as_u64()).unwrap_or(0),
"reasoningPreview": response.get("reasoning").and_then(|v| v.as_str()).map(|value| preview_text(value, 600)),
}),
};
let mut members = Vec::new();
if let Some(evidence) = response.get("evidence").and_then(|v| v.as_array()) {
for (idx, item) in evidence.iter().enumerate() {
let Some(memory_id) = item.get("id").and_then(|v| v.as_str()) else {
continue;
};
let role = item
.get("role")
.and_then(|v| v.as_str())
.unwrap_or(if idx == 0 { "primary" } else { "supporting" });
members.push(CompositionMemberRecord {
event_id: event_id.clone(),
memory_id: memory_id.to_string(),
role: role.to_string(),
rank: idx as i32,
trust: item.get("trust").and_then(|v| v.as_f64()),
score: item
.get("relevanceScore")
.or_else(|| item.get("relevance_score"))
.and_then(|v| v.as_f64()),
preview: None,
metadata: serde_json::json!({
"roleSource": "deep_reference_evidence",
"evidenceRank": idx,
"date": item.get("date").and_then(|v| v.as_str()),
}),
});
}
}
if let Some(contradictions) = response.get("contradictions").and_then(|v| v.as_array()) {
for (idx, contradiction) in contradictions.iter().enumerate() {
for side in ["stronger", "weaker"] {
let Some(item) = contradiction.get(side) else {
continue;
};
let Some(memory_id) = item.get("id").and_then(|v| v.as_str()) else {
continue;
};
members.push(CompositionMemberRecord {
event_id: event_id.clone(),
memory_id: memory_id.to_string(),
role: "contradicting".to_string(),
rank: idx as i32,
trust: item.get("trust").and_then(|v| v.as_f64()),
score: contradiction.get("topic_overlap").and_then(|v| v.as_f64()),
preview: None,
metadata: serde_json::json!({
"roleSource": "deep_reference_contradiction",
"side": side,
"date": item.get("date").and_then(|v| v.as_str()),
}),
});
}
}
}
if let Some(superseded) = response.get("superseded").and_then(|v| v.as_array()) {
for (idx, item) in superseded.iter().enumerate() {
let Some(memory_id) = item.get("id").and_then(|v| v.as_str()) else {
continue;
};
members.push(CompositionMemberRecord {
event_id: event_id.clone(),
memory_id: memory_id.to_string(),
role: "superseded".to_string(),
rank: idx as i32,
trust: item.get("trust").and_then(|v| v.as_f64()),
score: None,
preview: None,
metadata: serde_json::json!({
"roleSource": "deep_reference_superseded",
"superseded_by": item.get("superseded_by").and_then(|v| v.as_str()),
"date": item.get("date").and_then(|v| v.as_str()),
}),
});
}
}
if members.is_empty() {
return Ok(None);
}
storage
.save_composition(&event, &members, &[])
.map_err(|e| e.to_string())?;
Ok(Some(event_id))
}
fn query_hash(query: &str) -> String {
let mut hash = 0xcbf29ce484222325u64;
for byte in query.as_bytes() {
hash ^= u64::from(*byte);
hash = hash.wrapping_mul(0x100000001b3);
}
format!("fnv1a64:{hash:016x}")
}
fn preview_text(value: &str, max: usize) -> String {
let collapsed = value.replace('\n', " ");
if collapsed.len() <= max {
return collapsed;
}
format!("{}...", &collapsed[..collapsed.floor_char_boundary(max)])
}
// ============================================================================
// TESTS
// ============================================================================
@ -1010,6 +1167,99 @@ mod tests {
);
}
#[tokio::test]
async fn test_deep_reference_persists_composition_event() {
let (storage, _dir) = test_storage().await;
let primary_id = ingest_one(
&storage,
"ProtocolGate control-plane composition tracks global invariant local gate bypasses.",
&["protocolgate", "boundary-scope"],
)
.await;
let supporting_id = ingest_one(
&storage,
"ProtocolGate global invariant local gate research used Aave account-global health factor and route-local validation.",
&["protocolgate", "boundary-scope"],
)
.await;
let result = execute(
&storage,
&test_cognitive(),
Some(serde_json::json!({
"query": "ProtocolGate global invariant local gate",
"depth": 10
})),
)
.await
.expect("execute should succeed");
let event_id = result["composition_event_id"]
.as_str()
.expect("deep_reference should return persisted event id");
assert_eq!(result["compositionWriteStatus"].as_str(), Some("persisted"));
let event = storage
.get_composition_event(event_id)
.unwrap()
.expect("composition event should be stored");
assert_eq!(event.tool, "deep_reference");
assert_eq!(
event.query.as_deref(),
Some("ProtocolGate global invariant local gate")
);
let members = storage.get_composition_members(event_id).unwrap();
assert!(members.iter().any(|member| member.memory_id == primary_id));
assert!(
members
.iter()
.any(|member| member.memory_id == supporting_id)
);
assert!(members.iter().any(|member| member.role == "primary"));
assert!(
members.iter().any(|member| {
member.memory_id == primary_id
&& member.score.is_some()
&& member.metadata["roleSource"] == "deep_reference_evidence"
}),
"persisted members should retain relevance score and role source"
);
}
#[tokio::test]
async fn test_deep_reference_skips_empty_composition_event() {
let (storage, _dir) = test_storage().await;
let result = execute(
&storage,
&test_cognitive(),
Some(serde_json::json!({
"query": "no memories exist for this query",
"depth": 10
})),
)
.await
.expect("execute should succeed");
assert_eq!(
result["compositionWriteStatus"].as_str(),
Some("skipped_empty")
);
assert!(
result.get("composition_event_id").is_none(),
"empty evidence should not create a composition event"
);
assert!(
storage
.get_recent_composition_events(10)
.unwrap()
.is_empty(),
"ledger should stay empty when no memories participated"
);
}
// ========================================================================
// Confidence sanity: must vary with query relevance.
// ========================================================================

View file

@ -41,6 +41,7 @@ pub mod graph;
pub mod health;
// v2.1: Cross-reference (connect the dots)
pub mod composed_graph;
pub mod contradictions;
pub mod cross_reference;