From a4a6e877c548cecca8064579fd62879a6025a4c3 Mon Sep 17 00:00:00 2001 From: Jan De Landtsheer Date: Wed, 27 May 2026 15:40:04 +0200 Subject: [PATCH] feat(storage): swap async-trait for trait_variant + dyn adapter (0001a) Replaces #[async_trait::async_trait] on the storage trait with a trait_variant-driven layout plus a hand-written dyn-compatible adapter. - memory_store.rs: LocalMemoryStore is the source trait declared with native async-fn-in-trait. #[trait_variant::make(MemoryStoreSend: Send)] derives the Send-bounded variant that backends actually implement (the blanket impl in 0.1.x goes variant -> source). A hand-written MemoryStore trait wraps every method in Pin> + Send + 'a>> with a BoxedStoreFuture<'a, T> alias, and a blanket impl MemoryStore for T adapts every Send-variant implementation. This keeps Arc dyn-safe for Phase 1 cognitive-module tests -- trait_variant 0.1 alone does NOT produce a dyn-safe variant (RPITIT), so the hand-written adapter is required and supersedes the plan claim that trait_variant gives dyn-compat for free. - sqlite.rs: drop the #[async_trait::async_trait] attribute on the impl block and retarget it to MemoryStoreSend. Two pre-existing clippy issues that the macro had been masking are fixed in the same body (return Ok(out) tail expression in vector_search; DomainRow tuple alias in get_domain). - mod.rs: export MemoryStoreSend alongside the existing LocalMemoryStore and MemoryStore re-exports. Verification: cargo test -p vestige-core --features embeddings,vector-search passes (428 lib tests). All five Phase 1 integration test binaries pass (trait_round_trip, send_bound_variant including arc_dyn_memory_store_moves_across_tokio_tasks, cognitive_module_isolation, embedding_model_registry, domain_column_migration). cargo test --workspace green across every test binary. cargo build --workspace --release green. cargo clippy --workspace --features embeddings,vector-search -- -D warnings clean. grep -rn async_trait crates/vestige-core/src/storage/ returns zero hits. Supersedes plan claim in docs/plans/0001a-trait-rewrite.md about trait_variant emitting a dyn-compatible Send variant; option (c) from the design conversation (hand-written dyn adapter) was selected explicitly because trait_variant 0.1.2 does not. --- .../vestige-core/src/storage/memory_store.rs | 221 +++++++++++++++++- crates/vestige-core/src/storage/mod.rs | 4 +- crates/vestige-core/src/storage/sqlite.rs | 8 +- 3 files changed, 215 insertions(+), 18 deletions(-) diff --git a/crates/vestige-core/src/storage/memory_store.rs b/crates/vestige-core/src/storage/memory_store.rs index 2bc3137..2869a4e 100644 --- a/crates/vestige-core/src/storage/memory_store.rs +++ b/crates/vestige-core/src/storage/memory_store.rs @@ -4,6 +4,8 @@ //! intentionally flat: one trait, ~25 methods, no sub-traits. use std::collections::HashMap; +use std::future::Future; +use std::pin::Pin; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; @@ -182,17 +184,20 @@ pub struct ModelSignature { // TRAIT // ---------------------------------------------------------------------------- -/// The single storage abstraction. +/// Internal source trait declared with native async-fn-in-trait. /// -/// `#[async_trait::async_trait]` makes every `async fn` return a -/// `Pin>`, which is required for `Arc` -/// to be movable across `tokio::spawn` boundaries. +/// `#[trait_variant::make(MemoryStoreSend: Send)]` derives a Send-bounded +/// variant whose returned futures are `Send`. In trait_variant 0.1.x the +/// macro emits the blanket `impl LocalMemoryStore for T`, +/// so backends implement `MemoryStoreSend` (the Send variant) and get +/// `LocalMemoryStore` (the non-Send variant) for free. /// -/// `LocalMemoryStore` is a type alias kept for source compatibility with code -/// that refers to the non-send variant. In Phase 1 both names refer to the same -/// (dyn-compatible, Send-safe) trait. -#[async_trait::async_trait] -pub trait MemoryStore: Send + Sync + 'static { +/// Most callers should reach for the dyn-compatible `MemoryStore` trait +/// declared below, which adapts `MemoryStoreSend` into a boxed-future surface +/// and is the public storage abstraction for cognitive modules and tests +/// that want `Arc`. +#[trait_variant::make(MemoryStoreSend: Send)] +pub trait LocalMemoryStore: Sync + 'static { // --- Lifecycle --- async fn init(&self) -> MemoryStoreResult<()>; async fn health_check(&self) -> MemoryStoreResult; @@ -254,9 +259,201 @@ pub trait MemoryStore: Send + Sync + 'static { async fn vacuum(&self) -> MemoryStoreResult<()>; } -/// Type alias kept for source compatibility. Both names refer to the same -/// `async_trait`-annotated trait that is dyn-compatible and `Send + Sync`. -pub use MemoryStore as LocalMemoryStore; +// ---------------------------------------------------------------------------- +// DYN-COMPATIBLE STORAGE TRAIT +// ---------------------------------------------------------------------------- + +/// Boxed Send future returning a `MemoryStoreResult`, bound to the lifetime +/// of the borrows captured by the call (typically `&self` plus any reference +/// arguments). Used as the return type of every method on the dyn-compatible +/// `MemoryStore` trait below. +pub type BoxedStoreFuture<'a, T> = + Pin> + Send + 'a>>; + +/// Dyn-compatible storage trait. +/// +/// `MemoryStoreSend` above is the trait users implement; it uses native +/// async-fn-in-trait return types (RPITIT), which gives zero-allocation +/// static dispatch but is not dyn-safe. This trait wraps every method in +/// `Pin>` so `Arc` works for +/// the cognitive module surface and the Phase 1 integration tests. +/// +/// Implementations should not target this trait directly; the blanket +/// `impl MemoryStore for T` adapts every Send-variant +/// implementation automatically. Each call boxes the returned future +/// exactly once, identical to the cost of the previous design. +pub trait MemoryStore: Send + Sync + 'static { + fn init<'a>(&'a self) -> BoxedStoreFuture<'a, ()>; + fn health_check<'a>(&'a self) -> BoxedStoreFuture<'a, HealthStatus>; + + fn registered_model<'a>(&'a self) -> BoxedStoreFuture<'a, Option>; + fn register_model<'a>(&'a self, sig: &'a ModelSignature) -> BoxedStoreFuture<'a, ()>; + + fn insert<'a>(&'a self, record: &'a MemoryRecord) -> BoxedStoreFuture<'a, Uuid>; + fn get<'a>(&'a self, id: Uuid) -> BoxedStoreFuture<'a, Option>; + fn update<'a>(&'a self, record: &'a MemoryRecord) -> BoxedStoreFuture<'a, ()>; + fn delete<'a>(&'a self, id: Uuid) -> BoxedStoreFuture<'a, ()>; + + fn search<'a>(&'a self, query: &'a SearchQuery) -> BoxedStoreFuture<'a, Vec>; + fn fts_search<'a>( + &'a self, + text: &'a str, + limit: usize, + ) -> BoxedStoreFuture<'a, Vec>; + fn vector_search<'a>( + &'a self, + embedding: &'a [f32], + limit: usize, + ) -> BoxedStoreFuture<'a, Vec>; + + fn get_scheduling<'a>( + &'a self, + memory_id: Uuid, + ) -> BoxedStoreFuture<'a, Option>; + fn update_scheduling<'a>(&'a self, state: &'a SchedulingState) -> BoxedStoreFuture<'a, ()>; + fn get_due_memories<'a>( + &'a self, + before: DateTime, + limit: usize, + ) -> BoxedStoreFuture<'a, Vec<(MemoryRecord, SchedulingState)>>; + + fn add_edge<'a>(&'a self, edge: &'a MemoryEdge) -> BoxedStoreFuture<'a, ()>; + fn get_edges<'a>( + &'a self, + node_id: Uuid, + edge_type: Option<&'a str>, + ) -> BoxedStoreFuture<'a, Vec>; + fn remove_edge<'a>(&'a self, source: Uuid, target: Uuid) -> BoxedStoreFuture<'a, ()>; + fn get_neighbors<'a>( + &'a self, + node_id: Uuid, + depth: usize, + ) -> BoxedStoreFuture<'a, Vec<(MemoryRecord, f64)>>; + + fn list_domains<'a>(&'a self) -> BoxedStoreFuture<'a, Vec>; + fn get_domain<'a>(&'a self, id: &'a str) -> BoxedStoreFuture<'a, Option>; + fn upsert_domain<'a>(&'a self, domain: &'a Domain) -> BoxedStoreFuture<'a, ()>; + fn delete_domain<'a>(&'a self, id: &'a str) -> BoxedStoreFuture<'a, ()>; + fn classify<'a>(&'a self, embedding: &'a [f32]) -> BoxedStoreFuture<'a, Vec<(String, f64)>>; + + fn count<'a>(&'a self) -> BoxedStoreFuture<'a, usize>; + fn get_stats<'a>(&'a self) -> BoxedStoreFuture<'a, StoreStats>; + fn vacuum<'a>(&'a self) -> BoxedStoreFuture<'a, ()>; +} + +impl MemoryStore for T +where + T: MemoryStoreSend, +{ + fn init<'a>(&'a self) -> BoxedStoreFuture<'a, ()> { + Box::pin(::init(self)) + } + fn health_check<'a>(&'a self) -> BoxedStoreFuture<'a, HealthStatus> { + Box::pin(::health_check(self)) + } + + fn registered_model<'a>(&'a self) -> BoxedStoreFuture<'a, Option> { + Box::pin(::registered_model(self)) + } + fn register_model<'a>(&'a self, sig: &'a ModelSignature) -> BoxedStoreFuture<'a, ()> { + Box::pin(::register_model(self, sig)) + } + + fn insert<'a>(&'a self, record: &'a MemoryRecord) -> BoxedStoreFuture<'a, Uuid> { + Box::pin(::insert(self, record)) + } + fn get<'a>(&'a self, id: Uuid) -> BoxedStoreFuture<'a, Option> { + Box::pin(::get(self, id)) + } + fn update<'a>(&'a self, record: &'a MemoryRecord) -> BoxedStoreFuture<'a, ()> { + Box::pin(::update(self, record)) + } + fn delete<'a>(&'a self, id: Uuid) -> BoxedStoreFuture<'a, ()> { + Box::pin(::delete(self, id)) + } + + fn search<'a>(&'a self, query: &'a SearchQuery) -> BoxedStoreFuture<'a, Vec> { + Box::pin(::search(self, query)) + } + fn fts_search<'a>( + &'a self, + text: &'a str, + limit: usize, + ) -> BoxedStoreFuture<'a, Vec> { + Box::pin(::fts_search(self, text, limit)) + } + fn vector_search<'a>( + &'a self, + embedding: &'a [f32], + limit: usize, + ) -> BoxedStoreFuture<'a, Vec> { + Box::pin(::vector_search(self, embedding, limit)) + } + + fn get_scheduling<'a>( + &'a self, + memory_id: Uuid, + ) -> BoxedStoreFuture<'a, Option> { + Box::pin(::get_scheduling(self, memory_id)) + } + fn update_scheduling<'a>(&'a self, state: &'a SchedulingState) -> BoxedStoreFuture<'a, ()> { + Box::pin(::update_scheduling(self, state)) + } + fn get_due_memories<'a>( + &'a self, + before: DateTime, + limit: usize, + ) -> BoxedStoreFuture<'a, Vec<(MemoryRecord, SchedulingState)>> { + Box::pin(::get_due_memories(self, before, limit)) + } + + fn add_edge<'a>(&'a self, edge: &'a MemoryEdge) -> BoxedStoreFuture<'a, ()> { + Box::pin(::add_edge(self, edge)) + } + fn get_edges<'a>( + &'a self, + node_id: Uuid, + edge_type: Option<&'a str>, + ) -> BoxedStoreFuture<'a, Vec> { + Box::pin(::get_edges(self, node_id, edge_type)) + } + fn remove_edge<'a>(&'a self, source: Uuid, target: Uuid) -> BoxedStoreFuture<'a, ()> { + Box::pin(::remove_edge(self, source, target)) + } + fn get_neighbors<'a>( + &'a self, + node_id: Uuid, + depth: usize, + ) -> BoxedStoreFuture<'a, Vec<(MemoryRecord, f64)>> { + Box::pin(::get_neighbors(self, node_id, depth)) + } + + fn list_domains<'a>(&'a self) -> BoxedStoreFuture<'a, Vec> { + Box::pin(::list_domains(self)) + } + fn get_domain<'a>(&'a self, id: &'a str) -> BoxedStoreFuture<'a, Option> { + Box::pin(::get_domain(self, id)) + } + fn upsert_domain<'a>(&'a self, domain: &'a Domain) -> BoxedStoreFuture<'a, ()> { + Box::pin(::upsert_domain(self, domain)) + } + fn delete_domain<'a>(&'a self, id: &'a str) -> BoxedStoreFuture<'a, ()> { + Box::pin(::delete_domain(self, id)) + } + fn classify<'a>(&'a self, embedding: &'a [f32]) -> BoxedStoreFuture<'a, Vec<(String, f64)>> { + Box::pin(::classify(self, embedding)) + } + + fn count<'a>(&'a self) -> BoxedStoreFuture<'a, usize> { + Box::pin(::count(self)) + } + fn get_stats<'a>(&'a self) -> BoxedStoreFuture<'a, StoreStats> { + Box::pin(::get_stats(self)) + } + fn vacuum<'a>(&'a self) -> BoxedStoreFuture<'a, ()> { + Box::pin(::vacuum(self)) + } +} // ---------------------------------------------------------------------------- // UNIT TESTS diff --git a/crates/vestige-core/src/storage/mod.rs b/crates/vestige-core/src/storage/mod.rs index 6926385..5f0a54c 100644 --- a/crates/vestige-core/src/storage/mod.rs +++ b/crates/vestige-core/src/storage/mod.rs @@ -9,8 +9,8 @@ mod sqlite; pub use memory_store::{ ClassificationResult, Domain, HealthStatus, LocalMemoryStore, MemoryEdge, MemoryRecord, - MemoryStore, MemoryStoreError, MemoryStoreResult, ModelSignature, SchedulingState, SearchQuery, - SearchResult, StoreStats, + MemoryStore, MemoryStoreError, MemoryStoreResult, MemoryStoreSend, ModelSignature, + SchedulingState, SearchQuery, SearchResult, StoreStats, }; pub use migrations::MIGRATIONS; pub use portable::{ diff --git a/crates/vestige-core/src/storage/sqlite.rs b/crates/vestige-core/src/storage/sqlite.rs index 57eaa86..abc17af 100644 --- a/crates/vestige-core/src/storage/sqlite.rs +++ b/crates/vestige-core/src/storage/sqlite.rs @@ -8441,8 +8441,7 @@ impl SqliteMemoryStore { } } -#[async_trait::async_trait] -impl crate::storage::memory_store::LocalMemoryStore for SqliteMemoryStore { +impl crate::storage::memory_store::MemoryStoreSend for SqliteMemoryStore { async fn init(&self) -> crate::storage::memory_store::MemoryStoreResult<()> { // Migrations run in `new`; this is a no-op for the SQLite backend. Ok(()) @@ -8797,7 +8796,7 @@ impl crate::storage::memory_store::LocalMemoryStore for SqliteMemoryStore { }) }) .collect(); - return Ok(out); + Ok(out) } #[cfg(not(all(feature = "embeddings", feature = "vector-search")))] { @@ -9120,11 +9119,12 @@ impl crate::storage::memory_store::LocalMemoryStore for SqliteMemoryStore { ) -> crate::storage::memory_store::MemoryStoreResult> { use crate::storage::memory_store::{Domain, MemoryStoreError}; + type DomainRow = (String, String, Option>, String, i64, String); let reader = self .reader .lock() .map_err(|_| MemoryStoreError::Init("Reader lock poisoned".into()))?; - let result: Option<(String, String, Option>, String, i64, String)> = reader + let result: Option = reader .query_row( "SELECT id, label, centroid, top_terms, memory_count, created_at FROM domains WHERE id = ?1", rusqlite::params![id],