Fix dense dream connection persistence

Fixes #50
This commit is contained in:
Sam Valladares 2026-05-01 05:37:27 -05:00
parent f3d63af12e
commit 4e9e11ac0b
4 changed files with 165 additions and 46 deletions

View file

@ -5,6 +5,12 @@ All notable changes to Vestige will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### Fixed
- **Dream connection persistence cap** — dense single-domain dreams now persist every connection discovered in that run instead of losing everything beyond the old 1,000-entry live buffer. The live dreamer buffer now keeps up to 200,000 high-scoring recent connections, and the MCP `dream` tool exposes `min_similarity` for corpus-specific tuning.
## [2.1.1] - 2026-05-01 — "Portable Sync"
v2.1.1 focuses on user-controlled portability: exact storage archives, merge-safe file sync, pluggable sync backends, and explicit hook opt-ins.

View file

@ -93,6 +93,9 @@ const CONNECTION_DECAY_FACTOR: f64 = 0.95;
/// Minimum connection strength to keep
const MIN_CONNECTION_STRENGTH: f64 = 0.1;
/// Maximum discovered connections kept in the live dreamer buffer.
const MAX_STORED_DREAM_CONNECTIONS: usize = 200_000;
/// Maximum memories to replay per cycle
const MAX_REPLAY_MEMORIES: usize = 100;
@ -1129,44 +1132,81 @@ impl MemoryDreamer {
/// Run a dream cycle on provided memories
pub async fn dream(&self, memories: &[DreamMemory]) -> DreamResult {
self.dream_with_connections(memories).await.0
}
/// Run a dream cycle with a temporary config.
pub async fn dream_with_config(
&self,
memories: &[DreamMemory],
config: DreamConfig,
) -> DreamResult {
self.dream_with_config_and_connections(memories, config)
.await
.0
}
/// Run a dream cycle and return the exact connections found in that run.
pub async fn dream_with_connections(
&self,
memories: &[DreamMemory],
) -> (DreamResult, Vec<DiscoveredConnection>) {
self.run_dream(memories, &self.config).await
}
/// Run a dream cycle with a temporary config and return this run's connections.
pub async fn dream_with_config_and_connections(
&self,
memories: &[DreamMemory],
config: DreamConfig,
) -> (DreamResult, Vec<DiscoveredConnection>) {
self.run_dream(memories, &config).await
}
async fn run_dream(
&self,
memories: &[DreamMemory],
config: &DreamConfig,
) -> (DreamResult, Vec<DiscoveredConnection>) {
let start = std::time::Instant::now();
let mut stats = DreamStats::default();
// Filter memories based on config
let working_memories: Vec<_> = if self.config.focus_tags.is_empty() {
let working_memories: Vec<_> = if config.focus_tags.is_empty() {
memories
.iter()
.take(self.config.max_memories_per_dream)
.take(config.max_memories_per_dream)
.collect()
} else {
memories
.iter()
.filter(|m| m.tags.iter().any(|t| self.config.focus_tags.contains(t)))
.take(self.config.max_memories_per_dream)
.filter(|m| m.tags.iter().any(|t| config.focus_tags.contains(t)))
.take(config.max_memories_per_dream)
.collect()
};
stats.memories_analyzed = working_memories.len();
// Phase 1: Discover new connections
let new_connections = self.discover_connections(&working_memories, &mut stats);
let new_connections =
self.discover_connections(&working_memories, &mut stats, config.min_similarity);
// Phase 2: Find clusters/patterns
let clusters = self.find_clusters(&working_memories, &new_connections);
stats.clusters_found = clusters.len();
// Phase 3: Generate insights
let insights = self.generate_insights(&working_memories, &clusters, &mut stats);
let insights = self.generate_insights(&working_memories, &clusters, &mut stats, config);
// Phase 4: Strengthen important memories (would update storage)
let memories_strengthened = if self.config.enable_strengthening {
let memories_strengthened = if config.enable_strengthening {
self.identify_memories_to_strengthen(&working_memories, &new_connections)
} else {
0
};
// Phase 5: Identify compression candidates (would compress in storage)
let memories_compressed = if self.config.enable_compression {
let memories_compressed = if config.enable_compression {
self.identify_compression_candidates(&working_memories)
} else {
0
@ -1195,7 +1235,7 @@ impl MemoryDreamer {
}
}
result
(result, new_connections)
}
/// Synthesize insights from memories without full dream cycle
@ -1203,12 +1243,20 @@ impl MemoryDreamer {
let mut stats = DreamStats::default();
// Find clusters
let connections =
self.discover_connections(&memories.iter().collect::<Vec<_>>(), &mut stats);
let connections = self.discover_connections(
&memories.iter().collect::<Vec<_>>(),
&mut stats,
self.config.min_similarity,
);
let clusters = self.find_clusters(&memories.iter().collect::<Vec<_>>(), &connections);
// Generate insights
self.generate_insights(&memories.iter().collect::<Vec<_>>(), &clusters, &mut stats)
self.generate_insights(
&memories.iter().collect::<Vec<_>>(),
&clusters,
&mut stats,
&self.config,
)
}
/// Get all generated insights
@ -1257,6 +1305,7 @@ impl MemoryDreamer {
&self,
memories: &[&DreamMemory],
stats: &mut DreamStats,
min_similarity: f64,
) -> Vec<DiscoveredConnection> {
let mut connections = Vec::new();
@ -1271,7 +1320,7 @@ impl MemoryDreamer {
// Calculate similarity
let similarity = self.calculate_similarity(mem_a, mem_b);
if similarity >= self.config.min_similarity {
if similarity >= min_similarity {
let connection_type = self.determine_connection_type(mem_a, mem_b, similarity);
let reasoning =
self.generate_connection_reasoning(mem_a, mem_b, &connection_type);
@ -1464,6 +1513,7 @@ impl MemoryDreamer {
memories: &[&DreamMemory],
clusters: &[Vec<String>],
stats: &mut DreamStats,
config: &DreamConfig,
) -> Vec<SynthesizedInsight> {
let mut insights = Vec::new();
let memory_map: HashMap<_, _> = memories.iter().map(|m| (&m.id, *m)).collect();
@ -1483,12 +1533,12 @@ impl MemoryDreamer {
// Try to generate insight from this cluster
if let Some(insight) = self.generate_insight_from_cluster(&cluster_memories)
&& insight.novelty_score >= self.config.min_novelty
&& insight.novelty_score >= config.min_novelty
{
insights.push(insight);
}
if insights.len() >= self.config.max_insights {
if insights.len() >= config.max_insights {
break;
}
}
@ -1706,7 +1756,7 @@ impl MemoryDreamer {
fn store_connections(&self, connections: &[DiscoveredConnection]) {
if let Ok(mut stored) = self.connections.write() {
stored.extend(connections.iter().cloned());
// Keep the 1000 highest-scoring connections using a composite score
// Keep the highest-scoring connections using a composite score
// that balances quality (similarity) and recency (age-based decay).
//
// score = similarity * 0.6 + recency * 0.4
@ -1721,7 +1771,7 @@ impl MemoryDreamer {
// Strong old connections are retained longer than weak new ones,
// but eventually yield to fresh high-quality discoveries.
let len = stored.len();
if len > 1000 {
if len > MAX_STORED_DREAM_CONNECTIONS {
let now = Utc::now();
stored.sort_unstable_by(|a, b| {
let score = |c: &DiscoveredConnection| -> f64 {
@ -1737,7 +1787,7 @@ impl MemoryDreamer {
.partial_cmp(&score(a))
.unwrap_or(std::cmp::Ordering::Equal)
});
stored.truncate(1000);
stored.truncate(MAX_STORED_DREAM_CONNECTIONS);
}
}
}
@ -1854,6 +1904,31 @@ mod tests {
assert!(result.stats.connections_evaluated > 0);
}
#[tokio::test]
async fn test_dense_dream_keeps_more_than_legacy_connection_cap() {
let dreamer = MemoryDreamer::with_config(DreamConfig {
max_memories_per_dream: 50,
min_similarity: 0.1,
..DreamConfig::default()
});
let memories: Vec<_> = (0..50)
.map(|i| {
make_memory(
&format!("dense-{i}"),
&format!("Dense single-domain memory {i} about shared identity systems"),
vec!["dense", "identity"],
)
})
.collect();
let (result, connections) = dreamer.dream_with_connections(&memories).await;
assert_eq!(result.new_connections_found, 1_225);
assert_eq!(connections.len(), 1_225);
assert_eq!(dreamer.get_connections().len(), 1_225);
}
#[test]
fn test_tag_similarity() {
let dreamer = MemoryDreamer::new();

View file

@ -858,23 +858,14 @@ pub async fn trigger_dream(State(state): State<AppState>) -> Result<Json<Value>,
// Run dream through CognitiveEngine
let cog = cognitive.lock().await;
// Capture start time before the dream — composite-score eviction in store_connections
// reorders the buffer, making positional slicing (pre_dream_count..) unreliable.
let dream_start = Utc::now();
let dream_result = cog.dreamer.dream(&dream_memories).await;
let (dream_result, new_connections) = cog.dreamer.dream_with_connections(&dream_memories).await;
let insights = cog.dreamer.synthesize_insights(&dream_memories);
let all_connections = cog.dreamer.get_connections();
drop(cog);
// Persist new connections
// Filter by timestamp — same approach as dream.rs to avoid positional index issues.
let new_connections: Vec<&vestige_core::DiscoveredConnection> = all_connections
.iter()
.filter(|c| c.discovered_at >= dream_start)
.collect();
let mut connections_persisted = 0u64;
let now = Utc::now();
for conn in new_connections.iter() {
for conn in &new_connections {
let link_type = match conn.connection_type {
vestige_core::DiscoveredConnectionType::Semantic => "semantic",
vestige_core::DiscoveredConnectionType::SharedConcept => "shared_concepts",

View file

@ -16,6 +16,13 @@ pub fn schema() -> serde_json::Value {
"type": "integer",
"description": "Number of recent memories to dream about (default: 50)",
"default": 50
},
"min_similarity": {
"type": "number",
"description": "Minimum similarity for connection discovery (0.0-1.0, default: 0.5)",
"minimum": 0.0,
"maximum": 1.0,
"default": 0.5
}
}
})
@ -32,6 +39,11 @@ pub async fn execute(
.and_then(|v| v.as_u64())
.unwrap_or(50)
.min(500) as usize; // Cap at 500 to prevent O(N^2) hang
let min_similarity = args
.as_ref()
.and_then(|a| a.get("min_similarity"))
.and_then(|v| v.as_f64())
.map(|v| v.clamp(0.0, 1.0));
// v1.9.0: Waking SWR tagging — preferential replay of tagged memories (70/30 split)
let tagged_nodes = storage
@ -95,15 +107,18 @@ pub async fn execute(
.collect();
let cog = cognitive.lock().await;
// Capture start time before the dream so we can identify newly discovered
// connections by timestamp rather than by buffer position. This is robust
// against the composite-score eviction sort in store_connections, which
// reorders the buffer and makes positional slicing (pre_dream_count..)
// unreliable.
let dream_start = Utc::now();
let dream_result = cog.dreamer.dream(&dream_memories).await;
let (dream_result, new_connections) = if let Some(min_similarity) = min_similarity {
let config = vestige_core::DreamConfig {
min_similarity,
..vestige_core::DreamConfig::default()
};
cog.dreamer
.dream_with_config_and_connections(&dream_memories, config)
.await
} else {
cog.dreamer.dream_with_connections(&dream_memories).await
};
let insights = cog.dreamer.synthesize_insights(&dream_memories);
let all_connections = cog.dreamer.get_connections();
drop(cog);
// v2.1.0: Persist dream insights to database (Bug #4 fix)
@ -126,17 +141,10 @@ pub async fn execute(
}
}
// Identify new connections from this dream by timestamp rather than buffer
// position — positional slicing is broken after composite-score eviction
// reorders the buffer.
let new_connections: Vec<&vestige_core::DiscoveredConnection> = all_connections
.iter()
.filter(|c| c.discovered_at >= dream_start)
.collect();
let mut connections_persisted = 0u64;
{
let now = Utc::now();
for conn in new_connections.iter() {
for conn in &new_connections {
let link_type = match conn.connection_type {
vestige_core::DiscoveredConnectionType::Semantic => "semantic",
vestige_core::DiscoveredConnectionType::SharedConcept => "shared_concepts",
@ -178,7 +186,7 @@ pub async fn execute(
// Hydrate live cognitive engine with newly persisted connections
if connections_persisted > 0 {
let mut cog = cognitive.lock().await;
for conn in new_connections.iter() {
for conn in &new_connections {
let link_type_enum = match conn.connection_type {
vestige_core::DiscoveredConnectionType::Semantic => LinkType::Semantic,
vestige_core::DiscoveredConnectionType::SharedConcept => LinkType::Semantic,
@ -286,6 +294,9 @@ mod tests {
assert_eq!(s["type"], "object");
assert!(s["properties"]["memory_count"].is_object());
assert_eq!(s["properties"]["memory_count"]["default"], 50);
assert!(s["properties"]["min_similarity"].is_object());
assert_eq!(s["properties"]["min_similarity"]["minimum"], 0.0);
assert_eq!(s["properties"]["min_similarity"]["maximum"], 1.0);
}
#[tokio::test]
@ -616,6 +627,42 @@ mod tests {
);
}
#[tokio::test]
async fn test_dream_persists_dense_connection_set_above_legacy_buffer_cap() {
let (storage, _dir) = test_storage().await;
ingest_n_memories(&storage, 50).await;
let result = execute(
&storage,
&test_cognitive(),
Some(serde_json::json!({
"memory_count": 50,
"min_similarity": 0.1
})),
)
.await
.unwrap();
assert_eq!(result["status"], "dreamed");
let found = result["stats"]["new_connections_found"]
.as_u64()
.unwrap_or(0);
let persisted = result["connectionsPersisted"].as_u64().unwrap_or(0);
assert!(
found > 1_000,
"test setup should discover more than the legacy 1,000 connection cap"
);
assert_eq!(
persisted, found,
"dense dreams should persist every connection discovered in the run"
);
assert_eq!(
storage.get_all_connections().unwrap().len(),
persisted as usize
);
}
#[tokio::test]
async fn test_dream_persists_insights() {
let (storage, _dir) = test_storage().await;