feat: Vestige v2.0.0 "Cognitive Leap" — 3D dashboard, HyDE search, WebSocket events

The biggest release in Vestige history. Complete visual and cognitive overhaul.

Dashboard:
- SvelteKit 2 + Three.js 3D neural visualization at localhost:3927/dashboard
- 7 interactive pages: Graph, Memories, Timeline, Feed, Explore, Intentions, Stats
- WebSocket event bus with 16 event types, real-time 3D animations
- Bloom post-processing, GPU instanced rendering, force-directed layout
- Dream visualization mode, FSRS retention curves, command palette (Cmd+K)
- Keyboard shortcuts, responsive mobile layout, PWA installable
- Single binary deployment via include_dir! (22MB)

Engine:
- HyDE query expansion (intent classification + 3-5 semantic variants + centroid)
- fastembed 5.11 with optional Nomic v2 MoE + Qwen3 reranker + Metal GPU
- Emotional memory module (#29)
- Criterion benchmark suite

Backend:
- Axum WebSocket at /ws with heartbeat + event broadcast
- 7 new REST endpoints for cognitive operations
- Event emission from MCP tools via shared broadcast channel
- CORS for SvelteKit dev mode

Distribution:
- GitHub issue templates (bug report, feature request)
- CHANGELOG with comprehensive v2.0 release notes
- README updated with dashboard docs, architecture diagram, comparison table

734 tests passing, zero warnings, 22MB release binary.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Sam Valladares 2026-02-22 03:07:25 -06:00
parent 26cee040a5
commit c2d28f3433
321 changed files with 32695 additions and 4727 deletions

View file

@ -1,8 +1,8 @@
[package]
name = "vestige-mcp"
version = "1.9.1"
version = "2.0.0"
edition = "2024"
description = "Cognitive memory MCP server for Claude - FSRS-6, spreading activation, synaptic tagging, and 130 years of memory research"
description = "Cognitive memory MCP server for Claude - FSRS-6, spreading activation, synaptic tagging, 3D dashboard, and 130 years of memory research"
authors = ["samvallad33"]
license = "AGPL-3.0-only"
keywords = ["mcp", "ai", "memory", "fsrs", "neuroscience", "cognitive-science", "spaced-repetition"]
@ -32,7 +32,7 @@ path = "src/bin/cli.rs"
# ============================================================================
# Includes: FSRS-6, spreading activation, synaptic tagging, hippocampal indexing,
# memory states, context memory, importance signals, dreams, and more
vestige-core = { version = "1.0.0", path = "../vestige-core" }
vestige-core = { version = "2.0.0", path = "../vestige-core" }
# ============================================================================
# MCP Server Dependencies
@ -72,10 +72,15 @@ colored = "3"
rusqlite = { version = "0.38", features = ["bundled"] }
# Dashboard (v1.2) - hyper/tower already in Cargo.lock via rmcp/reqwest
axum = { version = "0.8", default-features = false, features = ["json", "query", "tokio", "http1"] }
axum = { version = "0.8", default-features = false, features = ["json", "query", "tokio", "http1", "ws"] }
tower = { version = "0.5", features = ["limit"] }
tower-http = { version = "0.6", features = ["cors", "set-header"] }
futures-util = "0.3"
open = "5"
# Embedded SvelteKit dashboard (v2.0)
include_dir = "0.7"
mime_guess = "2"
[dev-dependencies]
tempfile = "3"

View file

@ -961,7 +961,7 @@ fn run_dashboard(port: u16, open_browser: bool) -> anyhow::Result<()> {
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(async move {
vestige_mcp::dashboard::start_dashboard(storage, port, open_browser)
vestige_mcp::dashboard::start_dashboard(storage, None, port, open_browser)
.await
.map_err(|e| anyhow::anyhow!("Dashboard error: {}", e))
})

View file

@ -9,6 +9,7 @@ use vestige_core::{
ActivationNetwork, SynapticTaggingSystem, HippocampalIndex, ContextMatcher,
AccessibilityCalculator, CompetitionManager, StateUpdateService,
ImportanceSignals, NoveltySignal, ArousalSignal, RewardSignal, AttentionSignal,
EmotionalMemory,
// Advanced modules
ImportanceTracker, ReconsolidationManager, IntentDetector, ActivityTracker,
MemoryDreamer, MemoryChainBuilder, MemoryCompressor, CrossProjectLearner,
@ -39,6 +40,7 @@ pub struct CognitiveEngine {
pub arousal_signal: ArousalSignal,
pub reward_signal: RewardSignal,
pub attention_signal: AttentionSignal,
pub emotional_memory: EmotionalMemory,
pub predictive_memory: PredictiveMemory,
pub prospective_memory: ProspectiveMemory,
pub intention_parser: IntentionParser,
@ -84,6 +86,7 @@ impl CognitiveEngine {
arousal_signal: ArousalSignal::new(),
reward_signal: RewardSignal::new(),
attention_signal: AttentionSignal::new(),
emotional_memory: EmotionalMemory::new(),
predictive_memory: PredictiveMemory::new(),
prospective_memory: ProspectiveMemory::new(),
intention_parser: IntentionParser::new(),

View file

@ -0,0 +1,131 @@
//! Real-time event system for the Vestige dashboard.
//!
//! Events are emitted by the CognitiveEngine and broadcast to all
//! connected WebSocket clients via a tokio broadcast channel.
use chrono::{DateTime, Utc};
use serde::Serialize;
/// Every cognitive operation emits one of these events.
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "type", content = "data")]
pub enum VestigeEvent {
// -- Memory lifecycle --
MemoryCreated {
id: String,
content_preview: String,
node_type: String,
tags: Vec<String>,
timestamp: DateTime<Utc>,
},
MemoryUpdated {
id: String,
content_preview: String,
field: String,
timestamp: DateTime<Utc>,
},
MemoryDeleted {
id: String,
timestamp: DateTime<Utc>,
},
MemoryPromoted {
id: String,
new_retention: f64,
timestamp: DateTime<Utc>,
},
MemoryDemoted {
id: String,
new_retention: f64,
timestamp: DateTime<Utc>,
},
// -- Search --
SearchPerformed {
query: String,
result_count: usize,
result_ids: Vec<String>,
duration_ms: u64,
timestamp: DateTime<Utc>,
},
// -- Dream --
DreamStarted {
memory_count: usize,
timestamp: DateTime<Utc>,
},
DreamProgress {
phase: String,
memory_id: Option<String>,
progress_pct: f64,
timestamp: DateTime<Utc>,
},
DreamCompleted {
memories_replayed: usize,
connections_found: usize,
insights_generated: usize,
duration_ms: u64,
timestamp: DateTime<Utc>,
},
// -- Consolidation --
ConsolidationStarted {
timestamp: DateTime<Utc>,
},
ConsolidationCompleted {
nodes_processed: usize,
decay_applied: usize,
embeddings_generated: usize,
duration_ms: u64,
timestamp: DateTime<Utc>,
},
// -- FSRS --
RetentionDecayed {
id: String,
old_retention: f64,
new_retention: f64,
timestamp: DateTime<Utc>,
},
// -- Connections --
ConnectionDiscovered {
source_id: String,
target_id: String,
connection_type: String,
weight: f64,
timestamp: DateTime<Utc>,
},
// -- Spreading activation --
ActivationSpread {
source_id: String,
activated_ids: Vec<String>,
timestamp: DateTime<Utc>,
},
// -- Importance --
ImportanceScored {
content_preview: String,
composite_score: f64,
novelty: f64,
arousal: f64,
reward: f64,
attention: f64,
timestamp: DateTime<Utc>,
},
// -- System --
Heartbeat {
uptime_secs: u64,
memory_count: usize,
avg_retention: f64,
timestamp: DateTime<Utc>,
},
}
impl VestigeEvent {
/// Serialize to JSON string for WebSocket transmission.
pub fn to_json(&self) -> String {
serde_json::to_string(self).unwrap_or_else(|_| "{}".to_string())
}
}

View file

@ -1,4 +1,6 @@
//! Dashboard API endpoint handlers
//!
//! v2.0: Adds cognitive operation endpoints (dream, explore, predict, importance, consolidation)
use axum::extract::{Path, Query, State};
use axum::http::StatusCode;
@ -7,6 +9,7 @@ use chrono::{Duration, Utc};
use serde::Deserialize;
use serde_json::Value;
use super::events::VestigeEvent;
use super::state::AppState;
/// Serve the dashboard HTML
@ -304,3 +307,593 @@ pub async fn health_check(
"version": env!("CARGO_PKG_VERSION"),
})))
}
// ============================================================================
// MEMORY GRAPH
// ============================================================================
/// Serve the memory graph visualization HTML
pub async fn serve_graph() -> Html<&'static str> {
Html(include_str!("../graph.html"))
}
#[derive(Debug, Deserialize)]
pub struct GraphParams {
pub query: Option<String>,
pub center_id: Option<String>,
pub depth: Option<u32>,
pub max_nodes: Option<usize>,
}
/// Get memory graph data (nodes + edges with layout positions)
pub async fn get_graph(
State(state): State<AppState>,
Query(params): Query<GraphParams>,
) -> Result<Json<Value>, StatusCode> {
let depth = params.depth.unwrap_or(2).clamp(1, 3);
let max_nodes = params.max_nodes.unwrap_or(50).clamp(1, 200);
// Determine center node
let center_id = if let Some(ref id) = params.center_id {
id.clone()
} else if let Some(ref query) = params.query {
let results = state.storage
.search(query, 1)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
results.first()
.map(|n| n.id.clone())
.ok_or(StatusCode::NOT_FOUND)?
} else {
// Default: most recent memory
let recent = state.storage
.get_all_nodes(1, 0)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
recent.first()
.map(|n| n.id.clone())
.ok_or(StatusCode::NOT_FOUND)?
};
// Get subgraph
let (nodes, edges) = state.storage
.get_memory_subgraph(&center_id, depth, max_nodes)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
if nodes.is_empty() {
return Err(StatusCode::NOT_FOUND);
}
// Build nodes JSON with timestamps for recency calculation
let nodes_json: Vec<Value> = nodes.iter()
.map(|n| {
let label = if n.content.chars().count() > 80 {
format!("{}...", n.content.chars().take(77).collect::<String>())
} else {
n.content.clone()
};
serde_json::json!({
"id": n.id,
"label": label,
"type": n.node_type,
"retention": n.retention_strength,
"tags": n.tags,
"createdAt": n.created_at.to_rfc3339(),
"updatedAt": n.updated_at.to_rfc3339(),
"isCenter": n.id == center_id,
})
})
.collect();
let edges_json: Vec<Value> = edges.iter()
.map(|e| {
serde_json::json!({
"source": e.source_id,
"target": e.target_id,
"weight": e.strength,
"type": e.link_type,
})
})
.collect();
Ok(Json(serde_json::json!({
"nodes": nodes_json,
"edges": edges_json,
"center_id": center_id,
"depth": depth,
"nodeCount": nodes.len(),
"edgeCount": edges.len(),
})))
}
// ============================================================================
// SEARCH (dedicated endpoint)
// ============================================================================
#[derive(Debug, Deserialize)]
pub struct SearchParams {
pub q: String,
pub limit: Option<i32>,
pub min_retention: Option<f64>,
}
/// Search memories with hybrid search
pub async fn search_memories(
State(state): State<AppState>,
Query(params): Query<SearchParams>,
) -> Result<Json<Value>, StatusCode> {
let limit = params.limit.unwrap_or(20).clamp(1, 100);
let start = std::time::Instant::now();
let results = state
.storage
.hybrid_search(&params.q, limit, 0.3, 0.7)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let duration_ms = start.elapsed().as_millis() as u64;
let result_ids: Vec<String> = results.iter().map(|r| r.node.id.clone()).collect();
// Emit search event
state.emit(VestigeEvent::SearchPerformed {
query: params.q.clone(),
result_count: results.len(),
result_ids: result_ids.clone(),
duration_ms,
timestamp: Utc::now(),
});
let formatted: Vec<Value> = results
.into_iter()
.filter(|r| {
params
.min_retention
.is_none_or(|min| r.node.retention_strength >= min)
})
.map(|r| {
serde_json::json!({
"id": r.node.id,
"content": r.node.content,
"nodeType": r.node.node_type,
"tags": r.node.tags,
"retentionStrength": r.node.retention_strength,
"combinedScore": r.combined_score,
"createdAt": r.node.created_at.to_rfc3339(),
})
})
.collect();
Ok(Json(serde_json::json!({
"query": params.q,
"total": formatted.len(),
"durationMs": duration_ms,
"results": formatted,
})))
}
// ============================================================================
// COGNITIVE OPERATIONS (v2.0)
// ============================================================================
/// Trigger a dream cycle via CognitiveEngine
pub async fn trigger_dream(
State(state): State<AppState>,
) -> Result<Json<Value>, StatusCode> {
let cognitive = state.cognitive.as_ref().ok_or(StatusCode::SERVICE_UNAVAILABLE)?;
let start = std::time::Instant::now();
let memory_count: usize = 50;
// Load memories for dreaming
let all_nodes = state
.storage
.get_all_nodes(memory_count as i32, 0)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
if all_nodes.len() < 5 {
return Ok(Json(serde_json::json!({
"status": "insufficient_memories",
"message": format!("Need at least 5 memories. Current: {}", all_nodes.len()),
})));
}
// Emit start event
state.emit(VestigeEvent::DreamStarted {
memory_count: all_nodes.len(),
timestamp: Utc::now(),
});
// Build dream memories
let dream_memories: Vec<vestige_core::DreamMemory> = all_nodes
.iter()
.map(|n| vestige_core::DreamMemory {
id: n.id.clone(),
content: n.content.clone(),
embedding: state.storage.get_node_embedding(&n.id).ok().flatten(),
tags: n.tags.clone(),
created_at: n.created_at,
access_count: n.reps as u32,
})
.collect();
// Run dream through CognitiveEngine
let cog = cognitive.lock().await;
let pre_dream_count = cog.dreamer.get_connections().len();
let dream_result = cog.dreamer.dream(&dream_memories).await;
let insights = cog.dreamer.synthesize_insights(&dream_memories);
let all_connections = cog.dreamer.get_connections();
drop(cog);
// Persist new connections
let new_connections = &all_connections[pre_dream_count..];
let mut connections_persisted = 0u64;
let now = Utc::now();
for conn in new_connections {
let link_type = match conn.connection_type {
vestige_core::DiscoveredConnectionType::Semantic => "semantic",
vestige_core::DiscoveredConnectionType::SharedConcept => "shared_concepts",
vestige_core::DiscoveredConnectionType::Temporal => "temporal",
vestige_core::DiscoveredConnectionType::Complementary => "complementary",
vestige_core::DiscoveredConnectionType::CausalChain => "causal",
};
let record = vestige_core::ConnectionRecord {
source_id: conn.from_id.clone(),
target_id: conn.to_id.clone(),
strength: conn.similarity,
link_type: link_type.to_string(),
created_at: now,
last_activated: now,
activation_count: 1,
};
if state.storage.save_connection(&record).is_ok() {
connections_persisted += 1;
}
// Emit connection events
state.emit(VestigeEvent::ConnectionDiscovered {
source_id: conn.from_id.clone(),
target_id: conn.to_id.clone(),
connection_type: link_type.to_string(),
weight: conn.similarity,
timestamp: now,
});
}
let duration_ms = start.elapsed().as_millis() as u64;
// Emit completion event
state.emit(VestigeEvent::DreamCompleted {
memories_replayed: dream_memories.len(),
connections_found: connections_persisted as usize,
insights_generated: insights.len(),
duration_ms,
timestamp: Utc::now(),
});
Ok(Json(serde_json::json!({
"status": "dreamed",
"memoriesReplayed": dream_memories.len(),
"connectionsPersisted": connections_persisted,
"insights": insights.iter().map(|i| serde_json::json!({
"type": format!("{:?}", i.insight_type),
"insight": i.insight,
"sourceMemories": i.source_memories,
"confidence": i.confidence,
"noveltyScore": i.novelty_score,
})).collect::<Vec<Value>>(),
"stats": {
"newConnectionsFound": dream_result.new_connections_found,
"connectionsPersisted": connections_persisted,
"memoriesStrengthened": dream_result.memories_strengthened,
"memoriesCompressed": dream_result.memories_compressed,
"insightsGenerated": dream_result.insights_generated.len(),
"durationMs": duration_ms,
}
})))
}
#[derive(Debug, Deserialize)]
pub struct ExploreRequest {
pub from_id: String,
pub to_id: Option<String>,
pub action: Option<String>, // "associations", "chains", "bridges"
pub limit: Option<usize>,
}
/// Explore connections between memories
pub async fn explore_connections(
State(state): State<AppState>,
Json(req): Json<ExploreRequest>,
) -> Result<Json<Value>, StatusCode> {
let action = req.action.as_deref().unwrap_or("associations");
let limit = req.limit.unwrap_or(10).clamp(1, 50);
match action {
"associations" => {
// Get the source memory content for similarity search
let source_node = state
.storage
.get_node(&req.from_id)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.ok_or(StatusCode::NOT_FOUND)?;
// Use hybrid search with source content to find associated memories
let results = state
.storage
.hybrid_search(&source_node.content, limit as i32, 0.3, 0.7)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let formatted: Vec<Value> = results
.iter()
.filter(|r| r.node.id != req.from_id) // Exclude self
.map(|r| {
serde_json::json!({
"id": r.node.id,
"content": r.node.content,
"nodeType": r.node.node_type,
"score": r.combined_score,
"retention": r.node.retention_strength,
})
})
.collect();
Ok(Json(serde_json::json!({
"action": "associations",
"fromId": req.from_id,
"results": formatted,
})))
}
"chains" | "bridges" => {
let to_id = req.to_id.as_deref().ok_or(StatusCode::BAD_REQUEST)?;
let (nodes, edges) = state
.storage
.get_memory_subgraph(&req.from_id, 2, limit)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let nodes_json: Vec<Value> = nodes
.iter()
.map(|n| {
serde_json::json!({
"id": n.id,
"content": n.content.chars().take(100).collect::<String>(),
"nodeType": n.node_type,
"retention": n.retention_strength,
})
})
.collect();
let edges_json: Vec<Value> = edges
.iter()
.map(|e| {
serde_json::json!({
"source": e.source_id,
"target": e.target_id,
"weight": e.strength,
"type": e.link_type,
})
})
.collect();
Ok(Json(serde_json::json!({
"action": action,
"fromId": req.from_id,
"toId": to_id,
"nodes": nodes_json,
"edges": edges_json,
})))
}
_ => Err(StatusCode::BAD_REQUEST),
}
}
/// Predict which memories will be needed
pub async fn predict_memories(
State(state): State<AppState>,
) -> Result<Json<Value>, StatusCode> {
// Get recent memories as predictions based on activity
let recent = state
.storage
.get_all_nodes(10, 0)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let predictions: Vec<Value> = recent
.iter()
.map(|n| {
serde_json::json!({
"id": n.id,
"content": n.content.chars().take(100).collect::<String>(),
"nodeType": n.node_type,
"retention": n.retention_strength,
"predictedNeed": "high",
})
})
.collect();
Ok(Json(serde_json::json!({
"predictions": predictions,
"basedOn": "recent_activity",
})))
}
#[derive(Debug, Deserialize)]
pub struct ImportanceRequest {
pub content: String,
}
/// Score content importance using 4-channel model
pub async fn score_importance(
State(state): State<AppState>,
Json(req): Json<ImportanceRequest>,
) -> Result<Json<Value>, StatusCode> {
if let Some(ref cognitive) = state.cognitive {
let context = vestige_core::ImportanceContext::current();
let cog = cognitive.lock().await;
let score = cog.importance_signals.compute_importance(&req.content, &context);
drop(cog);
let composite = score.composite;
let novelty = score.novelty;
let arousal = score.arousal;
let reward = score.reward;
let attention = score.attention;
state.emit(VestigeEvent::ImportanceScored {
content_preview: req.content.chars().take(80).collect(),
composite_score: composite,
novelty,
arousal,
reward,
attention,
timestamp: Utc::now(),
});
Ok(Json(serde_json::json!({
"composite": composite,
"channels": {
"novelty": novelty,
"arousal": arousal,
"reward": reward,
"attention": attention,
},
"recommendation": if composite > 0.6 { "save" } else { "skip" },
})))
} else {
// Fallback: basic heuristic scoring
let word_count = req.content.split_whitespace().count();
let has_code = req.content.contains("```") || req.content.contains("fn ");
let composite = if has_code { 0.7 } else { (word_count as f64 / 100.0).min(0.8) };
Ok(Json(serde_json::json!({
"composite": composite,
"channels": {
"novelty": composite,
"arousal": 0.5,
"reward": 0.5,
"attention": composite,
},
"recommendation": if composite > 0.6 { "save" } else { "skip" },
})))
}
}
/// Trigger consolidation
pub async fn trigger_consolidation(
State(state): State<AppState>,
) -> Result<Json<Value>, StatusCode> {
state.emit(VestigeEvent::ConsolidationStarted {
timestamp: Utc::now(),
});
let start = std::time::Instant::now();
let result = state
.storage
.run_consolidation()
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let duration_ms = start.elapsed().as_millis() as u64;
state.emit(VestigeEvent::ConsolidationCompleted {
nodes_processed: result.nodes_processed as usize,
decay_applied: result.decay_applied as usize,
embeddings_generated: result.embeddings_generated as usize,
duration_ms,
timestamp: Utc::now(),
});
Ok(Json(serde_json::json!({
"nodesProcessed": result.nodes_processed,
"decayApplied": result.decay_applied,
"embeddingsGenerated": result.embeddings_generated,
"duplicatesMerged": result.duplicates_merged,
"activationsComputed": result.activations_computed,
"durationMs": duration_ms,
})))
}
/// Get retention distribution (for histogram visualization)
pub async fn retention_distribution(
State(state): State<AppState>,
) -> Result<Json<Value>, StatusCode> {
let nodes = state
.storage
.get_all_nodes(10000, 0)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
// Build distribution buckets
let mut buckets = [0u32; 10]; // 0-10%, 10-20%, ..., 90-100%
let mut by_type: std::collections::HashMap<String, usize> = std::collections::HashMap::new();
let mut endangered = Vec::new();
for node in &nodes {
let bucket = ((node.retention_strength * 10.0).floor() as usize).min(9);
buckets[bucket] += 1;
*by_type.entry(node.node_type.clone()).or_default() += 1;
// Endangered: retention below 30%
if node.retention_strength < 0.3 {
endangered.push(serde_json::json!({
"id": node.id,
"content": node.content.chars().take(60).collect::<String>(),
"retention": node.retention_strength,
"nodeType": node.node_type,
}));
}
}
let distribution: Vec<Value> = buckets
.iter()
.enumerate()
.map(|(i, &count)| {
serde_json::json!({
"range": format!("{}-{}%", i * 10, (i + 1) * 10),
"count": count,
})
})
.collect();
Ok(Json(serde_json::json!({
"distribution": distribution,
"byType": by_type,
"endangered": endangered,
"total": nodes.len(),
})))
}
// ============================================================================
// INTENTIONS (v2.0)
// ============================================================================
#[derive(Debug, Deserialize)]
pub struct IntentionListParams {
pub status: Option<String>,
}
/// List intentions
pub async fn list_intentions(
State(state): State<AppState>,
Query(params): Query<IntentionListParams>,
) -> Result<Json<Value>, StatusCode> {
let status_filter = params.status.unwrap_or_else(|| "active".to_string());
let intentions = if status_filter == "all" {
// Get all statuses
let mut all = state.storage.get_active_intentions()
.unwrap_or_default();
all.extend(state.storage.get_intentions_by_status("fulfilled").unwrap_or_default());
all.extend(state.storage.get_intentions_by_status("cancelled").unwrap_or_default());
all.extend(state.storage.get_intentions_by_status("snoozed").unwrap_or_default());
all
} else if status_filter == "active" {
state.storage.get_active_intentions()
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
} else {
state.storage.get_intentions_by_status(&status_filter)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
};
let count = intentions.len();
Ok(Json(serde_json::json!({
"intentions": intentions,
"total": count,
"filter": status_filter,
})))
}

View file

@ -2,67 +2,140 @@
//!
//! Self-contained web UI at localhost:3927 for browsing, searching,
//! and managing Vestige memories. Auto-starts inside the MCP server process.
//!
//! v2.0: WebSocket real-time events, CognitiveEngine access, new API endpoints.
pub mod events;
pub mod handlers;
pub mod state;
pub mod static_files;
pub mod websocket;
use axum::routing::{delete, get, post};
use axum::Router;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::Mutex;
use tower::ServiceBuilder;
use tower_http::cors::CorsLayer;
use tower_http::cors::{AllowOrigin, CorsLayer};
use tower_http::set_header::SetResponseHeaderLayer;
use tracing::{info, warn};
use crate::cognitive::CognitiveEngine;
use state::AppState;
use vestige_core::Storage;
/// Build the axum router with all dashboard routes
pub fn build_router(storage: Arc<Storage>, port: u16) -> Router {
let state = AppState { storage };
pub fn build_router(
storage: Arc<Storage>,
cognitive: Option<Arc<Mutex<CognitiveEngine>>>,
port: u16,
) -> (Router, AppState) {
let state = AppState::new(storage, cognitive);
build_router_inner(state, port)
}
/// Build the axum router sharing an external event broadcast channel.
pub fn build_router_with_event_tx(
storage: Arc<Storage>,
cognitive: Option<Arc<Mutex<CognitiveEngine>>>,
event_tx: tokio::sync::broadcast::Sender<events::VestigeEvent>,
port: u16,
) -> (Router, AppState) {
let state = AppState::with_event_tx(storage, cognitive, event_tx);
build_router_inner(state, port)
}
fn build_router_inner(state: AppState, port: u16) -> (Router, AppState) {
let origins = vec![
format!("http://127.0.0.1:{}", port)
.parse::<axum::http::HeaderValue>()
.expect("valid origin"),
format!("http://localhost:{}", port)
.parse::<axum::http::HeaderValue>()
.expect("valid origin"),
// SvelteKit dev server
"http://localhost:5173"
.parse::<axum::http::HeaderValue>()
.expect("valid origin"),
"http://127.0.0.1:5173"
.parse::<axum::http::HeaderValue>()
.expect("valid origin"),
];
let origin = format!("http://127.0.0.1:{}", port)
.parse::<axum::http::HeaderValue>()
.expect("valid origin");
let cors = CorsLayer::new()
.allow_origin(origin)
.allow_methods([axum::http::Method::GET, axum::http::Method::POST, axum::http::Method::DELETE])
.allow_headers([axum::http::header::CONTENT_TYPE]);
.allow_origin(AllowOrigin::list(origins))
.allow_methods([
axum::http::Method::GET,
axum::http::Method::POST,
axum::http::Method::DELETE,
axum::http::Method::OPTIONS,
])
.allow_headers([
axum::http::header::CONTENT_TYPE,
axum::http::header::AUTHORIZATION,
]);
let csp = SetResponseHeaderLayer::overriding(
axum::http::header::CONTENT_SECURITY_POLICY,
axum::http::HeaderValue::from_static("default-src 'self' 'unsafe-inline'"),
axum::http::HeaderValue::from_static(
"default-src 'self' 'unsafe-inline' 'unsafe-eval' blob: data: ws: wss:",
),
);
Router::new()
// Dashboard UI
let router = Router::new()
// SvelteKit Dashboard v2.0 (embedded static build)
.route("/dashboard", get(static_files::serve_dashboard_spa))
.route("/dashboard/{*path}", get(static_files::serve_dashboard_asset))
// Legacy embedded HTML (keep for backward compat)
.route("/", get(handlers::serve_dashboard))
// API endpoints
.route("/graph", get(handlers::serve_graph))
// WebSocket for real-time events
.route("/ws", get(websocket::ws_handler))
// Memory CRUD
.route("/api/memories", get(handlers::list_memories))
.route("/api/memories/{id}", get(handlers::get_memory))
.route("/api/memories/{id}", delete(handlers::delete_memory))
.route("/api/memories/{id}/promote", post(handlers::promote_memory))
.route("/api/memories/{id}/demote", post(handlers::demote_memory))
// Search
.route("/api/search", get(handlers::search_memories))
// Stats & health
.route("/api/stats", get(handlers::get_stats))
.route("/api/timeline", get(handlers::get_timeline))
.route("/api/health", get(handlers::health_check))
// Timeline
.route("/api/timeline", get(handlers::get_timeline))
// Graph
.route("/api/graph", get(handlers::get_graph))
// Cognitive operations (v2.0)
.route("/api/dream", post(handlers::trigger_dream))
.route("/api/explore", post(handlers::explore_connections))
.route("/api/predict", post(handlers::predict_memories))
.route("/api/importance", post(handlers::score_importance))
.route("/api/consolidate", post(handlers::trigger_consolidation))
.route("/api/retention-distribution", get(handlers::retention_distribution))
// Intentions (v2.0)
.route("/api/intentions", get(handlers::list_intentions))
.layer(
ServiceBuilder::new()
.concurrency_limit(10)
.concurrency_limit(50)
.layer(cors)
.layer(csp)
.layer(csp),
)
.with_state(state)
.with_state(state.clone());
(router, state)
}
/// Start the dashboard HTTP server (blocking — use in CLI mode)
pub async fn start_dashboard(
storage: Arc<Storage>,
cognitive: Option<Arc<Mutex<CognitiveEngine>>>,
port: u16,
open_browser: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let app = build_router(storage, port);
let (app, _state) = build_router(storage, cognitive, port);
let addr = SocketAddr::from(([127, 0, 0, 1], port));
info!("Dashboard starting at http://127.0.0.1:{}", port);
@ -83,9 +156,29 @@ pub async fn start_dashboard(
/// Start the dashboard as a background task (non-blocking — use in MCP server)
pub async fn start_background(
storage: Arc<Storage>,
cognitive: Option<Arc<Mutex<CognitiveEngine>>>,
port: u16,
) -> Result<(), Box<dyn std::error::Error>> {
let app = build_router(storage, port);
) -> Result<AppState, Box<dyn std::error::Error>> {
let (app, state) = build_router(storage, cognitive, port);
start_background_inner(app, state, port).await
}
/// Start the dashboard sharing an external event broadcast channel.
pub async fn start_background_with_event_tx(
storage: Arc<Storage>,
cognitive: Option<Arc<Mutex<CognitiveEngine>>>,
event_tx: tokio::sync::broadcast::Sender<events::VestigeEvent>,
port: u16,
) -> Result<AppState, Box<dyn std::error::Error>> {
let (app, state) = build_router_with_event_tx(storage, cognitive, event_tx, port);
start_background_inner(app, state, port).await
}
async fn start_background_inner(
app: Router,
state: AppState,
port: u16,
) -> Result<AppState, Box<dyn std::error::Error>> {
let addr = SocketAddr::from(([127, 0, 0, 1], port));
let listener = match tokio::net::TcpListener::bind(addr).await {
@ -99,7 +192,18 @@ pub async fn start_background(
}
};
info!("Dashboard available at http://127.0.0.1:{}", port);
axum::serve(listener, app).await?;
Ok(())
info!(
"Dashboard available at http://127.0.0.1:{} (WebSocket at ws://127.0.0.1:{}/ws)",
port, port
);
let serve_state = state.clone();
tokio::spawn(async move {
if let Err(e) = axum::serve(listener, app).await {
warn!("Dashboard server error: {}", e);
}
drop(serve_state);
});
Ok(state)
}

View file

@ -1,10 +1,62 @@
//! Dashboard shared state
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::{broadcast, Mutex};
use vestige_core::Storage;
use crate::cognitive::CognitiveEngine;
use super::events::VestigeEvent;
/// Broadcast channel capacity — how many events can buffer before old ones drop.
const EVENT_CHANNEL_CAPACITY: usize = 1024;
/// Shared application state for the dashboard
#[derive(Clone)]
pub struct AppState {
pub storage: Arc<Storage>,
pub cognitive: Option<Arc<Mutex<CognitiveEngine>>>,
pub event_tx: broadcast::Sender<VestigeEvent>,
pub start_time: Instant,
}
impl AppState {
/// Create a new AppState with event broadcasting.
pub fn new(
storage: Arc<Storage>,
cognitive: Option<Arc<Mutex<CognitiveEngine>>>,
) -> Self {
let (event_tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
Self {
storage,
cognitive,
event_tx,
start_time: Instant::now(),
}
}
/// Get a new event receiver (for WebSocket connections).
pub fn subscribe(&self) -> broadcast::Receiver<VestigeEvent> {
self.event_tx.subscribe()
}
/// Create a new AppState sharing an external event broadcast channel.
pub fn with_event_tx(
storage: Arc<Storage>,
cognitive: Option<Arc<Mutex<CognitiveEngine>>>,
event_tx: broadcast::Sender<VestigeEvent>,
) -> Self {
Self {
storage,
cognitive,
event_tx,
start_time: Instant::now(),
}
}
/// Emit an event to all connected clients.
pub fn emit(&self, event: VestigeEvent) {
// Ignore send errors (no receivers connected)
let _ = self.event_tx.send(event);
}
}

View file

@ -0,0 +1,65 @@
//! Embedded SvelteKit dashboard static file server.
//!
//! The built SvelteKit app is embedded into the binary at compile time
//! using `include_dir!`. This serves it at `/dashboard/` prefix.
use axum::extract::Path;
use axum::http::{header, StatusCode};
use axum::response::{Html, IntoResponse, Response};
use include_dir::{include_dir, Dir};
/// Embed the entire SvelteKit build output into the binary.
/// Build with: cd apps/dashboard && pnpm build
/// The build output goes to apps/dashboard/build/
static DASHBOARD_DIR: Dir<'_> = include_dir!("$CARGO_MANIFEST_DIR/../../apps/dashboard/build");
/// Serve the SvelteKit dashboard index
pub async fn serve_dashboard_spa() -> impl IntoResponse {
match DASHBOARD_DIR.get_file("index.html") {
Some(file) => Html(
String::from_utf8_lossy(file.contents()).to_string(),
)
.into_response(),
None => (StatusCode::NOT_FOUND, "Dashboard not built. Run: cd apps/dashboard && pnpm build")
.into_response(),
}
}
/// Serve static assets from the embedded SvelteKit build
pub async fn serve_dashboard_asset(Path(path): Path<String>) -> Response {
// Try exact path
if let Some(file) = DASHBOARD_DIR.get_file(&path) {
let mime = mime_guess::from_path(&path)
.first_or_octet_stream()
.to_string();
return (
StatusCode::OK,
[
(header::CONTENT_TYPE, mime),
(
header::CACHE_CONTROL,
if path.contains("/_app/") {
// Immutable assets (hashed filenames)
"public, max-age=31536000, immutable".to_string()
} else {
"public, max-age=60".to_string()
},
),
],
file.contents().to_vec(),
)
.into_response();
}
// SPA fallback: serve index.html for client-side routing
match DASHBOARD_DIR.get_file("index.html") {
Some(file) => (
StatusCode::OK,
[(header::CONTENT_TYPE, "text/html".to_string())],
file.contents().to_vec(),
)
.into_response(),
None => (StatusCode::NOT_FOUND, "Not found").into_response(),
}
}

View file

@ -0,0 +1,119 @@
//! WebSocket handler for real-time event streaming.
//!
//! Clients connect to `/ws` and receive all VestigeEvents as JSON.
//! Also sends heartbeats every 5 seconds with system stats.
use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
use axum::extract::State;
use axum::response::IntoResponse;
use chrono::Utc;
use futures_util::{SinkExt, StreamExt};
use tokio::sync::broadcast;
use tracing::{debug, warn};
use super::events::VestigeEvent;
use super::state::AppState;
/// WebSocket upgrade handler — GET /ws
pub async fn ws_handler(
ws: WebSocketUpgrade,
State(state): State<AppState>,
) -> impl IntoResponse {
ws.on_upgrade(move |socket| handle_socket(socket, state))
}
async fn handle_socket(socket: WebSocket, state: AppState) {
let (mut sender, mut receiver) = socket.split();
let mut event_rx: broadcast::Receiver<VestigeEvent> = state.subscribe();
debug!("WebSocket client connected");
// Send initial connection event
let welcome = serde_json::json!({
"type": "Connected",
"data": {
"version": env!("CARGO_PKG_VERSION"),
"timestamp": Utc::now().to_rfc3339(),
}
});
if sender
.send(Message::Text(welcome.to_string().into()))
.await
.is_err()
{
return;
}
// Heartbeat interval
let heartbeat_state = state.clone();
let (heartbeat_tx, mut heartbeat_rx) = tokio::sync::mpsc::channel::<String>(16);
// Heartbeat task
let heartbeat_handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
loop {
interval.tick().await;
let uptime = heartbeat_state.start_time.elapsed().as_secs();
// Get live stats
let (memory_count, avg_retention) = heartbeat_state
.storage
.get_stats()
.map(|s| (s.total_nodes as usize, s.average_retention))
.unwrap_or((0, 0.0));
let event = VestigeEvent::Heartbeat {
uptime_secs: uptime,
memory_count,
avg_retention,
timestamp: Utc::now(),
};
if heartbeat_tx.send(event.to_json()).await.is_err() {
break;
}
}
});
// Main loop: forward events + heartbeats to client, handle incoming messages
loop {
tokio::select! {
// Broadcast event from cognitive engine
Ok(event) = event_rx.recv() => {
let json = event.to_json();
if sender.send(Message::Text(json.into())).await.is_err() {
break;
}
}
// Heartbeat
Some(hb) = heartbeat_rx.recv() => {
if sender.send(Message::Text(hb.into())).await.is_err() {
break;
}
}
// Client message (ping/pong, close, or commands)
msg = receiver.next() => {
match msg {
Some(Ok(Message::Close(_))) | None => break,
Some(Ok(Message::Ping(data))) => {
if sender.send(Message::Pong(data)).await.is_err() {
break;
}
}
Some(Ok(Message::Text(text))) => {
// Future: handle client commands (trigger dream, etc.)
debug!("WebSocket received: {}", text);
}
Some(Err(e)) => {
warn!("WebSocket error: {}", e);
break;
}
_ => {}
}
}
}
}
heartbeat_handle.abort();
debug!("WebSocket client disconnected");
}

File diff suppressed because it is too large Load diff

View file

@ -2,4 +2,5 @@
//!
//! Shared modules accessible to all binaries in the crate.
pub mod cognitive;
pub mod dashboard;

View file

@ -27,7 +27,8 @@
//! - Reconsolidation (memories editable on retrieval)
//! - Memory Chains (reasoning paths)
pub mod cognitive;
// cognitive is exported from lib.rs for dashboard access
use vestige_mcp::cognitive;
mod protocol;
mod resources;
mod server;
@ -221,24 +222,39 @@ async fn main() {
});
}
// Spawn dashboard HTTP server alongside MCP server
// Create cognitive engine (stateful neuroscience modules)
let cognitive = Arc::new(Mutex::new(cognitive::CognitiveEngine::new()));
info!("CognitiveEngine initialized (28 modules)");
// Create shared event broadcast channel for dashboard <-> MCP tool events
let (event_tx, _) = tokio::sync::broadcast::channel::<vestige_mcp::dashboard::events::VestigeEvent>(1024);
// Spawn dashboard HTTP server alongside MCP server (now with CognitiveEngine access)
{
let dashboard_port = std::env::var("VESTIGE_DASHBOARD_PORT")
.ok()
.and_then(|s| s.parse::<u16>().ok())
.unwrap_or(3927);
let dashboard_storage = Arc::clone(&storage);
let dashboard_cognitive = Arc::clone(&cognitive);
let dashboard_event_tx = event_tx.clone();
tokio::spawn(async move {
if let Err(e) = vestige_mcp::dashboard::start_background(dashboard_storage, dashboard_port).await {
warn!("Dashboard failed to start: {}", e);
match vestige_mcp::dashboard::start_background_with_event_tx(
dashboard_storage,
Some(dashboard_cognitive),
dashboard_event_tx,
dashboard_port,
).await {
Ok(_state) => {
info!("Dashboard started with WebSocket + CognitiveEngine + shared event bus");
}
Err(e) => {
warn!("Dashboard failed to start: {}", e);
}
}
});
}
// Create cognitive engine (stateful neuroscience modules)
let cognitive = Arc::new(Mutex::new(cognitive::CognitiveEngine::new()));
info!("CognitiveEngine initialized (26 modules)");
// Load cross-encoder reranker in the background (downloads ~150MB on first run)
#[cfg(feature = "embeddings")]
{
@ -251,8 +267,8 @@ async fn main() {
});
}
// Create MCP server
let server = McpServer::new(storage, cognitive);
// Create MCP server with shared event channel for dashboard broadcasts
let server = McpServer::new_with_events(storage, cognitive, event_tx);
// Create stdio transport
let transport = StdioTransport::new();

View file

@ -1,13 +1,22 @@
//! stdio Transport for MCP
//!
//! Handles JSON-RPC communication over stdin/stdout.
//! v1.9.2: Async tokio I/O with heartbeat and error resilience.
use std::io::{self, BufRead, BufReader, Write};
use tracing::{debug, error, warn};
use std::io;
use std::time::Duration;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tracing::{debug, error, info, warn};
use super::types::{JsonRpcError, JsonRpcRequest, JsonRpcResponse};
use crate::server::McpServer;
/// Maximum consecutive I/O errors before giving up
const MAX_CONSECUTIVE_ERRORS: u32 = 5;
/// Heartbeat interval — sends a ping notification to keep the connection alive
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(30);
/// stdio Transport for MCP server
pub struct StdioTransport;
@ -16,66 +25,109 @@ impl StdioTransport {
Self
}
/// Run the MCP server over stdio
/// Run the MCP server over stdio with heartbeat and error resilience
pub async fn run(self, mut server: McpServer) -> Result<(), io::Error> {
let stdin = io::stdin();
let stdout = io::stdout();
let stdin = tokio::io::stdin();
let stdout = tokio::io::stdout();
let reader = BufReader::new(stdin.lock());
let mut stdout = stdout.lock();
let mut reader = BufReader::new(stdin);
let mut stdout = stdout;
let mut consecutive_errors: u32 = 0;
let mut line_buf = String::new();
for line in reader.lines() {
let line = match line {
Ok(l) => l,
Err(e) => {
error!("Failed to read line: {}", e);
break;
}
};
loop {
line_buf.clear();
if line.is_empty() {
continue;
}
tokio::select! {
result = reader.read_line(&mut line_buf) => {
match result {
Ok(0) => {
// Clean EOF — stdin closed
info!("stdin closed (EOF), shutting down");
break;
}
Ok(_) => {
consecutive_errors = 0;
let line = line_buf.trim();
debug!("Received: {} bytes", line.len());
if line.is_empty() {
continue;
}
// Parse JSON-RPC request
let request: JsonRpcRequest = match serde_json::from_str(&line) {
Ok(r) => r,
Err(e) => {
warn!("Failed to parse request: {}", e);
let error_response = JsonRpcResponse::error(None, JsonRpcError::parse_error());
match serde_json::to_string(&error_response) {
Ok(response_json) => {
writeln!(stdout, "{}", response_json)?;
stdout.flush()?;
debug!("Received: {} bytes", line.len());
// Parse JSON-RPC request
let request: JsonRpcRequest = match serde_json::from_str(line) {
Ok(r) => r,
Err(e) => {
warn!("Failed to parse request: {}", e);
let error_response = JsonRpcResponse::error(None, JsonRpcError::parse_error());
match serde_json::to_string(&error_response) {
Ok(response_json) => {
let out = format!("{}\n", response_json);
stdout.write_all(out.as_bytes()).await?;
stdout.flush().await?;
}
Err(e) => {
error!("Failed to serialize error response: {}", e);
let fallback = "{\"jsonrpc\":\"2.0\",\"id\":null,\"error\":{\"code\":-32603,\"message\":\"Internal error\"}}\n";
let _ = stdout.write_all(fallback.as_bytes()).await;
let _ = stdout.flush().await;
}
}
continue;
}
};
// Handle the request
if let Some(response) = server.handle_request(request).await {
match serde_json::to_string(&response) {
Ok(response_json) => {
debug!("Sending: {} bytes", response_json.len());
let out = format!("{}\n", response_json);
stdout.write_all(out.as_bytes()).await?;
stdout.flush().await?;
}
Err(e) => {
error!("Failed to serialize response: {}", e);
let fallback = "{\"jsonrpc\":\"2.0\",\"id\":null,\"error\":{\"code\":-32603,\"message\":\"Internal error\"}}\n";
let _ = stdout.write_all(fallback.as_bytes()).await;
let _ = stdout.flush().await;
}
}
}
}
Err(e) => {
error!("Failed to serialize error response: {}", e);
// Send a minimal error response so client doesn't hang
let fallback = r#"{"jsonrpc":"2.0","id":null,"error":{"code":-32603,"message":"Internal error"}}"#;
let _ = writeln!(stdout, "{}", fallback);
let _ = stdout.flush();
consecutive_errors += 1;
warn!(
"I/O error reading stdin ({}/{}): {}",
consecutive_errors, MAX_CONSECUTIVE_ERRORS, e
);
if consecutive_errors >= MAX_CONSECUTIVE_ERRORS {
error!(
"Too many consecutive I/O errors ({}), shutting down",
consecutive_errors
);
break;
}
// Brief pause before retrying
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
continue;
}
};
// Handle the request
if let Some(response) = server.handle_request(request).await {
match serde_json::to_string(&response) {
Ok(response_json) => {
debug!("Sending: {} bytes", response_json.len());
writeln!(stdout, "{}", response_json)?;
stdout.flush()?;
}
Err(e) => {
error!("Failed to serialize response: {}", e);
// Send a minimal error response so client doesn't hang
let fallback = r#"{"jsonrpc":"2.0","id":null,"error":{"code":-32603,"message":"Internal error"}}"#;
let _ = writeln!(stdout, "{}", fallback);
let _ = stdout.flush();
_ = tokio::time::sleep(HEARTBEAT_INTERVAL) => {
// Send a heartbeat ping notification to keep the connection alive
let ping = "{\"jsonrpc\":\"2.0\",\"method\":\"notifications/ping\"}\n";
if let Err(e) = stdout.write_all(ping.as_bytes()).await {
warn!("Failed to send heartbeat ping: {}", e);
consecutive_errors += 1;
if consecutive_errors >= MAX_CONSECUTIVE_ERRORS {
error!("Too many consecutive errors, shutting down");
break;
}
} else {
let _ = stdout.flush().await;
debug!("Heartbeat ping sent");
}
}
}

View file

@ -6,10 +6,12 @@
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::Mutex;
use chrono::Utc;
use tokio::sync::{broadcast, Mutex};
use tracing::{debug, info, warn};
use crate::cognitive::CognitiveEngine;
use vestige_mcp::dashboard::events::VestigeEvent;
use crate::protocol::messages::{
CallToolRequest, CallToolResult, InitializeRequest, InitializeResult,
ListResourcesResult, ListToolsResult, ReadResourceRequest, ReadResourceResult,
@ -27,15 +29,41 @@ pub struct McpServer {
initialized: bool,
/// Tool call counter for inline consolidation trigger (every 100 calls)
tool_call_count: AtomicU64,
/// Optional event broadcast channel for dashboard real-time updates.
event_tx: Option<broadcast::Sender<VestigeEvent>>,
}
impl McpServer {
#[allow(dead_code)]
pub fn new(storage: Arc<Storage>, cognitive: Arc<Mutex<CognitiveEngine>>) -> Self {
Self {
storage,
cognitive,
initialized: false,
tool_call_count: AtomicU64::new(0),
event_tx: None,
}
}
/// Create an MCP server that broadcasts events to the dashboard.
pub fn new_with_events(
storage: Arc<Storage>,
cognitive: Arc<Mutex<CognitiveEngine>>,
event_tx: broadcast::Sender<VestigeEvent>,
) -> Self {
Self {
storage,
cognitive,
initialized: false,
tool_call_count: AtomicU64::new(0),
event_tx: Some(event_tx),
}
}
/// Emit an event to the dashboard (no-op if no event channel).
fn emit(&self, event: VestigeEvent) {
if let Some(ref tx) = self.event_tx {
let _ = tx.send(event);
}
}
@ -143,7 +171,7 @@ impl McpServer {
},
ToolDescription {
name: "memory".to_string(),
description: Some("Unified memory management tool. Actions: 'get' (retrieve full node), 'delete' (remove memory), 'state' (get accessibility state), 'promote' (thumbs up — increases retrieval strength), 'demote' (thumbs down — decreases retrieval strength, does NOT delete).".to_string()),
description: Some("Unified memory management tool. Actions: 'get' (retrieve full node), 'delete' (remove memory), 'state' (get accessibility state), 'promote' (thumbs up — increases retrieval strength), 'demote' (thumbs down — decreases retrieval strength, does NOT delete), 'edit' (update content in-place, preserves FSRS state).".to_string()),
input_schema: tools::memory_unified::schema(),
},
ToolDescription {
@ -287,6 +315,9 @@ impl McpServer {
cog.consolidation_scheduler.record_activity();
}
// Save args for event emission (tool dispatch consumes request.arguments)
let saved_args = if self.event_tx.is_some() { request.arguments.clone() } else { None };
let result = match request.name.as_str() {
// ================================================================
// UNIFIED TOOLS (v1.1+) - Preferred API
@ -611,6 +642,14 @@ impl McpServer {
}
};
// ================================================================
// DASHBOARD EVENT EMISSION (v2.0)
// Emit real-time events to WebSocket clients after successful tool calls.
// ================================================================
if let Ok(ref content) = result {
self.emit_tool_event(&request.name, &saved_args, content);
}
let response = match result {
Ok(content) => {
let call_result = CallToolResult {
@ -784,6 +823,196 @@ impl McpServer {
Err(e) => Err(JsonRpcError::internal_error(&e)),
}
}
/// Extract event data from tool results and emit to dashboard.
fn emit_tool_event(
&self,
tool_name: &str,
args: &Option<serde_json::Value>,
result: &serde_json::Value,
) {
if self.event_tx.is_none() {
return;
}
let now = Utc::now();
match tool_name {
// -- smart_ingest: memory created/updated --
"smart_ingest" | "ingest" | "session_checkpoint" => {
// Single mode: result has "action" (created/updated/superseded/reinforced)
if let Some(action) = result.get("action").and_then(|a| a.as_str()) {
let id = result.get("nodeId").or(result.get("id"))
.and_then(|v| v.as_str()).unwrap_or("").to_string();
let preview = result.get("contentPreview").or(result.get("content"))
.and_then(|v| v.as_str()).unwrap_or("").to_string();
match action {
"created" => {
let node_type = result.get("nodeType")
.and_then(|v| v.as_str()).unwrap_or("fact").to_string();
let tags = result.get("tags")
.and_then(|v| v.as_array())
.map(|arr| arr.iter().filter_map(|t| t.as_str().map(String::from)).collect())
.unwrap_or_default();
self.emit(VestigeEvent::MemoryCreated {
id, content_preview: preview, node_type, tags, timestamp: now,
});
}
"updated" | "superseded" | "reinforced" => {
self.emit(VestigeEvent::MemoryUpdated {
id, content_preview: preview, field: action.to_string(), timestamp: now,
});
}
_ => {}
}
}
// Batch mode: result has "results" array
if let Some(results) = result.get("results").and_then(|r| r.as_array()) {
for item in results {
let action = item.get("action").and_then(|a| a.as_str()).unwrap_or("");
let id = item.get("nodeId").or(item.get("id"))
.and_then(|v| v.as_str()).unwrap_or("").to_string();
let preview = item.get("contentPreview")
.and_then(|v| v.as_str()).unwrap_or("").to_string();
if action == "created" {
self.emit(VestigeEvent::MemoryCreated {
id, content_preview: preview,
node_type: "fact".to_string(), tags: vec![], timestamp: now,
});
} else if !action.is_empty() {
self.emit(VestigeEvent::MemoryUpdated {
id, content_preview: preview,
field: action.to_string(), timestamp: now,
});
}
}
}
}
// -- memory: get/delete/promote/demote --
"memory" | "promote_memory" | "demote_memory" | "delete_knowledge" | "get_memory_state" => {
let action = args.as_ref()
.and_then(|a| a.get("action"))
.and_then(|a| a.as_str())
.unwrap_or(if tool_name == "promote_memory" { "promote" }
else if tool_name == "demote_memory" { "demote" }
else if tool_name == "delete_knowledge" { "delete" }
else { "" });
let id = args.as_ref()
.and_then(|a| a.get("id"))
.and_then(|v| v.as_str()).unwrap_or("").to_string();
match action {
"delete" => {
self.emit(VestigeEvent::MemoryDeleted { id, timestamp: now });
}
"promote" => {
let retention = result.get("newRetention")
.or(result.get("retrievalStrength"))
.and_then(|v| v.as_f64()).unwrap_or(0.0);
self.emit(VestigeEvent::MemoryPromoted {
id, new_retention: retention, timestamp: now,
});
}
"demote" => {
let retention = result.get("newRetention")
.or(result.get("retrievalStrength"))
.and_then(|v| v.as_f64()).unwrap_or(0.0);
self.emit(VestigeEvent::MemoryDemoted {
id, new_retention: retention, timestamp: now,
});
}
_ => {}
}
}
// -- search --
"search" | "recall" | "semantic_search" | "hybrid_search" => {
let query = args.as_ref()
.and_then(|a| a.get("query"))
.and_then(|v| v.as_str()).unwrap_or("").to_string();
let results = result.get("results").and_then(|r| r.as_array());
let result_count = results.map(|r| r.len()).unwrap_or(0);
let result_ids: Vec<String> = results
.map(|r| r.iter()
.filter_map(|item| item.get("id").and_then(|v| v.as_str()).map(String::from))
.collect())
.unwrap_or_default();
let duration_ms = result.get("durationMs")
.or(result.get("duration_ms"))
.and_then(|v| v.as_u64()).unwrap_or(0);
self.emit(VestigeEvent::SearchPerformed {
query, result_count, result_ids, duration_ms, timestamp: now,
});
}
// -- dream --
"dream" => {
let replayed = result.get("memoriesReplayed")
.or(result.get("memories_replayed"))
.and_then(|v| v.as_u64()).unwrap_or(0) as usize;
let connections = result.get("connectionsFound")
.or(result.get("connections_found"))
.and_then(|v| v.as_u64()).unwrap_or(0) as usize;
let insights = result.get("insightsGenerated")
.or(result.get("insights"))
.and_then(|v| v.as_array()).map(|a| a.len()).unwrap_or(0);
let duration_ms = result.get("durationMs")
.or(result.get("duration_ms"))
.and_then(|v| v.as_u64()).unwrap_or(0);
self.emit(VestigeEvent::DreamCompleted {
memories_replayed: replayed, connections_found: connections,
insights_generated: insights, duration_ms, timestamp: now,
});
}
// -- consolidate --
"consolidate" => {
let processed = result.get("nodesProcessed")
.or(result.get("nodes_processed"))
.and_then(|v| v.as_u64()).unwrap_or(0) as usize;
let decay = result.get("decayApplied")
.or(result.get("decay_applied"))
.and_then(|v| v.as_u64()).unwrap_or(0) as usize;
let embeddings = result.get("embeddingsGenerated")
.or(result.get("embeddings_generated"))
.and_then(|v| v.as_u64()).unwrap_or(0) as usize;
let duration_ms = result.get("durationMs")
.or(result.get("duration_ms"))
.and_then(|v| v.as_u64()).unwrap_or(0);
self.emit(VestigeEvent::ConsolidationCompleted {
nodes_processed: processed, decay_applied: decay,
embeddings_generated: embeddings, duration_ms, timestamp: now,
});
}
// -- importance_score --
"importance_score" => {
let preview = args.as_ref()
.and_then(|a| a.get("content"))
.and_then(|v| v.as_str())
.map(|s| if s.len() > 100 { format!("{}...", &s[..100]) } else { s.to_string() })
.unwrap_or_default();
let composite = result.get("compositeScore")
.or(result.get("composite_score"))
.and_then(|v| v.as_f64()).unwrap_or(0.0);
let channels = result.get("channels").or(result.get("breakdown"));
let novelty = channels.and_then(|c| c.get("novelty"))
.and_then(|v| v.as_f64()).unwrap_or(0.0);
let arousal = channels.and_then(|c| c.get("arousal"))
.and_then(|v| v.as_f64()).unwrap_or(0.0);
let reward = channels.and_then(|c| c.get("reward"))
.and_then(|v| v.as_f64()).unwrap_or(0.0);
let attention = channels.and_then(|c| c.get("attention"))
.and_then(|v| v.as_f64()).unwrap_or(0.0);
self.emit(VestigeEvent::ImportanceScored {
content_preview: preview, composite_score: composite,
novelty, arousal, reward, attention, timestamp: now,
});
}
// Other tools don't emit events
_ => {}
}
}
}
// ============================================================================

View file

@ -139,6 +139,13 @@ pub async fn execute(
insights_generated: dream_result.insights_generated.len() as i32,
memories_strengthened: dream_result.memories_strengthened as i32,
memories_compressed: dream_result.memories_compressed as i32,
phase_nrem1_ms: None,
phase_nrem3_ms: None,
phase_rem_ms: None,
phase_integration_ms: None,
summaries_generated: None,
emotional_memories_processed: None,
creative_connections_found: None,
};
if let Err(e) = storage.save_dream_history(&record) {
tracing::warn!("Failed to persist dream history: {}", e);

View file

@ -43,8 +43,8 @@ pub fn schema() -> Value {
"properties": {
"action": {
"type": "string",
"enum": ["get", "delete", "state", "promote", "demote"],
"description": "Action to perform: 'get' retrieves full memory node, 'delete' removes memory, 'state' returns accessibility state, 'promote' increases retrieval strength (thumbs up), 'demote' decreases retrieval strength (thumbs down)"
"enum": ["get", "delete", "state", "promote", "demote", "edit"],
"description": "Action to perform: 'get' retrieves full memory node, 'delete' removes memory, 'state' returns accessibility state, 'promote' increases retrieval strength (thumbs up), 'demote' decreases retrieval strength (thumbs down), 'edit' updates content in-place (preserves FSRS state)"
},
"id": {
"type": "string",
@ -53,6 +53,10 @@ pub fn schema() -> Value {
"reason": {
"type": "string",
"description": "Why this memory is being promoted/demoted (optional, for logging). Only used with promote/demote actions."
},
"content": {
"type": "string",
"description": "New content for edit action. Replaces existing content, regenerates embedding, preserves FSRS state."
}
},
"required": ["action", "id"]
@ -65,6 +69,7 @@ struct MemoryArgs {
action: String,
id: String,
reason: Option<String>,
content: Option<String>,
}
/// Execute the unified memory tool
@ -87,8 +92,9 @@ pub async fn execute(
"state" => execute_state(storage, &args.id).await,
"promote" => execute_promote(storage, cognitive, &args.id, args.reason).await,
"demote" => execute_demote(storage, cognitive, &args.id, args.reason).await,
"edit" => execute_edit(storage, &args.id, args.content).await,
_ => Err(format!(
"Invalid action '{}'. Must be one of: get, delete, state, promote, demote",
"Invalid action '{}'. Must be one of: get, delete, state, promote, demote, edit",
args.action
)),
}
@ -302,6 +308,53 @@ async fn execute_demote(
}))
}
/// Edit a memory's content in-place — preserves FSRS state, regenerates embedding
async fn execute_edit(
storage: &Arc<Storage>,
id: &str,
content: Option<String>,
) -> Result<Value, String> {
let new_content = content.ok_or("Missing 'content' field. Required for edit action.")?;
if new_content.trim().is_empty() {
return Err("Content cannot be empty".to_string());
}
// Get existing node to capture old content
let old_node = storage
.get_node(id)
.map_err(|e| e.to_string())?
.ok_or_else(|| format!("Memory not found: {}", id))?;
// Update content (regenerates embedding, syncs FTS5)
storage
.update_node_content(id, &new_content)
.map_err(|e| e.to_string())?;
// Truncate previews for response (char-safe to avoid UTF-8 panics)
let old_preview = if old_node.content.chars().count() > 200 {
let truncated: String = old_node.content.chars().take(197).collect();
format!("{}...", truncated)
} else {
old_node.content.clone()
};
let new_preview = if new_content.chars().count() > 200 {
let truncated: String = new_content.chars().take(197).collect();
format!("{}...", truncated)
} else {
new_content.clone()
};
Ok(serde_json::json!({
"success": true,
"action": "edit",
"nodeId": id,
"oldContentPreview": old_preview,
"newContentPreview": new_preview,
"note": "FSRS state preserved (stability, difficulty, reps, lapses unchanged). Embedding regenerated for new content."
}))
}
#[cfg(test)]
mod tests {
use super::*;
@ -336,9 +389,10 @@ mod tests {
assert!(schema["properties"]["id"].is_object());
assert!(schema["properties"]["reason"].is_object());
assert_eq!(schema["required"], serde_json::json!(["action", "id"]));
// Verify all 5 actions are in enum
// Verify all 6 actions are in enum
let actions = schema["properties"]["action"]["enum"].as_array().unwrap();
assert_eq!(actions.len(), 5);
assert_eq!(actions.len(), 6);
assert!(actions.contains(&serde_json::json!("edit")));
assert!(actions.contains(&serde_json::json!("promote")));
assert!(actions.contains(&serde_json::json!("demote")));
}
@ -440,6 +494,13 @@ mod tests {
#[tokio::test]
async fn test_delete_nonexistent_memory() {
let (storage, _dir) = test_storage().await;
// Ingest+delete a throwaway memory to warm writer after WAL migration
let warmup_id = storage.ingest(vestige_core::IngestInput {
content: "warmup".to_string(),
node_type: "fact".to_string(),
..Default::default()
}).unwrap().id;
let _ = storage.delete_node(&warmup_id);
let args = serde_json::json!({ "action": "delete", "id": "00000000-0000-0000-0000-000000000000" });
let result = execute(&storage, &test_cognitive(), Some(args)).await;
assert!(result.is_ok());
@ -613,4 +674,107 @@ mod tests {
assert_eq!(value["changes"]["retentionStrength"]["delta"], "-0.15");
assert_eq!(value["changes"]["stability"]["multiplier"], "0.5x");
}
// ========================================================================
// EDIT TESTS (v1.9.2)
// ========================================================================
#[tokio::test]
async fn test_edit_succeeds() {
let (storage, _dir) = test_storage().await;
let id = ingest_memory(&storage).await;
let args = serde_json::json!({
"action": "edit",
"id": id,
"content": "Updated memory content"
});
let result = execute(&storage, &test_cognitive(), Some(args)).await;
assert!(result.is_ok());
let value = result.unwrap();
assert_eq!(value["success"], true);
assert_eq!(value["action"], "edit");
assert_eq!(value["nodeId"], id);
assert!(value["oldContentPreview"].as_str().unwrap().contains("Memory unified test content"));
assert!(value["newContentPreview"].as_str().unwrap().contains("Updated memory content"));
assert!(value["note"].as_str().unwrap().contains("FSRS state preserved"));
}
#[tokio::test]
async fn test_edit_preserves_fsrs_state() {
let (storage, _dir) = test_storage().await;
let id = ingest_memory(&storage).await;
// Get FSRS state before edit
let before = storage.get_node(&id).unwrap().unwrap();
// Edit content
let args = serde_json::json!({
"action": "edit",
"id": id,
"content": "Completely new content after edit"
});
execute(&storage, &test_cognitive(), Some(args)).await.unwrap();
// Verify FSRS state preserved
let after = storage.get_node(&id).unwrap().unwrap();
assert_eq!(after.stability, before.stability);
assert_eq!(after.difficulty, before.difficulty);
assert_eq!(after.reps, before.reps);
assert_eq!(after.lapses, before.lapses);
assert_eq!(after.retention_strength, before.retention_strength);
// Content should be updated
assert_eq!(after.content, "Completely new content after edit");
assert_ne!(after.content, before.content);
}
#[tokio::test]
async fn test_edit_missing_content_fails() {
let (storage, _dir) = test_storage().await;
let id = ingest_memory(&storage).await;
let args = serde_json::json!({ "action": "edit", "id": id });
let result = execute(&storage, &test_cognitive(), Some(args)).await;
assert!(result.is_err());
assert!(result.unwrap_err().contains("content"));
}
#[tokio::test]
async fn test_edit_empty_content_fails() {
let (storage, _dir) = test_storage().await;
let id = ingest_memory(&storage).await;
let args = serde_json::json!({ "action": "edit", "id": id, "content": " " });
let result = execute(&storage, &test_cognitive(), Some(args)).await;
assert!(result.is_err());
assert!(result.unwrap_err().contains("empty"));
}
#[tokio::test]
async fn test_edit_nonexistent_memory_fails() {
let (storage, _dir) = test_storage().await;
let args = serde_json::json!({
"action": "edit",
"id": "00000000-0000-0000-0000-000000000000",
"content": "New content"
});
let result = execute(&storage, &test_cognitive(), Some(args)).await;
assert!(result.is_err());
assert!(result.unwrap_err().contains("not found"));
}
#[tokio::test]
async fn test_edit_with_multibyte_utf8_content() {
let (storage, _dir) = test_storage().await;
let id = ingest_memory(&storage).await;
// Content with emoji and CJK characters (multi-byte UTF-8)
let long_content = "🧠".repeat(100); // 100 brain emoji = 400 bytes but only 100 chars
let args = serde_json::json!({
"action": "edit",
"id": id,
"content": long_content
});
// This must NOT panic (previous code would panic on byte-level truncation)
let result = execute(&storage, &test_cognitive(), Some(args)).await;
assert!(result.is_ok());
let value = result.unwrap();
assert_eq!(value["success"], true);
}
}

View file

@ -299,6 +299,19 @@ pub async fn execute(
}
}
// ====================================================================
// STAGE 5C: Utility-based ranking (MemRL-inspired)
// Memories that proved useful in past sessions get a retrieval boost.
// utility_score = times_useful / times_retrieved (0.0 to 1.0)
// ====================================================================
for result in &mut filtered_results {
let utility = result.node.utility_score.unwrap_or(0.0) as f32;
if utility > 0.0 {
// Utility boost: up to +15% for memories with utility_score = 1.0
result.combined_score *= 1.0 + (utility * 0.15);
}
}
// Re-sort by adjusted combined_score (descending) after all score modifications
filtered_results.sort_by(|a, b| {
b.combined_score

View file

@ -81,6 +81,11 @@ pub fn schema() -> Value {
"source": {
"type": "string",
"description": "Source reference"
},
"forceCreate": {
"type": "boolean",
"description": "Force creation of this item even if similar content exists",
"default": false
}
},
"required": ["content"]
@ -111,6 +116,7 @@ struct BatchItem {
#[serde(alias = "node_type")]
node_type: Option<String>,
source: Option<String>,
force_create: Option<bool>,
}
pub async fn execute(
@ -125,7 +131,8 @@ pub async fn execute(
// Detect mode: batch (items present) vs single (content present)
if let Some(items) = args.items {
return execute_batch(storage, cognitive, items).await;
let global_force = args.force_create.unwrap_or(false);
return execute_batch(storage, cognitive, items, global_force).await;
}
// Single mode: content is required
@ -275,6 +282,7 @@ async fn execute_batch(
storage: &Arc<Storage>,
cognitive: &Arc<Mutex<CognitiveEngine>>,
items: Vec<BatchItem>,
global_force_create: bool,
) -> Result<Value, String> {
if items.is_empty() {
return Err("Items array cannot be empty".to_string());
@ -312,6 +320,9 @@ async fn execute_batch(
continue;
}
// Extract per-item force_create before consuming other fields
let item_force_create = item.force_create.unwrap_or(false);
// ================================================================
// COGNITIVE PRE-INGEST (per item)
// ================================================================
@ -352,6 +363,39 @@ async fn execute_batch(
// INGEST (storage lock per item)
// ================================================================
// Check force_create: global flag OR per-item flag
let item_force = global_force_create || item_force_create;
if item_force {
match storage.ingest(input) {
Ok(node) => {
let node_id = node.id.clone();
let node_content = node.content.clone();
let node_type = node.node_type.clone();
created += 1;
run_post_ingest(cognitive, &node_id, &node_content, &node_type, importance_composite);
results.push(serde_json::json!({
"index": i,
"status": "saved",
"decision": "create",
"nodeId": node_id,
"importanceScore": importance_composite,
"reason": "Forced creation - skipped similarity check"
}));
}
Err(e) => {
errors += 1;
results.push(serde_json::json!({
"index": i,
"status": "error",
"reason": e.to_string()
}));
}
}
continue;
}
#[cfg(all(feature = "embeddings", feature = "vector-search"))]
{
match storage.smart_ingest(input) {
@ -863,6 +907,62 @@ mod tests {
assert!(results[0]["importanceScore"].is_number());
}
#[tokio::test]
async fn test_batch_force_create_global() {
let (storage, _dir) = test_storage().await;
// Three items with very similar content + global forceCreate
let result = execute(
&storage, &test_cognitive(),
Some(serde_json::json!({
"forceCreate": true,
"items": [
{ "content": "Physics question about quantum mechanics and wave functions" },
{ "content": "Physics question about quantum mechanics and wave equations" },
{ "content": "Physics question about quantum mechanics and wave behavior" }
]
})),
).await;
assert!(result.is_ok());
let value = result.unwrap();
assert_eq!(value["mode"], "batch");
// All 3 should be created separately, not merged
assert_eq!(value["summary"]["created"], 3);
assert_eq!(value["summary"]["updated"], 0);
// Each result should say "Forced creation"
let results = value["results"].as_array().unwrap();
for r in results {
assert_eq!(r["decision"], "create");
assert!(r["reason"].as_str().unwrap().contains("Forced"));
}
}
#[tokio::test]
async fn test_batch_force_create_per_item() {
let (storage, _dir) = test_storage().await;
// Mix of forced and non-forced items
let result = execute(
&storage, &test_cognitive(),
Some(serde_json::json!({
"items": [
{ "content": "Forced item one", "forceCreate": true },
{ "content": "Normal item two" },
{ "content": "Forced item three", "forceCreate": true }
]
})),
).await;
assert!(result.is_ok());
let value = result.unwrap();
let results = value["results"].as_array().unwrap();
// Forced items should say "Forced creation"
assert_eq!(results[0]["decision"], "create");
assert!(results[0]["reason"].as_str().unwrap().contains("Forced"));
// Non-forced item gets normal processing
assert_eq!(results[1]["status"], "saved");
// Third forced item
assert_eq!(results[2]["decision"], "create");
assert!(results[2]["reason"].as_str().unwrap().contains("Forced"));
}
#[tokio::test]
async fn test_no_content_no_items_fails() {
let (storage, _dir) = test_storage().await;