diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index b21bea9..e9f3317 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -74,7 +74,14 @@ pub struct SchemaApplyResult { pub struct Omnigraph { root_uri: String, storage: Arc, - coordinator: GraphCoordinator, + /// Coordinator state behind a tokio Mutex. PR 2 (MR-686) wraps this + /// so engine write APIs can be `&self` (the HTTP server's `AppState` + /// then holds `Arc` and dispatches concurrent calls + /// without a global write lock). Critical sections are short: + /// callers acquire, read or refresh, drop. Lock acquisition order: + /// always before `runtime_cache` (when both are needed in one + /// scope). + coordinator: Arc>, table_store: TableStore, runtime_cache: RuntimeCache, /// Read-heavy on every query, written only by `apply_schema`. ArcSwap @@ -139,7 +146,7 @@ impl Omnigraph { Ok(Self { root_uri: root.clone(), storage, - coordinator, + coordinator: Arc::new(tokio::sync::Mutex::new(coordinator)), table_store: TableStore::new(&root), runtime_cache: RuntimeCache::default(), catalog: Arc::new(ArcSwap::from_pointee(catalog)), @@ -225,7 +232,7 @@ impl Omnigraph { Ok(Self { root_uri: root.clone(), storage, - coordinator, + coordinator: Arc::new(tokio::sync::Mutex::new(coordinator)), table_store: TableStore::new(&root), runtime_cache: RuntimeCache::default(), catalog: Arc::new(ArcSwap::from_pointee(catalog)), @@ -275,7 +282,7 @@ impl Omnigraph { schema_apply::apply_schema(self, desired_schema_source).await } - pub(crate) async fn ensure_schema_apply_idle(&mut self, operation: &str) -> Result<()> { + pub(crate) async fn ensure_schema_apply_idle(&self, operation: &str) -> Result<()> { schema_apply::ensure_schema_apply_idle(self, operation).await } @@ -336,15 +343,16 @@ impl Omnigraph { } pub(crate) async fn swap_coordinator_for_branch( - &mut self, + &self, branch: Option<&str>, ) -> Result { let next = self.open_coordinator_for_branch(branch).await?; - Ok(std::mem::replace(&mut self.coordinator, next)) + let mut coord = self.coordinator.lock().await; + Ok(std::mem::replace(&mut *coord, next)) } - pub(crate) fn restore_coordinator(&mut self, coordinator: GraphCoordinator) { - self.coordinator = coordinator; + pub(crate) async fn restore_coordinator(&self, coordinator: GraphCoordinator) { + *self.coordinator.lock().await = coordinator; } pub(crate) async fn resolved_branch_target( @@ -354,21 +362,19 @@ impl Omnigraph { self.ensure_schema_state_valid().await?; let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string()); let normalized = normalize_branch_name(branch.unwrap_or("main"))?; - if normalized.as_deref() == self.coordinator.current_branch() { - let snapshot_id = self.coordinator.head_commit_id().await?.unwrap_or_else(|| { - SnapshotId::synthetic( - self.coordinator.current_branch(), - self.coordinator.version(), - ) + let coord = self.coordinator.lock().await; + if normalized.as_deref() == coord.current_branch() { + let snapshot_id = coord.head_commit_id().await?.unwrap_or_else(|| { + SnapshotId::synthetic(coord.current_branch(), coord.version()) }); return Ok(ResolvedTarget { requested, - branch: self.coordinator.current_branch().map(str::to_string), + branch: coord.current_branch().map(str::to_string), snapshot_id, - snapshot: self.coordinator.snapshot(), + snapshot: coord.snapshot(), }); } - self.coordinator.resolve_target(&requested).await + coord.resolve_target(&requested).await } pub(crate) async fn snapshot_for_branch(&self, branch: Option<&str>) -> Result { @@ -377,13 +383,13 @@ impl Omnigraph { .map(|resolved| resolved.snapshot) } - pub(crate) fn version(&self) -> u64 { - self.coordinator.version() + pub(crate) async fn version(&self) -> u64 { + self.coordinator.lock().await.version() } /// Return an immutable Snapshot from the known manifest state. No storage I/O. - pub(crate) fn snapshot(&self) -> Snapshot { - self.coordinator.snapshot() + pub(crate) async fn snapshot(&self) -> Snapshot { + self.coordinator.lock().await.snapshot() } pub async fn snapshot_of(&self, target: impl Into) -> Result { @@ -408,10 +414,11 @@ impl Omnigraph { } /// Synchronize this handle's write base to the latest head of the named branch. - pub async fn sync_branch(&mut self, branch: &str) -> Result<()> { + pub async fn sync_branch(&self, branch: &str) -> Result<()> { self.ensure_schema_state_valid().await?; let branch = normalize_branch_name(branch)?; - self.coordinator = self.open_coordinator_for_branch(branch.as_deref()).await?; + let next = self.open_coordinator_for_branch(branch.as_deref()).await?; + *self.coordinator.lock().await = next; self.runtime_cache.invalidate_all().await; Ok(()) } @@ -448,18 +455,19 @@ impl Omnigraph { /// (e.g. `schema_apply` mid-write) MUST use /// [`refresh_coordinator_only`](Self::refresh_coordinator_only) to /// avoid the recovery sweep racing their own sidecar. - pub async fn refresh(&mut self) -> Result<()> { - self.coordinator.refresh().await?; + pub async fn refresh(&self) -> Result<()> { + let mut coord = self.coordinator.lock().await; + coord.refresh().await?; let schema_state_recovery = recover_schema_state_files( &self.root_uri, Arc::clone(&self.storage), - &self.coordinator.snapshot(), + &coord.snapshot(), ) .await?; crate::db::manifest::recover_manifest_drift( &self.root_uri, Arc::clone(&self.storage), - &mut self.coordinator, + &mut *coord, crate::db::manifest::RecoveryMode::RollForwardOnly, schema_state_recovery, ) @@ -469,14 +477,14 @@ impl Omnigraph { Ok(()) } - async fn reload_schema_if_source_changed(&mut self) -> Result<()> { + async fn reload_schema_if_source_changed(&self) -> Result<()> { let schema_path = schema_source_uri(&self.root_uri); let schema_source = self.storage.read_text(&schema_path).await?; if schema_source == *self.schema_source.load_full() { return Ok(()); } let current_source_ir = read_schema_ir_from_source(&schema_source)?; - let branches = self.coordinator.branch_list().await?; + let branches = self.coordinator.lock().await.branch_list().await?; let (accepted_ir, _) = load_or_bootstrap_schema_contract( &self.root_uri, Arc::clone(&self.storage), @@ -498,15 +506,15 @@ impl Omnigraph { /// here would observe the caller's own sidecar, classify it as /// RolledPastExpected, and roll it forward — racing the caller's /// own publish path. - pub(crate) async fn refresh_coordinator_only(&mut self) -> Result<()> { - self.coordinator.refresh().await?; + pub(crate) async fn refresh_coordinator_only(&self) -> Result<()> { + self.coordinator.lock().await.refresh().await?; self.runtime_cache.invalidate_all().await; Ok(()) } pub async fn resolve_snapshot(&self, branch: &str) -> Result { self.ensure_schema_state_valid().await?; - self.coordinator.resolve_snapshot_id(branch).await + self.coordinator.lock().await.resolve_snapshot_id(branch).await } pub(crate) async fn resolved_target( @@ -514,7 +522,7 @@ impl Omnigraph { target: impl Into, ) -> Result { self.ensure_schema_state_valid().await?; - self.coordinator.resolve_target(&target.into()).await + self.coordinator.lock().await.resolve_target(&target.into()).await } // ─── Change detection ──────────────────────────────────────────────── @@ -545,26 +553,20 @@ impl Omnigraph { to_commit_id: &str, filter: &crate::changes::ChangeFilter, ) -> Result { - let from_commit = self - .coordinator - .resolve_commit(&SnapshotId::new(from_commit_id)) - .await?; - let to_commit = self - .coordinator - .resolve_commit(&SnapshotId::new(to_commit_id)) - .await?; - let from_snap = self - .coordinator + let coord = self.coordinator.lock().await; + let from_commit = coord.resolve_commit(&SnapshotId::new(from_commit_id)).await?; + let to_commit = coord.resolve_commit(&SnapshotId::new(to_commit_id)).await?; + let from_snap = coord .resolve_target(&ReadTarget::Snapshot(SnapshotId::new( from_commit.graph_commit_id.clone(), ))) .await?; - let to_snap = self - .coordinator + let to_snap = coord .resolve_target(&ReadTarget::Snapshot(SnapshotId::new( to_commit.graph_commit_id.clone(), ))) .await?; + drop(coord); crate::changes::diff_snapshots( self.uri(), &from_snap.snapshot, @@ -597,7 +599,7 @@ impl Omnigraph { /// Create a Snapshot at any historical manifest version. pub async fn snapshot_at_version(&self, version: u64) -> Result { self.ensure_schema_state_valid().await?; - self.coordinator.snapshot_at_version(version).await + self.coordinator.lock().await.snapshot_at_version(version).await } pub async fn export_jsonl( @@ -709,7 +711,7 @@ impl Omnigraph { ))); } - let snapshot = self.snapshot(); + let snapshot = self.snapshot().await; let table_key = format!("node:{}", type_name); let ds = snapshot.open(&table_key).await?; @@ -737,12 +739,12 @@ impl Omnigraph { }) } - pub(crate) fn active_branch(&self) -> Option<&str> { - self.coordinator.current_branch() + pub(crate) async fn active_branch(&self) -> Option { + self.coordinator.lock().await.current_branch().map(str::to_string) } async fn ensure_branch_delete_safe(&self, branch: &str, branches: &[String]) -> Result<()> { - let descendants = self.coordinator.branch_descendants(branch).await?; + let descendants = self.coordinator.lock().await.branch_descendants(branch).await?; if let Some(descendant) = descendants.first() { return Err(OmniError::manifest_conflict(format!( "cannot delete branch '{}' because descendant branch '{}' still depends on it", @@ -797,8 +799,9 @@ impl Omnigraph { Ok(()) } - async fn delete_branch_storage_only(&mut self, branch: &str) -> Result<()> { - if self.coordinator.current_branch() == Some(branch) { + async fn delete_branch_storage_only(&self, branch: &str) -> Result<()> { + let active = self.coordinator.lock().await.current_branch().map(str::to_string); + if active.as_deref() == Some(branch) { return Err(OmniError::manifest_conflict(format!( "cannot delete currently active branch '{}'", branch @@ -812,7 +815,7 @@ impl Omnigraph { .map(|entry| (entry.table_key.clone(), entry.table_path.clone())) .collect::>(); - self.coordinator.branch_delete(branch).await?; + self.coordinator.lock().await.branch_delete(branch).await?; self.cleanup_deleted_branch_tables(branch, &owned_tables) .await } @@ -833,15 +836,15 @@ impl Omnigraph { .map(|id| id.map(|snapshot_id| snapshot_id.as_str().to_string())) } - pub async fn branch_create(&mut self, name: &str) -> Result<()> { + pub async fn branch_create(&self, name: &str) -> Result<()> { self.ensure_schema_state_valid().await?; self.ensure_schema_apply_idle("branch_create").await?; ensure_public_branch_ref(name, "branch_create")?; - self.coordinator.branch_create(name).await + self.coordinator.lock().await.branch_create(name).await } pub async fn branch_create_from( - &mut self, + &self, from: impl Into, name: &str, ) -> Result<()> { @@ -850,7 +853,7 @@ impl Omnigraph { } async fn branch_create_from_impl( - &mut self, + &self, from: impl Into, name: &str, allow_internal_refs: bool, @@ -867,24 +870,24 @@ impl Omnigraph { } let branch = normalize_branch_name(&branch_name)?; let previous = self.swap_coordinator_for_branch(branch.as_deref()).await?; - let result = self.coordinator.branch_create(name).await; - self.restore_coordinator(previous); + let result = self.coordinator.lock().await.branch_create(name).await; + self.restore_coordinator(previous).await; result } pub async fn branch_list(&self) -> Result> { self.ensure_schema_state_valid().await?; - self.coordinator.branch_list().await + self.coordinator.lock().await.branch_list().await } - pub async fn branch_delete(&mut self, name: &str) -> Result<()> { + pub async fn branch_delete(&self, name: &str) -> Result<()> { self.ensure_schema_state_valid().await?; self.ensure_schema_apply_idle("branch_delete").await?; ensure_public_branch_ref(name, "branch_delete")?; self.refresh().await?; let branch = normalize_branch_name(name)? .ok_or_else(|| OmniError::manifest("cannot delete branch 'main'".to_string()))?; - let branches = self.coordinator.branch_list().await?; + let branches = self.coordinator.lock().await.branch_list().await?; if !branches.iter().any(|candidate| candidate == &branch) { return Err(OmniError::manifest_not_found(format!( "branch '{}' not found", @@ -898,7 +901,7 @@ impl Omnigraph { pub async fn get_commit(&self, commit_id: &str) -> Result { self.ensure_schema_state_valid().await?; - self.coordinator + self.coordinator.lock().await .resolve_commit(&SnapshotId::new(commit_id)) .await } @@ -1534,7 +1537,7 @@ edge WorksAt: Person -> Company } async fn table_rows_json(db: &Omnigraph, table_key: &str) -> Vec { - let snapshot = db.snapshot(); + let snapshot = db.snapshot().await; let ds = snapshot.open(table_key).await.unwrap(); let batches = db.table_store().scan_batches(&ds).await.unwrap(); batches @@ -1631,7 +1634,7 @@ edge WorksAt: Person -> Company let uri = dir.path().to_str().unwrap(); let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); seed_person_row(&mut db, "Alice", Some(30)).await; - let before_version = db.snapshot().version(); + let before_version = db.snapshot().await.version(); let desired = TEST_SCHEMA .replace("node Person {\n", "node Human @rename_from(\"Person\") {\n") @@ -1642,7 +1645,7 @@ edge WorksAt: Person -> Company ); db.apply_schema(&desired).await.unwrap(); - let head = db.snapshot(); + let head = db.snapshot().await; assert!(head.entry("node:Person").is_none()); assert!(head.entry("node:Human").is_some()); let historical = ManifestCoordinator::snapshot_at(uri, None, before_version) @@ -1672,7 +1675,7 @@ edge WorksAt: Person -> Company .await .unwrap(); - let all_branches = db.coordinator.all_branches().await.unwrap(); + let all_branches = db.coordinator.lock().await.all_branches().await.unwrap(); assert!( !all_branches.iter().any(|b| is_internal_run_branch(b)), "run branch should be deleted after publish, got: {:?}", @@ -1696,7 +1699,7 @@ edge WorksAt: Person -> Company let desired = TEST_SCHEMA.replace("name: String @key", "name: String @key @index"); db.apply_schema(&desired).await.unwrap(); - let snapshot = db.snapshot(); + let snapshot = db.snapshot().await; let ds = snapshot.open("node:Person").await.unwrap(); assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap()); } @@ -1715,7 +1718,7 @@ edge WorksAt: Person -> Company ); db.apply_schema(&desired).await.unwrap(); - let snapshot = db.snapshot(); + let snapshot = db.snapshot().await; let ds = snapshot.open("node:Person").await.unwrap(); assert!(db.table_store().has_btree_index(&ds, "id").await.unwrap()); assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap()); @@ -1728,6 +1731,8 @@ edge WorksAt: Person -> Company let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); let mut db = db; db.coordinator + .lock() + .await .branch_create(SCHEMA_APPLY_LOCK_BRANCH) .await .unwrap(); @@ -1745,6 +1750,8 @@ edge WorksAt: Person -> Company let uri = dir.path().to_str().unwrap(); let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); db.coordinator + .lock() + .await .branch_create(SCHEMA_APPLY_LOCK_BRANCH) .await .unwrap(); @@ -1762,6 +1769,8 @@ edge WorksAt: Person -> Company let uri = dir.path().to_str().unwrap(); let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); db.coordinator + .lock() + .await .branch_create(SCHEMA_APPLY_LOCK_BRANCH) .await .unwrap(); diff --git a/crates/omnigraph/src/db/omnigraph/export.rs b/crates/omnigraph/src/db/omnigraph/export.rs index ad5560e..8fc57f2 100644 --- a/crates/omnigraph/src/db/omnigraph/export.rs +++ b/crates/omnigraph/src/db/omnigraph/export.rs @@ -16,7 +16,7 @@ pub(super) async fn entity_at( id: &str, version: u64, ) -> Result> { - let snap = db.coordinator.snapshot_at_version(version).await?; + let snap = db.coordinator.lock().await.snapshot_at_version(version).await?; entity_from_snapshot(db, &snap, table_key, id).await } diff --git a/crates/omnigraph/src/db/omnigraph/schema_apply.rs b/crates/omnigraph/src/db/omnigraph/schema_apply.rs index ad6aadc..a2475d5 100644 --- a/crates/omnigraph/src/db/omnigraph/schema_apply.rs +++ b/crates/omnigraph/src/db/omnigraph/schema_apply.rs @@ -31,7 +31,7 @@ pub(super) async fn apply_schema_with_lock( desired_schema_source: &str, ) -> Result { db.ensure_schema_state_valid().await?; - let branches = db.coordinator.all_branches().await?; + let branches = db.coordinator.lock().await.all_branches().await?; // Skip `main` and internal system branches. The schema-apply lock branch // is excluded because it is the cluster-wide schema-apply serializer. // `__run__*` branches are no longer created; the filter remains as @@ -67,7 +67,7 @@ pub(super) async fn apply_schema_with_lock( return Ok(SchemaApplyResult { supported: true, applied: false, - manifest_version: db.version(), + manifest_version: db.version().await, steps: plan.steps, }); } @@ -75,7 +75,7 @@ pub(super) async fn apply_schema_with_lock( let mut desired_catalog = build_catalog_from_ir(&desired_ir)?; fixup_blob_schemas(&mut desired_catalog); - let snapshot = db.snapshot(); + let snapshot = db.snapshot().await; let base_manifest_version = snapshot.version(); let mut added_tables = BTreeSet::new(); let mut renamed_tables = HashMap::new(); @@ -443,11 +443,11 @@ pub(super) async fn apply_schema_with_lock( } db.refresh_coordinator_only().await?; - if db.version() != base_manifest_version { + if db.version().await != base_manifest_version { return Err(OmniError::manifest_conflict(format!( "schema apply lost its write lease: main advanced from v{} to v{} while schema apply was in progress", base_manifest_version, - db.version() + db.version().await ))); } @@ -475,6 +475,8 @@ pub(super) async fn apply_schema_with_lock( _snapshot_id: _, } = db .coordinator + .lock() + .await .commit_changes_with_actor(&manifest_changes, None) .await?; @@ -498,7 +500,7 @@ pub(super) async fn apply_schema_with_lock( db.store_catalog(desired_catalog); db.store_schema_source(desired_schema_source.to_string()); - db.coordinator.refresh().await?; + db.coordinator.lock().await.refresh().await?; db.runtime_cache.invalidate_all().await; if changed_edge_tables { db.invalidate_graph_index().await; @@ -529,7 +531,7 @@ pub(super) async fn apply_schema_with_lock( }) } -pub(super) async fn ensure_schema_apply_idle(db: &mut Omnigraph, operation: &str) -> Result<()> { +pub(super) async fn ensure_schema_apply_idle(db: &Omnigraph, operation: &str) -> Result<()> { db.refresh_coordinator_only().await?; ensure_schema_apply_not_locked(db, operation).await } @@ -537,7 +539,7 @@ pub(super) async fn ensure_schema_apply_idle(db: &mut Omnigraph, operation: &str pub(super) async fn acquire_schema_apply_lock(db: &mut Omnigraph) -> Result<()> { db.ensure_schema_state_valid().await?; db.refresh_coordinator_only().await?; - let branches = db.coordinator.all_branches().await?; + let branches = db.coordinator.lock().await.all_branches().await?; if branches .iter() .any(|branch| is_schema_apply_lock_branch(branch)) @@ -548,12 +550,16 @@ pub(super) async fn acquire_schema_apply_lock(db: &mut Omnigraph) -> Result<()> } db.coordinator + .lock() + .await .branch_create(SCHEMA_APPLY_LOCK_BRANCH) .await?; db.refresh_coordinator_only().await?; let blocking_branches = db .coordinator + .lock() + .await .all_branches() .await? .into_iter() @@ -572,6 +578,8 @@ pub(super) async fn acquire_schema_apply_lock(db: &mut Omnigraph) -> Result<()> pub(super) async fn release_schema_apply_lock(db: &mut Omnigraph) -> Result<()> { db.coordinator + .lock() + .await .branch_delete(SCHEMA_APPLY_LOCK_BRANCH) .await?; // Use refresh_coordinator_only — the full Omnigraph::refresh would @@ -585,6 +593,8 @@ pub(super) async fn release_schema_apply_lock(db: &mut Omnigraph) -> Result<()> pub(super) async fn ensure_schema_apply_not_locked(db: &Omnigraph, operation: &str) -> Result<()> { if db .coordinator + .lock() + .await .all_branches() .await? .iter() diff --git a/crates/omnigraph/src/db/omnigraph/table_ops.rs b/crates/omnigraph/src/db/omnigraph/table_ops.rs index 57549d1..08c9108 100644 --- a/crates/omnigraph/src/db/omnigraph/table_ops.rs +++ b/crates/omnigraph/src/db/omnigraph/table_ops.rs @@ -2,15 +2,13 @@ use super::*; pub(super) async fn graph_index(db: &Omnigraph) -> Result> { db.ensure_schema_state_valid().await?; - let resolved = db - .coordinator + let coord = db.coordinator.lock().await; + let resolved = coord .resolve_target(&ReadTarget::Branch( - db.coordinator - .current_branch() - .unwrap_or("main") - .to_string(), + coord.current_branch().unwrap_or("main").to_string(), )) .await?; + drop(coord); let catalog = db.catalog(); db.runtime_cache.graph_index(&resolved, &catalog).await } @@ -24,7 +22,7 @@ pub(super) async fn graph_index_for_resolved( } pub(super) async fn ensure_indices(db: &mut Omnigraph) -> Result<()> { - let current_branch = db.coordinator.current_branch().map(str::to_string); + let current_branch = db.coordinator.lock().await.current_branch().map(str::to_string); ensure_indices_for_branch(db, current_branch.as_deref()).await } @@ -402,7 +400,7 @@ pub(super) async fn open_for_mutation( db: &Omnigraph, table_key: &str, ) -> Result<(Dataset, String, Option)> { - let current_branch = db.coordinator.current_branch().map(str::to_string); + let current_branch = db.coordinator.lock().await.current_branch().map(str::to_string); open_for_mutation_on_branch(db, current_branch.as_deref(), table_key).await } @@ -737,6 +735,8 @@ async fn commit_prepared_updates( _snapshot_id: _, } = db .coordinator + .lock() + .await .commit_updates_with_actor(updates, actor_id) .await?; Ok(manifest_version) @@ -753,6 +753,8 @@ async fn commit_prepared_updates_with_expected( _snapshot_id: _, } = db .coordinator + .lock() + .await .commit_updates_with_actor_with_expected(updates, expected_table_versions, actor_id) .await?; Ok(manifest_version) @@ -764,7 +766,7 @@ pub(super) async fn commit_prepared_updates_on_branch( updates: &[crate::db::SubTableUpdate], actor_id: Option<&str>, ) -> Result { - let current_branch = db.coordinator.current_branch().map(str::to_string); + let current_branch = db.coordinator.lock().await.current_branch().map(str::to_string); let requested_branch = branch.map(str::to_string); if requested_branch == current_branch { return commit_prepared_updates(db, updates, actor_id).await; @@ -792,7 +794,7 @@ pub(super) async fn commit_prepared_updates_on_branch_with_expected( expected_table_versions: &std::collections::HashMap, actor_id: Option<&str>, ) -> Result { - let current_branch = db.coordinator.current_branch().map(str::to_string); + let current_branch = db.coordinator.lock().await.current_branch().map(str::to_string); let requested_branch = branch.map(str::to_string); if requested_branch == current_branch { return commit_prepared_updates_with_expected( @@ -827,7 +829,7 @@ pub(super) async fn commit_updates( updates: &[crate::db::SubTableUpdate], ) -> Result { db.ensure_schema_apply_not_locked("write commit").await?; - let current_branch = db.coordinator.current_branch().map(str::to_string); + let current_branch = db.coordinator.lock().await.current_branch().map(str::to_string); let prepared = prepare_updates_for_commit(db, current_branch.as_deref(), updates).await?; commit_prepared_updates(db, &prepared, None).await } @@ -836,7 +838,7 @@ pub(super) async fn commit_manifest_updates( db: &mut Omnigraph, updates: &[crate::db::SubTableUpdate], ) -> Result { - db.coordinator.commit_manifest_updates(updates).await + db.coordinator.lock().await.commit_manifest_updates(updates).await } pub(super) async fn record_merge_commit( @@ -846,7 +848,7 @@ pub(super) async fn record_merge_commit( merged_parent_commit_id: &str, actor_id: Option<&str>, ) -> Result { - db.coordinator + db.coordinator.lock().await .record_merge_commit( manifest_version, parent_commit_id, @@ -880,7 +882,7 @@ pub(super) async fn commit_updates_on_branch_with_expected( } pub(super) async fn ensure_commit_graph_initialized(db: &mut Omnigraph) -> Result<()> { - db.coordinator.ensure_commit_graph_initialized().await + db.coordinator.lock().await.ensure_commit_graph_initialized().await } pub(super) async fn invalidate_graph_index(db: &Omnigraph) { diff --git a/crates/omnigraph/src/exec/merge.rs b/crates/omnigraph/src/exec/merge.rs index f2284f0..b8aa27a 100644 --- a/crates/omnigraph/src/exec/merge.rs +++ b/crates/omnigraph/src/exec/merge.rs @@ -817,8 +817,9 @@ async fn publish_adopted_source_state( .ok_or_else(|| OmniError::manifest(format!("missing source entry for {}", table_key)))?; let target_entry = target_snapshot.entry(table_key); + let target_active = target_db.active_branch().await; match ( - target_db.active_branch(), + target_active.as_deref(), source_entry.table_branch.as_deref(), ) { // Both on main — pointer switch is safe (same lineage, version columns valid) @@ -1076,7 +1077,7 @@ impl Omnigraph { )) .await? .snapshot; - let previous_branch = self.active_branch().map(str::to_string); + let previous_branch = self.active_branch().await; let previous = self .swap_coordinator_for_branch(target_branch.as_deref()) .await?; @@ -1090,7 +1091,7 @@ impl Omnigraph { actor_id, ) .await; - self.restore_coordinator(previous); + self.restore_coordinator(previous).await; if merge_result.is_ok() && previous_branch == target_branch { self.refresh().await?; @@ -1109,7 +1110,7 @@ impl Omnigraph { actor_id: Option<&str>, ) -> Result { self.ensure_commit_graph_initialized().await?; - let target_snapshot = self.snapshot(); + let target_snapshot = self.snapshot().await; let mut table_keys = HashSet::new(); for entry in base_snapshot.entries() { @@ -1203,6 +1204,7 @@ impl Omnigraph { // commit + record_merge_commit calls below. Under PR 1b's // intermediate state (global server RwLock still in place), // this acquisition is uncontended. + let active_branch_for_keys = self.active_branch().await; let merge_queue_keys: Vec<(String, Option)> = ordered_table_keys .iter() .filter(|table_key| { @@ -1211,7 +1213,7 @@ impl Omnigraph { Some(CandidateTableState::RewriteMerged(_)) | Some(CandidateTableState::AdoptSourceState) ) }) - .map(|table_key| (table_key.clone(), self.active_branch().map(str::to_string))) + .map(|table_key| (table_key.clone(), active_branch_for_keys.clone())) .collect(); let _merge_queue_guards = self.write_queue().acquire_many(&merge_queue_keys).await; @@ -1240,7 +1242,7 @@ impl Omnigraph { // the orphaned post-Phase-B HEAD on the target ref. // Same rationale as table_ops.rs:115-120 in // ensure_indices_for_branch. - table_branch: self.active_branch().map(str::to_string), + table_branch: active_branch_for_keys.clone(), }) }) .collect(); @@ -1256,7 +1258,7 @@ impl Omnigraph { // `branch_merge` calls `swap_coordinator_for_branch(target_branch)` // before invoking this function, so `self.active_branch()` // is the target. - let target_branch = self.active_branch().map(str::to_string); + let target_branch = active_branch_for_keys.clone(); let mut sidecar = crate::db::manifest::new_sidecar( crate::db::manifest::SidecarKind::BranchMerge, target_branch, @@ -1314,7 +1316,7 @@ impl Omnigraph { crate::failpoints::maybe_fail("branch_merge.post_phase_b_pre_manifest_commit")?; let manifest_version = if updates.is_empty() { - self.version() + self.version().await } else { self.commit_manifest_updates(&updates).await? }; diff --git a/crates/omnigraph/src/loader/mod.rs b/crates/omnigraph/src/loader/mod.rs index 3e971ca..a813508 100644 --- a/crates/omnigraph/src/loader/mod.rs +++ b/crates/omnigraph/src/loader/mod.rs @@ -59,14 +59,14 @@ pub enum LoadMode { /// Load JSONL data into an Omnigraph database. pub async fn load_jsonl(db: &mut Omnigraph, data: &str, mode: LoadMode) -> Result { - let current_branch = db.active_branch().map(str::to_string); + let current_branch = db.active_branch().await; let branch = current_branch.as_deref().unwrap_or("main"); db.load(branch, data, mode).await } /// Load JSONL data from a file path. pub async fn load_jsonl_file(db: &mut Omnigraph, path: &str, mode: LoadMode) -> Result { - let current_branch = db.active_branch().map(str::to_string); + let current_branch = db.active_branch().await; let branch = current_branch.as_deref().unwrap_or("main"); db.load_file(branch, path, mode).await } @@ -1830,7 +1830,7 @@ edge WorksAt: Person -> Company .unwrap(); // Read back via snapshot - let snap = db.snapshot(); + let snap = db.snapshot().await; let person_ds = snap.open("node:Person").await.unwrap(); assert_eq!(person_ds.count_rows(None).await.unwrap(), 2); @@ -1867,7 +1867,7 @@ edge WorksAt: Person -> Company .await .unwrap(); - let snap = db.snapshot(); + let snap = db.snapshot().await; let knows_ds = snap.open("edge:Knows").await.unwrap(); let batches: Vec = knows_ds @@ -1902,13 +1902,13 @@ edge WorksAt: Person -> Company let dir = tempfile::tempdir().unwrap(); let uri = dir.path().to_str().unwrap(); let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); - let v1 = db.version(); + let v1 = db.version().await; load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite) .await .unwrap(); - assert!(db.version() > v1); + assert!(db.version().await > v1); } #[tokio::test] @@ -1925,7 +1925,7 @@ edge WorksAt: Person -> Company .unwrap(); load_jsonl(&mut db, batch2, LoadMode::Append).await.unwrap(); - let snap = db.snapshot(); + let snap = db.snapshot().await; let person_ds = snap.open("node:Person").await.unwrap(); assert_eq!(person_ds.count_rows(None).await.unwrap(), 2); }