MR-771: demote Run to direct-publish via expected_table_versions CAS

mutate_as and load now write directly to target tables and call the
publisher once at the end with per-table expected versions; the Run
state machine, _graph_runs.lance writers, __run__ staging branches,
and server /runs/* endpoints are removed. Multi-statement mutations
remain atomic at the manifest level via an in-memory MutationStaging
accumulator that gives read-your-writes within a query and a single
publish at the end. Concurrent-writer conflicts surface as
ExpectedVersionMismatch (HTTP 409 manifest_conflict) instead of the
old DivergentUpdate merge shape. Documents one known limitation in
docs/runs.md: a multi-statement mid-query failure where op-N writes
a Lance fragment and op-N+1 fails leaves Lance HEAD ahead of the
manifest until a follow-up introduces per-table Lance branches.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-04-30 08:52:50 +02:00
parent 4e5374a85e
commit 35be20cb05
No known key found for this signature in database
28 changed files with 1188 additions and 3216 deletions

View file

@ -154,42 +154,17 @@ impl Omnigraph {
pub async fn load(&mut self, branch: &str, data: &str, mode: LoadMode) -> Result<LoadResult> {
self.ensure_schema_state_valid().await?;
let requested = Self::normalize_branch_name(branch)?.unwrap_or_else(|| "main".to_string());
if crate::db::is_internal_run_branch(&requested) {
return self
.load_direct_on_branch(Some(requested.as_str()), data, mode)
.await;
}
let target_head_before = self.latest_branch_snapshot_id(&requested).await?;
let op = format!("load_jsonl:branch={}:mode={}", requested, mode.as_str());
let run = self.begin_run(&requested, Some(op.as_str())).await?;
let staged_result = match self
.load_direct_on_branch(Some(run.run_branch.as_str()), data, mode)
// Branch convention: `None` represents `main`. Re-normalizing to
// `Some("main")` here would route the publisher commit through a
// separate coordinator (the cross-branch path in
// `commit_prepared_updates_on_branch_with_expected`) and leave
// `self.coordinator` with a stale manifest snapshot.
let requested = Self::normalize_branch_name(branch)?;
// Direct-to-target writes: no Run state machine, no `__run__` staging
// branch. Cross-table OCC is enforced by the publisher's
// `expected_table_versions` CAS inside `load_jsonl_reader`.
self.load_direct_on_branch(requested.as_deref(), data, mode)
.await
{
Ok(result) => result,
Err(err) => {
let _ = self.fail_run(&run.run_id).await;
return Err(err);
}
};
let target_head_now = self.latest_branch_snapshot_id(&requested).await?;
if target_head_now.as_str() != target_head_before.as_str() {
let _ = self.fail_run(&run.run_id).await;
return Err(OmniError::manifest_conflict(format!(
"target branch '{}' advanced during transactional load; retry",
requested
)));
}
if let Err(err) = self.publish_run(&run.run_id).await {
let _ = self.fail_run(&run.run_id).await;
return Err(err);
}
Ok(staged_result)
}
pub async fn load_file(
@ -334,6 +309,10 @@ async fn load_jsonl_reader<R: BufRead>(
let mut updates = Vec::new();
let mut result = LoadResult::default();
let snapshot = db.snapshot_for_branch(branch).await?;
// Capture per-table manifest versions before any write so the publisher
// CAS at commit-time can detect concurrent writers landing between our
// read snapshot and our publish.
let mut expected_table_versions: HashMap<String, u64> = HashMap::new();
// Phase 2a: build and validate every node batch up front. Cheap and
// synchronous — surfaces validation errors before any S3 traffic.
@ -350,9 +329,10 @@ async fn load_jsonl_reader<R: BufRead>(
}
let loaded_count = batch.num_rows();
let table_key = format!("node:{}", type_name);
snapshot
let entry = snapshot
.entry(&table_key)
.ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
expected_table_versions.insert(table_key.clone(), entry.table_version);
prepared_nodes.push((type_name.clone(), table_key, batch, loaded_count));
}
@ -428,9 +408,10 @@ async fn load_jsonl_reader<R: BufRead>(
}
let loaded_count = batch.num_rows();
let table_key = format!("edge:{}", edge_name);
snapshot
let entry = snapshot
.entry(&table_key)
.ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
expected_table_versions.insert(table_key.clone(), entry.table_version);
prepared_edges.push((edge_name.clone(), table_key, batch, loaded_count));
}
@ -464,8 +445,9 @@ async fn load_jsonl_reader<R: BufRead>(
}
}
// Phase 4: Atomic manifest commit
db.commit_updates_on_branch(branch, &updates).await?;
// Phase 4: Atomic manifest commit with publisher-level OCC.
db.commit_updates_on_branch_with_expected(branch, &updates, &expected_table_versions)
.await?;
Ok(result)
}