feat(storage): swap async-trait for trait_variant + dyn adapter (0001a)

Replaces #[async_trait::async_trait] on the storage trait with a
trait_variant-driven layout plus a hand-written dyn-compatible adapter.

- memory_store.rs: LocalMemoryStore is the source trait declared with
  native async-fn-in-trait. #[trait_variant::make(MemoryStoreSend: Send)]
  derives the Send-bounded variant that backends actually implement (the
  blanket impl in 0.1.x goes variant -> source). A hand-written
  MemoryStore trait wraps every method in
  Pin<Box<dyn Future<Output = MemoryStoreResult<T>> + Send + 'a>> with
  a BoxedStoreFuture<'a, T> alias, and a blanket impl<T: MemoryStoreSend>
  MemoryStore for T adapts every Send-variant implementation. This keeps
  Arc<dyn MemoryStore> dyn-safe for Phase 1 cognitive-module tests --
  trait_variant 0.1 alone does NOT produce a dyn-safe variant (RPITIT),
  so the hand-written adapter is required and supersedes the plan claim
  that trait_variant gives dyn-compat for free.
- sqlite.rs: drop the #[async_trait::async_trait] attribute on the impl
  block and retarget it to MemoryStoreSend. Two pre-existing clippy
  issues that the macro had been masking are fixed in the same body
  (return Ok(out) tail expression in vector_search; DomainRow tuple
  alias in get_domain).
- mod.rs: export MemoryStoreSend alongside the existing LocalMemoryStore
  and MemoryStore re-exports.

Verification: cargo test -p vestige-core --features embeddings,vector-search
passes (428 lib tests). All five Phase 1 integration test binaries pass
(trait_round_trip, send_bound_variant including
arc_dyn_memory_store_moves_across_tokio_tasks, cognitive_module_isolation,
embedding_model_registry, domain_column_migration). cargo test --workspace
green across every test binary. cargo build --workspace --release green.
cargo clippy --workspace --features embeddings,vector-search -- -D warnings
clean. grep -rn async_trait crates/vestige-core/src/storage/ returns
zero hits.

Supersedes plan claim in docs/plans/0001a-trait-rewrite.md about
trait_variant emitting a dyn-compatible Send variant; option (c) from
the design conversation (hand-written dyn adapter) was selected
explicitly because trait_variant 0.1.2 does not.
This commit is contained in:
Jan De Landtsheer 2026-05-27 15:40:04 +02:00 committed by Sam Valladares
parent 5715f585fd
commit a4a6e877c5
3 changed files with 215 additions and 18 deletions

View file

@ -4,6 +4,8 @@
//! intentionally flat: one trait, ~25 methods, no sub-traits.
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
@ -182,17 +184,20 @@ pub struct ModelSignature {
// TRAIT
// ----------------------------------------------------------------------------
/// The single storage abstraction.
/// Internal source trait declared with native async-fn-in-trait.
///
/// `#[async_trait::async_trait]` makes every `async fn` return a
/// `Pin<Box<dyn Future + Send>>`, which is required for `Arc<dyn MemoryStore>`
/// to be movable across `tokio::spawn` boundaries.
/// `#[trait_variant::make(MemoryStoreSend: Send)]` derives a Send-bounded
/// variant whose returned futures are `Send`. In trait_variant 0.1.x the
/// macro emits the blanket `impl<T: MemoryStoreSend> LocalMemoryStore for T`,
/// so backends implement `MemoryStoreSend` (the Send variant) and get
/// `LocalMemoryStore` (the non-Send variant) for free.
///
/// `LocalMemoryStore` is a type alias kept for source compatibility with code
/// that refers to the non-send variant. In Phase 1 both names refer to the same
/// (dyn-compatible, Send-safe) trait.
#[async_trait::async_trait]
pub trait MemoryStore: Send + Sync + 'static {
/// Most callers should reach for the dyn-compatible `MemoryStore` trait
/// declared below, which adapts `MemoryStoreSend` into a boxed-future surface
/// and is the public storage abstraction for cognitive modules and tests
/// that want `Arc<dyn MemoryStore>`.
#[trait_variant::make(MemoryStoreSend: Send)]
pub trait LocalMemoryStore: Sync + 'static {
// --- Lifecycle ---
async fn init(&self) -> MemoryStoreResult<()>;
async fn health_check(&self) -> MemoryStoreResult<HealthStatus>;
@ -254,9 +259,201 @@ pub trait MemoryStore: Send + Sync + 'static {
async fn vacuum(&self) -> MemoryStoreResult<()>;
}
/// Type alias kept for source compatibility. Both names refer to the same
/// `async_trait`-annotated trait that is dyn-compatible and `Send + Sync`.
pub use MemoryStore as LocalMemoryStore;
// ----------------------------------------------------------------------------
// DYN-COMPATIBLE STORAGE TRAIT
// ----------------------------------------------------------------------------
/// Boxed Send future returning a `MemoryStoreResult<T>`, bound to the lifetime
/// of the borrows captured by the call (typically `&self` plus any reference
/// arguments). Used as the return type of every method on the dyn-compatible
/// `MemoryStore` trait below.
pub type BoxedStoreFuture<'a, T> =
Pin<Box<dyn Future<Output = MemoryStoreResult<T>> + Send + 'a>>;
/// Dyn-compatible storage trait.
///
/// `MemoryStoreSend` above is the trait users implement; it uses native
/// async-fn-in-trait return types (RPITIT), which gives zero-allocation
/// static dispatch but is not dyn-safe. This trait wraps every method in
/// `Pin<Box<dyn Future + Send + '_>>` so `Arc<dyn MemoryStore>` works for
/// the cognitive module surface and the Phase 1 integration tests.
///
/// Implementations should not target this trait directly; the blanket
/// `impl<T: MemoryStoreSend> MemoryStore for T` adapts every Send-variant
/// implementation automatically. Each call boxes the returned future
/// exactly once, identical to the cost of the previous design.
pub trait MemoryStore: Send + Sync + 'static {
fn init<'a>(&'a self) -> BoxedStoreFuture<'a, ()>;
fn health_check<'a>(&'a self) -> BoxedStoreFuture<'a, HealthStatus>;
fn registered_model<'a>(&'a self) -> BoxedStoreFuture<'a, Option<ModelSignature>>;
fn register_model<'a>(&'a self, sig: &'a ModelSignature) -> BoxedStoreFuture<'a, ()>;
fn insert<'a>(&'a self, record: &'a MemoryRecord) -> BoxedStoreFuture<'a, Uuid>;
fn get<'a>(&'a self, id: Uuid) -> BoxedStoreFuture<'a, Option<MemoryRecord>>;
fn update<'a>(&'a self, record: &'a MemoryRecord) -> BoxedStoreFuture<'a, ()>;
fn delete<'a>(&'a self, id: Uuid) -> BoxedStoreFuture<'a, ()>;
fn search<'a>(&'a self, query: &'a SearchQuery) -> BoxedStoreFuture<'a, Vec<SearchResult>>;
fn fts_search<'a>(
&'a self,
text: &'a str,
limit: usize,
) -> BoxedStoreFuture<'a, Vec<SearchResult>>;
fn vector_search<'a>(
&'a self,
embedding: &'a [f32],
limit: usize,
) -> BoxedStoreFuture<'a, Vec<SearchResult>>;
fn get_scheduling<'a>(
&'a self,
memory_id: Uuid,
) -> BoxedStoreFuture<'a, Option<SchedulingState>>;
fn update_scheduling<'a>(&'a self, state: &'a SchedulingState) -> BoxedStoreFuture<'a, ()>;
fn get_due_memories<'a>(
&'a self,
before: DateTime<Utc>,
limit: usize,
) -> BoxedStoreFuture<'a, Vec<(MemoryRecord, SchedulingState)>>;
fn add_edge<'a>(&'a self, edge: &'a MemoryEdge) -> BoxedStoreFuture<'a, ()>;
fn get_edges<'a>(
&'a self,
node_id: Uuid,
edge_type: Option<&'a str>,
) -> BoxedStoreFuture<'a, Vec<MemoryEdge>>;
fn remove_edge<'a>(&'a self, source: Uuid, target: Uuid) -> BoxedStoreFuture<'a, ()>;
fn get_neighbors<'a>(
&'a self,
node_id: Uuid,
depth: usize,
) -> BoxedStoreFuture<'a, Vec<(MemoryRecord, f64)>>;
fn list_domains<'a>(&'a self) -> BoxedStoreFuture<'a, Vec<Domain>>;
fn get_domain<'a>(&'a self, id: &'a str) -> BoxedStoreFuture<'a, Option<Domain>>;
fn upsert_domain<'a>(&'a self, domain: &'a Domain) -> BoxedStoreFuture<'a, ()>;
fn delete_domain<'a>(&'a self, id: &'a str) -> BoxedStoreFuture<'a, ()>;
fn classify<'a>(&'a self, embedding: &'a [f32]) -> BoxedStoreFuture<'a, Vec<(String, f64)>>;
fn count<'a>(&'a self) -> BoxedStoreFuture<'a, usize>;
fn get_stats<'a>(&'a self) -> BoxedStoreFuture<'a, StoreStats>;
fn vacuum<'a>(&'a self) -> BoxedStoreFuture<'a, ()>;
}
impl<T> MemoryStore for T
where
T: MemoryStoreSend,
{
fn init<'a>(&'a self) -> BoxedStoreFuture<'a, ()> {
Box::pin(<T as MemoryStoreSend>::init(self))
}
fn health_check<'a>(&'a self) -> BoxedStoreFuture<'a, HealthStatus> {
Box::pin(<T as MemoryStoreSend>::health_check(self))
}
fn registered_model<'a>(&'a self) -> BoxedStoreFuture<'a, Option<ModelSignature>> {
Box::pin(<T as MemoryStoreSend>::registered_model(self))
}
fn register_model<'a>(&'a self, sig: &'a ModelSignature) -> BoxedStoreFuture<'a, ()> {
Box::pin(<T as MemoryStoreSend>::register_model(self, sig))
}
fn insert<'a>(&'a self, record: &'a MemoryRecord) -> BoxedStoreFuture<'a, Uuid> {
Box::pin(<T as MemoryStoreSend>::insert(self, record))
}
fn get<'a>(&'a self, id: Uuid) -> BoxedStoreFuture<'a, Option<MemoryRecord>> {
Box::pin(<T as MemoryStoreSend>::get(self, id))
}
fn update<'a>(&'a self, record: &'a MemoryRecord) -> BoxedStoreFuture<'a, ()> {
Box::pin(<T as MemoryStoreSend>::update(self, record))
}
fn delete<'a>(&'a self, id: Uuid) -> BoxedStoreFuture<'a, ()> {
Box::pin(<T as MemoryStoreSend>::delete(self, id))
}
fn search<'a>(&'a self, query: &'a SearchQuery) -> BoxedStoreFuture<'a, Vec<SearchResult>> {
Box::pin(<T as MemoryStoreSend>::search(self, query))
}
fn fts_search<'a>(
&'a self,
text: &'a str,
limit: usize,
) -> BoxedStoreFuture<'a, Vec<SearchResult>> {
Box::pin(<T as MemoryStoreSend>::fts_search(self, text, limit))
}
fn vector_search<'a>(
&'a self,
embedding: &'a [f32],
limit: usize,
) -> BoxedStoreFuture<'a, Vec<SearchResult>> {
Box::pin(<T as MemoryStoreSend>::vector_search(self, embedding, limit))
}
fn get_scheduling<'a>(
&'a self,
memory_id: Uuid,
) -> BoxedStoreFuture<'a, Option<SchedulingState>> {
Box::pin(<T as MemoryStoreSend>::get_scheduling(self, memory_id))
}
fn update_scheduling<'a>(&'a self, state: &'a SchedulingState) -> BoxedStoreFuture<'a, ()> {
Box::pin(<T as MemoryStoreSend>::update_scheduling(self, state))
}
fn get_due_memories<'a>(
&'a self,
before: DateTime<Utc>,
limit: usize,
) -> BoxedStoreFuture<'a, Vec<(MemoryRecord, SchedulingState)>> {
Box::pin(<T as MemoryStoreSend>::get_due_memories(self, before, limit))
}
fn add_edge<'a>(&'a self, edge: &'a MemoryEdge) -> BoxedStoreFuture<'a, ()> {
Box::pin(<T as MemoryStoreSend>::add_edge(self, edge))
}
fn get_edges<'a>(
&'a self,
node_id: Uuid,
edge_type: Option<&'a str>,
) -> BoxedStoreFuture<'a, Vec<MemoryEdge>> {
Box::pin(<T as MemoryStoreSend>::get_edges(self, node_id, edge_type))
}
fn remove_edge<'a>(&'a self, source: Uuid, target: Uuid) -> BoxedStoreFuture<'a, ()> {
Box::pin(<T as MemoryStoreSend>::remove_edge(self, source, target))
}
fn get_neighbors<'a>(
&'a self,
node_id: Uuid,
depth: usize,
) -> BoxedStoreFuture<'a, Vec<(MemoryRecord, f64)>> {
Box::pin(<T as MemoryStoreSend>::get_neighbors(self, node_id, depth))
}
fn list_domains<'a>(&'a self) -> BoxedStoreFuture<'a, Vec<Domain>> {
Box::pin(<T as MemoryStoreSend>::list_domains(self))
}
fn get_domain<'a>(&'a self, id: &'a str) -> BoxedStoreFuture<'a, Option<Domain>> {
Box::pin(<T as MemoryStoreSend>::get_domain(self, id))
}
fn upsert_domain<'a>(&'a self, domain: &'a Domain) -> BoxedStoreFuture<'a, ()> {
Box::pin(<T as MemoryStoreSend>::upsert_domain(self, domain))
}
fn delete_domain<'a>(&'a self, id: &'a str) -> BoxedStoreFuture<'a, ()> {
Box::pin(<T as MemoryStoreSend>::delete_domain(self, id))
}
fn classify<'a>(&'a self, embedding: &'a [f32]) -> BoxedStoreFuture<'a, Vec<(String, f64)>> {
Box::pin(<T as MemoryStoreSend>::classify(self, embedding))
}
fn count<'a>(&'a self) -> BoxedStoreFuture<'a, usize> {
Box::pin(<T as MemoryStoreSend>::count(self))
}
fn get_stats<'a>(&'a self) -> BoxedStoreFuture<'a, StoreStats> {
Box::pin(<T as MemoryStoreSend>::get_stats(self))
}
fn vacuum<'a>(&'a self) -> BoxedStoreFuture<'a, ()> {
Box::pin(<T as MemoryStoreSend>::vacuum(self))
}
}
// ----------------------------------------------------------------------------
// UNIT TESTS

View file

@ -9,8 +9,8 @@ mod sqlite;
pub use memory_store::{
ClassificationResult, Domain, HealthStatus, LocalMemoryStore, MemoryEdge, MemoryRecord,
MemoryStore, MemoryStoreError, MemoryStoreResult, ModelSignature, SchedulingState, SearchQuery,
SearchResult, StoreStats,
MemoryStore, MemoryStoreError, MemoryStoreResult, MemoryStoreSend, ModelSignature,
SchedulingState, SearchQuery, SearchResult, StoreStats,
};
pub use migrations::MIGRATIONS;
pub use portable::{

View file

@ -8441,8 +8441,7 @@ impl SqliteMemoryStore {
}
}
#[async_trait::async_trait]
impl crate::storage::memory_store::LocalMemoryStore for SqliteMemoryStore {
impl crate::storage::memory_store::MemoryStoreSend for SqliteMemoryStore {
async fn init(&self) -> crate::storage::memory_store::MemoryStoreResult<()> {
// Migrations run in `new`; this is a no-op for the SQLite backend.
Ok(())
@ -8797,7 +8796,7 @@ impl crate::storage::memory_store::LocalMemoryStore for SqliteMemoryStore {
})
})
.collect();
return Ok(out);
Ok(out)
}
#[cfg(not(all(feature = "embeddings", feature = "vector-search")))]
{
@ -9120,11 +9119,12 @@ impl crate::storage::memory_store::LocalMemoryStore for SqliteMemoryStore {
) -> crate::storage::memory_store::MemoryStoreResult<Option<crate::storage::memory_store::Domain>>
{
use crate::storage::memory_store::{Domain, MemoryStoreError};
type DomainRow = (String, String, Option<Vec<u8>>, String, i64, String);
let reader = self
.reader
.lock()
.map_err(|_| MemoryStoreError::Init("Reader lock poisoned".into()))?;
let result: Option<(String, String, Option<Vec<u8>>, String, i64, String)> = reader
let result: Option<DomainRow> = reader
.query_row(
"SELECT id, label, centroid, top_terms, memory_count, created_at FROM domains WHERE id = ?1",
rusqlite::params![id],