engine: wrap coordinator in tokio Mutex (PR 2 Step B continued)

Wraps the GraphCoordinator field in `Arc<tokio::sync::Mutex<...>>` so
engine APIs can move from `&mut self` to `&self` without giving up the
coordinator's mutating refresh path. Lock acquisition order: always
before runtime_cache (when both are needed in one scope). Critical
sections stay short — load+clone for snapshot/version/current_branch,
single-method delegations elsewhere.

Public API changes:
- `Omnigraph::version()` and `Omnigraph::snapshot()` (pub(crate))
  become async; callers add `.await`.
- `Omnigraph::active_branch()` returns `Option<String>` (cloned) instead
  of `Option<&str>` borrowed from the coordinator. Callers either
  `.await` the result + use `.as_deref()`, or hoist into a binding.

`&self`-converted methods this round (tied to the coordinator wrap, not
the Step C surface refactor):
- `swap_coordinator_for_branch`
- `restore_coordinator` (now async; was sync)
- `sync_branch`
- `refresh`
- `refresh_coordinator_only`
- `reload_schema_if_source_changed`
- `branch_create`, `branch_create_from`, `branch_delete`, `branch_list`
- `delete_branch_storage_only`
- `ensure_branch_delete_safe`
- `ensure_schema_apply_idle`
- `ensure_schema_apply_idle` helper in schema_apply.rs (matches signature)

Caller updates: branch_create_from_impl threads `restore_coordinator`'s
new async signature; schema_apply, table_ops, exec/merge wrap every
direct `db.coordinator.X()` in `db.coordinator.lock().await.X()`;
exec/merge hoists `active_branch_for_keys` once outside the per-table
closure that builds queue keys + sidecar pins.

All 102 lib tests + 30 branching + 24 runs + 10 lifecycle + 16
staged_writes + 63 end_to_end pass workspace-wide. Zero test
regressions; the only behavior change is on the `Omnigraph` API
surface (sync -> async on the three accessors above).

Step C (engine API conversion: apply_schema, mutate_as, ingest_as,
branch_merge_as &mut self -> &self) follows in a subsequent commit.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-07 16:38:48 +02:00
parent fcb47620d3
commit 011f9b9610
No known key found for this signature in database
6 changed files with 130 additions and 107 deletions

View file

@ -74,7 +74,14 @@ pub struct SchemaApplyResult {
pub struct Omnigraph {
root_uri: String,
storage: Arc<dyn StorageAdapter>,
coordinator: GraphCoordinator,
/// Coordinator state behind a tokio Mutex. PR 2 (MR-686) wraps this
/// so engine write APIs can be `&self` (the HTTP server's `AppState`
/// then holds `Arc<Omnigraph>` and dispatches concurrent calls
/// without a global write lock). Critical sections are short:
/// callers acquire, read or refresh, drop. Lock acquisition order:
/// always before `runtime_cache` (when both are needed in one
/// scope).
coordinator: Arc<tokio::sync::Mutex<GraphCoordinator>>,
table_store: TableStore,
runtime_cache: RuntimeCache,
/// Read-heavy on every query, written only by `apply_schema`. ArcSwap
@ -139,7 +146,7 @@ impl Omnigraph {
Ok(Self {
root_uri: root.clone(),
storage,
coordinator,
coordinator: Arc::new(tokio::sync::Mutex::new(coordinator)),
table_store: TableStore::new(&root),
runtime_cache: RuntimeCache::default(),
catalog: Arc::new(ArcSwap::from_pointee(catalog)),
@ -225,7 +232,7 @@ impl Omnigraph {
Ok(Self {
root_uri: root.clone(),
storage,
coordinator,
coordinator: Arc::new(tokio::sync::Mutex::new(coordinator)),
table_store: TableStore::new(&root),
runtime_cache: RuntimeCache::default(),
catalog: Arc::new(ArcSwap::from_pointee(catalog)),
@ -275,7 +282,7 @@ impl Omnigraph {
schema_apply::apply_schema(self, desired_schema_source).await
}
pub(crate) async fn ensure_schema_apply_idle(&mut self, operation: &str) -> Result<()> {
pub(crate) async fn ensure_schema_apply_idle(&self, operation: &str) -> Result<()> {
schema_apply::ensure_schema_apply_idle(self, operation).await
}
@ -336,15 +343,16 @@ impl Omnigraph {
}
pub(crate) async fn swap_coordinator_for_branch(
&mut self,
&self,
branch: Option<&str>,
) -> Result<GraphCoordinator> {
let next = self.open_coordinator_for_branch(branch).await?;
Ok(std::mem::replace(&mut self.coordinator, next))
let mut coord = self.coordinator.lock().await;
Ok(std::mem::replace(&mut *coord, next))
}
pub(crate) fn restore_coordinator(&mut self, coordinator: GraphCoordinator) {
self.coordinator = coordinator;
pub(crate) async fn restore_coordinator(&self, coordinator: GraphCoordinator) {
*self.coordinator.lock().await = coordinator;
}
pub(crate) async fn resolved_branch_target(
@ -354,21 +362,19 @@ impl Omnigraph {
self.ensure_schema_state_valid().await?;
let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string());
let normalized = normalize_branch_name(branch.unwrap_or("main"))?;
if normalized.as_deref() == self.coordinator.current_branch() {
let snapshot_id = self.coordinator.head_commit_id().await?.unwrap_or_else(|| {
SnapshotId::synthetic(
self.coordinator.current_branch(),
self.coordinator.version(),
)
let coord = self.coordinator.lock().await;
if normalized.as_deref() == coord.current_branch() {
let snapshot_id = coord.head_commit_id().await?.unwrap_or_else(|| {
SnapshotId::synthetic(coord.current_branch(), coord.version())
});
return Ok(ResolvedTarget {
requested,
branch: self.coordinator.current_branch().map(str::to_string),
branch: coord.current_branch().map(str::to_string),
snapshot_id,
snapshot: self.coordinator.snapshot(),
snapshot: coord.snapshot(),
});
}
self.coordinator.resolve_target(&requested).await
coord.resolve_target(&requested).await
}
pub(crate) async fn snapshot_for_branch(&self, branch: Option<&str>) -> Result<Snapshot> {
@ -377,13 +383,13 @@ impl Omnigraph {
.map(|resolved| resolved.snapshot)
}
pub(crate) fn version(&self) -> u64 {
self.coordinator.version()
pub(crate) async fn version(&self) -> u64 {
self.coordinator.lock().await.version()
}
/// Return an immutable Snapshot from the known manifest state. No storage I/O.
pub(crate) fn snapshot(&self) -> Snapshot {
self.coordinator.snapshot()
pub(crate) async fn snapshot(&self) -> Snapshot {
self.coordinator.lock().await.snapshot()
}
pub async fn snapshot_of(&self, target: impl Into<ReadTarget>) -> Result<Snapshot> {
@ -408,10 +414,11 @@ impl Omnigraph {
}
/// Synchronize this handle's write base to the latest head of the named branch.
pub async fn sync_branch(&mut self, branch: &str) -> Result<()> {
pub async fn sync_branch(&self, branch: &str) -> Result<()> {
self.ensure_schema_state_valid().await?;
let branch = normalize_branch_name(branch)?;
self.coordinator = self.open_coordinator_for_branch(branch.as_deref()).await?;
let next = self.open_coordinator_for_branch(branch.as_deref()).await?;
*self.coordinator.lock().await = next;
self.runtime_cache.invalidate_all().await;
Ok(())
}
@ -448,18 +455,19 @@ impl Omnigraph {
/// (e.g. `schema_apply` mid-write) MUST use
/// [`refresh_coordinator_only`](Self::refresh_coordinator_only) to
/// avoid the recovery sweep racing their own sidecar.
pub async fn refresh(&mut self) -> Result<()> {
self.coordinator.refresh().await?;
pub async fn refresh(&self) -> Result<()> {
let mut coord = self.coordinator.lock().await;
coord.refresh().await?;
let schema_state_recovery = recover_schema_state_files(
&self.root_uri,
Arc::clone(&self.storage),
&self.coordinator.snapshot(),
&coord.snapshot(),
)
.await?;
crate::db::manifest::recover_manifest_drift(
&self.root_uri,
Arc::clone(&self.storage),
&mut self.coordinator,
&mut *coord,
crate::db::manifest::RecoveryMode::RollForwardOnly,
schema_state_recovery,
)
@ -469,14 +477,14 @@ impl Omnigraph {
Ok(())
}
async fn reload_schema_if_source_changed(&mut self) -> Result<()> {
async fn reload_schema_if_source_changed(&self) -> Result<()> {
let schema_path = schema_source_uri(&self.root_uri);
let schema_source = self.storage.read_text(&schema_path).await?;
if schema_source == *self.schema_source.load_full() {
return Ok(());
}
let current_source_ir = read_schema_ir_from_source(&schema_source)?;
let branches = self.coordinator.branch_list().await?;
let branches = self.coordinator.lock().await.branch_list().await?;
let (accepted_ir, _) = load_or_bootstrap_schema_contract(
&self.root_uri,
Arc::clone(&self.storage),
@ -498,15 +506,15 @@ impl Omnigraph {
/// here would observe the caller's own sidecar, classify it as
/// RolledPastExpected, and roll it forward — racing the caller's
/// own publish path.
pub(crate) async fn refresh_coordinator_only(&mut self) -> Result<()> {
self.coordinator.refresh().await?;
pub(crate) async fn refresh_coordinator_only(&self) -> Result<()> {
self.coordinator.lock().await.refresh().await?;
self.runtime_cache.invalidate_all().await;
Ok(())
}
pub async fn resolve_snapshot(&self, branch: &str) -> Result<SnapshotId> {
self.ensure_schema_state_valid().await?;
self.coordinator.resolve_snapshot_id(branch).await
self.coordinator.lock().await.resolve_snapshot_id(branch).await
}
pub(crate) async fn resolved_target(
@ -514,7 +522,7 @@ impl Omnigraph {
target: impl Into<ReadTarget>,
) -> Result<ResolvedTarget> {
self.ensure_schema_state_valid().await?;
self.coordinator.resolve_target(&target.into()).await
self.coordinator.lock().await.resolve_target(&target.into()).await
}
// ─── Change detection ────────────────────────────────────────────────
@ -545,26 +553,20 @@ impl Omnigraph {
to_commit_id: &str,
filter: &crate::changes::ChangeFilter,
) -> Result<crate::changes::ChangeSet> {
let from_commit = self
.coordinator
.resolve_commit(&SnapshotId::new(from_commit_id))
.await?;
let to_commit = self
.coordinator
.resolve_commit(&SnapshotId::new(to_commit_id))
.await?;
let from_snap = self
.coordinator
let coord = self.coordinator.lock().await;
let from_commit = coord.resolve_commit(&SnapshotId::new(from_commit_id)).await?;
let to_commit = coord.resolve_commit(&SnapshotId::new(to_commit_id)).await?;
let from_snap = coord
.resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
from_commit.graph_commit_id.clone(),
)))
.await?;
let to_snap = self
.coordinator
let to_snap = coord
.resolve_target(&ReadTarget::Snapshot(SnapshotId::new(
to_commit.graph_commit_id.clone(),
)))
.await?;
drop(coord);
crate::changes::diff_snapshots(
self.uri(),
&from_snap.snapshot,
@ -597,7 +599,7 @@ impl Omnigraph {
/// Create a Snapshot at any historical manifest version.
pub async fn snapshot_at_version(&self, version: u64) -> Result<Snapshot> {
self.ensure_schema_state_valid().await?;
self.coordinator.snapshot_at_version(version).await
self.coordinator.lock().await.snapshot_at_version(version).await
}
pub async fn export_jsonl(
@ -709,7 +711,7 @@ impl Omnigraph {
)));
}
let snapshot = self.snapshot();
let snapshot = self.snapshot().await;
let table_key = format!("node:{}", type_name);
let ds = snapshot.open(&table_key).await?;
@ -737,12 +739,12 @@ impl Omnigraph {
})
}
pub(crate) fn active_branch(&self) -> Option<&str> {
self.coordinator.current_branch()
pub(crate) async fn active_branch(&self) -> Option<String> {
self.coordinator.lock().await.current_branch().map(str::to_string)
}
async fn ensure_branch_delete_safe(&self, branch: &str, branches: &[String]) -> Result<()> {
let descendants = self.coordinator.branch_descendants(branch).await?;
let descendants = self.coordinator.lock().await.branch_descendants(branch).await?;
if let Some(descendant) = descendants.first() {
return Err(OmniError::manifest_conflict(format!(
"cannot delete branch '{}' because descendant branch '{}' still depends on it",
@ -797,8 +799,9 @@ impl Omnigraph {
Ok(())
}
async fn delete_branch_storage_only(&mut self, branch: &str) -> Result<()> {
if self.coordinator.current_branch() == Some(branch) {
async fn delete_branch_storage_only(&self, branch: &str) -> Result<()> {
let active = self.coordinator.lock().await.current_branch().map(str::to_string);
if active.as_deref() == Some(branch) {
return Err(OmniError::manifest_conflict(format!(
"cannot delete currently active branch '{}'",
branch
@ -812,7 +815,7 @@ impl Omnigraph {
.map(|entry| (entry.table_key.clone(), entry.table_path.clone()))
.collect::<Vec<_>>();
self.coordinator.branch_delete(branch).await?;
self.coordinator.lock().await.branch_delete(branch).await?;
self.cleanup_deleted_branch_tables(branch, &owned_tables)
.await
}
@ -833,15 +836,15 @@ impl Omnigraph {
.map(|id| id.map(|snapshot_id| snapshot_id.as_str().to_string()))
}
pub async fn branch_create(&mut self, name: &str) -> Result<()> {
pub async fn branch_create(&self, name: &str) -> Result<()> {
self.ensure_schema_state_valid().await?;
self.ensure_schema_apply_idle("branch_create").await?;
ensure_public_branch_ref(name, "branch_create")?;
self.coordinator.branch_create(name).await
self.coordinator.lock().await.branch_create(name).await
}
pub async fn branch_create_from(
&mut self,
&self,
from: impl Into<ReadTarget>,
name: &str,
) -> Result<()> {
@ -850,7 +853,7 @@ impl Omnigraph {
}
async fn branch_create_from_impl(
&mut self,
&self,
from: impl Into<ReadTarget>,
name: &str,
allow_internal_refs: bool,
@ -867,24 +870,24 @@ impl Omnigraph {
}
let branch = normalize_branch_name(&branch_name)?;
let previous = self.swap_coordinator_for_branch(branch.as_deref()).await?;
let result = self.coordinator.branch_create(name).await;
self.restore_coordinator(previous);
let result = self.coordinator.lock().await.branch_create(name).await;
self.restore_coordinator(previous).await;
result
}
pub async fn branch_list(&self) -> Result<Vec<String>> {
self.ensure_schema_state_valid().await?;
self.coordinator.branch_list().await
self.coordinator.lock().await.branch_list().await
}
pub async fn branch_delete(&mut self, name: &str) -> Result<()> {
pub async fn branch_delete(&self, name: &str) -> Result<()> {
self.ensure_schema_state_valid().await?;
self.ensure_schema_apply_idle("branch_delete").await?;
ensure_public_branch_ref(name, "branch_delete")?;
self.refresh().await?;
let branch = normalize_branch_name(name)?
.ok_or_else(|| OmniError::manifest("cannot delete branch 'main'".to_string()))?;
let branches = self.coordinator.branch_list().await?;
let branches = self.coordinator.lock().await.branch_list().await?;
if !branches.iter().any(|candidate| candidate == &branch) {
return Err(OmniError::manifest_not_found(format!(
"branch '{}' not found",
@ -898,7 +901,7 @@ impl Omnigraph {
pub async fn get_commit(&self, commit_id: &str) -> Result<GraphCommit> {
self.ensure_schema_state_valid().await?;
self.coordinator
self.coordinator.lock().await
.resolve_commit(&SnapshotId::new(commit_id))
.await
}
@ -1534,7 +1537,7 @@ edge WorksAt: Person -> Company
}
async fn table_rows_json(db: &Omnigraph, table_key: &str) -> Vec<Value> {
let snapshot = db.snapshot();
let snapshot = db.snapshot().await;
let ds = snapshot.open(table_key).await.unwrap();
let batches = db.table_store().scan_batches(&ds).await.unwrap();
batches
@ -1631,7 +1634,7 @@ edge WorksAt: Person -> Company
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
seed_person_row(&mut db, "Alice", Some(30)).await;
let before_version = db.snapshot().version();
let before_version = db.snapshot().await.version();
let desired = TEST_SCHEMA
.replace("node Person {\n", "node Human @rename_from(\"Person\") {\n")
@ -1642,7 +1645,7 @@ edge WorksAt: Person -> Company
);
db.apply_schema(&desired).await.unwrap();
let head = db.snapshot();
let head = db.snapshot().await;
assert!(head.entry("node:Person").is_none());
assert!(head.entry("node:Human").is_some());
let historical = ManifestCoordinator::snapshot_at(uri, None, before_version)
@ -1672,7 +1675,7 @@ edge WorksAt: Person -> Company
.await
.unwrap();
let all_branches = db.coordinator.all_branches().await.unwrap();
let all_branches = db.coordinator.lock().await.all_branches().await.unwrap();
assert!(
!all_branches.iter().any(|b| is_internal_run_branch(b)),
"run branch should be deleted after publish, got: {:?}",
@ -1696,7 +1699,7 @@ edge WorksAt: Person -> Company
let desired = TEST_SCHEMA.replace("name: String @key", "name: String @key @index");
db.apply_schema(&desired).await.unwrap();
let snapshot = db.snapshot();
let snapshot = db.snapshot().await;
let ds = snapshot.open("node:Person").await.unwrap();
assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
}
@ -1715,7 +1718,7 @@ edge WorksAt: Person -> Company
);
db.apply_schema(&desired).await.unwrap();
let snapshot = db.snapshot();
let snapshot = db.snapshot().await;
let ds = snapshot.open("node:Person").await.unwrap();
assert!(db.table_store().has_btree_index(&ds, "id").await.unwrap());
assert!(db.table_store().has_fts_index(&ds, "name").await.unwrap());
@ -1728,6 +1731,8 @@ edge WorksAt: Person -> Company
let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
let mut db = db;
db.coordinator
.lock()
.await
.branch_create(SCHEMA_APPLY_LOCK_BRANCH)
.await
.unwrap();
@ -1745,6 +1750,8 @@ edge WorksAt: Person -> Company
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
db.coordinator
.lock()
.await
.branch_create(SCHEMA_APPLY_LOCK_BRANCH)
.await
.unwrap();
@ -1762,6 +1769,8 @@ edge WorksAt: Person -> Company
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
db.coordinator
.lock()
.await
.branch_create(SCHEMA_APPLY_LOCK_BRANCH)
.await
.unwrap();

View file

@ -16,7 +16,7 @@ pub(super) async fn entity_at(
id: &str,
version: u64,
) -> Result<Option<serde_json::Value>> {
let snap = db.coordinator.snapshot_at_version(version).await?;
let snap = db.coordinator.lock().await.snapshot_at_version(version).await?;
entity_from_snapshot(db, &snap, table_key, id).await
}

View file

@ -31,7 +31,7 @@ pub(super) async fn apply_schema_with_lock(
desired_schema_source: &str,
) -> Result<SchemaApplyResult> {
db.ensure_schema_state_valid().await?;
let branches = db.coordinator.all_branches().await?;
let branches = db.coordinator.lock().await.all_branches().await?;
// Skip `main` and internal system branches. The schema-apply lock branch
// is excluded because it is the cluster-wide schema-apply serializer.
// `__run__*` branches are no longer created; the filter remains as
@ -67,7 +67,7 @@ pub(super) async fn apply_schema_with_lock(
return Ok(SchemaApplyResult {
supported: true,
applied: false,
manifest_version: db.version(),
manifest_version: db.version().await,
steps: plan.steps,
});
}
@ -75,7 +75,7 @@ pub(super) async fn apply_schema_with_lock(
let mut desired_catalog = build_catalog_from_ir(&desired_ir)?;
fixup_blob_schemas(&mut desired_catalog);
let snapshot = db.snapshot();
let snapshot = db.snapshot().await;
let base_manifest_version = snapshot.version();
let mut added_tables = BTreeSet::new();
let mut renamed_tables = HashMap::new();
@ -443,11 +443,11 @@ pub(super) async fn apply_schema_with_lock(
}
db.refresh_coordinator_only().await?;
if db.version() != base_manifest_version {
if db.version().await != base_manifest_version {
return Err(OmniError::manifest_conflict(format!(
"schema apply lost its write lease: main advanced from v{} to v{} while schema apply was in progress",
base_manifest_version,
db.version()
db.version().await
)));
}
@ -475,6 +475,8 @@ pub(super) async fn apply_schema_with_lock(
_snapshot_id: _,
} = db
.coordinator
.lock()
.await
.commit_changes_with_actor(&manifest_changes, None)
.await?;
@ -498,7 +500,7 @@ pub(super) async fn apply_schema_with_lock(
db.store_catalog(desired_catalog);
db.store_schema_source(desired_schema_source.to_string());
db.coordinator.refresh().await?;
db.coordinator.lock().await.refresh().await?;
db.runtime_cache.invalidate_all().await;
if changed_edge_tables {
db.invalidate_graph_index().await;
@ -529,7 +531,7 @@ pub(super) async fn apply_schema_with_lock(
})
}
pub(super) async fn ensure_schema_apply_idle(db: &mut Omnigraph, operation: &str) -> Result<()> {
pub(super) async fn ensure_schema_apply_idle(db: &Omnigraph, operation: &str) -> Result<()> {
db.refresh_coordinator_only().await?;
ensure_schema_apply_not_locked(db, operation).await
}
@ -537,7 +539,7 @@ pub(super) async fn ensure_schema_apply_idle(db: &mut Omnigraph, operation: &str
pub(super) async fn acquire_schema_apply_lock(db: &mut Omnigraph) -> Result<()> {
db.ensure_schema_state_valid().await?;
db.refresh_coordinator_only().await?;
let branches = db.coordinator.all_branches().await?;
let branches = db.coordinator.lock().await.all_branches().await?;
if branches
.iter()
.any(|branch| is_schema_apply_lock_branch(branch))
@ -548,12 +550,16 @@ pub(super) async fn acquire_schema_apply_lock(db: &mut Omnigraph) -> Result<()>
}
db.coordinator
.lock()
.await
.branch_create(SCHEMA_APPLY_LOCK_BRANCH)
.await?;
db.refresh_coordinator_only().await?;
let blocking_branches = db
.coordinator
.lock()
.await
.all_branches()
.await?
.into_iter()
@ -572,6 +578,8 @@ pub(super) async fn acquire_schema_apply_lock(db: &mut Omnigraph) -> Result<()>
pub(super) async fn release_schema_apply_lock(db: &mut Omnigraph) -> Result<()> {
db.coordinator
.lock()
.await
.branch_delete(SCHEMA_APPLY_LOCK_BRANCH)
.await?;
// Use refresh_coordinator_only — the full Omnigraph::refresh would
@ -585,6 +593,8 @@ pub(super) async fn release_schema_apply_lock(db: &mut Omnigraph) -> Result<()>
pub(super) async fn ensure_schema_apply_not_locked(db: &Omnigraph, operation: &str) -> Result<()> {
if db
.coordinator
.lock()
.await
.all_branches()
.await?
.iter()

View file

@ -2,15 +2,13 @@ use super::*;
pub(super) async fn graph_index(db: &Omnigraph) -> Result<Arc<crate::graph_index::GraphIndex>> {
db.ensure_schema_state_valid().await?;
let resolved = db
.coordinator
let coord = db.coordinator.lock().await;
let resolved = coord
.resolve_target(&ReadTarget::Branch(
db.coordinator
.current_branch()
.unwrap_or("main")
.to_string(),
coord.current_branch().unwrap_or("main").to_string(),
))
.await?;
drop(coord);
let catalog = db.catalog();
db.runtime_cache.graph_index(&resolved, &catalog).await
}
@ -24,7 +22,7 @@ pub(super) async fn graph_index_for_resolved(
}
pub(super) async fn ensure_indices(db: &mut Omnigraph) -> Result<()> {
let current_branch = db.coordinator.current_branch().map(str::to_string);
let current_branch = db.coordinator.lock().await.current_branch().map(str::to_string);
ensure_indices_for_branch(db, current_branch.as_deref()).await
}
@ -402,7 +400,7 @@ pub(super) async fn open_for_mutation(
db: &Omnigraph,
table_key: &str,
) -> Result<(Dataset, String, Option<String>)> {
let current_branch = db.coordinator.current_branch().map(str::to_string);
let current_branch = db.coordinator.lock().await.current_branch().map(str::to_string);
open_for_mutation_on_branch(db, current_branch.as_deref(), table_key).await
}
@ -737,6 +735,8 @@ async fn commit_prepared_updates(
_snapshot_id: _,
} = db
.coordinator
.lock()
.await
.commit_updates_with_actor(updates, actor_id)
.await?;
Ok(manifest_version)
@ -753,6 +753,8 @@ async fn commit_prepared_updates_with_expected(
_snapshot_id: _,
} = db
.coordinator
.lock()
.await
.commit_updates_with_actor_with_expected(updates, expected_table_versions, actor_id)
.await?;
Ok(manifest_version)
@ -764,7 +766,7 @@ pub(super) async fn commit_prepared_updates_on_branch(
updates: &[crate::db::SubTableUpdate],
actor_id: Option<&str>,
) -> Result<u64> {
let current_branch = db.coordinator.current_branch().map(str::to_string);
let current_branch = db.coordinator.lock().await.current_branch().map(str::to_string);
let requested_branch = branch.map(str::to_string);
if requested_branch == current_branch {
return commit_prepared_updates(db, updates, actor_id).await;
@ -792,7 +794,7 @@ pub(super) async fn commit_prepared_updates_on_branch_with_expected(
expected_table_versions: &std::collections::HashMap<String, u64>,
actor_id: Option<&str>,
) -> Result<u64> {
let current_branch = db.coordinator.current_branch().map(str::to_string);
let current_branch = db.coordinator.lock().await.current_branch().map(str::to_string);
let requested_branch = branch.map(str::to_string);
if requested_branch == current_branch {
return commit_prepared_updates_with_expected(
@ -827,7 +829,7 @@ pub(super) async fn commit_updates(
updates: &[crate::db::SubTableUpdate],
) -> Result<u64> {
db.ensure_schema_apply_not_locked("write commit").await?;
let current_branch = db.coordinator.current_branch().map(str::to_string);
let current_branch = db.coordinator.lock().await.current_branch().map(str::to_string);
let prepared = prepare_updates_for_commit(db, current_branch.as_deref(), updates).await?;
commit_prepared_updates(db, &prepared, None).await
}
@ -836,7 +838,7 @@ pub(super) async fn commit_manifest_updates(
db: &mut Omnigraph,
updates: &[crate::db::SubTableUpdate],
) -> Result<u64> {
db.coordinator.commit_manifest_updates(updates).await
db.coordinator.lock().await.commit_manifest_updates(updates).await
}
pub(super) async fn record_merge_commit(
@ -846,7 +848,7 @@ pub(super) async fn record_merge_commit(
merged_parent_commit_id: &str,
actor_id: Option<&str>,
) -> Result<String> {
db.coordinator
db.coordinator.lock().await
.record_merge_commit(
manifest_version,
parent_commit_id,
@ -880,7 +882,7 @@ pub(super) async fn commit_updates_on_branch_with_expected(
}
pub(super) async fn ensure_commit_graph_initialized(db: &mut Omnigraph) -> Result<()> {
db.coordinator.ensure_commit_graph_initialized().await
db.coordinator.lock().await.ensure_commit_graph_initialized().await
}
pub(super) async fn invalidate_graph_index(db: &Omnigraph) {

View file

@ -817,8 +817,9 @@ async fn publish_adopted_source_state(
.ok_or_else(|| OmniError::manifest(format!("missing source entry for {}", table_key)))?;
let target_entry = target_snapshot.entry(table_key);
let target_active = target_db.active_branch().await;
match (
target_db.active_branch(),
target_active.as_deref(),
source_entry.table_branch.as_deref(),
) {
// Both on main — pointer switch is safe (same lineage, version columns valid)
@ -1076,7 +1077,7 @@ impl Omnigraph {
))
.await?
.snapshot;
let previous_branch = self.active_branch().map(str::to_string);
let previous_branch = self.active_branch().await;
let previous = self
.swap_coordinator_for_branch(target_branch.as_deref())
.await?;
@ -1090,7 +1091,7 @@ impl Omnigraph {
actor_id,
)
.await;
self.restore_coordinator(previous);
self.restore_coordinator(previous).await;
if merge_result.is_ok() && previous_branch == target_branch {
self.refresh().await?;
@ -1109,7 +1110,7 @@ impl Omnigraph {
actor_id: Option<&str>,
) -> Result<MergeOutcome> {
self.ensure_commit_graph_initialized().await?;
let target_snapshot = self.snapshot();
let target_snapshot = self.snapshot().await;
let mut table_keys = HashSet::new();
for entry in base_snapshot.entries() {
@ -1203,6 +1204,7 @@ impl Omnigraph {
// commit + record_merge_commit calls below. Under PR 1b's
// intermediate state (global server RwLock still in place),
// this acquisition is uncontended.
let active_branch_for_keys = self.active_branch().await;
let merge_queue_keys: Vec<(String, Option<String>)> = ordered_table_keys
.iter()
.filter(|table_key| {
@ -1211,7 +1213,7 @@ impl Omnigraph {
Some(CandidateTableState::RewriteMerged(_)) | Some(CandidateTableState::AdoptSourceState)
)
})
.map(|table_key| (table_key.clone(), self.active_branch().map(str::to_string)))
.map(|table_key| (table_key.clone(), active_branch_for_keys.clone()))
.collect();
let _merge_queue_guards = self.write_queue().acquire_many(&merge_queue_keys).await;
@ -1240,7 +1242,7 @@ impl Omnigraph {
// the orphaned post-Phase-B HEAD on the target ref.
// Same rationale as table_ops.rs:115-120 in
// ensure_indices_for_branch.
table_branch: self.active_branch().map(str::to_string),
table_branch: active_branch_for_keys.clone(),
})
})
.collect();
@ -1256,7 +1258,7 @@ impl Omnigraph {
// `branch_merge` calls `swap_coordinator_for_branch(target_branch)`
// before invoking this function, so `self.active_branch()`
// is the target.
let target_branch = self.active_branch().map(str::to_string);
let target_branch = active_branch_for_keys.clone();
let mut sidecar = crate::db::manifest::new_sidecar(
crate::db::manifest::SidecarKind::BranchMerge,
target_branch,
@ -1314,7 +1316,7 @@ impl Omnigraph {
crate::failpoints::maybe_fail("branch_merge.post_phase_b_pre_manifest_commit")?;
let manifest_version = if updates.is_empty() {
self.version()
self.version().await
} else {
self.commit_manifest_updates(&updates).await?
};

View file

@ -59,14 +59,14 @@ pub enum LoadMode {
/// Load JSONL data into an Omnigraph database.
pub async fn load_jsonl(db: &mut Omnigraph, data: &str, mode: LoadMode) -> Result<LoadResult> {
let current_branch = db.active_branch().map(str::to_string);
let current_branch = db.active_branch().await;
let branch = current_branch.as_deref().unwrap_or("main");
db.load(branch, data, mode).await
}
/// Load JSONL data from a file path.
pub async fn load_jsonl_file(db: &mut Omnigraph, path: &str, mode: LoadMode) -> Result<LoadResult> {
let current_branch = db.active_branch().map(str::to_string);
let current_branch = db.active_branch().await;
let branch = current_branch.as_deref().unwrap_or("main");
db.load_file(branch, path, mode).await
}
@ -1830,7 +1830,7 @@ edge WorksAt: Person -> Company
.unwrap();
// Read back via snapshot
let snap = db.snapshot();
let snap = db.snapshot().await;
let person_ds = snap.open("node:Person").await.unwrap();
assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
@ -1867,7 +1867,7 @@ edge WorksAt: Person -> Company
.await
.unwrap();
let snap = db.snapshot();
let snap = db.snapshot().await;
let knows_ds = snap.open("edge:Knows").await.unwrap();
let batches: Vec<RecordBatch> = knows_ds
@ -1902,13 +1902,13 @@ edge WorksAt: Person -> Company
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
let v1 = db.version();
let v1 = db.version().await;
load_jsonl(&mut db, TEST_DATA, LoadMode::Overwrite)
.await
.unwrap();
assert!(db.version() > v1);
assert!(db.version().await > v1);
}
#[tokio::test]
@ -1925,7 +1925,7 @@ edge WorksAt: Person -> Company
.unwrap();
load_jsonl(&mut db, batch2, LoadMode::Append).await.unwrap();
let snap = db.snapshot();
let snap = db.snapshot().await;
let person_ds = snap.open("node:Person").await.unwrap();
assert_eq!(person_ds.count_rows(None).await.unwrap(), 2);
}