fix: hydrate cognitive modules from persisted connections (#16)

fix: hydrate cognitive modules from persisted connections
This commit is contained in:
Sam Valladares 2026-03-01 19:56:29 -06:00 committed by GitHub
commit 45c01edbee
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 455 additions and 6 deletions

View file

@ -9,13 +9,15 @@ use vestige_core::{
ActivationNetwork, SynapticTaggingSystem, HippocampalIndex, ContextMatcher,
AccessibilityCalculator, CompetitionManager, StateUpdateService,
ImportanceSignals, NoveltySignal, ArousalSignal, RewardSignal, AttentionSignal,
EmotionalMemory,
EmotionalMemory, LinkType,
// Advanced modules
ImportanceTracker, ReconsolidationManager, IntentDetector, ActivityTracker,
MemoryDreamer, MemoryChainBuilder, MemoryCompressor, CrossProjectLearner,
AdaptiveEmbedder, SpeculativeRetriever, ConsolidationScheduler,
// Search modules
Reranker, RerankerConfig,
// Storage
Storage,
};
use vestige_core::search::TemporalSearcher;
use vestige_core::neuroscience::predictive_retrieval::PredictiveMemory;
@ -70,6 +72,41 @@ impl Default for CognitiveEngine {
}
impl CognitiveEngine {
/// Load persisted connections from storage into in-memory cognitive modules.
///
/// Currently hydrates `ActivationNetwork` which serves `explore_connections`
/// "associations" queries. Other modules (MemoryChainBuilder, HippocampalIndex)
/// require full MemoryNode content and are deferred to a follow-up.
pub fn hydrate(&mut self, storage: &Storage) {
match storage.get_all_connections() {
Ok(connections) => {
for conn in &connections {
let link_type = match conn.link_type.as_str() {
"semantic" => LinkType::Semantic,
"temporal" => LinkType::Temporal,
"causal" => LinkType::Causal,
"spatial" => LinkType::Spatial,
"shared_concepts" | "complementary" => LinkType::Semantic,
_ => LinkType::Semantic,
};
self.activation_network.add_edge(
conn.source_id.clone(),
conn.target_id.clone(),
link_type,
conn.strength,
);
}
tracing::info!(
count = connections.len(),
"Hydrated cognitive modules from persisted connections"
);
}
Err(e) => {
tracing::warn!("Failed to hydrate cognitive modules: {}", e);
}
}
}
/// Initialize all cognitive modules with default configurations.
pub fn new() -> Self {
Self {
@ -110,3 +147,110 @@ impl CognitiveEngine {
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use vestige_core::{ConnectionRecord, IngestInput};
use chrono::Utc;
use tempfile::TempDir;
fn create_test_storage() -> (Storage, TempDir) {
let dir = TempDir::new().unwrap();
let storage = Storage::new(Some(dir.path().join("test.db"))).unwrap();
(storage, dir)
}
fn ingest_memory(storage: &Storage, content: &str) -> String {
let result = storage.ingest(IngestInput {
content: content.to_string(),
node_type: "fact".to_string(),
source: None,
sentiment_score: 0.0,
sentiment_magnitude: 0.0,
tags: vec!["test".to_string()],
valid_from: None,
valid_until: None,
}).unwrap();
result.id
}
#[test]
fn test_hydrate_empty_storage() {
let (storage, _dir) = create_test_storage();
let mut engine = CognitiveEngine::new();
engine.hydrate(&storage);
// Should succeed with 0 connections
let assocs = engine.activation_network.get_associations("nonexistent");
assert!(assocs.is_empty());
}
#[test]
fn test_hydrate_loads_connections() {
let (storage, _dir) = create_test_storage();
// Create two memories so FK constraints pass
let id1 = ingest_memory(&storage, "Memory about Rust programming");
let id2 = ingest_memory(&storage, "Memory about Cargo build system");
// Save a connection between them
let now = Utc::now();
storage.save_connection(&ConnectionRecord {
source_id: id1.clone(),
target_id: id2.clone(),
strength: 0.85,
link_type: "semantic".to_string(),
created_at: now,
last_activated: now,
activation_count: 1,
}).unwrap();
// Hydrate engine
let mut engine = CognitiveEngine::new();
engine.hydrate(&storage);
// Verify activation network has the connection
let assocs = engine.activation_network.get_associations(&id1);
assert!(!assocs.is_empty(), "Hydrated engine should have associations for {}", id1);
assert!(
assocs.iter().any(|a| a.memory_id == id2),
"Should find connection to {}",
id2
);
}
#[test]
fn test_hydrate_multiple_link_types() {
let (storage, _dir) = create_test_storage();
let id1 = ingest_memory(&storage, "Event A happened");
let id2 = ingest_memory(&storage, "Event B followed");
let id3 = ingest_memory(&storage, "Event C was caused by A");
let now = Utc::now();
storage.save_connection(&ConnectionRecord {
source_id: id1.clone(),
target_id: id2.clone(),
strength: 0.7,
link_type: "temporal".to_string(),
created_at: now,
last_activated: now,
activation_count: 1,
}).unwrap();
storage.save_connection(&ConnectionRecord {
source_id: id1.clone(),
target_id: id3.clone(),
strength: 0.9,
link_type: "causal".to_string(),
created_at: now,
last_activated: now,
activation_count: 1,
}).unwrap();
let mut engine = CognitiveEngine::new();
engine.hydrate(&storage);
let assocs = engine.activation_network.get_associations(&id1);
assert!(assocs.len() >= 2, "Should have at least 2 associations, got {}", assocs.len());
}
}

View file

@ -224,7 +224,12 @@ async fn main() {
// Create cognitive engine (stateful neuroscience modules)
let cognitive = Arc::new(Mutex::new(cognitive::CognitiveEngine::new()));
info!("CognitiveEngine initialized (28 modules)");
// Hydrate cognitive modules from persisted connections
{
let mut cog = cognitive.lock().await;
cog.hydrate(&storage);
}
info!("CognitiveEngine initialized and hydrated");
// Create shared event broadcast channel for dashboard <-> MCP tool events
let (event_tx, _) = tokio::sync::broadcast::channel::<vestige_mcp::dashboard::events::VestigeEvent>(1024);

View file

@ -6,7 +6,7 @@ use tokio::sync::Mutex;
use chrono::Utc;
use crate::cognitive::CognitiveEngine;
use vestige_core::{DreamHistoryRecord, Storage};
use vestige_core::{DreamHistoryRecord, LinkType, Storage};
pub fn schema() -> serde_json::Value {
serde_json::json!({
@ -116,8 +116,17 @@ pub async fn execute(
last_activated: now,
activation_count: 1,
};
if storage.save_connection(&record).is_ok() {
connections_persisted += 1;
match storage.save_connection(&record) {
Ok(_) => connections_persisted += 1,
Err(e) => {
tracing::warn!(
source = %conn.from_id,
target = %conn.to_id,
link_type = %link_type,
"Failed to persist dream connection: {}",
e
);
}
}
}
if connections_persisted > 0 {
@ -129,6 +138,26 @@ pub async fn execute(
}
}
// Hydrate live cognitive engine with newly persisted connections
if connections_persisted > 0 {
let mut cog = cognitive.lock().await;
for conn in new_connections {
let link_type_enum = match conn.connection_type {
vestige_core::DiscoveredConnectionType::Semantic => LinkType::Semantic,
vestige_core::DiscoveredConnectionType::SharedConcept => LinkType::Semantic,
vestige_core::DiscoveredConnectionType::Temporal => LinkType::Temporal,
vestige_core::DiscoveredConnectionType::Complementary => LinkType::Semantic,
vestige_core::DiscoveredConnectionType::CausalChain => LinkType::Causal,
};
cog.activation_network.add_edge(
conn.from_id.clone(),
conn.to_id.clone(),
link_type_enum,
conn.similarity,
);
}
}
// Persist dream history (non-fatal on failure — dream still happened)
{
let record = DreamHistoryRecord {
@ -309,4 +338,198 @@ mod tests {
assert!(last.is_some(), "Dream should have been persisted to database");
}
}
#[tokio::test]
async fn test_dream_connections_round_trip() {
// Verify dream → persist → query round-trip
let (storage, _dir) = test_storage().await;
// Create enough diverse memories to trigger connection discovery
for i in 0..15 {
storage.ingest(vestige_core::IngestInput {
content: format!(
"Memory {} about topic {}: detailed content for connection discovery",
i,
if i % 3 == 0 { "rust" } else if i % 3 == 1 { "cargo" } else { "testing" }
),
node_type: "fact".to_string(),
source: None,
sentiment_score: 0.0,
sentiment_magnitude: 0.0,
tags: vec!["dream-roundtrip".to_string()],
valid_from: None,
valid_until: None,
}).unwrap();
}
let cognitive = test_cognitive();
let result = execute(&storage, &cognitive, None).await.unwrap();
assert_eq!(result["status"], "dreamed");
let persisted = result["connectionsPersisted"].as_u64().unwrap_or(0);
if persisted > 0 {
// Verify connections are queryable from storage
let all_conns = storage.get_all_connections().unwrap();
assert!(!all_conns.is_empty(), "Persisted connections should be queryable");
// Verify connection IDs reference valid memories
let all_nodes = storage.get_all_nodes(100, 0).unwrap();
let valid_ids: std::collections::HashSet<String> =
all_nodes.iter().map(|n| n.id.clone()).collect();
for conn in &all_conns {
assert!(
valid_ids.contains(&conn.source_id),
"Connection source_id {} should reference a valid memory",
conn.source_id
);
assert!(
valid_ids.contains(&conn.target_id),
"Connection target_id {} should reference a valid memory",
conn.target_id
);
}
// Verify live cognitive engine was hydrated
let cog = cognitive.lock().await;
let first_conn = &all_conns[0];
let assocs = cog.activation_network.get_associations(&first_conn.source_id);
assert!(
!assocs.is_empty(),
"Live cognitive engine should have been hydrated with dream connections"
);
}
}
/// Directly test save_connection with real memory IDs — isolates the persistence layer.
#[tokio::test]
async fn test_save_connection_with_dream_ids() {
let (storage, _dir) = test_storage().await;
// Ingest memories and collect their IDs
let mut ids = Vec::new();
for i in 0..5 {
let result = storage.ingest(vestige_core::IngestInput {
content: format!("Save connection test memory {}", i),
node_type: "fact".to_string(),
source: None,
sentiment_score: 0.0,
sentiment_magnitude: 0.0,
tags: vec!["save-conn-test".to_string()],
valid_from: None,
valid_until: None,
}).unwrap();
ids.push(result.id);
}
// Simulate what dream does: save connections between real memory IDs
let now = chrono::Utc::now();
let mut saved = 0u32;
let mut errors = Vec::new();
for i in 0..ids.len() {
for j in (i+1)..ids.len() {
let record = vestige_core::ConnectionRecord {
source_id: ids[i].clone(),
target_id: ids[j].clone(),
strength: 0.75,
link_type: "semantic".to_string(),
created_at: now,
last_activated: now,
activation_count: 1,
};
match storage.save_connection(&record) {
Ok(_) => saved += 1,
Err(e) => errors.push(format!(
"{} -> {}: {}",
ids[i], ids[j], e
)),
}
}
}
assert!(
errors.is_empty(),
"save_connection failed for {} of {} connections:\n{}",
errors.len(),
saved + errors.len() as u32,
errors.join("\n")
);
assert!(saved > 0, "Should have saved at least one connection");
// Verify they're queryable
let all = storage.get_all_connections().unwrap();
assert_eq!(all.len(), saved as usize);
// Verify per-memory query
let conns = storage.get_connections_for_memory(&ids[0]).unwrap();
assert!(
!conns.is_empty(),
"get_connections_for_memory should return connections for {}",
ids[0]
);
}
/// Test that dream actually discovers connections and they persist.
/// Unlike test_dream_connections_round_trip, this ASSERTS on the dream
/// discovering connections (not just conditionally checking).
#[tokio::test]
async fn test_dream_discovers_and_persists_connections() {
let (storage, _dir) = test_storage().await;
// Ingest memories with known high-similarity content (shared tags + similar text)
let topics = [
("Rust borrow checker prevents data races at compile time", vec!["rust", "safety"]),
("Rust ownership model ensures memory safety without GC", vec!["rust", "safety"]),
("Cargo is the Rust package manager and build system", vec!["rust", "cargo"]),
("Cargo.toml defines dependencies for Rust projects", vec!["rust", "cargo"]),
("Unit tests in Rust use #[test] attribute", vec!["rust", "testing"]),
("Integration tests in Rust live in the tests/ directory", vec!["rust", "testing"]),
("Clippy is a Rust linter that catches common mistakes", vec!["rust", "tooling"]),
("Rustfmt formats Rust code according to style guidelines", vec!["rust", "tooling"]),
];
for (content, tags) in &topics {
storage.ingest(vestige_core::IngestInput {
content: content.to_string(),
node_type: "fact".to_string(),
source: None,
sentiment_score: 0.0,
sentiment_magnitude: 0.0,
tags: tags.iter().map(|t| t.to_string()).collect(),
valid_from: None,
valid_until: None,
}).unwrap();
}
let cognitive = test_cognitive();
let result = execute(&storage, &cognitive, None).await.unwrap();
assert_eq!(result["status"], "dreamed");
let found = result["stats"]["new_connections_found"].as_u64().unwrap_or(0);
let persisted = result["connectionsPersisted"].as_u64().unwrap_or(0);
// Dream should discover connections between these related memories
// (they share tags and have similar content)
assert!(
found > 0,
"Dream should discover connections between related memories (found: {})",
found
);
// Key assertion: if connections were found, they should persist
assert_eq!(
persisted, found,
"All {} discovered connections should persist, but only {} did. \
Check tracing output for save_connection errors.",
found, persisted
);
// Verify round-trip through storage
let stored = storage.get_all_connections().unwrap();
assert_eq!(
stored.len(),
persisted as usize,
"Storage should contain exactly {} connections",
persisted
);
}
}

View file

@ -35,7 +35,7 @@ pub fn schema() -> serde_json::Value {
}
pub async fn execute(
_storage: &Arc<Storage>,
storage: &Arc<Storage>,
cognitive: &Arc<Mutex<CognitiveEngine>>,
args: Option<serde_json::Value>,
) -> Result<serde_json::Value, String> {
@ -104,6 +104,26 @@ pub async fn execute(
all_associations.truncate(limit);
// Fallback: if in-memory modules are empty, query storage directly
if all_associations.is_empty() {
drop(cog); // release cognitive lock before storage call
if let Ok(connections) = storage.get_connections_for_memory(from) {
for conn in connections.iter().take(limit) {
let other_id = if conn.source_id == from {
&conn.target_id
} else {
&conn.source_id
};
all_associations.push(serde_json::json!({
"memory_id": other_id,
"strength": conn.strength,
"link_type": conn.link_type,
"source": "persistent_graph",
}));
}
}
}
Ok(serde_json::json!({
"action": "associations",
"from": from,
@ -274,4 +294,61 @@ mod tests {
let result = execute(&storage, &test_cognitive(), Some(args)).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_associations_storage_fallback() {
let (storage, _dir) = test_storage().await;
// Create two memories and a direct connection in storage
let id1 = storage.ingest(vestige_core::IngestInput {
content: "Memory about Rust".to_string(),
node_type: "fact".to_string(),
source: None,
sentiment_score: 0.0,
sentiment_magnitude: 0.0,
tags: vec!["test".to_string()],
valid_from: None,
valid_until: None,
}).unwrap().id;
let id2 = storage.ingest(vestige_core::IngestInput {
content: "Memory about Cargo".to_string(),
node_type: "fact".to_string(),
source: None,
sentiment_score: 0.0,
sentiment_magnitude: 0.0,
tags: vec!["test".to_string()],
valid_from: None,
valid_until: None,
}).unwrap().id;
// Save connection directly to storage (bypassing cognitive engine)
let now = chrono::Utc::now();
storage.save_connection(&vestige_core::ConnectionRecord {
source_id: id1.clone(),
target_id: id2.clone(),
strength: 0.9,
link_type: "semantic".to_string(),
created_at: now,
last_activated: now,
activation_count: 1,
}).unwrap();
// Execute with empty cognitive engine — should fall back to storage
let cognitive = test_cognitive();
let args = serde_json::json!({
"action": "associations",
"from": id1,
});
let result = execute(&storage, &cognitive, Some(args)).await;
assert!(result.is_ok());
let value = result.unwrap();
let associations = value["associations"].as_array().unwrap();
assert!(
!associations.is_empty(),
"Should find associations via storage fallback"
);
assert_eq!(associations[0]["source"], "persistent_graph");
assert_eq!(associations[0]["memory_id"], id2);
}
}