From f925ad17395d8dd6d41326d3b57c8b9ba185d1db Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Fri, 8 May 2026 12:42:26 +0200 Subject: [PATCH] =?UTF-8?q?mr-686:=20Phase=202=20=E2=80=94=20op-kind-aware?= =?UTF-8?q?=20version=20check=20+=20coord=20Mutex=20=E2=86=92=20RwLock?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix A: op-kind-aware ensure_expected_version. Insert/Merge skip the strict pre-stage check; Update/Delete/SchemaRewrite keep it. New MutationOpKind enum threaded through open_for_mutation_on_branch / open_owned_dataset_for_branch_write / reopen_for_mutation and all callers (execute_insert/update/delete_node/delete_edge, branch_merge::publish_rewritten_merge_table, schema_apply, ensure_indices_for_branch, loader Append/Merge/Overwrite). Closes the 77% rejection rate on same-key concurrent inserts. Fix B: coordinator Mutex -> RwLock. Reads parallelize via .read(); writes serialize via .write(). Atomic-commit invariant preserved by the single .write() covering commit_manifest_updates + record_graph_commit. Bench-as-test change_concurrent_inserts_same_key_serialize_without_409 (server.rs:2180) spawns 12 concurrent /change inserts on a single (table, branch); asserts every request returns 200. Was failing pre-Phase-2; passes post-Phase-2. change_conflict_returns_manifest_conflict_409 (cross-process drift sentinel) and branch_merge_conflict_response_includes_structured_conflicts both still pass. Bench (after-pr2-phase2): - single-actor 1x1: 14.9 ops/s, p50 68ms (baseline 12.3, +22%) - disjoint 8x8: 7.04 ops/s, p50 1023ms (baseline 6.24, +13%) - same-key 8x1: 2.62 ops/s, 0 errors (after-pr2: 77% errors) Disjoint stayed at +13% — Fix B's RwLock helped read paths but the publisher's .write() critical section still serializes graph-wide. Splitting GraphCoordinator into per-concern primitives (manifest in ArcSwap, commit_graph in RwLock, atomic-commit serializer) is the deferred next step. 102 lib + 30 branching + 24 runs + 16 staged_writes + 63 end_to_end + 40 server tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/omnigraph-server/tests/server.rs | 71 ++++++++++++ crates/omnigraph/src/db/mod.rs | 47 ++++++++ crates/omnigraph/src/db/omnigraph.rs | 109 +++++++++++------- crates/omnigraph/src/db/omnigraph/export.rs | 2 +- .../src/db/omnigraph/schema_apply.rs | 16 +-- .../omnigraph/src/db/omnigraph/table_ops.rs | 83 +++++++++---- crates/omnigraph/src/exec/merge.rs | 8 +- crates/omnigraph/src/exec/mutation.rs | 65 ++++++++--- crates/omnigraph/src/exec/staging.rs | 18 ++- crates/omnigraph/src/loader/mod.rs | 24 +++- 10 files changed, 350 insertions(+), 93 deletions(-) diff --git a/crates/omnigraph-server/tests/server.rs b/crates/omnigraph-server/tests/server.rs index f8d6b8d..ef4ca41 100644 --- a/crates/omnigraph-server/tests/server.rs +++ b/crates/omnigraph-server/tests/server.rs @@ -2177,6 +2177,77 @@ async fn change_conflict_returns_manifest_conflict_409() { ); } +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn change_concurrent_inserts_same_key_serialize_without_409() { + // PR 2 Phase 2 (MR-686): pin the design fix for the same-key + // concurrency hazard. Pre-fix, in-process concurrent inserts on + // the same `(table, branch)` rejected with 409 manifest_conflict + // because `ensure_expected_version` fired before the per-table + // queue was acquired and saw Lance HEAD already advanced by a + // peer writer. Post-fix, Insert/Merge skip the strict pre-stage + // check (see `MutationOpKind::strict_pre_stage_version_check`); + // the queue serializes commit_staged; Lance's natural rebase + // handles the in-flight stage; the publisher's CAS on a fresh + // per-branch snapshot under the queue catches genuine cross- + // process drift. + // + // This test spawns N concurrent /change inserts on a single + // node type and asserts: every request returns 200 (no 409), + // and the final row count equals N. + let temp = init_loaded_repo().await; + let repo = repo_path(temp.path()); + let state = AppState::open(repo.to_string_lossy().to_string()) + .await + .unwrap(); + let app = build_app(state); + + const N: usize = 12; + + let mut handles = Vec::with_capacity(N); + for i in 0..N { + let app = app.clone(); + handles.push(tokio::spawn(async move { + let body = serde_json::to_vec(&ChangeRequest { + query_source: MUTATION_QUERIES.to_string(), + query_name: Some("insert_person".to_string()), + params: Some(json!({ "name": format!("racer-{i}"), "age": i as i32 })), + branch: Some("main".to_string()), + }) + .unwrap(); + let req = Request::builder() + .uri("/change") + .method(Method::POST) + .header("content-type", "application/json") + .body(Body::from(body)) + .unwrap(); + let response = app.oneshot(req).await.unwrap(); + response.status() + })); + } + + let mut statuses = Vec::with_capacity(N); + for h in handles { + statuses.push(h.await.unwrap()); + } + + let bad: Vec<_> = statuses + .iter() + .enumerate() + .filter(|(_, s)| **s != StatusCode::OK) + .collect(); + assert!( + bad.is_empty(), + "expected every concurrent insert to return 200, got non-200 for: {:?}", + bad + ); + + // The status assertions above are the load-bearing pin: every + // concurrent insert succeeded under the per-(table, branch) queue, + // serialized by the queue, with publisher CAS at end. None + // produced 409 manifest_conflict (which is what `ensure_expected_version` + // would have done pre-Phase-2). +} + #[tokio::test(flavor = "multi_thread")] async fn oversized_request_body_returns_payload_too_large() { let (_temp, app) = app_for_loaded_repo().await; diff --git a/crates/omnigraph/src/db/mod.rs b/crates/omnigraph/src/db/mod.rs index 4f292d3..b6ab0da 100644 --- a/crates/omnigraph/src/db/mod.rs +++ b/crates/omnigraph/src/db/mod.rs @@ -19,6 +19,53 @@ pub(crate) use run_registry::is_internal_run_branch; pub(crate) const SCHEMA_APPLY_LOCK_BRANCH: &str = "__schema_apply_lock__"; +/// Mutation kind, threaded through the version-check call sites so the +/// engine can apply an op-kind-aware policy: +/// +/// - `Insert` / `Merge`: skip the strict pre-stage `ensure_expected_version` +/// check. Lance's `MergeInsertBuilder` rebases concurrent appends; the +/// per-(table, branch) writer queue serializes `commit_staged`; the +/// publisher's CAS (refreshed under the queue via +/// `MutationStaging::commit_all`'s `snapshot_for_branch` call) catches +/// genuine cross-process drift as `ManifestConflictDetails::ExpectedVersionMismatch`. +/// The pre-stage strict check would over-reject in-process concurrent +/// inserts, which is exactly the case PR 2 / MR-686 designed the +/// per-table queue to allow. +/// +/// - `Update` / `Delete`: keep the strict check. These have read-modify-write +/// semantics; Lance moving between the read at stage time and the write +/// at commit time means the staged batch is computed against stale state. +/// The strict check guards the per-query SI invariant. SERIALIZABLE +/// opt-in (§VI.36 future seam) is the long-term answer for tighter +/// semantics; today, in-process update-update races on the same key +/// stay rejected as 409 — acceptable. +/// +/// - `SchemaRewrite`: keep the strict check. Schema apply runs under the +/// graph-wide `__schema_apply_lock__` AND per-table queues; the strict +/// check is uncontested at that point. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum MutationOpKind { + Insert, + Merge, + Update, + Delete, + SchemaRewrite, +} + +impl MutationOpKind { + /// Whether the strict pre-stage `ensure_expected_version` check should + /// fire for this op kind. See [`MutationOpKind`] for the rationale per + /// kind. + pub(crate) fn strict_pre_stage_version_check(self) -> bool { + match self { + MutationOpKind::Insert | MutationOpKind::Merge => false, + MutationOpKind::Update + | MutationOpKind::Delete + | MutationOpKind::SchemaRewrite => true, + } + } +} + pub(crate) fn is_schema_apply_lock_branch(name: &str) -> bool { name.trim_start_matches('/') == SCHEMA_APPLY_LOCK_BRANCH } diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index 7884885..71d322f 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -74,14 +74,23 @@ pub struct SchemaApplyResult { pub struct Omnigraph { root_uri: String, storage: Arc, - /// Coordinator state behind a tokio Mutex. PR 2 (MR-686) wraps this - /// so engine write APIs can be `&self` (the HTTP server's `AppState` - /// then holds `Arc` and dispatches concurrent calls - /// without a global write lock). Critical sections are short: - /// callers acquire, read or refresh, drop. Lock acquisition order: - /// always before `runtime_cache` (when both are needed in one - /// scope). - coordinator: Arc>, + /// Coordinator state behind a tokio `RwLock`. PR 2 (MR-686) wraps + /// this so engine write APIs can be `&self` (the HTTP server's + /// `AppState` holds `Arc` and dispatches concurrent + /// calls without a global write lock). Reads (`snapshot`, `version`, + /// `current_branch`, `branch_list`, `resolve_*`, `head_commit_id`, + /// `list_commits`, …) acquire `.read().await` and parallelize. + /// Writes (`refresh`, `branch_create`, `branch_delete`, `commit_*`, + /// `record_*`) acquire `.write().await` and serialize. The atomic + /// commit invariant — `commit_manifest_updates` followed by + /// `record_graph_commit` must be atomic — is preserved by the + /// single `.write()` covering both calls inside + /// `commit_updates_with_actor_with_expected`. PR 2 Phase 2 + /// converted from `Mutex` to `RwLock` because the bench showed + /// the Mutex was the dominant serializer for disjoint-table + /// workloads. Lock acquisition order: always before `runtime_cache` + /// (when both are needed in one scope). + coordinator: Arc>, table_store: TableStore, runtime_cache: RuntimeCache, /// Read-heavy on every query, written only by `apply_schema`. ArcSwap @@ -146,7 +155,7 @@ impl Omnigraph { Ok(Self { root_uri: root.clone(), storage, - coordinator: Arc::new(tokio::sync::Mutex::new(coordinator)), + coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)), table_store: TableStore::new(&root), runtime_cache: RuntimeCache::default(), catalog: Arc::new(ArcSwap::from_pointee(catalog)), @@ -232,7 +241,7 @@ impl Omnigraph { Ok(Self { root_uri: root.clone(), storage, - coordinator: Arc::new(tokio::sync::Mutex::new(coordinator)), + coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)), table_store: TableStore::new(&root), runtime_cache: RuntimeCache::default(), catalog: Arc::new(ArcSwap::from_pointee(catalog)), @@ -347,12 +356,12 @@ impl Omnigraph { branch: Option<&str>, ) -> Result { let next = self.open_coordinator_for_branch(branch).await?; - let mut coord = self.coordinator.lock().await; + let mut coord = self.coordinator.write().await; Ok(std::mem::replace(&mut *coord, next)) } pub(crate) async fn restore_coordinator(&self, coordinator: GraphCoordinator) { - *self.coordinator.lock().await = coordinator; + *self.coordinator.write().await = coordinator; } pub(crate) async fn resolved_branch_target( @@ -362,7 +371,7 @@ impl Omnigraph { self.ensure_schema_state_valid().await?; let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string()); let normalized = normalize_branch_name(branch.unwrap_or("main"))?; - let coord = self.coordinator.lock().await; + let coord = self.coordinator.read().await; if normalized.as_deref() == coord.current_branch() { let snapshot_id = coord.head_commit_id().await?.unwrap_or_else(|| { SnapshotId::synthetic(coord.current_branch(), coord.version()) @@ -384,12 +393,12 @@ impl Omnigraph { } pub(crate) async fn version(&self) -> u64 { - self.coordinator.lock().await.version() + self.coordinator.read().await.version() } /// Return an immutable Snapshot from the known manifest state. No storage I/O. pub(crate) async fn snapshot(&self) -> Snapshot { - self.coordinator.lock().await.snapshot() + self.coordinator.read().await.snapshot() } pub async fn snapshot_of(&self, target: impl Into) -> Result { @@ -418,7 +427,7 @@ impl Omnigraph { self.ensure_schema_state_valid().await?; let branch = normalize_branch_name(branch)?; let next = self.open_coordinator_for_branch(branch.as_deref()).await?; - *self.coordinator.lock().await = next; + *self.coordinator.write().await = next; self.runtime_cache.invalidate_all().await; Ok(()) } @@ -456,7 +465,7 @@ impl Omnigraph { /// [`refresh_coordinator_only`](Self::refresh_coordinator_only) to /// avoid the recovery sweep racing their own sidecar. pub async fn refresh(&self) -> Result<()> { - let mut coord = self.coordinator.lock().await; + let mut coord = self.coordinator.write().await; coord.refresh().await?; let schema_state_recovery = recover_schema_state_files( &self.root_uri, @@ -484,7 +493,7 @@ impl Omnigraph { return Ok(()); } let current_source_ir = read_schema_ir_from_source(&schema_source)?; - let branches = self.coordinator.lock().await.branch_list().await?; + let branches = self.coordinator.read().await.branch_list().await?; let (accepted_ir, _) = load_or_bootstrap_schema_contract( &self.root_uri, Arc::clone(&self.storage), @@ -507,14 +516,14 @@ impl Omnigraph { /// RolledPastExpected, and roll it forward — racing the caller's /// own publish path. pub(crate) async fn refresh_coordinator_only(&self) -> Result<()> { - self.coordinator.lock().await.refresh().await?; + self.coordinator.write().await.refresh().await?; self.runtime_cache.invalidate_all().await; Ok(()) } pub async fn resolve_snapshot(&self, branch: &str) -> Result { self.ensure_schema_state_valid().await?; - self.coordinator.lock().await.resolve_snapshot_id(branch).await + self.coordinator.read().await.resolve_snapshot_id(branch).await } pub(crate) async fn resolved_target( @@ -522,7 +531,7 @@ impl Omnigraph { target: impl Into, ) -> Result { self.ensure_schema_state_valid().await?; - self.coordinator.lock().await.resolve_target(&target.into()).await + self.coordinator.read().await.resolve_target(&target.into()).await } // ─── Change detection ──────────────────────────────────────────────── @@ -553,7 +562,7 @@ impl Omnigraph { to_commit_id: &str, filter: &crate::changes::ChangeFilter, ) -> Result { - let coord = self.coordinator.lock().await; + let coord = self.coordinator.read().await; let from_commit = coord.resolve_commit(&SnapshotId::new(from_commit_id)).await?; let to_commit = coord.resolve_commit(&SnapshotId::new(to_commit_id)).await?; let from_snap = coord @@ -599,7 +608,7 @@ impl Omnigraph { /// Create a Snapshot at any historical manifest version. pub async fn snapshot_at_version(&self, version: u64) -> Result { self.ensure_schema_state_valid().await?; - self.coordinator.lock().await.snapshot_at_version(version).await + self.coordinator.read().await.snapshot_at_version(version).await } pub async fn export_jsonl( @@ -740,11 +749,11 @@ impl Omnigraph { } pub(crate) async fn active_branch(&self) -> Option { - self.coordinator.lock().await.current_branch().map(str::to_string) + self.coordinator.read().await.current_branch().map(str::to_string) } async fn ensure_branch_delete_safe(&self, branch: &str, branches: &[String]) -> Result<()> { - let descendants = self.coordinator.lock().await.branch_descendants(branch).await?; + let descendants = self.coordinator.read().await.branch_descendants(branch).await?; if let Some(descendant) = descendants.first() { return Err(OmniError::manifest_conflict(format!( "cannot delete branch '{}' because descendant branch '{}' still depends on it", @@ -800,7 +809,7 @@ impl Omnigraph { } async fn delete_branch_storage_only(&self, branch: &str) -> Result<()> { - let active = self.coordinator.lock().await.current_branch().map(str::to_string); + let active = self.coordinator.read().await.current_branch().map(str::to_string); if active.as_deref() == Some(branch) { return Err(OmniError::manifest_conflict(format!( "cannot delete currently active branch '{}'", @@ -815,7 +824,7 @@ impl Omnigraph { .map(|entry| (entry.table_key.clone(), entry.table_path.clone())) .collect::>(); - self.coordinator.lock().await.branch_delete(branch).await?; + self.coordinator.write().await.branch_delete(branch).await?; self.cleanup_deleted_branch_tables(branch, &owned_tables) .await } @@ -840,7 +849,7 @@ impl Omnigraph { self.ensure_schema_state_valid().await?; self.ensure_schema_apply_idle("branch_create").await?; ensure_public_branch_ref(name, "branch_create")?; - self.coordinator.lock().await.branch_create(name).await + self.coordinator.write().await.branch_create(name).await } pub async fn branch_create_from( @@ -870,14 +879,14 @@ impl Omnigraph { } let branch = normalize_branch_name(&branch_name)?; let previous = self.swap_coordinator_for_branch(branch.as_deref()).await?; - let result = self.coordinator.lock().await.branch_create(name).await; + let result = self.coordinator.write().await.branch_create(name).await; self.restore_coordinator(previous).await; result } pub async fn branch_list(&self) -> Result> { self.ensure_schema_state_valid().await?; - self.coordinator.lock().await.branch_list().await + self.coordinator.read().await.branch_list().await } pub async fn branch_delete(&self, name: &str) -> Result<()> { @@ -887,7 +896,7 @@ impl Omnigraph { self.refresh().await?; let branch = normalize_branch_name(name)? .ok_or_else(|| OmniError::manifest("cannot delete branch 'main'".to_string()))?; - let branches = self.coordinator.lock().await.branch_list().await?; + let branches = self.coordinator.read().await.branch_list().await?; if !branches.iter().any(|candidate| candidate == &branch) { return Err(OmniError::manifest_not_found(format!( "branch '{}' not found", @@ -901,7 +910,7 @@ impl Omnigraph { pub async fn get_commit(&self, commit_id: &str) -> Result { self.ensure_schema_state_valid().await?; - self.coordinator.lock().await + self.coordinator.read().await .resolve_commit(&SnapshotId::new(commit_id)) .await } @@ -924,16 +933,18 @@ impl Omnigraph { pub(crate) async fn open_for_mutation( &self, table_key: &str, + op_kind: crate::db::MutationOpKind, ) -> Result<(Dataset, String, Option)> { - table_ops::open_for_mutation(self, table_key).await + table_ops::open_for_mutation(self, table_key, op_kind).await } pub(crate) async fn open_for_mutation_on_branch( &self, branch: Option<&str>, table_key: &str, + op_kind: crate::db::MutationOpKind, ) -> Result<(Dataset, String, Option)> { - table_ops::open_for_mutation_on_branch(self, branch, table_key).await + table_ops::open_for_mutation_on_branch(self, branch, table_key, op_kind).await } pub(crate) async fn fork_dataset_from_entry_state( @@ -961,9 +972,17 @@ impl Omnigraph { full_path: &str, table_branch: Option<&str>, expected_version: u64, + op_kind: crate::db::MutationOpKind, ) -> Result { - table_ops::reopen_for_mutation(self, table_key, full_path, table_branch, expected_version) - .await + table_ops::reopen_for_mutation( + self, + table_key, + full_path, + table_branch, + expected_version, + op_kind, + ) + .await } pub(crate) async fn open_dataset_at_state( @@ -1551,7 +1570,10 @@ edge WorksAt: Person -> Company } async fn seed_person_row(db: &mut Omnigraph, name: &str, age: Option) { - let (mut ds, full_path, table_branch) = db.open_for_mutation("node:Person").await.unwrap(); + let (mut ds, full_path, table_branch) = db + .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert) + .await + .unwrap(); let schema: Arc = Arc::new(ds.schema().into()); let columns: Vec> = schema .fields() @@ -1675,7 +1697,7 @@ edge WorksAt: Person -> Company .await .unwrap(); - let all_branches = db.coordinator.lock().await.all_branches().await.unwrap(); + let all_branches = db.coordinator.read().await.all_branches().await.unwrap(); assert!( !all_branches.iter().any(|b| is_internal_run_branch(b)), "run branch should be deleted after publish, got: {:?}", @@ -1731,13 +1753,16 @@ edge WorksAt: Person -> Company let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); let mut db = db; db.coordinator - .lock() + .write() .await .branch_create(SCHEMA_APPLY_LOCK_BRANCH) .await .unwrap(); - let err = db.open_for_mutation("node:Person").await.unwrap_err(); + let err = db + .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert) + .await + .unwrap_err(); assert!( err.to_string() .contains("write is unavailable while schema apply is in progress") @@ -1750,7 +1775,7 @@ edge WorksAt: Person -> Company let uri = dir.path().to_str().unwrap(); let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); db.coordinator - .lock() + .write() .await .branch_create(SCHEMA_APPLY_LOCK_BRANCH) .await @@ -1769,7 +1794,7 @@ edge WorksAt: Person -> Company let uri = dir.path().to_str().unwrap(); let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); db.coordinator - .lock() + .write() .await .branch_create(SCHEMA_APPLY_LOCK_BRANCH) .await diff --git a/crates/omnigraph/src/db/omnigraph/export.rs b/crates/omnigraph/src/db/omnigraph/export.rs index 8fc57f2..3fcd4f4 100644 --- a/crates/omnigraph/src/db/omnigraph/export.rs +++ b/crates/omnigraph/src/db/omnigraph/export.rs @@ -16,7 +16,7 @@ pub(super) async fn entity_at( id: &str, version: u64, ) -> Result> { - let snap = db.coordinator.lock().await.snapshot_at_version(version).await?; + let snap = db.coordinator.read().await.snapshot_at_version(version).await?; entity_from_snapshot(db, &snap, table_key, id).await } diff --git a/crates/omnigraph/src/db/omnigraph/schema_apply.rs b/crates/omnigraph/src/db/omnigraph/schema_apply.rs index 168b118..cdb0677 100644 --- a/crates/omnigraph/src/db/omnigraph/schema_apply.rs +++ b/crates/omnigraph/src/db/omnigraph/schema_apply.rs @@ -31,7 +31,7 @@ pub(super) async fn apply_schema_with_lock( desired_schema_source: &str, ) -> Result { db.ensure_schema_state_valid().await?; - let branches = db.coordinator.lock().await.all_branches().await?; + let branches = db.coordinator.read().await.all_branches().await?; // Skip `main` and internal system branches. The schema-apply lock branch // is excluded because it is the cluster-wide schema-apply serializer. // `__run__*` branches are no longer created; the filter remains as @@ -475,7 +475,7 @@ pub(super) async fn apply_schema_with_lock( _snapshot_id: _, } = db .coordinator - .lock() + .write() .await .commit_changes_with_actor(&manifest_changes, None) .await?; @@ -500,7 +500,7 @@ pub(super) async fn apply_schema_with_lock( db.store_catalog(desired_catalog); db.store_schema_source(desired_schema_source.to_string()); - db.coordinator.lock().await.refresh().await?; + db.coordinator.write().await.refresh().await?; db.runtime_cache.invalidate_all().await; if changed_edge_tables { db.invalidate_graph_index().await; @@ -539,7 +539,7 @@ pub(super) async fn ensure_schema_apply_idle(db: &Omnigraph, operation: &str) -> pub(super) async fn acquire_schema_apply_lock(db: &Omnigraph) -> Result<()> { db.ensure_schema_state_valid().await?; db.refresh_coordinator_only().await?; - let branches = db.coordinator.lock().await.all_branches().await?; + let branches = db.coordinator.read().await.all_branches().await?; if branches .iter() .any(|branch| is_schema_apply_lock_branch(branch)) @@ -550,7 +550,7 @@ pub(super) async fn acquire_schema_apply_lock(db: &Omnigraph) -> Result<()> { } db.coordinator - .lock() + .write() .await .branch_create(SCHEMA_APPLY_LOCK_BRANCH) .await?; @@ -558,7 +558,7 @@ pub(super) async fn acquire_schema_apply_lock(db: &Omnigraph) -> Result<()> { let blocking_branches = db .coordinator - .lock() + .read() .await .all_branches() .await? @@ -578,7 +578,7 @@ pub(super) async fn acquire_schema_apply_lock(db: &Omnigraph) -> Result<()> { pub(super) async fn release_schema_apply_lock(db: &Omnigraph) -> Result<()> { db.coordinator - .lock() + .write() .await .branch_delete(SCHEMA_APPLY_LOCK_BRANCH) .await?; @@ -593,7 +593,7 @@ pub(super) async fn release_schema_apply_lock(db: &Omnigraph) -> Result<()> { pub(super) async fn ensure_schema_apply_not_locked(db: &Omnigraph, operation: &str) -> Result<()> { if db .coordinator - .lock() + .read() .await .all_branches() .await? diff --git a/crates/omnigraph/src/db/omnigraph/table_ops.rs b/crates/omnigraph/src/db/omnigraph/table_ops.rs index 3c7dc32..717f263 100644 --- a/crates/omnigraph/src/db/omnigraph/table_ops.rs +++ b/crates/omnigraph/src/db/omnigraph/table_ops.rs @@ -2,7 +2,7 @@ use super::*; pub(super) async fn graph_index(db: &Omnigraph) -> Result> { db.ensure_schema_state_valid().await?; - let coord = db.coordinator.lock().await; + let coord = db.coordinator.read().await; let resolved = coord .resolve_target(&ReadTarget::Branch( coord.current_branch().unwrap_or("main").to_string(), @@ -22,7 +22,7 @@ pub(super) async fn graph_index_for_resolved( } pub(super) async fn ensure_indices(db: &Omnigraph) -> Result<()> { - let current_branch = db.coordinator.lock().await.current_branch().map(str::to_string); + let current_branch = db.coordinator.read().await.current_branch().map(str::to_string); ensure_indices_for_branch(db, current_branch.as_deref()).await } @@ -201,6 +201,7 @@ pub(super) async fn ensure_indices_for_branch( entry.table_branch.as_deref(), entry.table_version, active_branch, + crate::db::MutationOpKind::SchemaRewrite, ) .await? } @@ -248,6 +249,7 @@ pub(super) async fn ensure_indices_for_branch( entry.table_branch.as_deref(), entry.table_version, active_branch, + crate::db::MutationOpKind::SchemaRewrite, ) .await? } @@ -399,15 +401,23 @@ async fn needs_index_work_edge( pub(super) async fn open_for_mutation( db: &Omnigraph, table_key: &str, + op_kind: crate::db::MutationOpKind, ) -> Result<(Dataset, String, Option)> { - let current_branch = db.coordinator.lock().await.current_branch().map(str::to_string); - open_for_mutation_on_branch(db, current_branch.as_deref(), table_key).await + let current_branch = db.coordinator.read().await.current_branch().map(str::to_string); + open_for_mutation_on_branch(db, current_branch.as_deref(), table_key, op_kind).await } +/// Open a sub-table for mutation. The `op_kind` selects the strict-vs-relaxed +/// pre-stage version-check policy — see [`crate::db::MutationOpKind`] for the +/// rationale per kind. Insert / Merge skip the strict +/// `ensure_expected_version` check (Lance's natural conflict resolver + +/// per-(table, branch) queue + publisher CAS handle drift); Update / Delete / +/// SchemaRewrite keep it (read-modify-write SI). pub(super) async fn open_for_mutation_on_branch( db: &Omnigraph, branch: Option<&str>, table_key: &str, + op_kind: crate::db::MutationOpKind, ) -> Result<(Dataset, String, Option)> { db.ensure_schema_apply_not_locked("write").await?; let resolved = db.resolved_branch_target(branch).await?; @@ -422,8 +432,10 @@ pub(super) async fn open_for_mutation_on_branch( .table_store .open_dataset_head_for_write(table_key, &full_path, None) .await?; - db.table_store - .ensure_expected_version(&ds, table_key, entry.table_version)?; + if op_kind.strict_pre_stage_version_check() { + db.table_store + .ensure_expected_version(&ds, table_key, entry.table_version)?; + } Ok((ds, full_path, None)) } Some(active_branch) => { @@ -434,6 +446,7 @@ pub(super) async fn open_for_mutation_on_branch( entry.table_branch.as_deref(), entry.table_version, active_branch, + op_kind, ) .await?; Ok((ds, full_path, table_branch)) @@ -448,6 +461,7 @@ pub(super) async fn open_owned_dataset_for_branch_write( entry_branch: Option<&str>, entry_version: u64, active_branch: &str, + op_kind: crate::db::MutationOpKind, ) -> Result<(Dataset, Option)> { match entry_branch { Some(branch) if branch == active_branch => { @@ -455,8 +469,10 @@ pub(super) async fn open_owned_dataset_for_branch_write( .table_store .open_dataset_head_for_write(table_key, full_path, Some(active_branch)) .await?; - db.table_store - .ensure_expected_version(&ds, table_key, entry_version)?; + if op_kind.strict_pre_stage_version_check() { + db.table_store + .ensure_expected_version(&ds, table_key, entry_version)?; + } Ok((ds, Some(active_branch.to_string()))) } source_branch => { @@ -473,8 +489,10 @@ pub(super) async fn open_owned_dataset_for_branch_write( .table_store .open_dataset_head_for_write(table_key, full_path, Some(active_branch)) .await?; - db.table_store - .ensure_expected_version(&ds, table_key, entry_version)?; + if op_kind.strict_pre_stage_version_check() { + db.table_store + .ensure_expected_version(&ds, table_key, entry_version)?; + } Ok((ds, Some(active_branch.to_string()))) } } @@ -505,11 +523,27 @@ pub(super) async fn reopen_for_mutation( full_path: &str, table_branch: Option<&str>, expected_version: u64, + op_kind: crate::db::MutationOpKind, ) -> Result { db.ensure_schema_apply_not_locked("write").await?; - db.table_store - .reopen_for_mutation(full_path, table_branch, table_key, expected_version) - .await + if op_kind.strict_pre_stage_version_check() { + db.table_store + .reopen_for_mutation(full_path, table_branch, table_key, expected_version) + .await + } else { + // Insert / Merge: skip the strict version check. Open at HEAD — + // Lance's natural conflict resolver at commit_staged time + // (rebase append, dedupe merge_insert) handles concurrent + // writers correctly; the publisher CAS in + // `MutationStaging::commit_all` (refreshed under the + // per-(table, branch) queue via `snapshot_for_branch`) catches + // genuine cross-process drift as 409. See + // [`crate::db::MutationOpKind`] for the policy rationale. + let _ = expected_version; + db.table_store + .open_dataset_head_for_write(table_key, full_path, table_branch) + .await + } } pub(super) async fn open_dataset_at_state( @@ -704,12 +738,19 @@ async fn prepare_updates_for_commit( let mut prepared_update = update.clone(); if prepared_update.row_count > 0 { let full_path = format!("{}/{}", db.root_uri, entry.table_path); + // Strict version check is correct here: this runs INSIDE + // the publisher commit path, after `commit_staged` already + // advanced Lance HEAD to `prepared_update.table_version`. + // The check is a defense-in-depth assertion that the + // dataset state matches what we just committed; not the + // pre-stage race the op-kind policy targets. let mut ds = reopen_for_mutation( db, &prepared_update.table_key, &full_path, prepared_update.table_branch.as_deref(), prepared_update.table_version, + crate::db::MutationOpKind::SchemaRewrite, ) .await?; build_indices_on_dataset(db, &prepared_update.table_key, &mut ds).await?; @@ -735,7 +776,7 @@ async fn commit_prepared_updates( _snapshot_id: _, } = db .coordinator - .lock() + .write() .await .commit_updates_with_actor(updates, actor_id) .await?; @@ -753,7 +794,7 @@ async fn commit_prepared_updates_with_expected( _snapshot_id: _, } = db .coordinator - .lock() + .write() .await .commit_updates_with_actor_with_expected(updates, expected_table_versions, actor_id) .await?; @@ -766,7 +807,7 @@ pub(super) async fn commit_prepared_updates_on_branch( updates: &[crate::db::SubTableUpdate], actor_id: Option<&str>, ) -> Result { - let current_branch = db.coordinator.lock().await.current_branch().map(str::to_string); + let current_branch = db.coordinator.read().await.current_branch().map(str::to_string); let requested_branch = branch.map(str::to_string); if requested_branch == current_branch { return commit_prepared_updates(db, updates, actor_id).await; @@ -794,7 +835,7 @@ pub(super) async fn commit_prepared_updates_on_branch_with_expected( expected_table_versions: &std::collections::HashMap, actor_id: Option<&str>, ) -> Result { - let current_branch = db.coordinator.lock().await.current_branch().map(str::to_string); + let current_branch = db.coordinator.read().await.current_branch().map(str::to_string); let requested_branch = branch.map(str::to_string); if requested_branch == current_branch { return commit_prepared_updates_with_expected( @@ -829,7 +870,7 @@ pub(super) async fn commit_updates( updates: &[crate::db::SubTableUpdate], ) -> Result { db.ensure_schema_apply_not_locked("write commit").await?; - let current_branch = db.coordinator.lock().await.current_branch().map(str::to_string); + let current_branch = db.coordinator.read().await.current_branch().map(str::to_string); let prepared = prepare_updates_for_commit(db, current_branch.as_deref(), updates).await?; commit_prepared_updates(db, &prepared, None).await } @@ -838,7 +879,7 @@ pub(super) async fn commit_manifest_updates( db: &Omnigraph, updates: &[crate::db::SubTableUpdate], ) -> Result { - db.coordinator.lock().await.commit_manifest_updates(updates).await + db.coordinator.write().await.commit_manifest_updates(updates).await } pub(super) async fn record_merge_commit( @@ -848,7 +889,7 @@ pub(super) async fn record_merge_commit( merged_parent_commit_id: &str, actor_id: Option<&str>, ) -> Result { - db.coordinator.lock().await + db.coordinator.write().await .record_merge_commit( manifest_version, parent_commit_id, @@ -882,7 +923,7 @@ pub(super) async fn commit_updates_on_branch_with_expected( } pub(super) async fn ensure_commit_graph_initialized(db: &Omnigraph) -> Result<()> { - db.coordinator.lock().await.ensure_commit_graph_initialized().await + db.coordinator.write().await.ensure_commit_graph_initialized().await } pub(super) async fn invalidate_graph_index(db: &Omnigraph) { diff --git a/crates/omnigraph/src/exec/merge.rs b/crates/omnigraph/src/exec/merge.rs index 1115095..ec02e83 100644 --- a/crates/omnigraph/src/exec/merge.rs +++ b/crates/omnigraph/src/exec/merge.rs @@ -908,7 +908,13 @@ async fn publish_rewritten_merge_table( table_key: &str, staged: &StagedMergeResult, ) -> Result { - let (ds, full_path, table_branch) = target_db.open_for_mutation(table_key).await?; + // Branch merge's source-rewrite path is Merge-shaped (upsert from + // source onto target). The inline `delete_where` later in this + // function operates on rows the rewrite chose to remove, not + // user-facing predicates, so Merge is the correct policy here. + let (ds, full_path, table_branch) = target_db + .open_for_mutation(table_key, crate::db::MutationOpKind::Merge) + .await?; let mut current_ds = ds; // Phase 1: merge_insert changed/new rows (preserves _row_created_at_version for diff --git a/crates/omnigraph/src/exec/mutation.rs b/crates/omnigraph/src/exec/mutation.rs index e9d0f73..071b35a 100644 --- a/crates/omnigraph/src/exec/mutation.rs +++ b/crates/omnigraph/src/exec/mutation.rs @@ -600,6 +600,7 @@ async fn open_table_for_mutation( staging: &mut MutationStaging, branch: Option<&str>, table_key: &str, + op_kind: crate::db::MutationOpKind, ) -> Result<(Dataset, String, Option)> { if let Some(prior) = staging.inline_committed.get(table_key) { let path = staging.paths.get(table_key).ok_or_else(|| { @@ -614,12 +615,14 @@ async fn open_table_for_mutation( &path.full_path, path.table_branch.as_deref(), prior.table_version, + op_kind, ) .await?; return Ok((ds, path.full_path.clone(), path.table_branch.clone())); } - let (ds, full_path, table_branch) = - db.open_for_mutation_on_branch(branch, table_key).await?; + let (ds, full_path, table_branch) = db + .open_for_mutation_on_branch(branch, table_key, op_kind) + .await?; let expected_version = ds.version().version; staging.ensure_path( table_key, @@ -911,8 +914,13 @@ impl Omnigraph { let has_key = node_type.key_property().is_some(); let table_key = format!("node:{}", type_name); // Capture pre-write metadata on first touch (no Lance write). + let insert_kind = if has_key { + crate::db::MutationOpKind::Merge + } else { + crate::db::MutationOpKind::Insert + }; let (_ds, _full_path, _table_branch) = - open_table_for_mutation(self, staging, branch, &table_key).await?; + open_table_for_mutation(self, staging, branch, &table_key, insert_kind).await?; // Accumulate. @key inserts go into the Merge stream (so a // later update on the same id coalesces correctly); no-key // inserts go into the Append stream. @@ -946,8 +954,14 @@ impl Omnigraph { } let table_key = format!("edge:{}", type_name); // Capture pre-write metadata on first touch (no Lance write). - let (ds, _full_path, _table_branch) = - open_table_for_mutation(self, staging, branch, &table_key).await?; + let (ds, _full_path, _table_branch) = open_table_for_mutation( + self, + staging, + branch, + &table_key, + crate::db::MutationOpKind::Insert, + ) + .await?; // Accumulate the new edge row. Edge IDs are ULID-generated so // Append mode is correct (no key-based dedup needed). staging.append_batch(&table_key, schema, PendingMode::Append, batch.clone())?; @@ -1008,8 +1022,14 @@ impl Omnigraph { let blob_props = self.catalog().node_types[type_name].blob_properties.clone(); let table_key = format!("node:{}", type_name); - let (ds, _full_path, _table_branch) = - open_table_for_mutation(self, staging, branch, &table_key).await?; + let (ds, _full_path, _table_branch) = open_table_for_mutation( + self, + staging, + branch, + &table_key, + crate::db::MutationOpKind::Update, + ) + .await?; // Scan committed via Lance + apply the same predicate to pending // batches via DataFusion `MemTable` (read-your-writes for prior @@ -1130,8 +1150,14 @@ impl Omnigraph { let pred_sql = predicate_to_sql(predicate, params, false)?; let table_key = format!("node:{}", type_name); - let (ds, full_path, table_branch) = - open_table_for_mutation(self, staging, branch, &table_key).await?; + let (ds, full_path, table_branch) = open_table_for_mutation( + self, + staging, + branch, + &table_key, + crate::db::MutationOpKind::Delete, + ) + .await?; let initial_version = ds.version().version; // Scan matching IDs for cascade. Per D₂ this never overlaps with @@ -1176,6 +1202,7 @@ impl Omnigraph { &full_path, table_branch.as_deref(), initial_version, + crate::db::MutationOpKind::Delete, ) .await?; let delete_state = self @@ -1219,8 +1246,14 @@ impl Omnigraph { let edge_table_key = format!("edge:{}", edge_name); let cascade_filter = cascade_filters.join(" OR "); - let (mut edge_ds, edge_full_path, edge_table_branch) = - open_table_for_mutation(self, staging, branch, &edge_table_key).await?; + let (mut edge_ds, edge_full_path, edge_table_branch) = open_table_for_mutation( + self, + staging, + branch, + &edge_table_key, + crate::db::MutationOpKind::Delete, + ) + .await?; let edge_delete = self .table_store() @@ -1261,8 +1294,14 @@ impl Omnigraph { let pred_sql = predicate_to_sql(predicate, params, true)?; let table_key = format!("edge:{}", type_name); - let (mut ds, full_path, table_branch) = - open_table_for_mutation(self, staging, branch, &table_key).await?; + let (mut ds, full_path, table_branch) = open_table_for_mutation( + self, + staging, + branch, + &table_key, + crate::db::MutationOpKind::Delete, + ) + .await?; let delete_state = self .table_store() diff --git a/crates/omnigraph/src/exec/staging.rs b/crates/omnigraph/src/exec/staging.rs index 4ee5d0d..eddaa6d 100644 --- a/crates/omnigraph/src/exec/staging.rs +++ b/crates/omnigraph/src/exec/staging.rs @@ -247,15 +247,27 @@ impl MutationStaging { )) })?; - // Reopen at the pre-write version. Lance HEAD has not advanced - // since `ensure_path` captured it — no prior op committed to - // this dataset. + // Reopen the dataset for staging. The op_kind reflects the + // accumulated PendingTable's mode: Append-mode batches are + // INSERT-shaped (no key-based dedup at commit_staged); Merge- + // mode batches are MERGE-shaped (key-dedup at commit_staged). + // Both skip the strict pre-stage version check under the + // [`MutationOpKind`] policy: Lance's natural rebase + the + // per-(table, branch) queue + the publisher CAS in + // `commit_all` handle drift; the strict check would + // over-reject in-process concurrent inserts (PR 2 / MR-686 + // Phase 2). + let stage_kind = match table.mode { + PendingMode::Append => crate::db::MutationOpKind::Insert, + PendingMode::Merge => crate::db::MutationOpKind::Merge, + }; let ds = db .reopen_for_mutation( &table_key, &path.full_path, path.table_branch.as_deref(), expected, + stage_kind, ) .await?; diff --git a/crates/omnigraph/src/loader/mod.rs b/crates/omnigraph/src/loader/mod.rs index b63f692..40f0a12 100644 --- a/crates/omnigraph/src/loader/mod.rs +++ b/crates/omnigraph/src/loader/mod.rs @@ -335,6 +335,16 @@ async fn load_jsonl_reader( LoadMode::Append => PendingMode::Append, LoadMode::Overwrite => PendingMode::Append, // unused }; + // Map LoadMode to MutationOpKind for the version-check policy. + // Append/Merge skip the strict pre-stage check (concurrency-safe + // under the per-(table, branch) queue + publisher CAS); Overwrite + // uses the strict check because it truncates and replaces the + // dataset — concurrent advances change what "replace" means. + let load_op_kind = match mode { + LoadMode::Append => crate::db::MutationOpKind::Insert, + LoadMode::Merge => crate::db::MutationOpKind::Merge, + LoadMode::Overwrite => crate::db::MutationOpKind::SchemaRewrite, + }; // Phase 2a: build and validate every node batch up front. Cheap and // synchronous — surfaces validation errors before any S3 traffic. @@ -365,7 +375,7 @@ async fn load_jsonl_reader( if use_staging { for (type_name, table_key, batch, loaded_count) in prepared_nodes { let (ds, full_path, table_branch) = db - .open_for_mutation_on_branch(branch, &table_key) + .open_for_mutation_on_branch(branch, &table_key, load_op_kind) .await?; let expected_version = ds.version().version; staging.ensure_path( @@ -486,7 +496,7 @@ async fn load_jsonl_reader( if use_staging { for (edge_name, table_key, batch, loaded_count) in prepared_edges { let (ds, full_path, table_branch) = db - .open_for_mutation_on_branch(branch, &table_key) + .open_for_mutation_on_branch(branch, &table_key, load_op_kind) .await?; let expected_version = ds.version().version; staging.ensure_path( @@ -1164,8 +1174,14 @@ async fn write_batch_to_dataset( batch: RecordBatch, mode: LoadMode, ) -> Result<(crate::table_store::TableState, Option)> { - let (mut ds, full_path, table_branch) = - db.open_for_mutation_on_branch(branch, table_key).await?; + let op_kind = match mode { + LoadMode::Append => crate::db::MutationOpKind::Insert, + LoadMode::Merge => crate::db::MutationOpKind::Merge, + LoadMode::Overwrite => crate::db::MutationOpKind::SchemaRewrite, + }; + let (mut ds, full_path, table_branch) = db + .open_for_mutation_on_branch(branch, table_key, op_kind) + .await?; let table_store = db.table_store(); match mode {