diff --git a/crates/omnigraph-server/src/registry.rs b/crates/omnigraph-server/src/registry.rs index 918d033..1bc21a1 100644 --- a/crates/omnigraph-server/src/registry.rs +++ b/crates/omnigraph-server/src/registry.rs @@ -23,6 +23,7 @@ use std::sync::Arc; use arc_swap::ArcSwap; use omnigraph::db::Omnigraph; +#[cfg(test)] use tokio::sync::Mutex; use crate::identity::GraphKey; @@ -107,6 +108,11 @@ pub enum InsertError { pub struct GraphRegistry { snapshot: ArcSwap, + /// Serializes runtime mutations through [`GraphRegistry::insert`]. + /// Gated with `insert` because they share a single contract — if + /// the consumer goes away, so does the lock. Re-introducing one + /// requires re-introducing the other. + #[cfg(test)] mutate: Mutex<()>, } @@ -115,6 +121,7 @@ impl GraphRegistry { pub fn new() -> Self { Self { snapshot: ArcSwap::from_pointee(RegistrySnapshot::default()), + #[cfg(test)] mutate: Mutex::new(()), } } @@ -136,6 +143,7 @@ impl GraphRegistry { } Ok(Self { snapshot: ArcSwap::from_pointee(RegistrySnapshot::new(graphs)), + #[cfg(test)] mutate: Mutex::new(()), }) } @@ -179,13 +187,19 @@ impl GraphRegistry { /// Add a new handle. Async because the mutex is `tokio::sync::Mutex` /// (a future managed-catalog flow may hold it across `.await` points /// during atomic registry mutations). Rejects duplicate `GraphKey` - /// and duplicate `uri`. Currently unused at runtime — only construction - /// via `from_handles` runs at startup — but kept for the tests that - /// pin its concurrency contract. + /// and duplicate `uri`. /// - /// Race semantics (pinned by `concurrent_insert_same_key_one_succeeds_one_errors`): - /// under two concurrent calls with the same key, exactly one returns - /// `Ok(())` and the other returns `Err(InsertError::DuplicateKey(_))`. + /// **Test-only surface.** No production code reaches this — startup + /// uses `from_handles`, and runtime add/remove is deferred. The + /// race-contract tests below pin the mutex linearization point so + /// that when a real consumer ships (managed cluster catalog), the + /// concurrency contract is already proven. Ungate by removing + /// `#[cfg(test)]` once that consumer is in scope. + /// + /// Race semantics (pinned by `concurrent_insert_same_key_exactly_one_succeeds`): + /// under N concurrent calls with the same key, exactly one returns + /// `Ok(())` and the rest return `Err(InsertError::DuplicateKey(_))`. + #[cfg(test)] pub async fn insert(&self, handle: Arc) -> Result<(), InsertError> { let _guard = self.mutate.lock().await; let current = self.snapshot.load();