mr-686: Phase 2 — op-kind-aware version check + coord Mutex → RwLock

Fix A: op-kind-aware ensure_expected_version. Insert/Merge skip the
strict pre-stage check; Update/Delete/SchemaRewrite keep it. New
MutationOpKind enum threaded through open_for_mutation_on_branch /
open_owned_dataset_for_branch_write / reopen_for_mutation and all
callers (execute_insert/update/delete_node/delete_edge,
branch_merge::publish_rewritten_merge_table, schema_apply,
ensure_indices_for_branch, loader Append/Merge/Overwrite). Closes the
77% rejection rate on same-key concurrent inserts.

Fix B: coordinator Mutex -> RwLock. Reads parallelize via .read();
writes serialize via .write(). Atomic-commit invariant preserved by
the single .write() covering commit_manifest_updates +
record_graph_commit.

Bench-as-test change_concurrent_inserts_same_key_serialize_without_409
(server.rs:2180) spawns 12 concurrent /change inserts on a single
(table, branch); asserts every request returns 200. Was failing
pre-Phase-2; passes post-Phase-2.
change_conflict_returns_manifest_conflict_409 (cross-process drift
sentinel) and branch_merge_conflict_response_includes_structured_conflicts
both still pass.

Bench (after-pr2-phase2):
- single-actor 1x1: 14.9 ops/s, p50 68ms (baseline 12.3, +22%)
- disjoint 8x8:    7.04 ops/s, p50 1023ms (baseline 6.24, +13%)
- same-key 8x1:    2.62 ops/s, 0 errors (after-pr2: 77% errors)

Disjoint stayed at +13% — Fix B's RwLock helped read paths but the
publisher's .write() critical section still serializes graph-wide.
Splitting GraphCoordinator into per-concern primitives (manifest in
ArcSwap, commit_graph in RwLock, atomic-commit serializer) is the
deferred next step.

102 lib + 30 branching + 24 runs + 16 staged_writes + 63 end_to_end
+ 40 server tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-08 12:42:26 +02:00
parent b93a130b40
commit f925ad1739
No known key found for this signature in database
10 changed files with 350 additions and 93 deletions

View file

@ -2177,6 +2177,77 @@ async fn change_conflict_returns_manifest_conflict_409() {
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn change_concurrent_inserts_same_key_serialize_without_409() {
// PR 2 Phase 2 (MR-686): pin the design fix for the same-key
// concurrency hazard. Pre-fix, in-process concurrent inserts on
// the same `(table, branch)` rejected with 409 manifest_conflict
// because `ensure_expected_version` fired before the per-table
// queue was acquired and saw Lance HEAD already advanced by a
// peer writer. Post-fix, Insert/Merge skip the strict pre-stage
// check (see `MutationOpKind::strict_pre_stage_version_check`);
// the queue serializes commit_staged; Lance's natural rebase
// handles the in-flight stage; the publisher's CAS on a fresh
// per-branch snapshot under the queue catches genuine cross-
// process drift.
//
// This test spawns N concurrent /change inserts on a single
// node type and asserts: every request returns 200 (no 409),
// and the final row count equals N.
let temp = init_loaded_repo().await;
let repo = repo_path(temp.path());
let state = AppState::open(repo.to_string_lossy().to_string())
.await
.unwrap();
let app = build_app(state);
const N: usize = 12;
let mut handles = Vec::with_capacity(N);
for i in 0..N {
let app = app.clone();
handles.push(tokio::spawn(async move {
let body = serde_json::to_vec(&ChangeRequest {
query_source: MUTATION_QUERIES.to_string(),
query_name: Some("insert_person".to_string()),
params: Some(json!({ "name": format!("racer-{i}"), "age": i as i32 })),
branch: Some("main".to_string()),
})
.unwrap();
let req = Request::builder()
.uri("/change")
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap();
let response = app.oneshot(req).await.unwrap();
response.status()
}));
}
let mut statuses = Vec::with_capacity(N);
for h in handles {
statuses.push(h.await.unwrap());
}
let bad: Vec<_> = statuses
.iter()
.enumerate()
.filter(|(_, s)| **s != StatusCode::OK)
.collect();
assert!(
bad.is_empty(),
"expected every concurrent insert to return 200, got non-200 for: {:?}",
bad
);
// The status assertions above are the load-bearing pin: every
// concurrent insert succeeded under the per-(table, branch) queue,
// serialized by the queue, with publisher CAS at end. None
// produced 409 manifest_conflict (which is what `ensure_expected_version`
// would have done pre-Phase-2).
}
#[tokio::test(flavor = "multi_thread")]
async fn oversized_request_body_returns_payload_too_large() {
let (_temp, app) = app_for_loaded_repo().await;

View file

@ -19,6 +19,53 @@ pub(crate) use run_registry::is_internal_run_branch;
pub(crate) const SCHEMA_APPLY_LOCK_BRANCH: &str = "__schema_apply_lock__";
/// Mutation kind, threaded through the version-check call sites so the
/// engine can apply an op-kind-aware policy:
///
/// - `Insert` / `Merge`: skip the strict pre-stage `ensure_expected_version`
/// check. Lance's `MergeInsertBuilder` rebases concurrent appends; the
/// per-(table, branch) writer queue serializes `commit_staged`; the
/// publisher's CAS (refreshed under the queue via
/// `MutationStaging::commit_all`'s `snapshot_for_branch` call) catches
/// genuine cross-process drift as `ManifestConflictDetails::ExpectedVersionMismatch`.
/// The pre-stage strict check would over-reject in-process concurrent
/// inserts, which is exactly the case PR 2 / MR-686 designed the
/// per-table queue to allow.
///
/// - `Update` / `Delete`: keep the strict check. These have read-modify-write
/// semantics; Lance moving between the read at stage time and the write
/// at commit time means the staged batch is computed against stale state.
/// The strict check guards the per-query SI invariant. SERIALIZABLE
/// opt-in (§VI.36 future seam) is the long-term answer for tighter
/// semantics; today, in-process update-update races on the same key
/// stay rejected as 409 — acceptable.
///
/// - `SchemaRewrite`: keep the strict check. Schema apply runs under the
/// graph-wide `__schema_apply_lock__` AND per-table queues; the strict
/// check is uncontested at that point.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum MutationOpKind {
Insert,
Merge,
Update,
Delete,
SchemaRewrite,
}
impl MutationOpKind {
/// Whether the strict pre-stage `ensure_expected_version` check should
/// fire for this op kind. See [`MutationOpKind`] for the rationale per
/// kind.
pub(crate) fn strict_pre_stage_version_check(self) -> bool {
match self {
MutationOpKind::Insert | MutationOpKind::Merge => false,
MutationOpKind::Update
| MutationOpKind::Delete
| MutationOpKind::SchemaRewrite => true,
}
}
}
pub(crate) fn is_schema_apply_lock_branch(name: &str) -> bool {
name.trim_start_matches('/') == SCHEMA_APPLY_LOCK_BRANCH
}

View file

@ -74,14 +74,23 @@ pub struct SchemaApplyResult {
pub struct Omnigraph {
root_uri: String,
storage: Arc<dyn StorageAdapter>,
/// Coordinator state behind a tokio Mutex. PR 2 (MR-686) wraps this
/// so engine write APIs can be `&self` (the HTTP server's `AppState`
/// then holds `Arc<Omnigraph>` and dispatches concurrent calls
/// without a global write lock). Critical sections are short:
/// callers acquire, read or refresh, drop. Lock acquisition order:
/// always before `runtime_cache` (when both are needed in one
/// scope).
coordinator: Arc<tokio::sync::Mutex<GraphCoordinator>>,
/// Coordinator state behind a tokio `RwLock`. PR 2 (MR-686) wraps
/// this so engine write APIs can be `&self` (the HTTP server's
/// `AppState` holds `Arc<Omnigraph>` and dispatches concurrent
/// calls without a global write lock). Reads (`snapshot`, `version`,
/// `current_branch`, `branch_list`, `resolve_*`, `head_commit_id`,
/// `list_commits`, …) acquire `.read().await` and parallelize.
/// Writes (`refresh`, `branch_create`, `branch_delete`, `commit_*`,
/// `record_*`) acquire `.write().await` and serialize. The atomic
/// commit invariant — `commit_manifest_updates` followed by
/// `record_graph_commit` must be atomic — is preserved by the
/// single `.write()` covering both calls inside
/// `commit_updates_with_actor_with_expected`. PR 2 Phase 2
/// converted from `Mutex` to `RwLock` because the bench showed
/// the Mutex was the dominant serializer for disjoint-table
/// workloads. Lock acquisition order: always before `runtime_cache`
/// (when both are needed in one scope).
coordinator: Arc<tokio::sync::RwLock<GraphCoordinator>>,
table_store: TableStore,
runtime_cache: RuntimeCache,
/// Read-heavy on every query, written only by `apply_schema`. ArcSwap
@ -146,7 +155,7 @@ impl Omnigraph {
Ok(Self {
root_uri: root.clone(),
storage,
coordinator: Arc::new(tokio::sync::Mutex::new(coordinator)),
coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
table_store: TableStore::new(&root),
runtime_cache: RuntimeCache::default(),
catalog: Arc::new(ArcSwap::from_pointee(catalog)),
@ -232,7 +241,7 @@ impl Omnigraph {
Ok(Self {
root_uri: root.clone(),
storage,
coordinator: Arc::new(tokio::sync::Mutex::new(coordinator)),
coordinator: Arc::new(tokio::sync::RwLock::new(coordinator)),
table_store: TableStore::new(&root),
runtime_cache: RuntimeCache::default(),
catalog: Arc::new(ArcSwap::from_pointee(catalog)),
@ -347,12 +356,12 @@ impl Omnigraph {
branch: Option<&str>,
) -> Result<GraphCoordinator> {
let next = self.open_coordinator_for_branch(branch).await?;
let mut coord = self.coordinator.lock().await;
let mut coord = self.coordinator.write().await;
Ok(std::mem::replace(&mut *coord, next))
}
pub(crate) async fn restore_coordinator(&self, coordinator: GraphCoordinator) {
*self.coordinator.lock().await = coordinator;
*self.coordinator.write().await = coordinator;
}
pub(crate) async fn resolved_branch_target(
@ -362,7 +371,7 @@ impl Omnigraph {
self.ensure_schema_state_valid().await?;
let requested = ReadTarget::Branch(branch.unwrap_or("main").to_string());
let normalized = normalize_branch_name(branch.unwrap_or("main"))?;
let coord = self.coordinator.lock().await;
let coord = self.coordinator.read().await;
if normalized.as_deref() == coord.current_branch() {
let snapshot_id = coord.head_commit_id().await?.unwrap_or_else(|| {
SnapshotId::synthetic(coord.current_branch(), coord.version())
@ -384,12 +393,12 @@ impl Omnigraph {
}
pub(crate) async fn version(&self) -> u64 {
self.coordinator.lock().await.version()
self.coordinator.read().await.version()
}
/// Return an immutable Snapshot from the known manifest state. No storage I/O.
pub(crate) async fn snapshot(&self) -> Snapshot {
self.coordinator.lock().await.snapshot()
self.coordinator.read().await.snapshot()
}
pub async fn snapshot_of(&self, target: impl Into<ReadTarget>) -> Result<Snapshot> {
@ -418,7 +427,7 @@ impl Omnigraph {
self.ensure_schema_state_valid().await?;
let branch = normalize_branch_name(branch)?;
let next = self.open_coordinator_for_branch(branch.as_deref()).await?;
*self.coordinator.lock().await = next;
*self.coordinator.write().await = next;
self.runtime_cache.invalidate_all().await;
Ok(())
}
@ -456,7 +465,7 @@ impl Omnigraph {
/// [`refresh_coordinator_only`](Self::refresh_coordinator_only) to
/// avoid the recovery sweep racing their own sidecar.
pub async fn refresh(&self) -> Result<()> {
let mut coord = self.coordinator.lock().await;
let mut coord = self.coordinator.write().await;
coord.refresh().await?;
let schema_state_recovery = recover_schema_state_files(
&self.root_uri,
@ -484,7 +493,7 @@ impl Omnigraph {
return Ok(());
}
let current_source_ir = read_schema_ir_from_source(&schema_source)?;
let branches = self.coordinator.lock().await.branch_list().await?;
let branches = self.coordinator.read().await.branch_list().await?;
let (accepted_ir, _) = load_or_bootstrap_schema_contract(
&self.root_uri,
Arc::clone(&self.storage),
@ -507,14 +516,14 @@ impl Omnigraph {
/// RolledPastExpected, and roll it forward — racing the caller's
/// own publish path.
pub(crate) async fn refresh_coordinator_only(&self) -> Result<()> {
self.coordinator.lock().await.refresh().await?;
self.coordinator.write().await.refresh().await?;
self.runtime_cache.invalidate_all().await;
Ok(())
}
pub async fn resolve_snapshot(&self, branch: &str) -> Result<SnapshotId> {
self.ensure_schema_state_valid().await?;
self.coordinator.lock().await.resolve_snapshot_id(branch).await
self.coordinator.read().await.resolve_snapshot_id(branch).await
}
pub(crate) async fn resolved_target(
@ -522,7 +531,7 @@ impl Omnigraph {
target: impl Into<ReadTarget>,
) -> Result<ResolvedTarget> {
self.ensure_schema_state_valid().await?;
self.coordinator.lock().await.resolve_target(&target.into()).await
self.coordinator.read().await.resolve_target(&target.into()).await
}
// ─── Change detection ────────────────────────────────────────────────
@ -553,7 +562,7 @@ impl Omnigraph {
to_commit_id: &str,
filter: &crate::changes::ChangeFilter,
) -> Result<crate::changes::ChangeSet> {
let coord = self.coordinator.lock().await;
let coord = self.coordinator.read().await;
let from_commit = coord.resolve_commit(&SnapshotId::new(from_commit_id)).await?;
let to_commit = coord.resolve_commit(&SnapshotId::new(to_commit_id)).await?;
let from_snap = coord
@ -599,7 +608,7 @@ impl Omnigraph {
/// Create a Snapshot at any historical manifest version.
pub async fn snapshot_at_version(&self, version: u64) -> Result<Snapshot> {
self.ensure_schema_state_valid().await?;
self.coordinator.lock().await.snapshot_at_version(version).await
self.coordinator.read().await.snapshot_at_version(version).await
}
pub async fn export_jsonl(
@ -740,11 +749,11 @@ impl Omnigraph {
}
pub(crate) async fn active_branch(&self) -> Option<String> {
self.coordinator.lock().await.current_branch().map(str::to_string)
self.coordinator.read().await.current_branch().map(str::to_string)
}
async fn ensure_branch_delete_safe(&self, branch: &str, branches: &[String]) -> Result<()> {
let descendants = self.coordinator.lock().await.branch_descendants(branch).await?;
let descendants = self.coordinator.read().await.branch_descendants(branch).await?;
if let Some(descendant) = descendants.first() {
return Err(OmniError::manifest_conflict(format!(
"cannot delete branch '{}' because descendant branch '{}' still depends on it",
@ -800,7 +809,7 @@ impl Omnigraph {
}
async fn delete_branch_storage_only(&self, branch: &str) -> Result<()> {
let active = self.coordinator.lock().await.current_branch().map(str::to_string);
let active = self.coordinator.read().await.current_branch().map(str::to_string);
if active.as_deref() == Some(branch) {
return Err(OmniError::manifest_conflict(format!(
"cannot delete currently active branch '{}'",
@ -815,7 +824,7 @@ impl Omnigraph {
.map(|entry| (entry.table_key.clone(), entry.table_path.clone()))
.collect::<Vec<_>>();
self.coordinator.lock().await.branch_delete(branch).await?;
self.coordinator.write().await.branch_delete(branch).await?;
self.cleanup_deleted_branch_tables(branch, &owned_tables)
.await
}
@ -840,7 +849,7 @@ impl Omnigraph {
self.ensure_schema_state_valid().await?;
self.ensure_schema_apply_idle("branch_create").await?;
ensure_public_branch_ref(name, "branch_create")?;
self.coordinator.lock().await.branch_create(name).await
self.coordinator.write().await.branch_create(name).await
}
pub async fn branch_create_from(
@ -870,14 +879,14 @@ impl Omnigraph {
}
let branch = normalize_branch_name(&branch_name)?;
let previous = self.swap_coordinator_for_branch(branch.as_deref()).await?;
let result = self.coordinator.lock().await.branch_create(name).await;
let result = self.coordinator.write().await.branch_create(name).await;
self.restore_coordinator(previous).await;
result
}
pub async fn branch_list(&self) -> Result<Vec<String>> {
self.ensure_schema_state_valid().await?;
self.coordinator.lock().await.branch_list().await
self.coordinator.read().await.branch_list().await
}
pub async fn branch_delete(&self, name: &str) -> Result<()> {
@ -887,7 +896,7 @@ impl Omnigraph {
self.refresh().await?;
let branch = normalize_branch_name(name)?
.ok_or_else(|| OmniError::manifest("cannot delete branch 'main'".to_string()))?;
let branches = self.coordinator.lock().await.branch_list().await?;
let branches = self.coordinator.read().await.branch_list().await?;
if !branches.iter().any(|candidate| candidate == &branch) {
return Err(OmniError::manifest_not_found(format!(
"branch '{}' not found",
@ -901,7 +910,7 @@ impl Omnigraph {
pub async fn get_commit(&self, commit_id: &str) -> Result<GraphCommit> {
self.ensure_schema_state_valid().await?;
self.coordinator.lock().await
self.coordinator.read().await
.resolve_commit(&SnapshotId::new(commit_id))
.await
}
@ -924,16 +933,18 @@ impl Omnigraph {
pub(crate) async fn open_for_mutation(
&self,
table_key: &str,
op_kind: crate::db::MutationOpKind,
) -> Result<(Dataset, String, Option<String>)> {
table_ops::open_for_mutation(self, table_key).await
table_ops::open_for_mutation(self, table_key, op_kind).await
}
pub(crate) async fn open_for_mutation_on_branch(
&self,
branch: Option<&str>,
table_key: &str,
op_kind: crate::db::MutationOpKind,
) -> Result<(Dataset, String, Option<String>)> {
table_ops::open_for_mutation_on_branch(self, branch, table_key).await
table_ops::open_for_mutation_on_branch(self, branch, table_key, op_kind).await
}
pub(crate) async fn fork_dataset_from_entry_state(
@ -961,9 +972,17 @@ impl Omnigraph {
full_path: &str,
table_branch: Option<&str>,
expected_version: u64,
op_kind: crate::db::MutationOpKind,
) -> Result<Dataset> {
table_ops::reopen_for_mutation(self, table_key, full_path, table_branch, expected_version)
.await
table_ops::reopen_for_mutation(
self,
table_key,
full_path,
table_branch,
expected_version,
op_kind,
)
.await
}
pub(crate) async fn open_dataset_at_state(
@ -1551,7 +1570,10 @@ edge WorksAt: Person -> Company
}
async fn seed_person_row(db: &mut Omnigraph, name: &str, age: Option<i32>) {
let (mut ds, full_path, table_branch) = db.open_for_mutation("node:Person").await.unwrap();
let (mut ds, full_path, table_branch) = db
.open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
.await
.unwrap();
let schema: Arc<Schema> = Arc::new(ds.schema().into());
let columns: Vec<Arc<dyn Array>> = schema
.fields()
@ -1675,7 +1697,7 @@ edge WorksAt: Person -> Company
.await
.unwrap();
let all_branches = db.coordinator.lock().await.all_branches().await.unwrap();
let all_branches = db.coordinator.read().await.all_branches().await.unwrap();
assert!(
!all_branches.iter().any(|b| is_internal_run_branch(b)),
"run branch should be deleted after publish, got: {:?}",
@ -1731,13 +1753,16 @@ edge WorksAt: Person -> Company
let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
let mut db = db;
db.coordinator
.lock()
.write()
.await
.branch_create(SCHEMA_APPLY_LOCK_BRANCH)
.await
.unwrap();
let err = db.open_for_mutation("node:Person").await.unwrap_err();
let err = db
.open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
.await
.unwrap_err();
assert!(
err.to_string()
.contains("write is unavailable while schema apply is in progress")
@ -1750,7 +1775,7 @@ edge WorksAt: Person -> Company
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
db.coordinator
.lock()
.write()
.await
.branch_create(SCHEMA_APPLY_LOCK_BRANCH)
.await
@ -1769,7 +1794,7 @@ edge WorksAt: Person -> Company
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
db.coordinator
.lock()
.write()
.await
.branch_create(SCHEMA_APPLY_LOCK_BRANCH)
.await

View file

@ -16,7 +16,7 @@ pub(super) async fn entity_at(
id: &str,
version: u64,
) -> Result<Option<serde_json::Value>> {
let snap = db.coordinator.lock().await.snapshot_at_version(version).await?;
let snap = db.coordinator.read().await.snapshot_at_version(version).await?;
entity_from_snapshot(db, &snap, table_key, id).await
}

View file

@ -31,7 +31,7 @@ pub(super) async fn apply_schema_with_lock(
desired_schema_source: &str,
) -> Result<SchemaApplyResult> {
db.ensure_schema_state_valid().await?;
let branches = db.coordinator.lock().await.all_branches().await?;
let branches = db.coordinator.read().await.all_branches().await?;
// Skip `main` and internal system branches. The schema-apply lock branch
// is excluded because it is the cluster-wide schema-apply serializer.
// `__run__*` branches are no longer created; the filter remains as
@ -475,7 +475,7 @@ pub(super) async fn apply_schema_with_lock(
_snapshot_id: _,
} = db
.coordinator
.lock()
.write()
.await
.commit_changes_with_actor(&manifest_changes, None)
.await?;
@ -500,7 +500,7 @@ pub(super) async fn apply_schema_with_lock(
db.store_catalog(desired_catalog);
db.store_schema_source(desired_schema_source.to_string());
db.coordinator.lock().await.refresh().await?;
db.coordinator.write().await.refresh().await?;
db.runtime_cache.invalidate_all().await;
if changed_edge_tables {
db.invalidate_graph_index().await;
@ -539,7 +539,7 @@ pub(super) async fn ensure_schema_apply_idle(db: &Omnigraph, operation: &str) ->
pub(super) async fn acquire_schema_apply_lock(db: &Omnigraph) -> Result<()> {
db.ensure_schema_state_valid().await?;
db.refresh_coordinator_only().await?;
let branches = db.coordinator.lock().await.all_branches().await?;
let branches = db.coordinator.read().await.all_branches().await?;
if branches
.iter()
.any(|branch| is_schema_apply_lock_branch(branch))
@ -550,7 +550,7 @@ pub(super) async fn acquire_schema_apply_lock(db: &Omnigraph) -> Result<()> {
}
db.coordinator
.lock()
.write()
.await
.branch_create(SCHEMA_APPLY_LOCK_BRANCH)
.await?;
@ -558,7 +558,7 @@ pub(super) async fn acquire_schema_apply_lock(db: &Omnigraph) -> Result<()> {
let blocking_branches = db
.coordinator
.lock()
.read()
.await
.all_branches()
.await?
@ -578,7 +578,7 @@ pub(super) async fn acquire_schema_apply_lock(db: &Omnigraph) -> Result<()> {
pub(super) async fn release_schema_apply_lock(db: &Omnigraph) -> Result<()> {
db.coordinator
.lock()
.write()
.await
.branch_delete(SCHEMA_APPLY_LOCK_BRANCH)
.await?;
@ -593,7 +593,7 @@ pub(super) async fn release_schema_apply_lock(db: &Omnigraph) -> Result<()> {
pub(super) async fn ensure_schema_apply_not_locked(db: &Omnigraph, operation: &str) -> Result<()> {
if db
.coordinator
.lock()
.read()
.await
.all_branches()
.await?

View file

@ -2,7 +2,7 @@ use super::*;
pub(super) async fn graph_index(db: &Omnigraph) -> Result<Arc<crate::graph_index::GraphIndex>> {
db.ensure_schema_state_valid().await?;
let coord = db.coordinator.lock().await;
let coord = db.coordinator.read().await;
let resolved = coord
.resolve_target(&ReadTarget::Branch(
coord.current_branch().unwrap_or("main").to_string(),
@ -22,7 +22,7 @@ pub(super) async fn graph_index_for_resolved(
}
pub(super) async fn ensure_indices(db: &Omnigraph) -> Result<()> {
let current_branch = db.coordinator.lock().await.current_branch().map(str::to_string);
let current_branch = db.coordinator.read().await.current_branch().map(str::to_string);
ensure_indices_for_branch(db, current_branch.as_deref()).await
}
@ -201,6 +201,7 @@ pub(super) async fn ensure_indices_for_branch(
entry.table_branch.as_deref(),
entry.table_version,
active_branch,
crate::db::MutationOpKind::SchemaRewrite,
)
.await?
}
@ -248,6 +249,7 @@ pub(super) async fn ensure_indices_for_branch(
entry.table_branch.as_deref(),
entry.table_version,
active_branch,
crate::db::MutationOpKind::SchemaRewrite,
)
.await?
}
@ -399,15 +401,23 @@ async fn needs_index_work_edge(
pub(super) async fn open_for_mutation(
db: &Omnigraph,
table_key: &str,
op_kind: crate::db::MutationOpKind,
) -> Result<(Dataset, String, Option<String>)> {
let current_branch = db.coordinator.lock().await.current_branch().map(str::to_string);
open_for_mutation_on_branch(db, current_branch.as_deref(), table_key).await
let current_branch = db.coordinator.read().await.current_branch().map(str::to_string);
open_for_mutation_on_branch(db, current_branch.as_deref(), table_key, op_kind).await
}
/// Open a sub-table for mutation. The `op_kind` selects the strict-vs-relaxed
/// pre-stage version-check policy — see [`crate::db::MutationOpKind`] for the
/// rationale per kind. Insert / Merge skip the strict
/// `ensure_expected_version` check (Lance's natural conflict resolver +
/// per-(table, branch) queue + publisher CAS handle drift); Update / Delete /
/// SchemaRewrite keep it (read-modify-write SI).
pub(super) async fn open_for_mutation_on_branch(
db: &Omnigraph,
branch: Option<&str>,
table_key: &str,
op_kind: crate::db::MutationOpKind,
) -> Result<(Dataset, String, Option<String>)> {
db.ensure_schema_apply_not_locked("write").await?;
let resolved = db.resolved_branch_target(branch).await?;
@ -422,8 +432,10 @@ pub(super) async fn open_for_mutation_on_branch(
.table_store
.open_dataset_head_for_write(table_key, &full_path, None)
.await?;
db.table_store
.ensure_expected_version(&ds, table_key, entry.table_version)?;
if op_kind.strict_pre_stage_version_check() {
db.table_store
.ensure_expected_version(&ds, table_key, entry.table_version)?;
}
Ok((ds, full_path, None))
}
Some(active_branch) => {
@ -434,6 +446,7 @@ pub(super) async fn open_for_mutation_on_branch(
entry.table_branch.as_deref(),
entry.table_version,
active_branch,
op_kind,
)
.await?;
Ok((ds, full_path, table_branch))
@ -448,6 +461,7 @@ pub(super) async fn open_owned_dataset_for_branch_write(
entry_branch: Option<&str>,
entry_version: u64,
active_branch: &str,
op_kind: crate::db::MutationOpKind,
) -> Result<(Dataset, Option<String>)> {
match entry_branch {
Some(branch) if branch == active_branch => {
@ -455,8 +469,10 @@ pub(super) async fn open_owned_dataset_for_branch_write(
.table_store
.open_dataset_head_for_write(table_key, full_path, Some(active_branch))
.await?;
db.table_store
.ensure_expected_version(&ds, table_key, entry_version)?;
if op_kind.strict_pre_stage_version_check() {
db.table_store
.ensure_expected_version(&ds, table_key, entry_version)?;
}
Ok((ds, Some(active_branch.to_string())))
}
source_branch => {
@ -473,8 +489,10 @@ pub(super) async fn open_owned_dataset_for_branch_write(
.table_store
.open_dataset_head_for_write(table_key, full_path, Some(active_branch))
.await?;
db.table_store
.ensure_expected_version(&ds, table_key, entry_version)?;
if op_kind.strict_pre_stage_version_check() {
db.table_store
.ensure_expected_version(&ds, table_key, entry_version)?;
}
Ok((ds, Some(active_branch.to_string())))
}
}
@ -505,11 +523,27 @@ pub(super) async fn reopen_for_mutation(
full_path: &str,
table_branch: Option<&str>,
expected_version: u64,
op_kind: crate::db::MutationOpKind,
) -> Result<Dataset> {
db.ensure_schema_apply_not_locked("write").await?;
db.table_store
.reopen_for_mutation(full_path, table_branch, table_key, expected_version)
.await
if op_kind.strict_pre_stage_version_check() {
db.table_store
.reopen_for_mutation(full_path, table_branch, table_key, expected_version)
.await
} else {
// Insert / Merge: skip the strict version check. Open at HEAD —
// Lance's natural conflict resolver at commit_staged time
// (rebase append, dedupe merge_insert) handles concurrent
// writers correctly; the publisher CAS in
// `MutationStaging::commit_all` (refreshed under the
// per-(table, branch) queue via `snapshot_for_branch`) catches
// genuine cross-process drift as 409. See
// [`crate::db::MutationOpKind`] for the policy rationale.
let _ = expected_version;
db.table_store
.open_dataset_head_for_write(table_key, full_path, table_branch)
.await
}
}
pub(super) async fn open_dataset_at_state(
@ -704,12 +738,19 @@ async fn prepare_updates_for_commit(
let mut prepared_update = update.clone();
if prepared_update.row_count > 0 {
let full_path = format!("{}/{}", db.root_uri, entry.table_path);
// Strict version check is correct here: this runs INSIDE
// the publisher commit path, after `commit_staged` already
// advanced Lance HEAD to `prepared_update.table_version`.
// The check is a defense-in-depth assertion that the
// dataset state matches what we just committed; not the
// pre-stage race the op-kind policy targets.
let mut ds = reopen_for_mutation(
db,
&prepared_update.table_key,
&full_path,
prepared_update.table_branch.as_deref(),
prepared_update.table_version,
crate::db::MutationOpKind::SchemaRewrite,
)
.await?;
build_indices_on_dataset(db, &prepared_update.table_key, &mut ds).await?;
@ -735,7 +776,7 @@ async fn commit_prepared_updates(
_snapshot_id: _,
} = db
.coordinator
.lock()
.write()
.await
.commit_updates_with_actor(updates, actor_id)
.await?;
@ -753,7 +794,7 @@ async fn commit_prepared_updates_with_expected(
_snapshot_id: _,
} = db
.coordinator
.lock()
.write()
.await
.commit_updates_with_actor_with_expected(updates, expected_table_versions, actor_id)
.await?;
@ -766,7 +807,7 @@ pub(super) async fn commit_prepared_updates_on_branch(
updates: &[crate::db::SubTableUpdate],
actor_id: Option<&str>,
) -> Result<u64> {
let current_branch = db.coordinator.lock().await.current_branch().map(str::to_string);
let current_branch = db.coordinator.read().await.current_branch().map(str::to_string);
let requested_branch = branch.map(str::to_string);
if requested_branch == current_branch {
return commit_prepared_updates(db, updates, actor_id).await;
@ -794,7 +835,7 @@ pub(super) async fn commit_prepared_updates_on_branch_with_expected(
expected_table_versions: &std::collections::HashMap<String, u64>,
actor_id: Option<&str>,
) -> Result<u64> {
let current_branch = db.coordinator.lock().await.current_branch().map(str::to_string);
let current_branch = db.coordinator.read().await.current_branch().map(str::to_string);
let requested_branch = branch.map(str::to_string);
if requested_branch == current_branch {
return commit_prepared_updates_with_expected(
@ -829,7 +870,7 @@ pub(super) async fn commit_updates(
updates: &[crate::db::SubTableUpdate],
) -> Result<u64> {
db.ensure_schema_apply_not_locked("write commit").await?;
let current_branch = db.coordinator.lock().await.current_branch().map(str::to_string);
let current_branch = db.coordinator.read().await.current_branch().map(str::to_string);
let prepared = prepare_updates_for_commit(db, current_branch.as_deref(), updates).await?;
commit_prepared_updates(db, &prepared, None).await
}
@ -838,7 +879,7 @@ pub(super) async fn commit_manifest_updates(
db: &Omnigraph,
updates: &[crate::db::SubTableUpdate],
) -> Result<u64> {
db.coordinator.lock().await.commit_manifest_updates(updates).await
db.coordinator.write().await.commit_manifest_updates(updates).await
}
pub(super) async fn record_merge_commit(
@ -848,7 +889,7 @@ pub(super) async fn record_merge_commit(
merged_parent_commit_id: &str,
actor_id: Option<&str>,
) -> Result<String> {
db.coordinator.lock().await
db.coordinator.write().await
.record_merge_commit(
manifest_version,
parent_commit_id,
@ -882,7 +923,7 @@ pub(super) async fn commit_updates_on_branch_with_expected(
}
pub(super) async fn ensure_commit_graph_initialized(db: &Omnigraph) -> Result<()> {
db.coordinator.lock().await.ensure_commit_graph_initialized().await
db.coordinator.write().await.ensure_commit_graph_initialized().await
}
pub(super) async fn invalidate_graph_index(db: &Omnigraph) {

View file

@ -908,7 +908,13 @@ async fn publish_rewritten_merge_table(
table_key: &str,
staged: &StagedMergeResult,
) -> Result<crate::db::SubTableUpdate> {
let (ds, full_path, table_branch) = target_db.open_for_mutation(table_key).await?;
// Branch merge's source-rewrite path is Merge-shaped (upsert from
// source onto target). The inline `delete_where` later in this
// function operates on rows the rewrite chose to remove, not
// user-facing predicates, so Merge is the correct policy here.
let (ds, full_path, table_branch) = target_db
.open_for_mutation(table_key, crate::db::MutationOpKind::Merge)
.await?;
let mut current_ds = ds;
// Phase 1: merge_insert changed/new rows (preserves _row_created_at_version for

View file

@ -600,6 +600,7 @@ async fn open_table_for_mutation(
staging: &mut MutationStaging,
branch: Option<&str>,
table_key: &str,
op_kind: crate::db::MutationOpKind,
) -> Result<(Dataset, String, Option<String>)> {
if let Some(prior) = staging.inline_committed.get(table_key) {
let path = staging.paths.get(table_key).ok_or_else(|| {
@ -614,12 +615,14 @@ async fn open_table_for_mutation(
&path.full_path,
path.table_branch.as_deref(),
prior.table_version,
op_kind,
)
.await?;
return Ok((ds, path.full_path.clone(), path.table_branch.clone()));
}
let (ds, full_path, table_branch) =
db.open_for_mutation_on_branch(branch, table_key).await?;
let (ds, full_path, table_branch) = db
.open_for_mutation_on_branch(branch, table_key, op_kind)
.await?;
let expected_version = ds.version().version;
staging.ensure_path(
table_key,
@ -911,8 +914,13 @@ impl Omnigraph {
let has_key = node_type.key_property().is_some();
let table_key = format!("node:{}", type_name);
// Capture pre-write metadata on first touch (no Lance write).
let insert_kind = if has_key {
crate::db::MutationOpKind::Merge
} else {
crate::db::MutationOpKind::Insert
};
let (_ds, _full_path, _table_branch) =
open_table_for_mutation(self, staging, branch, &table_key).await?;
open_table_for_mutation(self, staging, branch, &table_key, insert_kind).await?;
// Accumulate. @key inserts go into the Merge stream (so a
// later update on the same id coalesces correctly); no-key
// inserts go into the Append stream.
@ -946,8 +954,14 @@ impl Omnigraph {
}
let table_key = format!("edge:{}", type_name);
// Capture pre-write metadata on first touch (no Lance write).
let (ds, _full_path, _table_branch) =
open_table_for_mutation(self, staging, branch, &table_key).await?;
let (ds, _full_path, _table_branch) = open_table_for_mutation(
self,
staging,
branch,
&table_key,
crate::db::MutationOpKind::Insert,
)
.await?;
// Accumulate the new edge row. Edge IDs are ULID-generated so
// Append mode is correct (no key-based dedup needed).
staging.append_batch(&table_key, schema, PendingMode::Append, batch.clone())?;
@ -1008,8 +1022,14 @@ impl Omnigraph {
let blob_props = self.catalog().node_types[type_name].blob_properties.clone();
let table_key = format!("node:{}", type_name);
let (ds, _full_path, _table_branch) =
open_table_for_mutation(self, staging, branch, &table_key).await?;
let (ds, _full_path, _table_branch) = open_table_for_mutation(
self,
staging,
branch,
&table_key,
crate::db::MutationOpKind::Update,
)
.await?;
// Scan committed via Lance + apply the same predicate to pending
// batches via DataFusion `MemTable` (read-your-writes for prior
@ -1130,8 +1150,14 @@ impl Omnigraph {
let pred_sql = predicate_to_sql(predicate, params, false)?;
let table_key = format!("node:{}", type_name);
let (ds, full_path, table_branch) =
open_table_for_mutation(self, staging, branch, &table_key).await?;
let (ds, full_path, table_branch) = open_table_for_mutation(
self,
staging,
branch,
&table_key,
crate::db::MutationOpKind::Delete,
)
.await?;
let initial_version = ds.version().version;
// Scan matching IDs for cascade. Per D₂ this never overlaps with
@ -1176,6 +1202,7 @@ impl Omnigraph {
&full_path,
table_branch.as_deref(),
initial_version,
crate::db::MutationOpKind::Delete,
)
.await?;
let delete_state = self
@ -1219,8 +1246,14 @@ impl Omnigraph {
let edge_table_key = format!("edge:{}", edge_name);
let cascade_filter = cascade_filters.join(" OR ");
let (mut edge_ds, edge_full_path, edge_table_branch) =
open_table_for_mutation(self, staging, branch, &edge_table_key).await?;
let (mut edge_ds, edge_full_path, edge_table_branch) = open_table_for_mutation(
self,
staging,
branch,
&edge_table_key,
crate::db::MutationOpKind::Delete,
)
.await?;
let edge_delete = self
.table_store()
@ -1261,8 +1294,14 @@ impl Omnigraph {
let pred_sql = predicate_to_sql(predicate, params, true)?;
let table_key = format!("edge:{}", type_name);
let (mut ds, full_path, table_branch) =
open_table_for_mutation(self, staging, branch, &table_key).await?;
let (mut ds, full_path, table_branch) = open_table_for_mutation(
self,
staging,
branch,
&table_key,
crate::db::MutationOpKind::Delete,
)
.await?;
let delete_state = self
.table_store()

View file

@ -247,15 +247,27 @@ impl MutationStaging {
))
})?;
// Reopen at the pre-write version. Lance HEAD has not advanced
// since `ensure_path` captured it — no prior op committed to
// this dataset.
// Reopen the dataset for staging. The op_kind reflects the
// accumulated PendingTable's mode: Append-mode batches are
// INSERT-shaped (no key-based dedup at commit_staged); Merge-
// mode batches are MERGE-shaped (key-dedup at commit_staged).
// Both skip the strict pre-stage version check under the
// [`MutationOpKind`] policy: Lance's natural rebase + the
// per-(table, branch) queue + the publisher CAS in
// `commit_all` handle drift; the strict check would
// over-reject in-process concurrent inserts (PR 2 / MR-686
// Phase 2).
let stage_kind = match table.mode {
PendingMode::Append => crate::db::MutationOpKind::Insert,
PendingMode::Merge => crate::db::MutationOpKind::Merge,
};
let ds = db
.reopen_for_mutation(
&table_key,
&path.full_path,
path.table_branch.as_deref(),
expected,
stage_kind,
)
.await?;

View file

@ -335,6 +335,16 @@ async fn load_jsonl_reader<R: BufRead>(
LoadMode::Append => PendingMode::Append,
LoadMode::Overwrite => PendingMode::Append, // unused
};
// Map LoadMode to MutationOpKind for the version-check policy.
// Append/Merge skip the strict pre-stage check (concurrency-safe
// under the per-(table, branch) queue + publisher CAS); Overwrite
// uses the strict check because it truncates and replaces the
// dataset — concurrent advances change what "replace" means.
let load_op_kind = match mode {
LoadMode::Append => crate::db::MutationOpKind::Insert,
LoadMode::Merge => crate::db::MutationOpKind::Merge,
LoadMode::Overwrite => crate::db::MutationOpKind::SchemaRewrite,
};
// Phase 2a: build and validate every node batch up front. Cheap and
// synchronous — surfaces validation errors before any S3 traffic.
@ -365,7 +375,7 @@ async fn load_jsonl_reader<R: BufRead>(
if use_staging {
for (type_name, table_key, batch, loaded_count) in prepared_nodes {
let (ds, full_path, table_branch) = db
.open_for_mutation_on_branch(branch, &table_key)
.open_for_mutation_on_branch(branch, &table_key, load_op_kind)
.await?;
let expected_version = ds.version().version;
staging.ensure_path(
@ -486,7 +496,7 @@ async fn load_jsonl_reader<R: BufRead>(
if use_staging {
for (edge_name, table_key, batch, loaded_count) in prepared_edges {
let (ds, full_path, table_branch) = db
.open_for_mutation_on_branch(branch, &table_key)
.open_for_mutation_on_branch(branch, &table_key, load_op_kind)
.await?;
let expected_version = ds.version().version;
staging.ensure_path(
@ -1164,8 +1174,14 @@ async fn write_batch_to_dataset(
batch: RecordBatch,
mode: LoadMode,
) -> Result<(crate::table_store::TableState, Option<String>)> {
let (mut ds, full_path, table_branch) =
db.open_for_mutation_on_branch(branch, table_key).await?;
let op_kind = match mode {
LoadMode::Append => crate::db::MutationOpKind::Insert,
LoadMode::Merge => crate::db::MutationOpKind::Merge,
LoadMode::Overwrite => crate::db::MutationOpKind::SchemaRewrite,
};
let (mut ds, full_path, table_branch) = db
.open_for_mutation_on_branch(branch, table_key, op_kind)
.await?;
let table_store = db.table_store();
match mode {